The iter_completed function is similar to asyncio.as_completed, but takes an iterator of futures as input, and includes support for max_jobs and max_load parameters. The default values for max_jobs and max_load correspond to multiprocessing.cpu_count().
Example usage for async_aux_get: import portage from portage.util.futures.iter_completed import iter_completed portdb = portage.portdb future_cpv = {} def future_generator(): for cpv in portdb.cp_list('sys-apps/portage'): future = portdb.async_aux_get(cpv, portage.auxdbkeys) future_cpv[id(future)] = cpv yield future for future in iter_completed(future_generator()): cpv = future_cpv.pop(id(future)) try: result = future.result() except KeyError as e: # aux_get failed print('error:', cpv, e) else: print(cpv, result) See: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed Bug: https://bugs.gentoo.org/648790 --- .../tests/util/futures/test_iter_completed.py | 50 ++++++++++++ pym/portage/util/_async/FuturePollTask.py | 27 ++++++ pym/portage/util/futures/iter_completed.py | 63 ++++++++++++++ pym/portage/util/futures/wait.py | 95 ++++++++++++++++++++++ 4 files changed, 235 insertions(+) create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py create mode 100644 pym/portage/util/_async/FuturePollTask.py create mode 100644 pym/portage/util/futures/iter_completed.py create mode 100644 pym/portage/util/futures/wait.py diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py new file mode 100644 index 000000000..6607d871c --- /dev/null +++ b/pym/portage/tests/util/futures/test_iter_completed.py @@ -0,0 +1,50 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import time +from portage.tests import TestCase +from portage.util._async.ForkProcess import ForkProcess +from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util.futures.iter_completed import iter_completed + + +class SleepProcess(ForkProcess): + __slots__ = ('future', 'seconds') + def _start(self): + self.addExitListener(self._future_done) + ForkProcess._start(self) + + def _future_done(self, task): + self.future.set_result(self.seconds) + + def _run(self): + time.sleep(self.seconds) + + +class IterCompletedTestCase(TestCase): + + def testIterCompleted(self): + + # Mark this as todo, since we don't want to fail if heavy system + # load causes the tasks to finish in an unexpected order. + self.todo = True + + loop = global_event_loop() + tasks = [ + SleepProcess(seconds=0.200), + SleepProcess(seconds=0.100), + SleepProcess(seconds=0.001), + ] + + expected_order = sorted(task.seconds for task in tasks) + + def future_generator(): + for task in tasks: + task.future = loop.create_future() + task.scheduler = loop + task.start() + yield task.future + + for seconds, future in zip(expected_order, iter_completed(future_generator(), + max_jobs=None, max_load=None, loop=loop)): + self.assertEqual(seconds, future.result()) diff --git a/pym/portage/util/_async/FuturePollTask.py b/pym/portage/util/_async/FuturePollTask.py new file mode 100644 index 000000000..6b7cdf7d5 --- /dev/null +++ b/pym/portage/util/_async/FuturePollTask.py @@ -0,0 +1,27 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import os +import signal + +from _emerge.AbstractPollTask import AbstractPollTask + + +class FuturePollTask(AbstractPollTask): + """ + Wraps a Future in an AsynchronousTask, which is useful for + scheduling with TaskScheduler. + """ + __slots__ = ('future',) + def _start(self): + self.future.add_done_callback(self._done_callback) + + def _done_callback(self, future): + if future.cancelled(): + self.cancelled = True + self.returncode = -signal.SIGINT + elif future.exception() is None: + self.returncode = os.EX_OK + else: + self.returncode = 1 + self.wait() diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py new file mode 100644 index 000000000..0540cc986 --- /dev/null +++ b/pym/portage/util/futures/iter_completed.py @@ -0,0 +1,63 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import multiprocessing + +from portage.util._async.FuturePollTask import FuturePollTask +from portage.util._async.TaskScheduler import TaskScheduler +from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util.futures.wait import wait, FIRST_COMPLETED + + +def iter_completed(futures, max_jobs=None, max_load=None, loop=None): + """ + This is similar to asyncio.as_completed, but takes an iterator of + futures as input, and includes support for max_jobs and max_load + parameters. + + @param futures: iterator of asyncio.Future (or compatible) + @type futures: iterator + @param max_jobs: max number of futures to process concurrently (default + is multiprocessing.cpu_count()) + @type max_jobs: int + @param max_load: max load allowed when scheduling a new future, + otherwise schedule no more than 1 future at a time (default + is multiprocessing.cpu_count()) + @type max_load: int or float + @param loop: event loop + @type loop: EventLoop + @return: iterator of futures that are done + @rtype: iterator + """ + loop = loop or global_event_loop() + max_jobs = max_jobs or multiprocessing.cpu_count() + max_load = max_load or multiprocessing.cpu_count() + + future_map = {} + def task_generator(): + for future in futures: + future_map[id(future)] = future + yield FuturePollTask(future=future) + + scheduler = TaskScheduler( + task_generator(), + max_jobs=max_jobs, + max_load=max_load, + event_loop=loop) + + try: + scheduler.start() + + # scheduler should ensure that future_map is non-empty until + # task_generator is exhausted + while future_map: + done, pending = loop.run_until_complete( + wait(*list(future_map.values()), return_when=FIRST_COMPLETED)) + for future in done: + del future_map[id(future)] + yield future + + finally: + # cleanup in case of interruption by SIGINT, etc + scheduler.cancel() + scheduler.wait() diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/wait.py new file mode 100644 index 000000000..a65a25ac4 --- /dev/null +++ b/pym/portage/util/futures/wait.py @@ -0,0 +1,95 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +try: + from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION +except ImportError: + ALL_COMPLETED = 'ALL_COMPLETED' + FIRST_COMPLETED ='FIRST_COMPLETED' + FIRST_EXCEPTION = 'FIRST_EXCEPTION' + +from portage.util._eventloop.global_event_loop import global_event_loop + + +# Use **kwargs since python2.7 does not allow arguments with defaults +# to follow *futures. +def wait(*futures, **kwargs): + """ + Use portage's internal EventLoop to emulate asyncio.wait: + https://docs.python.org/3/library/asyncio-task.html#asyncio.wait + + @param future: future to wait for + @type future: asyncio.Future (or compatible) + @param timeout: number of seconds to wait (wait indefinitely if + not specified) + @type timeout: int or float + @param return_when: indicates when this function should return, must + be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or + FIRST_EXCEPTION (default is ALL_COMPLETED) + @type return_when: object + @param loop: event loop + @type loop: EventLoop + @return: tuple of (done, pending). + @rtype: asyncio.Future (or compatible) + """ + if not futures: + raise TypeError("wait() missing 1 required positional argument: 'future'") + loop = kwargs.pop('loop', None) + timeout = kwargs.pop('timeout', None) + return_when = kwargs.pop('return_when', ALL_COMPLETED) + if kwargs: + raise TypeError("wait() got an unexpected keyword argument '{}'".\ + format(next(iter(kwargs)))) + loop = loop or global_event_loop() + result_future = loop.create_future() + _Waiter(futures, timeout, return_when, result_future, loop) + return result_future + + +class _Waiter(object): + def __init__(self, futures, timeout, return_when, result_future, loop): + self._futures = futures + self._completed = set() + self._exceptions = set() + self._return_when = return_when + self._result_future = result_future + self._loop = loop + self._ready = False + self._timeout = None + for future in self._futures: + future.add_done_callback(self._done_callback) + if timeout is not None: + self._timeout = loop.call_later(timeout, self._timeout_callback) + + def _timeout_callback(self): + if not self._ready: + self._ready = True + self._ready_callback() + + def _done_callback(self, future): + if future.cancelled() or future.exception() is None: + self._completed.add(id(future)) + else: + self._exceptions.add(id(future)) + if not self._ready and ( + (self._return_when is FIRST_COMPLETED and self._completed) or + (self._return_when is FIRST_EXCEPTION and self._exceptions) or + (len(self._futures) == len(self._completed) + len(self._exceptions))): + self._ready = True + # use call_soon in case multiple callbacks complete in quick succession + self._loop.call_soon(self._ready_callback) + + def _ready_callback(self): + if self._timeout is not None: + self._timeout.cancel() + self._timeout = None + done = [] + pending = [] + done_ids = self._completed.union(self._exceptions) + for future in self._futures: + if id(future) in done_ids: + done.append(future) + else: + pending.append(future) + future.remove_done_callback(self._done_callback) + self._result_future.set_result((done, pending)) -- 2.13.6