Source code for postprocessing.maxent_gf_imp

################################################################################
#
# TRIQS: a Toolbox for Research in Interacting Quantum Systems
#
# Copyright (C) 2016-2018, N. Wentzell
# Copyright (C) 2018-2019, Simons Foundation
#   author: N. Wentzell
#
# TRIQS is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# TRIQS is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# TRIQS. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Analytic continuation of the impurity Green's function to the impurity spectral
function using maxent.

Reads G_imp(i omega) from the h5 archive and writes A_imp(omega) back. See
the docstring of main() for more information.

Not mpi parallelized.

Author: Maximilian Merkel, Materials Theory Group, ETH Zurich, 2020 - 2022
"""

import sys
import time
import numpy as np

from triqs_maxent.elementwise_maxent import PoormanMaxEnt
from triqs_maxent.omega_meshes import HyperbolicOmegaMesh
from triqs_maxent.alpha_meshes import LogAlphaMesh
from triqs_maxent.logtaker import VerbosityFlags
from h5 import HDFArchive
from triqs.utility import mpi
from triqs.gf import BlockGf


def _read_h5(external_path, iteration):
    """
    Reads the h5 archive to get the impurity Green's functions.

    Parameters
    ----------
    external_path : string
        path to h5 archive
    iteration : int
        The iteration that is being read from, None corresponds to 'last_iter'

    Returns
    -------
    gf_imp_tau : list
        Impurity Green's function as block Green's function for each impurity
    """

    """Reads the block Green's function G(tau) from h5 archive."""

    h5_internal_path = 'DMFT_results/' + ('last_iter' if iteration is None
                                          else f'it_{iteration}')

    with HDFArchive(external_path, 'r') as archive:
        impurity_paths = [key for key in archive[h5_internal_path].keys()
                          if 'Gimp_time_' in key and 'orig' not in key]
        # Sorts impurity paths by their indices, not sure if necessary
        impurity_indices = [int(s[s.rfind('_')+1:]) for s in impurity_paths]
        impurity_paths = [impurity_paths[i] for i in np.argsort(impurity_indices)]

        gf_imp_tau = [archive[h5_internal_path][p] for p in impurity_paths]
    return gf_imp_tau


def _sum_greens_functions(block_gf, sum_spins):
    """
    Sums over spin channels if sum_spins. It combines "up" and "down" into one
    block "total", or for SOC, simply renames the blocks ud into "total".
    """

    if not sum_spins:
        return block_gf

    for ind in block_gf.indices:
        if ind.startswith('up_'):
            assert ind.replace('up', 'down') in block_gf.indices
        elif ind.startswith('down_'):
            assert ind.replace('down', 'up') in block_gf.indices
        elif not ind.startswith('ud_'):
            raise ValueError(f'Block {ind} in G(tau) has unknown spin type. '
                             + 'Check G(tau) or turn off sum_spins.')

    summed_gf_imp = {}

    for block_name, block in sorted(block_gf):
        if block_name.startswith('up_'):
            new_block_name = block_name.replace('up', 'total')
            opp_spin_block_name = block_name.replace('up', 'down')
            summed_gf_imp[new_block_name] = block + block_gf[opp_spin_block_name]
        elif block_name.startswith('ud_'):
            summed_gf_imp[block_name.replace('ud', 'total')] = block

    return BlockGf(name_list=summed_gf_imp.keys(), block_list=summed_gf_imp.values())


def _run_maxent(gf_imp_list, maxent_error, n_points_maxent, n_points_alpha,
                omega_min, omega_max, analyzer='LineFitAnalyzer'):
    """
    Runs maxent to get the spectral functions from the list of block GFs.
    """

    omega_mesh = HyperbolicOmegaMesh(omega_min=omega_min, omega_max=omega_max,
                                     n_points=n_points_maxent)

    mpi.report(f'Continuing impurities with blocks: ')
    imps_blocks = []
    for i, block_gf in enumerate(gf_imp_list):
        blocks = list(block_gf.indices)
        mpi.report('- Imp {}: {}'.format(i, blocks))
        for block in blocks:
            imps_blocks.append((i, block))
    mpi.report('-'*50)
    imps_blocks_indices = np.arange(len(imps_blocks))

    # Initialize collective results
    data_linefit = [0] * len(imps_blocks)
    data_chi2 =  [0] * len(imps_blocks)

    mpi.barrier()
    for i in mpi.slice_array(imps_blocks_indices):
        imp, block = imps_blocks[i]
        print(f"\nRank {mpi.rank}: solving impurity {imp+1} block '{block}'")

        solver = PoormanMaxEnt(use_complex=True)
        solver.set_G_tau(gf_imp_list[imp][block])
        solver.set_error(maxent_error)
        solver.omega = omega_mesh
        solver.alpha_mesh = LogAlphaMesh(alpha_min=1e-6, alpha_max=1e2,
                                         n_points=n_points_alpha)
        # silence output
        solver.maxent_diagonal.logtaker.verbose = VerbosityFlags.Quiet
        solver.maxent_offdiagonal.logtaker.verbose = VerbosityFlags.Quiet
        results = solver.run()

        n_orb = gf_imp_list[imp][block].target_shape[0]
        opt_alpha = np.zeros((n_orb, n_orb, 2), dtype=int)
        opt_alpha[:, :, :] = -1  # set results to -1 to distinguish them from 0
        for i_orb in range(n_orb):
            for j_orb in range(n_orb):
                for l_com in range(2):  # loop over complex numbers
                    if results.analyzer_results[i_orb][j_orb][l_com] == {}:
                        continue
                    opt_alpha[i_orb, j_orb,
                              l_com] = results.analyzer_results[i_orb][j_orb][l_com][analyzer]['alpha_index']

        print(
            f"Optimal alphas , Imp {imp+1} block '{block}': \n--- Real part ---\n", opt_alpha[:, :, 0])
        if np.any(opt_alpha[:, :, 1] != -1):
            print('Imp {i+1} block {block} Imag part ---\n', opt_alpha[:, :, 1])
        if np.any(opt_alpha[:, :, 0] == -1):
            print('(a -1 indicates that maxent did not run for this block due to symmetry)')

        # store unpacked data in flatted list / maxent res object not bcastable
        data_linefit[i] = results.get_A_out('LineFitAnalyzer')
        data_chi2[i] = results.get_A_out('Chi2CurvatureAnalyzer')

    # slow barrier to reduce CPU load of waiting ranks
    mpi.barrier(1000)
    # Synchronizes information between ranks
    for i in imps_blocks_indices:
        data_linefit[i] = mpi.all_reduce(data_linefit[i])
        data_chi2[i] = mpi.all_reduce(data_chi2[i])

    # final result list
    unpacked_results = [{'mesh': np.array(omega_mesh),
                         'Aimp_w_line_fit': {},
                         'Aimp_w_chi2_curvature': {}
                         } for _ in range(len(gf_imp_list))]

    for i in imps_blocks_indices:
        imp, block = imps_blocks[i]
        unpacked_results[imp]['Aimp_w_line_fit'][block] = data_linefit[i]
        unpacked_results[imp]['Aimp_w_chi2_curvature'][block] = data_chi2[i]

    return unpacked_results


def _write_spectral_function_to_h5(unpacked_results, external_path, iteration):
    """ Writes the mesh and the maxent result for each analyzer to h5 archive. """

    h5_internal_path = 'DMFT_results/' + ('last_iter' if iteration is None
                                          else f'it_{iteration}')

    with HDFArchive(external_path, 'a') as archive:
        for i, res in enumerate(unpacked_results):
            archive[h5_internal_path][f'Aimp_maxent_{i}'] = res


[docs]def main(external_path, iteration=None, sum_spins=False, maxent_error=0.02, n_points_maxent=200, n_points_alpha=50, omega_min=-20, omega_max=20): """ Main function that reads the impurity Greens (GF) function from h5, analytically continues it, writes the result back to the h5 archive and also returns the results. Parameters ---------- external_path : string Path to the h5 archive to read from and write to. iteration : int/string Iteration to read from and write to. Defaults to last_iter. sum_spins : bool Whether to sum over the spins or continue the impurity GF for the up and down spin separately, for example for magnetized results. maxent_error : float The error that is used for the analyzers. n_points_maxent : int Number of omega points on the hyperbolic mesh used in the continuation. n_points_alpha : int Number of points that the MaxEnt alpha parameter is varied on logarithmically. omega_min : float Lower end of range where the GF is being continued. Range has to comprise all features of the impurity GF for correct normalization. omega_max : float Upper end of range where the GF is being continued. See omega_min. Returns ------- maxent_results : list The omega mesh and impurity spectral function from two different analyzers in a dict for each impurity """ gf_imp_tau = [] if mpi.is_master_node(): start_time = time.time() gf_imp_tau = _read_h5(external_path, iteration) for i, gf in enumerate(gf_imp_tau): gf_imp_tau[i] = _sum_greens_functions(gf, sum_spins) gf_imp_tau = mpi.bcast(gf_imp_tau) maxent_results = _run_maxent(gf_imp_tau, maxent_error, n_points_maxent, n_points_alpha, omega_min, omega_max) if mpi.is_master_node(): _write_spectral_function_to_h5(maxent_results, external_path, iteration) total_time = time.time() - start_time mpi.report('-'*80, 'DONE') mpi.report(f'Total run time: {total_time:.0f} s.') return maxent_results
def _strtobool(val): """Convert a string representation of truth to true (1) or false (0). True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if 'val' is anything else. Copied from distutils.util in python 3.10. """ val = val.lower() if val in ('y', 'yes', 't', 'true', 'on', '1'): return 1 elif val in ('n', 'no', 'f', 'false', 'off', '0'): return 0 else: raise ValueError("invalid truth value {!r}".format(val)) if __name__ == '__main__': # Casts input parameters if len(sys.argv) > 2: if sys.argv[2].lower() == 'none': sys.argv[2] = None if len(sys.argv) > 3: sys.argv[3] = _strtobool(sys.argv[3]) if len(sys.argv) > 4: sys.argv[4] = float(sys.argv[4]) if len(sys.argv) > 5: sys.argv[5] = int(sys.argv[5]) if len(sys.argv) > 6: sys.argv[6] = int(sys.argv[6]) if len(sys.argv) > 7: sys.argv[7] = float(sys.argv[7]) if len(sys.argv) > 8: sys.argv[8] = float(sys.argv[8]) main(*sys.argv[1:])