Time for me to ask something...

I've got a scenario where I need to pass data in both directions of a
multiprocessing program.  Specifically what I'm working with is a test
runner from a homegrown testing harness (aside having nothing to do with
the question: if it was me, I would build this starting with pytest, but
the codebase predates pytest and I'm only willing to do so much surgery
at the moment!). The workflow is: the main program either uses it
arguments to figure out what tests to run or scans for them, building up
a list of files which is then modified into a list of Test class instances.

This is passed to a set of workers through a multiprocessing.Queue.
Each worker pulls items off the queue and runs them, which causes the
Test instance to be updated with results. This is easy enough and looks
just like the threaded version of the runner; since everything is
gathered and enqueued before the workers are fired up, finding when
things are done is easy enough. For example, can use the sentinel trick
of writing a None for each worker at the end so each one in turn will
pick up one of those and quit. That's not even necessary, it seems.

The mp version (which I'm doing just as a personal exercise) has a
difference: the main program wants to go through the results and produce
a summary report, which means it needs access to the modified Test
instances. Unlike the threaded version, the list of instances is not
shared between the processes. I figured I'd send the modified instances
back through another queue which the main process reads.  So now after
all this vebiage, the question: how does the main process, acting in a
consumer role on the resultq, read this queue effectively in the
presence of multiple producers?

If it does q.get in a loop, it will eventually block when the queue is
empty.  If it does q.get_nowait, it _might_ see an empty queue when some
tests are pending that haven't been written to the queue yet (especially
since the pickling that takes place on the queue apparently introduces a
little delay)

A somewhat simplified view of what I have now is:

Queue, JoinableQueue imported from multiprocessing

    queue = JoinableQueue()
    resultq = Queue()
        procs = [RunTest(queue=queue, resultq=resultq) for _ in range(jobs)]
    total_num_tests = len(tests)
    for t in tests:
        queue.put(t)
    for p in procs:
        p.daemon = True
        p.start()
    queue.join()

    # collect back the modified test instances
    tests = []
    for t in iter(resultq.get, None):
        tests.append(t)
        # because q.get() blocks, we need a way to break out
        # q.empty() is claimed not to be reliable in mp.
        if len(tests) >= total_num_tests:
            break

This actually works fine based on knowing how many tests there are and
assuming when we've collected the same number, everything is done.  But
it doesn't sound entirely "clean" to me. What if a test is "lost" due to
an error, so its modified Test instance is never written to the result
queue?  Could use a sentinel, but who's going to write the sentinel -
this seems to suffer from the same problem as the non-blocking get, in
that a worker could write a sentinel when *it* is done, but other
workers might still be running their last test and we'll conclude the
queue has been drained a bit too early.  I did play some with trying to
set up a barrier across the workers, so they're all finished before each
writes a sentinel, but that still blocked in the q.get (mea culpa: I'm
violating the guidelines and not posting code for that - I didn't save
that particular hack).

I've read a little bit about multiprocessing.Manager and about
multiprocessing.Pool to look for other approaches, both seemed to have
limitations that made them awkward for this case, probably based on me
failing to understand something.


So... what else should I be trying to make this a little cleaner?

thanks,

-- mats
_______________________________________________
Tutor maillist  -  Tutor@python.org
To unsubscribe or change subscription options:
https://mail.python.org/mailman/listinfo/tutor

Reply via email to