Parallel Programming in Python

I found today an interesting python package that permits to set up very easily a parallel computing script. I had a python script that analyzes a molecular dynamics trajectory and it was taking almost 60 seconds per snapshot. And it was not feasable to analyze thousands of them at this speed. Si googling a bit, i found this processing python package (http://pyprocessing.berlios.de/). I could install it as simple as running:

> sudo easy_install processing

It provides a lot of features to perform threading in a multicore node, in a cluster with several nodes or even through the net.  I'm not an expert on this and I thought that it would be a very complex task to have my script running in parallel. BUT It was a quite big surprised when I had it working with less than 10 lines of code changed!

In the package website you can find some examples and test code. ex_workers.py had the answers to my problems. I'll write a fake chunk of code to illustrate how simple it was:

MY SERIAL SCRIPT:

if __name__ == "__main__":
    import Biskit as bi
    import time
    from Parameters import Parameters
    from Controller import Controller
    from parseTrajectory import Trajectory
 
    pfile = 'parameters.conf'
    Parameters(pfile)
 
    # Prepare system
    sysPdb = bi.PDBModel(GlobalParameters.SSys)
    sysPdb.loadAmberTopology(GlobalParameters.STop)
    sysRef = GlobalParameters.SRef
 
    # Initialize Controller
    main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
 
    # Loop on trajectory
    # Must be centered trajectory!
    traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb,  GlobalParameters.SExtension)
    for trajFile in traj:
        print "Working on file ",trajFile.fname
        for i,frame in enumerate(trajFile):
            print "+ Frame",i
            t0=time.time()
            main.update(frame)
            print time.time()-t0
    main.saveGrids()
if __name__ == "__main__":
    import Biskit as bi
    import time
    from Parameters import Parameters
    from Controller import Controller
    from parseTrajectory import Trajectory
 
    pfile = 'parameters.conf'
    Parameters(pfile)
 
    # Prepare system
    sysPdb = bi.PDBModel(GlobalParameters.SSys)
    sysPdb.loadAmberTopology(GlobalParameters.STop)
    sysRef = GlobalParameters.SRef
 
    # Initialize Controller
    main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
 
    # Loop on trajectory
    # Must be centered trajectory!
    traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb,  GlobalParameters.SExtension)
    for trajFile in traj:
        print "Working on file ",trajFile.fname
        for i,frame in enumerate(trajFile):
            print "+ Frame",i
            t0=time.time()
            main.update(frame)
            print time.time()-t0
    main.saveGrids()
if __name__ == "__main__":
    import Biskit as bi
    import time
    from Parameters import Parameters
    from Controller import Controller
    from parseTrajectory import Trajectory
 
    pfile = 'parameters.conf'
    Parameters(pfile)
 
    # Prepare system
    sysPdb = bi.PDBModel(GlobalParameters.SSys)
    sysPdb.loadAmberTopology(GlobalParameters.STop)
    sysRef = GlobalParameters.SRef
 
    # Initialize Controller
    main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
 
    # Loop on trajectory
    # Must be centered trajectory!
    traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb,  GlobalParameters.SExtension)
    for trajFile in traj:
        print "Working on file ",trajFile.fname
        for i,frame in enumerate(trajFile):
            print "+ Frame",i
            t0=time.time()
            main.update(frame)
            print time.time()-t0
    main.saveGrids()
if __name__ == "__main__":
    import Biskit as bi
    import time
    from Parameters import Parameters
    from Controller import Controller
    from parseTrajectory import Trajectory
 
    pfile = 'parameters.conf'
    Parameters(pfile)
 
    # Prepare system
    sysPdb = bi.PDBModel(GlobalParameters.SSys)
    sysPdb.loadAmberTopology(GlobalParameters.STop)
    sysRef = GlobalParameters.SRef
 
    # Initialize Controller
    main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
 
    # Loop on trajectory
    # Must be centered trajectory!
    traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb,  GlobalParameters.SExtension)
    for trajFile in traj:
        print "Working on file ",trajFile.fname
        for i,frame in enumerate(trajFile):
            print "+ Frame",i
            t0=time.time()
            main.update(frame)
            print time.time()-t0
    main.saveGrids()
if __name__ == "__main__":
    import Biskit as bi
    import time
    from Parameters import Parameters
    from Controller import Controller
    from parseTrajectory import Trajectory
 
    pfile = 'parameters.conf'
    Parameters(pfile)
 
    # Prepare system
    sysPdb = bi.PDBModel(GlobalParameters.SSys)
    sysPdb.loadAmberTopology(GlobalParameters.STop)
    sysRef = GlobalParameters.SRef
 
    # Initialize Controller
    main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
 
    # Loop on trajectory
    # Must be centered trajectory!
    traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb,  GlobalParameters.SExtension)
    for trajFile in traj:
        print "Working on file ",trajFile.fname
        for i,frame in enumerate(trajFile):
            print "+ Frame",i
            t0=time.time()
            main.update(frame)
            print time.time()-t0
    main.saveGrids()

import MyMainClass
import TrajectoryParser

# Set up some variables and prepare some system files
# Initialize the main class that will take every snapshot and do some analysis on it
main = MyMainClass(args)

# Loop over trajectory
trajectory = TrajectoryParser(args)
for file in trajectory:
    for snap in file:
        main.analyzeSnapshot(snap)

# Some last processing of the results
main.processResults()   

 

PARALLELIZED VERSION:
added in blue, removed in red
 

import MyMainClass
import TrajectoryParser
from processing import Queue, Process, cpuCount

# Set up some variables and prepare some system files
# Initialize the main class that will take every snapshot and do some analysis on it
main = MyMainClass(args)

# Define a function that each thread will perform
# Basically it will take the queue and grab one of the jobs pending and execute it
def worker(queue):

for item in iter(queue.get, 'STOP'):
            func, args = item
            func(args)
    for item in iter(queue.get, 'STOP'):
        func, args = item
        func(args)

# Initialize the queue and the threads
NUMPROCESSES = cpuCount()        # This will take maximum number of cores in the node, else the user can specify
queue = Queue(maxsize=NUMPROCESSES) # If no max size is given, the queue will become huge and occupy a lot of memory
for i in range(NUMPROCESSES):
    Process(target=worker, args=(queue,)).start()

# Loop over trajectory
trajectory = TrajectoryParser(args)
for file in trajectory:
    for snap in file:
#        main.analyzeSnapshot(snap)
        queue.put((main.analyzeSnapshot, snap))  # If queue has maximum size, this will wait until one slot is empty. 

# Terminate threads
for i in range(NUMPROCESSES):
    queue.put('STOP')


# Some last processing of the results 
main.processResults()   


Note how simple it was. main is an object which will call some other objects and methods and I did not have to pass any other info to the threads as they all share same memory (i don't know how complex would it be otherwise). I adapted the worker function for my single argument (analyzeSnapshot(snap)). In the examples at the website, they have a slightly different version. Also they use a second queue to store the results. In my case, analyzeSnapshot method stores the partial results in a numpy.memmap array, so i don't have any direct output to gather.

Hope it helps someone someday :)
At least it will help myself in the future when i forget all this.

 

if __name__ == "__main__":
    import Biskit as bi
    import time
    from Parameters import Parameters
    from Controller import Controller
    from parseTrajectory import Trajectory
 
    pfile = 'parameters.conf'
    Parameters(pfile)
 
    # Prepare system
    sysPdb = bi.PDBModel(GlobalParameters.SSys)
    sysPdb.loadAmberTopology(GlobalParameters.STop)
    sysRef = GlobalParameters.SRef
 
    # Initialize Controller
    main = Controller(sysPdb, GlobalParameters.SSolv, ref=sysRef)
 
    # Loop on trajectory
    # Must be centered trajectory!
    traj = Trajectory(GlobalParameters.STraj, GlobalParameters.SMDprog, sysPdb,  GlobalParameters.SExtension)
    for trajFile in traj:
        print "Working on file ",trajFile.fname
        for i,frame in enumerate(trajFile):
            print "+ Frame",i
            t0=time.time()
            main.update(frame)
            print time.time()-t0
    main.saveGrid    import Biskit as bi
    import time
    from Parameters import Parameters
    from Controller import Controller
    from parseTrajectory import Traject