Tim Peters <[email protected]> added the comment:
First thing: the code uses the global name `outputer` for two different
things, as the name of a module function and as the global name given to the
Process object running that function. At least on Windows under Python 3.6.4
that confusion prevents the program from running. So rename one of them.
Then comes the pain ;-) A multiprocessing queue is a rather complex object
under the covers, and the docs don't really spell out all the details. Maybe
they should.
The docs do vaguely sketch that a "feeder thread" is created in each process
using an mp.queue, which feeds object you .put() from an internal buffer into
an interprocess pipe. The internal buffer is needed in case you .put() so many
objects so fast that feeding them into a pipe directly would cause the OS pipe
functions to fail.
And that happens in your case: you have 10 producers running at full speed
overwhelming a single slow consumer. _Most_ of the data enqueued by
output_queue.put(i+1) is sitting in those internal buffers most of the time,
and the base interprocess pipe doesn't know anything about them for the
duration.
The practical consequence: while the queue always reflects the order in which
objects were .put() within a single process, there's no guarantee about
ordering _across_ processes. Objects are fed from internal buffers into the
shared pipe whenever a process's feeder thread happens to wake up and sees that
the pipe isn't "too full". task_queue.task_done() only records that an object
has been taken off of task_queue and _given_ to output_queue.put(i+1); most of
the time, the latter just sticks i+1 into an internal buffer because
output_queue's shared pipe is too full to accept another object.
Given that this is how things actually work, what you can do instead is add:
for w in workers:
w.join()
somwehere before output_queue.put(None). A worker process doesn't end until
its feeder thread(s) complete feeding all the internal buffer objects into
pipes, so .join()'ing a worker is the one "obvious" way to guarantee that all
the worker's .put() actions have wholly completed.
In which case, there's no point to using a JoinableQueue at all - .task_done()
no longer serves any real purpose in the code then.
----------
nosy: +tim.peters
_______________________________________
Python tracker <[email protected]>
<https://bugs.python.org/issue32382>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com