On Sun, Feb 25, 2018 at 8:50 PM, Zac Medico <zmed...@gentoo.org> wrote:

> The iter_completed function is similar to asyncio.as_completed, but
> takes an iterator of futures as input, and includes support for
> max_jobs and max_load parameters. The default values for max_jobs
> and max_load correspond to multiprocessing.cpu_count().
>
> Example usage for async_aux_get:
>
>   import portage
>   from portage.util.futures.iter_completed import iter_completed
>
>   portdb = portage.portdb
>   future_cpv = {}
>

I'm not sure I grasp the purpose of this dict, can't we just modify the
async aux get to return the cpv from the future?


>
>   def future_generator():
>     for cpv in portdb.cp_list('sys-apps/portage'):
>       future = portdb.async_aux_get(cpv, portage.auxdbkeys)
>       future_cpv[id(future)] = cpv
>       yield future
>
>
for cpv in portdb.cp_list('...'):
   yield portdb.async_aux_get(cpv, portage.auxdbkeys)


>   for future in iter_completed(future_generator()):
>     cpv = future_cpv.pop(id(future))
>     try:
>       result = future.result()
>     except KeyError as e:
>       # aux_get failed
>       print('error:', cpv, e)
>     else:
>       print(cpv, result)
>

for future in iter_completed(future_generator()):
  try:
    cpv, result = future.result()
  except KeyError as e:
    print('error', cpv, e)


Or do we expect callers to need other things to key off of in this API?

-A


> See: https://docs.python.org/3/library/asyncio-task.html#
> asyncio.as_completed
> Bug: https://bugs.gentoo.org/648790
> ---
>  .../tests/util/futures/test_iter_completed.py      | 50 ++++++++++++
>  pym/portage/util/_async/FuturePollTask.py          | 27 ++++++
>  pym/portage/util/futures/iter_completed.py         | 63 ++++++++++++++
>  pym/portage/util/futures/wait.py                   | 95
> ++++++++++++++++++++++
>  4 files changed, 235 insertions(+)
>  create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py
>  create mode 100644 pym/portage/util/_async/FuturePollTask.py
>  create mode 100644 pym/portage/util/futures/iter_completed.py
>  create mode 100644 pym/portage/util/futures/wait.py
>
> diff --git a/pym/portage/tests/util/futures/test_iter_completed.py
> b/pym/portage/tests/util/futures/test_iter_completed.py
> new file mode 100644
> index 000000000..6607d871c
> --- /dev/null
> +++ b/pym/portage/tests/util/futures/test_iter_completed.py
> @@ -0,0 +1,50 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import time
> +from portage.tests import TestCase
> +from portage.util._async.ForkProcess import ForkProcess
> +from portage.util._eventloop.global_event_loop import global_event_loop
> +from portage.util.futures.iter_completed import iter_completed
> +
> +
> +class SleepProcess(ForkProcess):
> +       __slots__ = ('future', 'seconds')
> +       def _start(self):
> +               self.addExitListener(self._future_done)
> +               ForkProcess._start(self)
> +
> +       def _future_done(self, task):
> +               self.future.set_result(self.seconds)
> +
> +       def _run(self):
> +               time.sleep(self.seconds)
> +
> +
> +class IterCompletedTestCase(TestCase):
> +
> +       def testIterCompleted(self):
> +
> +               # Mark this as todo, since we don't want to fail if heavy
> system
> +               # load causes the tasks to finish in an unexpected order.
> +               self.todo = True
> +
> +               loop = global_event_loop()
> +               tasks = [
> +                       SleepProcess(seconds=0.200),
> +                       SleepProcess(seconds=0.100),
> +                       SleepProcess(seconds=0.001),
> +               ]
> +
> +               expected_order = sorted(task.seconds for task in tasks)
> +
> +               def future_generator():
> +                       for task in tasks:
> +                               task.future = loop.create_future()
> +                               task.scheduler = loop
> +                               task.start()
> +                               yield task.future
> +
> +               for seconds, future in zip(expected_order,
> iter_completed(future_generator(),
> +                       max_jobs=None, max_load=None, loop=loop)):
> +                       self.assertEqual(seconds, future.result())
> diff --git a/pym/portage/util/_async/FuturePollTask.py
> b/pym/portage/util/_async/FuturePollTask.py
> new file mode 100644
> index 000000000..6b7cdf7d5
> --- /dev/null
> +++ b/pym/portage/util/_async/FuturePollTask.py
> @@ -0,0 +1,27 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import os
> +import signal
> +
> +from _emerge.AbstractPollTask import AbstractPollTask
> +
> +
> +class FuturePollTask(AbstractPollTask):
> +       """
> +       Wraps a Future in an AsynchronousTask, which is useful for
> +       scheduling with TaskScheduler.
> +       """
> +       __slots__ = ('future',)
> +       def _start(self):
> +               self.future.add_done_callback(self._done_callback)
> +
> +       def _done_callback(self, future):
> +               if future.cancelled():
> +                       self.cancelled = True
> +                       self.returncode = -signal.SIGINT
> +               elif future.exception() is None:
> +                       self.returncode = os.EX_OK
> +               else:
> +                       self.returncode = 1
> +               self.wait()
> diff --git a/pym/portage/util/futures/iter_completed.py
> b/pym/portage/util/futures/iter_completed.py
> new file mode 100644
> index 000000000..0540cc986
> --- /dev/null
> +++ b/pym/portage/util/futures/iter_completed.py
> @@ -0,0 +1,63 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import multiprocessing
> +
> +from portage.util._async.FuturePollTask import FuturePollTask
> +from portage.util._async.TaskScheduler import TaskScheduler
> +from portage.util._eventloop.global_event_loop import global_event_loop
> +from portage.util.futures.wait import wait, FIRST_COMPLETED
> +
> +
> +def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
> +       """
> +       This is similar to asyncio.as_completed, but takes an iterator of
> +       futures as input, and includes support for max_jobs and max_load
> +       parameters.
> +
> +       @param futures: iterator of asyncio.Future (or compatible)
> +       @type futures: iterator
> +       @param max_jobs: max number of futures to process concurrently
> (default
> +               is multiprocessing.cpu_count())
> +       @type max_jobs: int
> +       @param max_load: max load allowed when scheduling a new future,
> +               otherwise schedule no more than 1 future at a time (default
> +               is multiprocessing.cpu_count())
> +       @type max_load: int or float
> +       @param loop: event loop
> +       @type loop: EventLoop
> +       @return: iterator of futures that are done
> +       @rtype: iterator
> +       """
> +       loop = loop or global_event_loop()
> +       max_jobs = max_jobs or multiprocessing.cpu_count()
> +       max_load = max_load or multiprocessing.cpu_count()
> +
> +       future_map = {}
> +       def task_generator():
> +               for future in futures:
> +                       future_map[id(future)] = future
> +                       yield FuturePollTask(future=future)
> +
> +       scheduler = TaskScheduler(
> +               task_generator(),
> +               max_jobs=max_jobs,
> +               max_load=max_load,
> +               event_loop=loop)
> +
> +       try:
> +               scheduler.start()
> +
> +               # scheduler should ensure that future_map is non-empty
> until
> +               # task_generator is exhausted
> +               while future_map:
> +                       done, pending = loop.run_until_complete(
> +                               wait(*list(future_map.values()),
> return_when=FIRST_COMPLETED))
> +                       for future in done:
> +                               del future_map[id(future)]
> +                               yield future
> +
> +       finally:
> +               # cleanup in case of interruption by SIGINT, etc
> +               scheduler.cancel()
> +               scheduler.wait()
> diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/
> wait.py
> new file mode 100644
> index 000000000..a65a25ac4
> --- /dev/null
> +++ b/pym/portage/util/futures/wait.py
> @@ -0,0 +1,95 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +try:
> +       from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
> +except ImportError:
> +       ALL_COMPLETED = 'ALL_COMPLETED'
> +       FIRST_COMPLETED ='FIRST_COMPLETED'
> +       FIRST_EXCEPTION = 'FIRST_EXCEPTION'
> +
> +from portage.util._eventloop.global_event_loop import global_event_loop
> +
> +
> +# Use **kwargs since python2.7 does not allow arguments with defaults
> +# to follow *futures.
> +def wait(*futures, **kwargs):
> +       """
> +       Use portage's internal EventLoop to emulate asyncio.wait:
> +       https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
> +
> +       @param future: future to wait for
> +       @type future: asyncio.Future (or compatible)
> +       @param timeout: number of seconds to wait (wait indefinitely if
> +               not specified)
> +       @type timeout: int or float
> +       @param return_when: indicates when this function should return,
> must
> +               be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or
> +               FIRST_EXCEPTION (default is ALL_COMPLETED)
> +       @type return_when: object
> +       @param loop: event loop
> +       @type loop: EventLoop
> +       @return: tuple of (done, pending).
> +       @rtype: asyncio.Future (or compatible)
> +       """
> +       if not futures:
> +               raise TypeError("wait() missing 1 required positional
> argument: 'future'")
> +       loop = kwargs.pop('loop', None)
> +       timeout = kwargs.pop('timeout', None)
> +       return_when = kwargs.pop('return_when', ALL_COMPLETED)
> +       if kwargs:
> +               raise TypeError("wait() got an unexpected keyword argument
> '{}'".\
> +                       format(next(iter(kwargs))))
> +       loop = loop or global_event_loop()
> +       result_future = loop.create_future()
> +       _Waiter(futures, timeout, return_when, result_future, loop)
> +       return result_future
> +
> +
> +class _Waiter(object):
> +       def __init__(self, futures, timeout, return_when, result_future,
> loop):
> +               self._futures = futures
> +               self._completed = set()
> +               self._exceptions = set()
> +               self._return_when = return_when
> +               self._result_future = result_future
> +               self._loop = loop
> +               self._ready = False
> +               self._timeout = None
> +               for future in self._futures:
> +                       future.add_done_callback(self._done_callback)
> +               if timeout is not None:
> +                       self._timeout = loop.call_later(timeout,
> self._timeout_callback)
> +
> +       def _timeout_callback(self):
> +               if not self._ready:
> +                       self._ready = True
> +                       self._ready_callback()
> +
> +       def _done_callback(self, future):
> +               if future.cancelled() or future.exception() is None:
> +                       self._completed.add(id(future))
> +               else:
> +                       self._exceptions.add(id(future))
> +               if not self._ready and (
> +                       (self._return_when is FIRST_COMPLETED and
> self._completed) or
> +                       (self._return_when is FIRST_EXCEPTION and
> self._exceptions) or
> +                       (len(self._futures) == len(self._completed) +
> len(self._exceptions))):
> +                       self._ready = True
> +                       # use call_soon in case multiple callbacks
> complete in quick succession
> +                       self._loop.call_soon(self._ready_callback)
> +
> +       def _ready_callback(self):
> +               if self._timeout is not None:
> +                       self._timeout.cancel()
> +                       self._timeout = None
> +               done = []
> +               pending = []
> +               done_ids = self._completed.union(self._exceptions)
> +               for future in self._futures:
> +                       if id(future) in done_ids:
> +                               done.append(future)
> +                       else:
> +                               pending.append(future)
> +                               future.remove_done_callback(
> self._done_callback)
> +               self._result_future.set_result((done, pending))
> --
> 2.13.6
>
>
>

Reply via email to