On Sun, Feb 25, 2018 at 8:50 PM, Zac Medico <zmed...@gentoo.org> wrote:
> The iter_completed function is similar to asyncio.as_completed, but > takes an iterator of futures as input, and includes support for > max_jobs and max_load parameters. The default values for max_jobs > and max_load correspond to multiprocessing.cpu_count(). > > Example usage for async_aux_get: > > import portage > from portage.util.futures.iter_completed import iter_completed > > portdb = portage.portdb > future_cpv = {} > I'm not sure I grasp the purpose of this dict, can't we just modify the async aux get to return the cpv from the future? > > def future_generator(): > for cpv in portdb.cp_list('sys-apps/portage'): > future = portdb.async_aux_get(cpv, portage.auxdbkeys) > future_cpv[id(future)] = cpv > yield future > > for cpv in portdb.cp_list('...'): yield portdb.async_aux_get(cpv, portage.auxdbkeys) > for future in iter_completed(future_generator()): > cpv = future_cpv.pop(id(future)) > try: > result = future.result() > except KeyError as e: > # aux_get failed > print('error:', cpv, e) > else: > print(cpv, result) > for future in iter_completed(future_generator()): try: cpv, result = future.result() except KeyError as e: print('error', cpv, e) Or do we expect callers to need other things to key off of in this API? -A > See: https://docs.python.org/3/library/asyncio-task.html# > asyncio.as_completed > Bug: https://bugs.gentoo.org/648790 > --- > .../tests/util/futures/test_iter_completed.py | 50 ++++++++++++ > pym/portage/util/_async/FuturePollTask.py | 27 ++++++ > pym/portage/util/futures/iter_completed.py | 63 ++++++++++++++ > pym/portage/util/futures/wait.py | 95 > ++++++++++++++++++++++ > 4 files changed, 235 insertions(+) > create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py > create mode 100644 pym/portage/util/_async/FuturePollTask.py > create mode 100644 pym/portage/util/futures/iter_completed.py > create mode 100644 pym/portage/util/futures/wait.py > > diff --git a/pym/portage/tests/util/futures/test_iter_completed.py > b/pym/portage/tests/util/futures/test_iter_completed.py > new file mode 100644 > index 000000000..6607d871c > --- /dev/null > +++ b/pym/portage/tests/util/futures/test_iter_completed.py > @@ -0,0 +1,50 @@ > +# Copyright 2018 Gentoo Foundation > +# Distributed under the terms of the GNU General Public License v2 > + > +import time > +from portage.tests import TestCase > +from portage.util._async.ForkProcess import ForkProcess > +from portage.util._eventloop.global_event_loop import global_event_loop > +from portage.util.futures.iter_completed import iter_completed > + > + > +class SleepProcess(ForkProcess): > + __slots__ = ('future', 'seconds') > + def _start(self): > + self.addExitListener(self._future_done) > + ForkProcess._start(self) > + > + def _future_done(self, task): > + self.future.set_result(self.seconds) > + > + def _run(self): > + time.sleep(self.seconds) > + > + > +class IterCompletedTestCase(TestCase): > + > + def testIterCompleted(self): > + > + # Mark this as todo, since we don't want to fail if heavy > system > + # load causes the tasks to finish in an unexpected order. > + self.todo = True > + > + loop = global_event_loop() > + tasks = [ > + SleepProcess(seconds=0.200), > + SleepProcess(seconds=0.100), > + SleepProcess(seconds=0.001), > + ] > + > + expected_order = sorted(task.seconds for task in tasks) > + > + def future_generator(): > + for task in tasks: > + task.future = loop.create_future() > + task.scheduler = loop > + task.start() > + yield task.future > + > + for seconds, future in zip(expected_order, > iter_completed(future_generator(), > + max_jobs=None, max_load=None, loop=loop)): > + self.assertEqual(seconds, future.result()) > diff --git a/pym/portage/util/_async/FuturePollTask.py > b/pym/portage/util/_async/FuturePollTask.py > new file mode 100644 > index 000000000..6b7cdf7d5 > --- /dev/null > +++ b/pym/portage/util/_async/FuturePollTask.py > @@ -0,0 +1,27 @@ > +# Copyright 2018 Gentoo Foundation > +# Distributed under the terms of the GNU General Public License v2 > + > +import os > +import signal > + > +from _emerge.AbstractPollTask import AbstractPollTask > + > + > +class FuturePollTask(AbstractPollTask): > + """ > + Wraps a Future in an AsynchronousTask, which is useful for > + scheduling with TaskScheduler. > + """ > + __slots__ = ('future',) > + def _start(self): > + self.future.add_done_callback(self._done_callback) > + > + def _done_callback(self, future): > + if future.cancelled(): > + self.cancelled = True > + self.returncode = -signal.SIGINT > + elif future.exception() is None: > + self.returncode = os.EX_OK > + else: > + self.returncode = 1 > + self.wait() > diff --git a/pym/portage/util/futures/iter_completed.py > b/pym/portage/util/futures/iter_completed.py > new file mode 100644 > index 000000000..0540cc986 > --- /dev/null > +++ b/pym/portage/util/futures/iter_completed.py > @@ -0,0 +1,63 @@ > +# Copyright 2018 Gentoo Foundation > +# Distributed under the terms of the GNU General Public License v2 > + > +import multiprocessing > + > +from portage.util._async.FuturePollTask import FuturePollTask > +from portage.util._async.TaskScheduler import TaskScheduler > +from portage.util._eventloop.global_event_loop import global_event_loop > +from portage.util.futures.wait import wait, FIRST_COMPLETED > + > + > +def iter_completed(futures, max_jobs=None, max_load=None, loop=None): > + """ > + This is similar to asyncio.as_completed, but takes an iterator of > + futures as input, and includes support for max_jobs and max_load > + parameters. > + > + @param futures: iterator of asyncio.Future (or compatible) > + @type futures: iterator > + @param max_jobs: max number of futures to process concurrently > (default > + is multiprocessing.cpu_count()) > + @type max_jobs: int > + @param max_load: max load allowed when scheduling a new future, > + otherwise schedule no more than 1 future at a time (default > + is multiprocessing.cpu_count()) > + @type max_load: int or float > + @param loop: event loop > + @type loop: EventLoop > + @return: iterator of futures that are done > + @rtype: iterator > + """ > + loop = loop or global_event_loop() > + max_jobs = max_jobs or multiprocessing.cpu_count() > + max_load = max_load or multiprocessing.cpu_count() > + > + future_map = {} > + def task_generator(): > + for future in futures: > + future_map[id(future)] = future > + yield FuturePollTask(future=future) > + > + scheduler = TaskScheduler( > + task_generator(), > + max_jobs=max_jobs, > + max_load=max_load, > + event_loop=loop) > + > + try: > + scheduler.start() > + > + # scheduler should ensure that future_map is non-empty > until > + # task_generator is exhausted > + while future_map: > + done, pending = loop.run_until_complete( > + wait(*list(future_map.values()), > return_when=FIRST_COMPLETED)) > + for future in done: > + del future_map[id(future)] > + yield future > + > + finally: > + # cleanup in case of interruption by SIGINT, etc > + scheduler.cancel() > + scheduler.wait() > diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/ > wait.py > new file mode 100644 > index 000000000..a65a25ac4 > --- /dev/null > +++ b/pym/portage/util/futures/wait.py > @@ -0,0 +1,95 @@ > +# Copyright 2018 Gentoo Foundation > +# Distributed under the terms of the GNU General Public License v2 > + > +try: > + from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION > +except ImportError: > + ALL_COMPLETED = 'ALL_COMPLETED' > + FIRST_COMPLETED ='FIRST_COMPLETED' > + FIRST_EXCEPTION = 'FIRST_EXCEPTION' > + > +from portage.util._eventloop.global_event_loop import global_event_loop > + > + > +# Use **kwargs since python2.7 does not allow arguments with defaults > +# to follow *futures. > +def wait(*futures, **kwargs): > + """ > + Use portage's internal EventLoop to emulate asyncio.wait: > + https://docs.python.org/3/library/asyncio-task.html#asyncio.wait > + > + @param future: future to wait for > + @type future: asyncio.Future (or compatible) > + @param timeout: number of seconds to wait (wait indefinitely if > + not specified) > + @type timeout: int or float > + @param return_when: indicates when this function should return, > must > + be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or > + FIRST_EXCEPTION (default is ALL_COMPLETED) > + @type return_when: object > + @param loop: event loop > + @type loop: EventLoop > + @return: tuple of (done, pending). > + @rtype: asyncio.Future (or compatible) > + """ > + if not futures: > + raise TypeError("wait() missing 1 required positional > argument: 'future'") > + loop = kwargs.pop('loop', None) > + timeout = kwargs.pop('timeout', None) > + return_when = kwargs.pop('return_when', ALL_COMPLETED) > + if kwargs: > + raise TypeError("wait() got an unexpected keyword argument > '{}'".\ > + format(next(iter(kwargs)))) > + loop = loop or global_event_loop() > + result_future = loop.create_future() > + _Waiter(futures, timeout, return_when, result_future, loop) > + return result_future > + > + > +class _Waiter(object): > + def __init__(self, futures, timeout, return_when, result_future, > loop): > + self._futures = futures > + self._completed = set() > + self._exceptions = set() > + self._return_when = return_when > + self._result_future = result_future > + self._loop = loop > + self._ready = False > + self._timeout = None > + for future in self._futures: > + future.add_done_callback(self._done_callback) > + if timeout is not None: > + self._timeout = loop.call_later(timeout, > self._timeout_callback) > + > + def _timeout_callback(self): > + if not self._ready: > + self._ready = True > + self._ready_callback() > + > + def _done_callback(self, future): > + if future.cancelled() or future.exception() is None: > + self._completed.add(id(future)) > + else: > + self._exceptions.add(id(future)) > + if not self._ready and ( > + (self._return_when is FIRST_COMPLETED and > self._completed) or > + (self._return_when is FIRST_EXCEPTION and > self._exceptions) or > + (len(self._futures) == len(self._completed) + > len(self._exceptions))): > + self._ready = True > + # use call_soon in case multiple callbacks > complete in quick succession > + self._loop.call_soon(self._ready_callback) > + > + def _ready_callback(self): > + if self._timeout is not None: > + self._timeout.cancel() > + self._timeout = None > + done = [] > + pending = [] > + done_ids = self._completed.union(self._exceptions) > + for future in self._futures: > + if id(future) in done_ids: > + done.append(future) > + else: > + pending.append(future) > + future.remove_done_callback( > self._done_callback) > + self._result_future.set_result((done, pending)) > -- > 2.13.6 > > >