Parallel Processing in Python with processing

There seems to be lots of discussion these days about concurrency going around the Python blogs. I first clued into the arguments when I read Bruce Eckel’s critique of Python 3000 and Guido’s discussion of why the GIL persists in Python. Ignoring the philosophical question of processes vs. threads, you will, as a practical matter, probably need to use multiple processes in Python to scale your programs to more than one core.

I just finished reading the free first issue of Python Magazine, where Doug Hellmann reviewed three options for running processes in parallel in Python: the subprocess module, the parallel python package, and the processing module. (The article is very good, as is the rest of the magazine. Go read it! I only wish that the US dollar were doing a little better as the magazine is priced in CAN$.)

In a bit of serendipity, Fredrik Lundh posted an article where he optimizes a log parsing program in Python, starting from a single process implementation, moving up to threaded and multi-process implementations. Remembering the processing module from Doug’s article, I decided to take a whack at porting one of Fredrik’s versions to use it.

The processing module is designed to be a nearly drop-in replacement for threading, so I started with the threaded version of the log parser. The changes were very simple, as you can see from the diff:

--- wf-4.py 2007-10-07 00:08:04.000000000 -0500
+++ wf-4-processing.py 2007-10-07 00:37:12.000000000 -0500
@@ -1,6 +1,7 @@
import re, sys
from collections import defaultdict
-import threading, Queue
+import processing
+

# 1: 2.7 seconds
# 2: 2.5 seconds
@@ -28,14 +29,13 @@
if not s:
break

-class Worker(threading.Thread):
+class Worker(processing.Process):
def run(self):
while 1:
chunk = queue.get()
if chunk is None:
break
- result.append(process(*chunk))
- queue.task_done()
+ result_queue.put(process(*chunk))

# --------------------------------------------------------------------

@@ -51,7 +51,8 @@
except:
count = 2

-queue = Queue.Queue()
+queue = processing.Queue()
+result_queue = processing.Queue()
result = []

for i in range(count):
@@ -59,10 +60,13 @@
w.setDaemon(1)
w.start()

+chunk_count = 0
for chunk in getchunks(FILE):
queue.put((FILE, chunk))
+ chunk_count += 1

-queue.join()
+for i in range(chunk_count):
+ result.append(result_queue.get())

count = defaultdict(int)
for item in result:

Basically, I had to add a results queue to pass the parsed log results back to the parent process, since the child processes no longer had direct access to a list object in the parent memory space. I could have used processing.Namespace to create a shared list, but a queue to feed back to the parent seemed more natural. It also provided a convenient way to test when all of the workers were finished. (Send one chunk out, look for one item in the return queue)

Update: Based on Fredrik’s comment below, I’ve updated to his new test version which reports the wallclock time difference with both time() and clock(). clock() doesn’t work so well measure the right quantity on OS X, so with time() the numbers make more sense now.

The speed improvement was amazing pretty good! Here are the times on my MacBook 1.83 GHz Core Duo sytem for other versions that were posted, as well as the processing version:

wf-1-gala.py: 6.40 sec (original by Santiago Gala)
wf-2.py: 2.43 sec (optimized, single threaded version)
wf-3.py: 2.25 sec (chunked version)
wf-4.py: 2.10 sec (chunked version with threads)
wf-5.py: 4.95 sec (2 processes using the subprocess module)
wf-6.py: 1.28 sec (2 processes, subprocess module, using memory-mapped files)
wf-4-processing.py: 1.34 sec (chunked version with processing module, 2 worker processes)

I should note that to get wf-5.py and wf-6.py to run, I had to add a call to flush() on the file object in the putobject() function. The flush() is shown in the blog entry, but is not present in the actual source files. Flush problem fixed now! Strangely, wf-5.py does very poorly, but wf-6.py does much better.

So it looks like the processing version does almost as well as the final, fancy version of the log parser, but with a lot less code. I think that counts as a win. :)

You can download wf-4-processing.py here, and the README over here explains how to download the test data set.

Update: As another aside: I saw very little benefit, and usually some penalty, when increasing the number of processes in any of the multi-process tests beyond two. I suspect this is because, unlike Linux, OS X is a little sluggish with the process creation.

Update #2: I finally tried this code on Ubuntu (single proc virtual machine, so no benchmarks) and discovered that the default implementation of processing.Queue uses a POSIX message queue when available. The objects passed around in this program are too large for such a queue, so you need to explicitly ask for a PipeQueue. The code has been updated to reflect this.

Update #3: And as proof that you can screw up even dead-easy parallel tasks, the above code in fact has a deadlock. A threading.Queue object has no length limit, so put() never blocks. However, processing.PipeQueue.get() can block if the pipe fills up.

The obvious way around this would be fix processing.PipeQueue so that it has the same semantics as threading.Queue. Ironically, one could solve that problem by having an extra thread in each process that monitored the pipes. Perhaps something like select() could be used instead to avoid having to introduce threads. That’s probably not portable to Windows, so more thinking here might be required.

Update #4: The processing test code in fact includes a program called test_worker.py which shows how to avoid this problem using a thread in the main process. Would be nice if there was a way to hide that in the Queue implementation though.

17 Responses to “Parallel Processing in Python with processing

  1. Fredrik Says:

    Cool! The large wf-5 slowdown is pretty surprising, the fact that memory mapping is faster on a Mach/BSD-based kernel than on Windows less so. But I guess it’s time to try this on a couple of multi-core big-iron Unix boxes. I’ll see what I can find ;-)

  2. Fredrik Says:

    (… and I completely missed that your processing version was based on wf-4 and not wf-6. Wow.)

  3. Paddy3118 Says:

    So,
    Could it be that there is little need for the threading module and that its place in the standard library should be replaced by the processing module?

    The GIL gets in the way of Python threads but I guess we need more evidence as if sub-processes have to swap a lot of data then they might be at a disadvantage.

    - Paddy.

  4. Fredrik Says:

    (Ahem. … and I also completely missed that I never got around to fix the time calculations to use *wall time* also on Unixoid machines. This means that your multiprocess results are probably quite a bit lower than they should be. Sorry for that. I’ve fixed the code in my repository so they report both walltime and process time for the main script.)

  5. Fredrik Says:

    “Could it be that there is little need for the threading module and that its place in the standard library should be replaced by the processing module?”

    Not really (there are still plenty of uses for ordinary threads), but processing would definitely be an excellent addition to the standard library.

  6. Fredrik Says:

    “clock() doesn’t work so well on OS X, so with time() the numbers make more sense now.”

    clock() works well on OS X, but it only measures process time for the main script, not wall time (I got suspicious when wf-6.py reported 0.02 seconds on an 8-core machine ;-)

  7. Jesse Says:

    The more I play and work with the processing module, the more I want to do a PEP to get it into the stdlib. Thanks for this

  8. stan Says:

    I think starting the PEP process (well, with the author’s blessing) would be a great idea. This really does belong in the standard library alongside the threading module, so we have a simple alternative to threads when appropriate. Certainly, when forking on a single machine, it seems perfect.

    I need to play more to see how easy working with remote clients is. The remote execution problem is harder, since there is no inter-machine fork (unless you are using some kind of single-system-image operating system). You have to start separate processes on the other machines, worry about authentication, and all that mess. Here the slightly more awkward approach of Parallel Python seems sensible since it works the same way for local or remote worker processes.

    Too bad there is no obvious way to streamline the processing module for remote usage. I do have this strange desire to try to use Stackless with this module to distribute tasklets automatically to worker threads…

  9. Paul Boddie Says:

    I had to try this with the pprocess module [1], too, and I’ve uploaded my version here:

    http://www.boddie.org.uk/python/pprocess/pwf-4.py

    I’ve taken the liberty to divide the chunks into sizes proportional to the number of processes involved. See below for a discussion of this modification.

    Running on a 2 CPU machine I get 0.95s with 1 computation process, 0.60s with 2 processes, 0.62s with 4 processes. Meanwhile, taking wf-4.py as a comparison, I get 1.07s for 1 thread, 1.01s for 2 threads, 1.00s for 4 threads. This is on a machine with decent I/O and with the disk cache presumably nicely warmed up. Thus, the threaded version doesn’t really get much benefit when running with lots of threads.

    If I use the default chunk size, more parallel invocations or communications are required, whether or not processes are reused or spawned anew. With the pprocess module, this has a large impact on the performance, probably because the inter-process communications could do with some optimisation.

    Still, if you diff wf-4.py with pwf-4.py, you’ll see that I actually remove code because of certain conveniences provided in the pprocess module, as described in the tutorial [2]

    [1] http://www.python.org/pypi/pprocess
    [2] http://www.boddie.org.uk/python/pprocess/tutorial.html

  10. stan Says:

    pprocess looks pretty neat! Definitely a different approach than trying to emulate Thread(). The common use cases seem to be very streamlined.

  11. stan Says:

    Oh, and I finally hacked processing so that processing.PipeQueue has infinite queue depth. So on a dual core Linux system, you get the following results:

    wf-1-gala.py 3.422
    wf-2.py 1.049
    wf-3.py 1.223
    wf-4.py 1.103086
    wf-5.py 0.7303121
    wf-6.py 0.5413789
    wf-4-processing.py 0.7166

    So as you might have expected, processing is on par with the multi-process version that used pipes (wf-5) and slower than the version (wf-6) which used shared memory.

  12. Peer Says:

    So where can we find your modified processing.PipeQueue code? Did you submit it to the processing developers?

  13. stan Says:

    I haven’t posted it yet since I need to take another pass through it to make sure I didn’t break anything. Then I can send it to the processing developer to see what he thinks.

  14. Jesse Says:

    I pinged the processing module author - he’s game for a PEP to be put together

  15. Valery Says:

    Paul Boddie wrote really an excellent module. I am happy to use it already.

    Finally I met the situation, when my function supplied as a first argument into the pmap call returns an class instance. When I try to access the result the exception is thrown. The unlucky class is “SQLSoup” of SqlSoup extension of well-known SQLAlchemy ( http://www.sqlalchemy.org/docs/04/documentation.html#plugins_sqlsoup)

    I can’t reproduce situation yet in a simpler form. Maybe anyone could give me a hint?

    Valery

  16. stan Says:

    It is possible that the object doesn’t pickle properly, which is how all pretty much all the parallel processing modules pass arguments from one process to the other. Without knowing more about the details of your problem, that’s a total guess.

  17. Paul Boddie Says:

    I responded to Valery about the issues with pickling certain things. SQLAlchemy does some exotic things which don’t help with pickling, and I suggested avoiding passing such objects between processes (or across “process boundaries”, if you like). Without knowing more, I can’t say whether that’s an acceptable solution or not, however.

Leave a Reply

Entries (RSS)