I found today an interesting python package that permits to set up very easily a parallel computing script. I had a python script that analyzes a molecular dynamics trajectory and it was taking almost 60 seconds per snapshot. And it was not feasable to analyze thousands of them at this speed. Si googling a bit, i found this
processing python package (
http://pyprocessing.berlios.de/). I could install it as simple as running:
> sudo easy_install processing
It provides a lot of features to perform threading in a multicore node, in a cluster with several nodes or even through the net. I'm not an expert on this and I thought that it would be a very complex task to have my script running in parallel. BUT It was a quite big surprised when I had it working with less than 10 lines of code changed!
In the package website you can find some examples and test code. ex_workers.py had the answers to my problems. I'll write a fake chunk of code to illustrate how simple it was:
MY SERIAL SCRIPT:
if __name__ == "__main__":
import Biskit as bi
import time
from Parameters import Parameters
from Controller import Controller
from parseTrajectory import Trajectory
pfile = 'parameters.conf'
Parameters(pfile)
# Prepare system
sysPdb = bi.PDBModel(GlobalParameters.SSys)
sysPdb.loadAmberTopology(GlobalParameters.STop)
sysRef = GlobalParameters.SRef
# Initialize Controller
main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
# Loop on trajectory
# Must be centered trajectory!
traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb, GlobalParameters.SExtension)
for trajFile in traj:
print "Working on file ",trajFile.fname
for i,frame in enumerate(trajFile):
print "+ Frame",i
t0=time.time()
main.update(frame)
print time.time()-t0
main.saveGrids()
if __name__ == "__main__":
import Biskit as bi
import time
from Parameters import Parameters
from Controller import Controller
from parseTrajectory import Trajectory
pfile = 'parameters.conf'
Parameters(pfile)
# Prepare system
sysPdb = bi.PDBModel(GlobalParameters.SSys)
sysPdb.loadAmberTopology(GlobalParameters.STop)
sysRef = GlobalParameters.SRef
# Initialize Controller
main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
# Loop on trajectory
# Must be centered trajectory!
traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb, GlobalParameters.SExtension)
for trajFile in traj:
print "Working on file ",trajFile.fname
for i,frame in enumerate(trajFile):
print "+ Frame",i
t0=time.time()
main.update(frame)
print time.time()-t0
main.saveGrids()
if __name__ == "__main__":
import Biskit as bi
import time
from Parameters import Parameters
from Controller import Controller
from parseTrajectory import Trajectory
pfile = 'parameters.conf'
Parameters(pfile)
# Prepare system
sysPdb = bi.PDBModel(GlobalParameters.SSys)
sysPdb.loadAmberTopology(GlobalParameters.STop)
sysRef = GlobalParameters.SRef
# Initialize Controller
main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
# Loop on trajectory
# Must be centered trajectory!
traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb, GlobalParameters.SExtension)
for trajFile in traj:
print "Working on file ",trajFile.fname
for i,frame in enumerate(trajFile):
print "+ Frame",i
t0=time.time()
main.update(frame)
print time.time()-t0
main.saveGrids()
if __name__ == "__main__":
import Biskit as bi
import time
from Parameters import Parameters
from Controller import Controller
from parseTrajectory import Trajectory
pfile = 'parameters.conf'
Parameters(pfile)
# Prepare system
sysPdb = bi.PDBModel(GlobalParameters.SSys)
sysPdb.loadAmberTopology(GlobalParameters.STop)
sysRef = GlobalParameters.SRef
# Initialize Controller
main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
# Loop on trajectory
# Must be centered trajectory!
traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb, GlobalParameters.SExtension)
for trajFile in traj:
print "Working on file ",trajFile.fname
for i,frame in enumerate(trajFile):
print "+ Frame",i
t0=time.time()
main.update(frame)
print time.time()-t0
main.saveGrids()
if __name__ == "__main__":
import Biskit as bi
import time
from Parameters import Parameters
from Controller import Controller
from parseTrajectory import Trajectory
pfile = 'parameters.conf'
Parameters(pfile)
# Prepare system
sysPdb = bi.PDBModel(GlobalParameters.SSys)
sysPdb.loadAmberTopology(GlobalParameters.STop)
sysRef = GlobalParameters.SRef
# Initialize Controller
main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
# Loop on trajectory
# Must be centered trajectory!
traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb, GlobalParameters.SExtension)
for trajFile in traj:
print "Working on file ",trajFile.fname
for i,frame in enumerate(trajFile):
print "+ Frame",i
t0=time.time()
main.update(frame)
print time.time()-t0
main.saveGrids()
import MyMainClass
import TrajectoryParser
# Set up some variables and prepare some system files
# Initialize the main class that will take every snapshot and do some analysis on it
main = MyMainClass(args)
# Loop over trajectory
trajectory = TrajectoryParser(args)
for file in trajectory:
for snap in file:
main.analyzeSnapshot(snap)
# Some last processing of the results
main.processResults()
PARALLELIZED VERSION:
added in blue, removed in red
import MyMainClass
import TrajectoryParser
from processing import Queue, Process, cpuCount
# Set up some variables and prepare some system files
# Initialize the main class that will take every snapshot and do some analysis on it
main = MyMainClass(args)
# Define a function that each thread will perform
# Basically it will take the queue and grab one of the jobs pending and execute it
def worker(queue):
for item in iter(queue.get, 'STOP'):
func, args = item
func(args)
for item in iter(queue.get, 'STOP'):
func, args = item
func(args)
# Initialize the queue and the threads
NUMPROCESSES = cpuCount() # This will take maximum number of cores in the node, else the user can specify
queue = Queue(maxsize=NUMPROCESSES) # If no max size is given, the queue will become huge and occupy a lot of memory
for i in range(NUMPROCESSES):
Process(target=worker, args=(queue,)).start()
# Loop over trajectory
trajectory = TrajectoryParser(args)
for file in trajectory:
for snap in file:
# main.analyzeSnapshot(snap)
queue.put((main.analyzeSnapshot, snap)) # If queue has maximum size, this will wait until one slot is empty.
# Terminate threads
for i in range(NUMPROCESSES):
queue.put('STOP')
# Some last processing of the results
main.processResults()
Note how simple it was. main is an object which will call some other objects and methods and I did not have to pass any other info to the threads as they all share same memory (i don't know how complex would it be otherwise). I adapted the worker function for my single argument (analyzeSnapshot(snap)). In the examples at the website, they have a slightly different version. Also they use a second queue to store the results. In my case, analyzeSnapshot method stores the partial results in a numpy.memmap array, so i don't have any direct output to gather.
Hope it helps someone someday :)
At least it will help myself in the future when i forget all this.
if __name__ == "__main__":
import Biskit as bi
import time
from Parameters import Parameters
from Controller import Controller
from parseTrajectory import Trajectory
pfile = 'parameters.conf'
Parameters(pfile)
# Prepare system
sysPdb = bi.PDBModel(GlobalParameters.SSys)
sysPdb.loadAmberTopology(GlobalParameters.STop)
sysRef = GlobalParameters.SRef
# Initialize Controller
main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
# Loop on trajectory
# Must be centered trajectory!
traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb, GlobalParameters.SExtension)
for trajFile in traj:
print "Working on file ",trajFile.fname
for i,frame in enumerate(trajFile):
print "+ Frame",i
t0=time.time()
main.update(frame)
print time.time()-t0
main.saveGrid import Biskit as bi
import time
from Parameters import Parameters
from Controller import Controller
from parseTrajectory import Traject