Charles-Francois Natali <[email protected]> added the comment:
I think those lockups are due to a race in the Pool shutdown code.
In Lib/multiprocessing/pool.py:
def close(self):
debug('closing pool')
if self._state == RUN:
self._state = CLOSE
self._worker_handler._state = CLOSE
self._taskqueue.put(None)
We set the current state to CLOSE, and send None to the taskqueue, so that
task_handler detects that we want to shut down the queue and sends None
(sentinel) to the inqueue for each worker process.
When a worker process receives this sentinel, it exists, and when Pool's join
method is called, each process is joined successfully.
Now, there's a problem, because of the worker_hanler thread.
This thread constantly starts new threads if existing one exited after having
completed their work:
def _handle_workers(pool):
while pool._worker_handler._state == RUN and pool._state == RUN:
pool._maintain_pool()
time.sleep(0.1)
debug('worker handler exiting')
where
def _maintain_pool(self):
"""Clean up any exited workers and start replacements for them.
"""
if self._join_exited_workers():
self._repopulate_pool()
Imagine the following happens:
worker_handler checks that the pool is still running (state == RUN), but before
calling maintain_pool, it's preempted (releasal of the GIL), and Pool's close()
methode is called :
state is set to CLOSE, None is put to taskqueue, and worker threads exit.
Then, Pool's join is called:
def join(self):
debug('joining pool')
assert self._state in (CLOSE, TERMINATE)
self._worker_handler.join()
self._task_handler.join()
self._result_handler.join()
for p in self._pool:
p.join()
this blocks until worker_handler exits. This thread sooner or later resumes and
calls maintain_pool.
maintain_pool calls repopulate_pool, which recreates new worker
threads/processes.
Then, worker_handler checks the current state, sees CLOSE, and exists.
Then, Pool's join blocks there:
for p in self._pool:
p.join()
since the newly created processes never receive the sentinels (already consumed
by the previous worker processes)...
This race can be reproduced almost every time by just adding:
def _handle_workers(pool):
while pool._worker_handler._state == RUN and pool._state == RUN:
+ time.sleep(1)
pool._maintain_pool()
time.sleep(0.1)
debug('worker handler exiting')
Then something as simple as this will block:
p = multiprocessing.Pool(3)
p.close()
p.join()
I still have to think of a clean way to solve this.
----------
nosy: +neologix
_______________________________________
Python tracker <[email protected]>
<http://bugs.python.org/issue8428>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com