Source code for triqs_maxent.maxent_loop

# TRIQS application maxent
# Copyright (C) 2018 Gernot J. Kraberger
# Copyright (C) 2018 Simons Foundation
# Authors: Gernot J. Kraberger and Manuel Zingl
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.



from .alpha_meshes import LogAlphaMesh
from .cost_functions import MaxEntCostFunction, BryanCostFunction
from .minimizers import LevenbergMinimizer
from .logtaker import Logtaker, VerbosityFlags
from .maxent_result import MaxEntResult
from .analyzers import *
from .probabilities import NormalLogProbability
from .functions import PlusMinusEntropy, PlusMinusH_of_v, view_real
from datetime import datetime
import numpy as np


[docs] class MaxEntLoop(object): r""" The main class running the MaxEnt optimization and :math:`\alpha` loop Parameters ---------- cost_function : CostFunction or str the cost function, e.g. :py:class:`.MaxEntCostFunction` (the default) or :py:class:`.BryanCostFunction`. This is :math:`\alpha` dependent and has to be minimized for each :math:`\alpha`. It can also be a string, i.e. 'normal' for MaxEntCostFunction, 'plusminus' for ``MaxEntCostFunction`` with :py:class:`.PlusMinusEntropy`, or 'bryan' for ``BryanCostFunction``. minimizer : Minimizer the minimizer, e.g. :py:class:`.LevenbergMinimizer` (the default). This actually minimizes the ``cost_function`` for a particular :math:`\alpha`. alpha_mesh : AlphaMesh the array of :math:`\alpha` values that should be calculated. The convention is to start with the largest :math:`\alpha` value. The default is a :py:class:`.LogAlphaMesh`. probability : Probability or str the function, e.g. :py:class:`.NormalLogProbability`, that allows to calculate the :math:`\log` of the probability of a particular :math:`\alpha`. If 'normal', the :py:class:`.NormalLogProbability` is used. Default: ``None`` analyzers : list of Analyzer the list of analyzers that are used to choose the one true spectral functions from :math:`A_\alpha(\omega)`. Default: ``[LineFitAnalyzer(), Chi2CurvatureAnalyzer(), EntropyAnalyzer()]`` and additionally ``[BryanAnalyzer(), ClassicAnalyzer()]`` if ``probability`` is not ``None`` logtaker : Logtaker for processing the log G_threshold : float if all values of the data Green function have an absolute value below that threshold, the calculation is not performed reduce_singular_space : float a threshold for reducing the singular space, see :py:meth:`.KernelSVD.reduce_singular_space`. A_init : array the initial spectral function (i.e., the starting value of the optimization for the first :math:`\alpha` value) (if it is ``None``, the default model is used) interactive : bool whether or not to pressing Ctrl+C during the run should show a menu to decide what to do instead of immediately exiting (default: True) scale_alpha : float or str scale all alpha values by this factor; if ``'Ndata'``, it is scaled by the number of data points (the default) """ def __init__(self, cost_function=None, minimizer=None, alpha_mesh=None, probability=None, analyzers=None, logtaker=None, G_threshold=1.e-10, reduce_singular_space=1.e-14, A_init=None, interactive=True, scale_alpha='Ndata'): self.cost_function = cost_function self.minimizer = minimizer self.alpha_mesh = alpha_mesh self.logtaker = logtaker self.probability = probability self.analyzers = analyzers self.G_threshold = G_threshold # supply default values if self.cost_function is None: self.cost_function = MaxEntCostFunction() else: if isinstance(self.cost_function, str): if self.cost_function.lower() == 'normal': self.cost_function = MaxEntCostFunction() elif self.cost_function.lower() == 'plusminus': self.cost_function = MaxEntCostFunction( S=PlusMinusEntropy(), H_of_v=PlusMinusH_of_v()) elif self.cost_function.lower() == 'bryan': self.cost_function = BryanCostFunction() else: raise Exception( 'Unknown cost_function str {}.'.format( self.cost_function)) if self.minimizer is None: self.minimizer = LevenbergMinimizer() if self.alpha_mesh is None: self.alpha_mesh = LogAlphaMesh() if self.logtaker is None: self.logtaker = Logtaker() if isinstance(self.probability, str): if self.probability.lower() == 'normal': self.probability = NormalLogProbability() if self.analyzers is None: self.analyzers = [LineFitAnalyzer(), Chi2CurvatureAnalyzer(), EntropyAnalyzer()] if self.probability is not None: self.analyzers += [BryanAnalyzer(), ClassicAnalyzer()] self.interactive = interactive self.A_init = A_init self.reduce_singular_space = reduce_singular_space self.scale_alpha = scale_alpha ####### Main functionality #######
[docs] def run(self, result=None, matrix_element=None, complex_index=None): """ Run the MaxEnt loop Parameters ---------- result : MaxEntResult the object where the result should be saved in; especially useful for elementwise calculations, where the results from different matrix elements should be written into only one result object. If ``None``, a new ``MaxEntResult`` is generated. matrix_element : tuple the index of the matrix element that should be calculated. This is needed when a ``result`` is given in an elementwise calculation, so that the result can be written for the correct matrix element complex_index : int the index of the complex number, either 0 (real) or 1 (imaginary); this is needed for complex elementwise calculations Returns ------- result : :py:class:`.MaxEntResult` the result of the calculation """ # check if all entries of G are below the threshold; then # no calculation should be performed if np.max(np.abs(self.G)) < self.G_threshold: if result is not None and matrix_element is not None: result.zero_elements.append(matrix_element) self.logtaker.error_message( 'G below threshold, not performing the calculation.') return None self.logtaker.welcome_message() assert self.err is not None, 'No error specified' self.K.reduce_singular_space(self.reduce_singular_space) # calculate minimal chi2 A_min = np.linalg.lstsq(self.K.K, self.G, rcond=-1)[0] if(np.any(np.iscomplex(A_min))): A_min = view_real(A_min) chi2_min = self.chi2(A_min).f() self.logtaker.message(VerbosityFlags.Header, "Minimal chi2: {}", chi2_min) # the initial value of v H = np.empty(self.chi2.input_size) right_side = ( self.D.D if self.A_init is None else self.A_init) * self.omega.delta right_side_slice = [np.newaxis] * H.ndim for i in self.chi2.axes_preference[:right_side.ndim]: right_side_slice[i] = slice(None, None) H[:] = right_side[tuple(right_side_slice)] v = self.H_of_v(H).inv() # set up result if result is None: result = MaxEntResult() # set the default analyzer name to the first analyzer if result._default_analyzer_name is None: try: result._default_analyzer_name = self.analyzers[0].name except: pass if self.scale_alpha is None: scale_alpha = 1.0 elif isinstance(self.scale_alpha, str): if self.scale_alpha.lower() == "ndata": scale_alpha = len(self.G) else: raise Exception( "Unknown value {} for scale_alpha".format( self.scale_alpha)) self.logtaker.message( VerbosityFlags.Header, 'scaling alpha by a factor {} (number of data points)'.format(scale_alpha)) else: scale_alpha = self.scale_alpha self.logtaker.message( VerbosityFlags.Header, 'scaling alpha by a factor {}'.format(scale_alpha)) self.check_consistency() result.start_timing(matrix_element=matrix_element, complex_index=complex_index) any_not_converged = False # the main loop for i_alpha, alpha in enumerate(self.alpha_mesh): try: self.cost_function.set_alpha(alpha * scale_alpha) # minimize the cost function v = self.minimizer.minimize(self.cost_function, v) Q_min = self.cost_function(v) # report self.logtaker.message( VerbosityFlags.AlphaLoop, "alpha[{:" + str(int(np.ceil(np.log10(len(self.alpha_mesh))))) + "d}] = {:16.8e}, chi2 = {:16.8e}, n_iter={:8d}{}", i_alpha, alpha * scale_alpha, Q_min.chi2.f(), self.minimizer.n_iter_last, ' ' if self.minimizer.converged else '!') if not self.minimizer.converged: any_not_converged = True if self.probability is None: result.add_result(Q_min, matrix_element=matrix_element, complex_index=complex_index) else: result.add_result(Q_min, self.probability(Q_min).f(), matrix_element=matrix_element, complex_index=complex_index) except KeyboardInterrupt: if not self.interactive: raise print() print('Program interrupted.') print(' c continue with next alpha (this alpha will be tossed!)') print(' a continue with alpha analyzer (end alpha loop)') print(' r return immediately') print(' x exit code immediately') inp = input('How to proceed? ') if inp == 'c': pass elif inp == 'a': break elif inp == 'r': return result elif inp == 'x': raise if any_not_converged: self.logtaker.message( VerbosityFlags.AlphaLoop, "\n! ... The minimizer did not converge. Results might be wrong.\n") run_time = result.end_timing(matrix_element=matrix_element, complex_index=complex_index) self.logtaker.message(VerbosityFlags.Timing, "MaxEnt loop finished in {}", run_time) # analyze to get the one true spectral function from A(alpha) result.analyze(self.analyzers, matrix_element=matrix_element, complex_index=complex_index) return result
####### Helper functions #######
[docs] def check_consistency(self): """ check whether all child objects are consistent e.g., whether they all have the same ``omega`` values etc. Raises an error if not. This is automatically done before ``run``\ ning. """ assert self.chi2 is self.cost_function.chi2 assert self.S is self.cost_function.S assert self.D is self.cost_function.D assert self.K is self.cost_function.K assert self.G is self.cost_function.G assert self.err is self.cost_function.err assert self.omega is self.cost_function.omega assert self.data_variable is self.cost_function.data_variable assert self.H_of_v is self.cost_function.H_of_v assert self.A_of_H is self.cost_function.A_of_H assert self.cost_function.K is self.H_of_v.K assert self.chi2.K is self.H_of_v.K assert np.all(self.K.omega == self.omega) assert np.all(self.D.omega == self.omega) assert np.all(self.S.omega == self.omega) assert np.all(self.H_of_v.omega == self.omega) assert np.all(self.A_of_H.omega == self.omega) assert np.all(self.K.data_variable == self.data_variable) assert self.K is self.H_of_v.K assert np.all(self.chi2.data_variable == self.data_variable) assert np.all(self.H_of_v.K.data_variable == self.data_variable) assert np.all(self.H_of_v.D.D == self.D.D) assert np.all(self.S.D.D == self.D.D)
[docs] def set_verbosity(self, verbosity=None, add=None, remove=None, change_callback=True): """ Set the verbosity Parameters ---------- verbosity : :py:class:`.VerbosityFlags` if not None, the verbosity is set to this value; e.g.:: VerbosityFlags.Header | VerbosityFlags.Timing to show just the header and the timing information. add : :py:class:`.VerbosityFlags` if not None, the verbosity flags given are added to the verbosity flags already set. If also verbosity is not None, this is performed *after* setting ``verbosity``. remove : :py:class:`.VerbosityFlags` if not None, the verbosity flags given are removed from the verbosity flags already set. If also ``verbosity`` or ``add`` is not None, this is performed after the other two. change_callback : bool whether to turn on/off the callback in the minimizer as needed according to the verbosity flags given (the minimizer is faster if it does not have to produce verbose info) """ if verbosity is not None: self.logtaker.verbose = verbosity if add is not None: self.logtaker.verbose |= add if remove is not None: self.logtaker.verbose &= ~ remove # generating the verbose message in the minimizer takes some # time that can be saved, therefore we explicitly set the # callback to None if not needed if change_callback: if self.logtaker.verbose & VerbosityFlags.SolverDetails: self.minimizer.verbose_callback = self.logtaker.solver_verbose_callback else: self.minimizer.verbose_callback = None
def get_K(self): return self.cost_function.K def set_K(self, K, update_chi2=True, update_H_of_v=True, update_Q=True): self.cost_function.set_K(K, update_chi2=update_chi2, update_H_of_v=update_H_of_v, update_Q=update_Q) K = property(get_K, set_K) def get_G(self): return self.cost_function.G def set_G(self, G, update_chi2=True, update_Q=True): self.cost_function.set_G(G, update_chi2=update_chi2, update_Q=update_Q) G = property(get_G, set_G) def get_err(self): return self.cost_function.err def set_err(self, err, update_chi2=True, update_Q=True): self.cost_function.set_err(err, update_chi2=update_chi2, update_Q=update_Q) err = property(get_err, set_err) def get_omega(self): return self.cost_function.omega def set_omega(self, omega, update_K=True, update_chi2=True, update_D=True, update_S=True, update_H_of_v=True, update_A_of_H=True, update_Q=True): self.cost_function.set_omega(omega, update_K=update_K, update_chi2=update_chi2, update_D=update_D, update_S=update_S, update_H_of_v=update_H_of_v, update_A_of_H=update_A_of_H, update_Q=update_Q) omega = property(get_omega, set_omega) # ``data_variable`` is a name we use for tau # because in general we might want to use a different kernel # and then the supplied data is not G(tau) but, eg, G(iw) def get_data_variable(self): return self.cost_function.data_variable def set_data_variable(self, data_variable, update_K=True, update_chi2=True, update_Q=True, update_H_of_v=True): self.cost_function.set_data_variable(data_variable, update_K=update_K, update_chi2=update_chi2, update_Q=update_Q, update_H_of_v=update_H_of_v) data_variable = property(get_data_variable, set_data_variable) def get_D(self): return self.cost_function.D def set_D(self, D, update_S=True, update_H_of_v=True, update_A_of_H=True, update_Q=True): self.cost_function.set_D(D, update_S=update_S, update_H_of_v=update_H_of_v, update_A_of_H=update_A_of_H, update_Q=update_Q) D = property(get_D, set_D) def get_chi2(self): return self.cost_function.chi2 def set_chi2(self, chi2, update_Q=True): self.cost_function.set_chi2(chi2, update_Q=update_Q) chi2 = property(get_chi2, set_chi2) def get_S(self): return self.cost_function.S def set_S(self, S, update_Q=True): self.cost_function.set_S(S, update_Q=update_Q) S = property(get_S, set_S) def get_H_of_v(self): return self.cost_function.H_of_v def set_H_of_v(self, H_of_v, update_Q=True): self.cost_function.set_H_of_v(H_of_v, update_Q=update_Q) H_of_v = property(get_H_of_v, set_H_of_v) def get_A_of_H(self): return self.cost_function.A_of_H def set_A_of_H(self, A_of_H, update_Q=True): self.cost_function.set_A_of_H(A_of_H, update_Q=update_Q) A_of_H = property(get_A_of_H, set_A_of_H)