# Copyright (c) 2013 Commissariat à l'énergie atomique et aux énergies alternatives (CEA)
# Copyright (c) 2013 Centre national de la recherche scientifique (CNRS)
# Copyright (c) 2020 Simons Foundation
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You may obtain a copy of the License at
# https:#www.gnu.org/licenses/gpl-3.0.txt
#
# Authors: Olivier Parcollet, Nils Wentzell
import os,sys,time,pickle
import triqs.utility.mpi as mpi
[docs]class DistributionOnNodes:
"""
Distribution of the calculation of a function over the nodes.
Derive from it and reimplement :
- treate : will be called by the MASTER each time a point is computed. this function will typically
store it, and possibly affect the list of points waiting to be computed, as returned by next...
NB : "None" result is ignored.
- next : next point to compute. If it returns None (which is not the same as finished())
the computation is ignored.
- finished() : Whether the calculation is finished
- the_function : function to be computed, with the argument given by next()
In the module DistributionOnNodeTest is a test example
"""
SleepTime = 1
#def treate(self,x,node_where_computed): pass
#def next(self): return None
#def finished(self): return True #None
#def the_function(self,x): return x
[docs] def run(self):
"""
"""
mpi.barrier()
if mpi.size==1 : # single machine. Avoid the fork
while not(self.finished()):
n = next(self)
if n!=None :
self.treate(self.the_function(n),0)
return
# Code for multiprocessor machines
RequestList,pid = [],0 # the pid of the child on the master
node_running,node_stopped= mpi.size*[False],mpi.size*[False]
if mpi.rank==0 :
while not(self.finished()) or pid or [n for n in node_running if n] != [] :
# Treat the request which have self.finished
def keep_request(r) :
#if not(mpi.test(r)) : return True
#if r.message !=None : self.treate(*r.message)
#node_running[r.status.source] = False
T = r.test()
if T is None : return True
value = T[0]
if value !=None : self.treate(*value)
node_running[T[1].source] = False
return False
RequestList = list(filter(keep_request,RequestList))
# send new calculation to the nodes or "stop" them
for node in [ n for n in range(1,mpi.size) if not(node_running[n] or node_stopped[n]) ] :
#open('tmp','a').write("master : comm to node %d %s\n"%(node,self.finished()))
mpi.send(self.finished(),node)
if not(self.finished()) :
mpi.send(next(self),node) # send the data for the computation
node_running[node] = True
RequestList.append(mpi.irecv(node)) #Post the receive
else :
node_stopped[node] = True
# Look if the child process on the master has self.finished.
if not(pid) or os.waitpid(pid,os.WNOHANG) :
if pid :
RR = pickle.load(open("res_master",'r'))
if RR != None : self.treate(*RR)
if not(self.finished()) :
pid=os.fork();
currently_calculated_by_master = next(self)
if pid==0 : # we are on the child
if currently_calculated_by_master :
res = self.the_function(currently_calculated_by_master)
else:
res = None
pickle.dump((res,mpi.rank),open('res_master','w'))
os._exit(0) # Cf python doc. Used for child only.
else : pid=0
if (pid): time.sleep(self.SleepTime) # so that most of the time is for the actual calculation on the master
else : # not master
while not(mpi.recv(0)) : # master will first send a finished flag
omega = mpi.recv(0)
if omega ==None :
res = None
else :
res = self.the_function(omega)
mpi.send((res,mpi.rank),0)
mpi.barrier()
#########################################
[docs]class DistributionOnNodesOneStack(DistributionOnNodes) :
"""
A special of distribution, when one has a given stack of points to compute
Reimplement just the_function
"""
[docs] def __init__(self,the_stack) :
self.__l = the_stack
self.__l.reverse()
self.__result = []
[docs] def treate(self,x,node):
self.__result.append((node,x))
[docs] def finished(self):
return self.__l==[]
def __next__(self):
return self.__l.pop()
[docs] def result(self) : return self.__result
[docs] def run(self):
DistributionOnNodes.run(self)
return self.result()
# TEST ONLY
[docs]class DistributionOnNodesTest(DistributionOnNodesOneStack) :
"""
"""
[docs] def the_function(self,x):
return (x,x+1,x+2)
if __name__ == '__main__' :
d = DistributionOnNodesTest(list(range(21)))
d.run()
if mpi.rank==0 :
print(d.result())