Duncan Booth wrote:
> Simon Forman wrote:
> > If you need help understanding it please ask questions.  I, for one,
> > would be happy to comment it for you to explain how it works.  It's so
> > nice and elegant that I've already cut-and-pasted it into my own
> > "notebook" of cool useful python "patterns" to use in the future.
> >
> > Reread it slowly, think about what it's doing, if questions arise write
> > them down and ask them.  Duncan's code is beautiful.  It's well worth
> > your time to understand it.
> If you convert what I wrote into an example which actually runs then do
> repost it. It might also be easier to understand since I'm sure I'll have
> made some mistakes or omitted some critical bits (such as the import
> lines).

A pleasure.

There was one missing close-parenthesis in the Thread() call, but other
than that (and the implied imports and variables) everything was good.

I added some dummy code to simulate long-running requests, and I made
the run() function return its Thread's name, but other than that it's
basically the code you posted.  It's neat to increase NUMTHREADS and
watch the script run faster. :)

When the time comes to use this in something I'll probably wrap it up
in a class, and add some error handling to the run() function (or
method).  But in the meantime it's quite elegant and useful as is.
Definitely not not Scottish ;-)

Thanks Duncan.


from Queue import Queue
from threading import Thread, currentThread

# Imports for the dummy testing func.
from time import sleep
from random import random


def dummy_func(secs):
    Sleep for secs seconds then return secs.
    (Dummy function to simulate requests.)
    return secs

# Some fake requests to pass to dummy_func().
dummy_requests = [
    5 * random() for notused in xrange(100)

# Dummy handle_response() for demo purposes.
def handle_response(resp): print resp

def run(request, response, func=dummy_func):
    Get items from the request Queue, process them
    with func(), put the results along with the
    Thread's name into the response Queue.

    Stop running once an item is None.
    name = currentThread().getName()
    while 1:
        item = request.get()
        if item is None:
        response.put((name, func(item)))

# Create two Queues for the requests and responses
requestQueue = Queue()
responseQueue = Queue()

# Pool of NUMTHREADS Threads that run run().
thread_pool = [
        args=(requestQueue, responseQueue),
        name="Thread %i" % i
    for i in range(NUMTHREADS)

# Start the threads.
for t in thread_pool: t.start()

# Queue up the requests.
for item in dummy_requests: requestQueue.put(item)

# Shut down the threads after all requests end.
# (Put one None "sentinel" for each thread.)
for t in thread_pool: requestQueue.put(None)

# Get and handle each response.
for notused in xrange(len(dummy_requests)):
    response = responseQueue.get()

# Don't end the program prematurely.
# (Note that because Queue.get() is blocking by
# default this isn't strictly necessary.  But if
# you were, say, handling responses in another
# thread, you'd want something like this in your
# main thread.)
for t in thread_pool: t.join()

Example output from a run of the above script:

('Thread 0', 0.10915462751068916)
('Thread 0', 0.29428189134629135)
('Thread 1', 1.6234285192453246)
('Thread 0', 3.195799156145096)
('Thread 1', 2.7641123440885367)
('Thread 2', 4.7810243032096862)
('Thread 2', 1.1752965020601662)
('Thread 1', 2.0727863018148924)
('Thread 0', 4.8127195859913252)
('Thread 1', 2.4780495377626242)


