Packages for Multiprocessing

threading

Python has basic support for threading built in: for example, here’s a program that runs two threads, each of which prints out messages after sleeping a particular amount of time:

from threading import Thread, local
import time

class MessageThread(Thread):
    def __init__(self, message, sleep):
        self.message = message
        self.sleep = sleep
        Thread.__init__(self)                # remember to run Thread init!

    def run(self):                           # automatically run by 'start'
        i = 0
        while i < 50:
            i += 1
            print i, self.message

            time.sleep(self.sleep)

t1 = MessageThread("thread - 1", 1)
t2 = MessageThread("thread - 2", 2)

t1.start()
t2.start()

However, due to the existence of the Global Interpreter Lock (GIL) (http://docs.python.org/api/threads.html), CPU-intensive code will not run faster on dual-core CPUs than it will on single-core CPUs.

Briefly, the idea is that the Python interpreter holds a global lock, and no Python code can be executed without holding that lock. (Code execution will still be interleaved, but no two Python instructions can execute at the same time.) Therefore, any Python code that you write (or GIL-naive C/C++ extension code) will not take advantage of multiple CPUs.

This is intentional:

There is a long history of wrangling about the GIL, and there are a couple of good arguments for it. Briefly,

  • it dramatically simplifies writing C extension code, because by default, C extension code does not need to know anything about threads.
  • putting in locks appropriately to handle places where contention might occur is not only error-prone but makes the code quite slow; locks really affect performance.
  • threaded code is difficult to debug, and most people don’t need it, despite having been brainwashed to think that they do ;).

But we don’t care about that: we do want our code to run on multiple CPUs. So first, let’s dip back into C code: what do we have to do to make our C code release the GIL so that it can do a long computation?

Basically, just wrap I/O blocking code or CPU-intensive code in the following macros:

Py_BEGIN_ALLOW_THREADS

...Do some time-consuming operation...

Py_END_ALLOW_THREADS

This is actually pretty easy to do to your C code, and it does result in that code being run in parallel on multi-core CPUs. (note: example?)

The big problem with the GIL, however, is that it really means that you simply can’t write parallel code in Python without jumping through some kind of hoop. Below, we discuss a couple of these hoops ;).

Writing (and indicating) threadsafe C extensions

Suppose you had some CPU-expensive C code:

void waste_time() {
     int i, n;
     for (i = 0; i < 1024*1024*1024; i++) {
         if ((i % 2) == 0) n++;
     }
}

and you wrapped this in a Python function:

PyObject * waste_time_fn(PyObject * self, PyObject * args) {
     waste_time();
}

Now, left like this, any call to waste_time_fn will cause all Python threads and processes to block, waiting for waste_time to finish. That’s silly, though – waste_time is clearly threadsafe, because it uses only local variables!

To tell Python that you are engaged in some expensive operations that are threadsafe, just enclose the waste_time code like so:

PyObject * waste_time_fn(PyObject * self, PyObject * args) {
     Py_BEGIN_ALLOW_THREADS

     waste_time();

     Py_END_ALLOW_THREADS
}

This code will now be run in parallel when threading is used. One caveat: you can’t do any call to the Python C API in the code between the Py_BEGIN_ALLOW_THREADS and Py_END_ALLOW_THREADS, because the Python C API is not threadsafe.

parallelpython

parallelpython is a system for controlling multiple Python processes on multiple machines. Here’s an example program:

#!/usr/bin/python
def isprime(n):
    """Returns True if n is prime and False otherwise"""
    import math

    if n < 2:
        return False
    if n == 2:
        return True
    max = int(math.ceil(math.sqrt(n)))
    i = 2
    while i <= max:
        if n % i == 0:
            return False
        i += 1
    return True

def sum_primes(n):
    """Calculates sum of all primes below given integer n"""
    return sum([x for x in xrange(2, n) if isprime(x)])

####

import sys, time

import pp
# Creates jobserver with specified number of workers
job_server = pp.Server(ncpus=int(sys.argv[1]))

print "Starting pp with", job_server.get_ncpus(), "workers"

start_time = time.time()

# Submit a job of calulating sum_primes(100) for execution.
#
#    * sum_primes - the function
#    * (input,) - tuple with arguments for sum_primes
#    * (isprime,) - tuple with functions on which sum_primes depends
#
# Execution starts as soon as one of the workers will become available

inputs = (100000, 100100, 100200, 100300, 100400, 100500, 100600, 100700)

jobs = []
for input in inputs:
    job = job_server.submit(sum_primes, (input,), (isprime,))
    jobs.append(job)

for job, input in zip(jobs, inputs):
    print "Sum of primes below", input, "is", job()

print "Time elapsed: ", time.time() - start_time, "s"
job_server.print_stats()

If you add “ppservers=(‘host1’)” to to the line

pp.Server(...)

pp will check for parallelpython servers running on those other hosts and send jobs to them as well.

The way parallelpython works is it literally sends the Python code across the network & evaluates it there! It seems to work well.

Rpyc

Rpyc is a remote procedure call system built in (and tailored to) Python. It is basically a way to transparently control remove Python processes. For example, here’s some code that will connect to an Rpyc server and ask the server to calculate the first 500 prime numbers:

from Rpyc import SocketConnection

# connect to the “remote” server c = SocketConnection(“localhost”)

# make sure it has the right code in its path c.modules.sys.path.append(‘/u/t/dev/misc/rpyc’)

# tell it to execute ‘primestuff.get_n_primes’ primes = c.modules.primestuff.get_n_primes(500) print primes[-20:]

Note that this is a synchronous connection, so the client waits for the result; you could also have it do the computation asynchronously, leaving the client free to request results from other servers.

In terms of parallel computing, the server has to be controlled directly, which makes it less than ideal. I think parallelpython is a better choice for straightforward number crunching.

pyMPI

pyMPI is a nice Python implementation to the MPI (message-passing interface) library. MPI enables different processors to communicate with each other. I can’t demo pyMPI, because I couldn’t get it to work on my other machine, but here’s some example code that computs pi to a precision of 1e-6 on however many machines you have running MPI.

import random
import mpi

def computePi(nsamples):
    rank, size = mpi.rank, mpi.size
    oldpi, pi, mypi = 0.0,0.0,0.0

    done = False
    while(not done):
        inside = 0
        for i in xrange(nsamples):
            x = random.random()
            y = random.random()
            if ((x*x)+(y*y)<1):
                inside+=1

        oldpi = pi
        mypi = (inside * 1.0)/nsamples
        pi =  (4.0 / mpi.size) * mpi.allreduce(mypi, mpi.SUM)

        delta = abs(pi - oldpi)
        if(mpi.rank==0):
            print "pi:",pi," - delta:",delta
        if(delta < 0.00001):
            done = True
    return pi

if __name__=="__main__":
    pi = computePi(10000)
    if(mpi.rank==0):
        print "Computed value of pi on",mpi.size,"processors is",pi

One big problem with MPI is that documentation is essentially absent, but I can still make a few points ;).

First, the “magic” happens in the ‘allreduce’ function up above, where it sums the results from all of the machines and then divides by the number of machines.

Second, pyMPI takes the unusual approach of actually building an MPI-aware Python interpreter, so instead of running your scripts in normal Python, you run them using ‘pyMPI’.

multitask

multitask is not a multi-machine mechanism; it’s a library that implements cooperative multitasking around I/O operations. Briefly, whenever you’re going to do an I/O operation (like wait for more data from the network) you can tell multitask to yield to another thread of control. Here is a simple example where control is voluntarily yielded after a ‘print’:

import multitask

 def printer(message):
     while True:
         print message
         yield

 multitask.add(printer('hello'))
 multitask.add(printer('goodbye'))
 multitask.run()

Here’s another example from the home page:

import multitask

def listener(sock):
    while True:
        conn, address = (yield multitask.accept(sock))    # WAIT
        multitask.add(client_handler(conn))

def client_handler(sock):
    while True:
        request = (yield multitask.recv(sock, 1024))      # WAIT
        if not request:
            break
        response = handle_request(request)
        yield multitask.send(sock, response)              # WAIT

multitask.add(listener(sock))
multitask.run()