On Sun, 26 Mar 2017 03:13:11 -0700 Zac Medico <zmed...@gentoo.org> wrote:
> Implement the add_done_callback and remove_done_callback methods, > since they are required in order to make further progress toward > asyncio compatibility. > > Also implement the AbstractEventLoop create_future method for the > EventLoop class, so that it returns an instance of _EventLoopFuture. > EventLoop currently does not implement some of the > asyncio.AbstractEventLoop methods that asyncio.Future requires for > its add_done_callback implementation, and the create_future method > conveniently solves this problem. > > X-Gentoo-bug: 591760 > X-Gentoo-bug-url: https://bugs.gentoo.org/show_bug.cgi?id=591760 > --- > pym/portage/tests/ebuild/test_ipc_daemon.py | 3 +- > .../tests/util/eventloop/test_call_soon_fifo.py | 6 +- > pym/portage/tests/util/futures/__init__.py | 0 > pym/portage/tests/util/futures/__test__.py | 0 > .../tests/util/futures/test_done_callback.py | 35 +++++++++ > pym/portage/util/_async/SchedulerInterface.py | 3 +- > pym/portage/util/_eventloop/EventLoop.py | 14 ++++ > pym/portage/util/futures/futures.py | 82 > ++++++++++++++++++++-- 8 files changed, 132 insertions(+), 11 > deletions(-) create mode 100644 > pym/portage/tests/util/futures/__init__.py create mode 100644 > pym/portage/tests/util/futures/__test__.py create mode 100644 > pym/portage/tests/util/futures/test_done_callback.py > > diff --git a/pym/portage/tests/ebuild/test_ipc_daemon.py > b/pym/portage/tests/ebuild/test_ipc_daemon.py index 68f139a..fc79165 > 100644 --- a/pym/portage/tests/ebuild/test_ipc_daemon.py > +++ b/pym/portage/tests/ebuild/test_ipc_daemon.py > @@ -16,7 +16,6 @@ from portage.util import ensure_dirs > from portage.util._async.ForkProcess import ForkProcess > from portage.util._async.TaskScheduler import TaskScheduler > from portage.util._eventloop.global_event_loop import > global_event_loop -from portage.util.futures.futures import Future > from _emerge.SpawnProcess import SpawnProcess > from _emerge.EbuildBuildDir import EbuildBuildDir > from _emerge.EbuildIpcDaemon import EbuildIpcDaemon > @@ -150,7 +149,7 @@ class IpcDaemonTestCase(TestCase): > self._run_done.set_result(True) > > def _run(self, event_loop, task_scheduler, timeout): > - self._run_done = Future() > + self._run_done = event_loop.create_future() > timeout_id = event_loop.timeout_add(timeout, > self._timeout_callback, task_scheduler) > task_scheduler.addExitListener(self._exit_callback) > diff --git a/pym/portage/tests/util/eventloop/test_call_soon_fifo.py > b/pym/portage/tests/util/eventloop/test_call_soon_fifo.py index > 5ecc13f..f970c67 100644 --- > a/pym/portage/tests/util/eventloop/test_call_soon_fifo.py +++ > b/pym/portage/tests/util/eventloop/test_call_soon_fifo.py @@ -7,22 > +7,22 @@ import random from portage import os > from portage.tests import TestCase > from portage.util._eventloop.global_event_loop import > global_event_loop -from portage.util.futures.futures import Future > + > > class CallSoonFifoTestCase(TestCase): > > def testCallSoonFifo(self): > > + event_loop = global_event_loop() > inputs = [random.random() for index in range(10)] > outputs = [] > - finished = Future() > + finished = event_loop.create_future() > > def add_output(value): > outputs.append(value) > if len(outputs) == len(inputs): > finished.set_result(True) > > - event_loop = global_event_loop() > for value in inputs: > event_loop.call_soon(functools.partial(add_output, > value)) > diff --git a/pym/portage/tests/util/futures/__init__.py > b/pym/portage/tests/util/futures/__init__.py new file mode 100644 > index 0000000..e69de29 > diff --git a/pym/portage/tests/util/futures/__test__.py > b/pym/portage/tests/util/futures/__test__.py new file mode 100644 > index 0000000..e69de29 > diff --git a/pym/portage/tests/util/futures/test_done_callback.py > b/pym/portage/tests/util/futures/test_done_callback.py new file mode > 100644 index 0000000..76b727b > --- /dev/null > +++ b/pym/portage/tests/util/futures/test_done_callback.py > @@ -0,0 +1,35 @@ > +# Copyright 2017 Gentoo Foundation > +# Distributed under the terms of the GNU General Public License v2 > + > +from portage.tests import TestCase > +from portage.util._eventloop.global_event_loop import > global_event_loop + > + > +class FutureDoneCallbackTestCase(TestCase): > + > + def testFutureDoneCallback(self): > + > + event_loop = global_event_loop() > + > + def done_callback(finished): > + done_callback_called.set_result(True) > + > + done_callback_called = event_loop.create_future() > + finished = event_loop.create_future() > + finished.add_done_callback(done_callback) > + event_loop.call_soon(finished.set_result, True) > + event_loop.run_until_complete(done_callback_called) > + > + def done_callback2(finished): > + done_callback2_called.set_result(True) > + > + done_callback_called = event_loop.create_future() > + done_callback2_called = event_loop.create_future() > + finished = event_loop.create_future() > + finished.add_done_callback(done_callback) > + finished.add_done_callback(done_callback2) > + finished.remove_done_callback(done_callback) > + event_loop.call_soon(finished.set_result, True) > + event_loop.run_until_complete(done_callback2_called) > + > + self.assertFalse(done_callback_called.done()) > diff --git a/pym/portage/util/_async/SchedulerInterface.py > b/pym/portage/util/_async/SchedulerInterface.py index > 6028fd9..21420ae 100644 --- > a/pym/portage/util/_async/SchedulerInterface.py +++ > b/pym/portage/util/_async/SchedulerInterface.py @@ -13,7 +13,8 @@ > class SchedulerInterface(SlotObject): > _event_loop_attrs = ("IO_ERR", "IO_HUP", "IO_IN", > "IO_NVAL", "IO_OUT", "IO_PRI", > - "call_soon", "call_soon_threadsafe", > "child_watch_add", > + "call_soon", "call_soon_threadsafe", > + "child_watch_add", "create_future", > "idle_add", "io_add_watch", "iteration", > "run_until_complete", "source_remove", "timeout_add") > > diff --git a/pym/portage/util/_eventloop/EventLoop.py > b/pym/portage/util/_eventloop/EventLoop.py index 308157b..712838e > 100644 --- a/pym/portage/util/_eventloop/EventLoop.py > +++ b/pym/portage/util/_eventloop/EventLoop.py > @@ -22,6 +22,11 @@ try: > except ImportError: > import dummy_threading as threading > > +import portage > +portage.proxy.lazyimport.lazyimport(globals(), > + 'portage.util.futures.futures:_EventLoopFuture', > +) > + > from portage import OrderedDict > from portage.util import writemsg_level > from ..SlotObject import SlotObject > @@ -157,6 +162,15 @@ class EventLoop(object): > self._sigchld_src_id = None > self._pid = os.getpid() > > + def create_future(self): > + """ > + Create a Future object attached to the loop. This > returns > + an instance of _EventLoopFuture, because EventLoop > is currently > + missing some of the asyncio.AbstractEventLoop > methods that > + asyncio.Future requires. > + """ > + return _EventLoopFuture(loop=self) > + > def _new_source_id(self): > """ > Generate a new source id. This method is thread-safe. > diff --git a/pym/portage/util/futures/futures.py > b/pym/portage/util/futures/futures.py index c648f10..dd913a1 100644 > --- a/pym/portage/util/futures/futures.py > +++ b/pym/portage/util/futures/futures.py > @@ -23,10 +23,6 @@ except ImportError: > > from portage.exception import PortageException > > - _PENDING = 'PENDING' > - _CANCELLED = 'CANCELLED' > - _FINISHED = 'FINISHED' > - > class Error(PortageException): > pass > > @@ -37,12 +33,40 @@ except ImportError: > class InvalidStateError(Error): > pass > > - class Future(object): > + Future = None > + > +from portage.util._eventloop.global_event_loop import > global_event_loop + > +_PENDING = 'PENDING' > +_CANCELLED = 'CANCELLED' > +_FINISHED = 'FINISHED' > + > +class _EventLoopFuture(object): > + """ > + This class provides (a subset of) the asyncio.Future > interface, for > + use with the EventLoop class, because EventLoop is > currently > + missing some of the asyncio.AbstractEventLoop > methods that > + asyncio.Future requires. > + """ > > # Class variables serving as defaults for instance > variables. _state = _PENDING > _result = None > _exception = None > + _loop = None > + > + def __init__(self, loop=None): > + """Initialize the future. > + > + The optional loop argument allows explicitly > setting the event > + loop object used by the future. If it's not > provided, the future uses > + the default event loop. > + """ > + if loop is None: > + self._loop = global_event_loop() > + else: > + self._loop = loop > + self._callbacks = [] > > def cancel(self): > """Cancel the future and schedule callbacks. > @@ -54,8 +78,27 @@ except ImportError: > if self._state != _PENDING: > return False > self._state = _CANCELLED > + self._schedule_callbacks() > return True > > + def _schedule_callbacks(self): > + """Internal: Ask the event loop to call all > callbacks. + > + The callbacks are scheduled to be called as > soon as possible. Also > + clears the callback list. > + """ > + callbacks = self._callbacks[:] > + if not callbacks: > + return > + > + self._callbacks[:] = [] > + for callback in callbacks: > + self._loop.call_soon(callback, self) > + > + def cancelled(self): > + """Return True if the future was > cancelled.""" > + return self._state == _CANCELLED > + > def done(self): > """Return True if the future is done. > > @@ -93,6 +136,29 @@ except ImportError: > raise InvalidStateError('Exception > is not set.') return self._exception > > + def add_done_callback(self, fn): > + """Add a callback to be run when the future > becomes done. + > + The callback is called with a single > argument - the future object. If > + the future is already done when this is > called, the callback is > + scheduled with call_soon. > + """ > + if self._state != _PENDING: > + self._loop.call_soon(fn, self) > + else: > + self._callbacks.append(fn) > + > + def remove_done_callback(self, fn): > + """Remove all instances of a callback from > the "call when done" list. + > + Returns the number of callbacks removed. > + """ > + filtered_callbacks = [f for f in > self._callbacks if f != fn] > + removed_count = len(self._callbacks) - > len(filtered_callbacks) > + if removed_count: > + self._callbacks[:] = > filtered_callbacks > + return removed_count > + > def set_result(self, result): > """Mark the future done and set its result. > > @@ -103,6 +169,7 @@ except ImportError: > raise InvalidStateError('{}: > {!r}'.format(self._state, self)) self._result = result > self._state = _FINISHED > + self._schedule_callbacks() > > def set_exception(self, exception): > """Mark the future done and set an exception. > @@ -116,3 +183,8 @@ except ImportError: > exception = exception() > self._exception = exception > self._state = _FINISHED > + self._schedule_callbacks() > + > + > +if Future is None: > + Future = _EventLoopFuture looks fine... /me ignoring the lack of parameters descriptions in the docstrings -- Brian Dolbec <dolsen>