New submission from Dan O'Reilly:

As initially discussed on python-ideas, it would be nice if there was a way to 
query the concurrent.futures Executor objects for information about their 
internal state - number of total/active/idle workers, number of 
total/active/waiting tasks, which tasks are active, which are waiting, etc. 
Some of this information can be determined today by examining the Executor's 
internal variables, but not all.

I'm attaching a patch that makes a first attempt at adding this support. 
Currently it adds quite a few methods, though these could be consolidated 
somewhat if that's preferable. Here's what's I've added, along with possible 
consolidations:

worker_count() : Total number of workers currently in the pool
active_worker_count() : Number of workers currently processing a work item
idle_worker_count(): Number of workers not processing a work item
(Possible consolidation: worker_counts(): returns a dict containing 
total/active/idle keys mapped to the above.)

task_count(): Total number of tasks currently being handled by the pool
active_task_count(): Number of tasks currently being processed by workers 
(Possibly redundant - it will always match active_worker_count())
waiting_task_count(): Number of submitted tasks not yet being processed by a 
worker
(Possible consolidation: task_counts(): returns a dict containing 
total/active/waiting keys mapped to the above.)

active_tasks(): A set of WorkItem objects currently being processed by a worker.
waiting_tasks(): A list of WorkItem objects currently waiting to be processed 
by a worker.
(Possible consolidation: get_tasks(): returns a dict containing active/waiting 
keys mapped to the above.)

A WorkItem is an object containing the function object, args tuple, and kwargs 
dict submitted to the Executor.

ThreadPoolExecutor notes:

For ThreadPoolExecutor, most of this information is made available by changing 
the worker threads from functions into class instances, and maintaining a small 
amount of extra state on the instance. The added overhead for users who don't 
care about using introspection should be very minimal. Note that for 
waiting_tasks(), we have to introspect the underlying queue.Queue. This is done 
by locking the queue's internal mutex, and iterating over the queue's internal 
deque. There was some concern about doing this on the mailing list, so I wanted 
to call it out. We could alternately implement waiting_tasks by maintaining 
some data structure (a deque?) of work items that are enqueued in parallel to 
the actual queue. However, this adds additional memory overhead and 
implementation complexity (to keep the new deque in sync with the queue's 
content).

ProcessPoolExecutor notes:

ProcessPoolExecutor uses both a dict and a multiprocessing.Queue internally. 
Every submitted work item is placed into the dict (which is called 
_pending_work_items), keyed on a unique work_id. However, only WORKER_COUNT + 1 
items are actually placed into the multiprocessing.Queue at a time. This, along 
with the added IPC complexity and cost, makes the implementation approach a bit 
different from ThreadPoolExecutor. 

Every method except worker_count() and task_count() require changes in the 
worker implementation - it now has to send the work_id of the work item it's 
about to process back to the parent. It does this via a 
multiprocessing.SimpleQueue that's already being used to send the result of the 
work item to the parent. The parent process will then store that work_id in a 
set called _active_work_items. When the actual result of a work item is sent 
back to the parent, the work_id (which is already included with the result) is 
removed from the _active_work_items set.

The active_tasks() method can build its set by iterating over work_ids in the 
_active_tasks set, and looking up the corresponding WorkItem in the 
_pending_work_items dict. waiting_tasks() can iterate over the _pending_tasks 
dict and build a list containing any item that isn't present in the 
_active_tasks set. That list is then sorted by work_id for it to reflect the 
actual order that the tasks will be placed into the queue. The main source of 
added overhead for non-users of introspection is the cost of sending the 
work_id back to the parent process prior to actually processing a WorkItem in 
the child, along with the small amount of memory used to store the 
_active_tasks set (which will never be greater than MAX_WORKERS in size). In 
practice I don't think this will have much noticeable performance impact, 
except perhaps for cases where there are many tasks which execute very quickly.

Also note that right now there are no docs included in the patch. I want some 
consensus on the API to be reached prior to writing them.

----------
components: Library (Lib)
files: introspect_executors.diff
keywords: patch
messages: 225941
nosy: dan.oreilly
priority: normal
severity: normal
status: open
title: ProcessPoolExecutor/ThreadPoolExecutor should provide introspection APIs
type: enhancement
versions: Python 3.5
Added file: http://bugs.python.org/file36480/introspect_executors.diff

_______________________________________
Python tracker <rep...@bugs.python.org>
<http://bugs.python.org/issue22281>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to