There seems to be lots of discussion these days about concurrency going around the Python blogs. I first clued into the arguments when I read Bruce Eckel’s critique of Python 3000 and Guido’s discussion of why the GIL persists in Python. Ignoring the philosophical question of processes vs. threads, you will, as a practical matter, probably need to use multiple processes in Python to scale your programs to more than one core.
I just finished reading the free first issue of Python Magazine, where Doug Hellmann reviewed three options for running processes in parallel in Python: the subprocess module, the parallel python package, and the processing module. (The article is very good, as is the rest of the magazine. Go read it! I only wish that the US dollar were doing a little better as the magazine is priced in CAN$.)
In a bit of serendipity, Fredrik Lundh posted an article where he optimizes a log parsing program in Python, starting from a single process implementation, moving up to threaded and multi-process implementations. Remembering the processing module from Doug’s article, I decided to take a whack at porting one of Fredrik’s versions to use it.
The processing module is designed to be a nearly drop-in replacement for threading, so I started with the threaded version of the log parser. The changes were very simple, as you can see from the diff:
--- wf-4.py 2007-10-07 00:08:04.000000000 -0500
+++ wf-4-processing.py 2007-10-07 00:37:12.000000000 -0500
@@ -1,6 +1,7 @@
import re, sys
from collections import defaultdict
-import threading, Queue
+import processing
+
# 1: 2.7 seconds
# 2: 2.5 seconds
@@ -28,14 +29,13 @@
if not s:
break
-class Worker(threading.Thread):
+class Worker(processing.Process):
def run(self):
while 1:
chunk = queue.get()
if chunk is None:
break
- result.append(process(*chunk))
- queue.task_done()
+ result_queue.put(process(*chunk))
# --------------------------------------------------------------------
@@ -51,7 +51,8 @@
except:
count = 2
-queue = Queue.Queue()
+queue = processing.Queue()
+result_queue = processing.Queue()
result = []
for i in range(count):
@@ -59,10 +60,13 @@
w.setDaemon(1)
w.start()
+chunk_count = 0
for chunk in getchunks(FILE):
queue.put((FILE, chunk))
+ chunk_count += 1
-queue.join()
+for i in range(chunk_count):
+ result.append(result_queue.get())
count = defaultdict(int)
for item in result:
Basically, I had to add a results queue to pass the parsed log results back to the parent process, since the child processes no longer had direct access to a list object in the parent memory space. I could have used processing.Namespace to create a shared list, but a queue to feed back to the parent seemed more natural. It also provided a convenient way to test when all of the workers were finished. (Send one chunk out, look for one item in the return queue)
Update: Based on Fredrik’s comment below, I’ve updated to his new test version which reports the wallclock time difference with both time() and clock(). clock() doesn’t work so well measure the right quantity on OS X, so with time() the numbers make more sense now.
The speed improvement was amazing pretty good! Here are the times on my MacBook 1.83 GHz Core Duo sytem for other versions that were posted, as well as the processing version:
wf-1-gala.py: 6.40 sec (original by Santiago Gala)
wf-2.py: 2.43 sec (optimized, single threaded version)
wf-3.py: 2.25 sec (chunked version)
wf-4.py: 2.10 sec (chunked version with threads)
wf-5.py: 4.95 sec (2 processes using the subprocess module)
wf-6.py: 1.28 sec (2 processes, subprocess module, using memory-mapped files)
wf-4-processing.py: 1.34 sec (chunked version with processing module, 2 worker processes)
I should note that to get wf-5.py and wf-6.py to run, I had to add a call to flush() on the file object in the putobject() function. The flush() is shown in the blog entry, but is not present in the actual source files. Flush problem fixed now! Strangely, wf-5.py does very poorly, but wf-6.py does much better.
So it looks like the processing version does almost as well as the final, fancy version of the log parser, but with a lot less code. I think that counts as a win.
You can download wf-4-processing.py here, and the README over here explains how to download the test data set.
Update: As another aside: I saw very little benefit, and usually some penalty, when increasing the number of processes in any of the multi-process tests beyond two. I suspect this is because, unlike Linux, OS X is a little sluggish with the process creation.
Update #2: I finally tried this code on Ubuntu (single proc virtual machine, so no benchmarks) and discovered that the default implementation of processing.Queue uses a POSIX message queue when available. The objects passed around in this program are too large for such a queue, so you need to explicitly ask for a PipeQueue. The code has been updated to reflect this.
Update #3: And as proof that you can screw up even dead-easy parallel tasks, the above code in fact has a deadlock. A threading.Queue object has no length limit, so put() never blocks. However, processing.PipeQueue.get() can block if the pipe fills up.
The obvious way around this would be fix processing.PipeQueue so that it has the same semantics as threading.Queue. Ironically, one could solve that problem by having an extra thread in each process that monitored the pipes. Perhaps something like select() could be used instead to avoid having to introduce threads. That’s probably not portable to Windows, so more thinking here might be required.
Update #4: The processing test code in fact includes a program called test_worker.py which shows how to avoid this problem using a thread in the main process. Would be nice if there was a way to hide that in the Queue implementation though.