On Oct 7, 12:16 pm, MRAB <pyt...@mrabarnett.plus.com> wrote:
> Felix wrote:
> > Hello,
>
> > I keep running into a deadlock in a fairly simple parallel script
> > using Multiprocessing.Queue for sending tasks and receiving results.

> > It seems to be the workers cannot finish pusing buffered results into
> > the output queue when calling 'results.join_thread' while terminating,
> > but why is that? I tried calling 'results.close()' before joining the
> > workers in the main process, but it does not make a difference.
>
> > Is there something I am understanding wrong about the interface? Is
> > there a much better way to do what I am trying to do above?

> You can therefore get into a deadlock where:
>
> * Process A won't read from the queue until it has joined process B.
> * The join won't succeed until process B has terminated.
> * Process B won't terminate until it has finished writing to the queue.
> * Process B can't finish writing to the queue because it's full.
> * The queue is full because process A isn't reading from it.

I thought about that, but it seemed unlikely since I am not generating
too many results (a few thousand small touples of int). Also I tried
to deal with it by reading as many results form the queue as were
available, then joining the workers, then reading again. This did not
work reliably, maybe because the queue would fill up again while I
start joining the individual workers.

In any case the core of the problem is the following:

A bunch of workers push an unknown number of results into a queue. The
main process needs to collect all those results.

What is the right way to implement that with multiprocessing? I tried
joining the workers and then reading everything available, but
obviously (see above) that does not seem to work.

A dirty trick that works would be reading all results slowly and
assuming no more results are comming after the queue is empty, but
this is obviously unstable:

while 1:
    try:
        res.append(results.get(True,LONG_TIMEOUT))
    except Empty:
        break

It could be made somewhat better by joining the workers afterwards and
reading again, but another deadlock might happen.

What I am doing now is having the workers push a "DONE" flag on the
result queue when they end and reading results until all DONE flags
have arrived:


def work(tasks, results):
    for task in iter(tasks.get, 'STOP'):
        res = calc(*task)
        if res:
            results.put(res)
            tasks.put((task[0], res[1]))
            tasks.put((res[0],task[1]))
       queue.task_done()
    results.put('DONE')

And in main:

res = []
for i in range(opts.nprocs):
    res += list(iter(results.get,'DONE'))

for p in procs:
    p.join()

This seems to work, and as long as workers push data to the results
queue in the same order as the puts happen in each process (is this
guaranteed?) it should be stable. But is it the best/easiest way to do
this?
-- 
http://mail.python.org/mailman/listinfo/python-list

Reply via email to