Implement the add_done_callback and remove_done_callback methods, since they are required in order to make further progress toward asyncio compatibility.
Also implement the AbstractEventLoop create_future method for the EventLoop class, so that it returns an instance of _EventLoopFuture. EventLoop currently does not implement some of the asyncio.AbstractEventLoop methods that asyncio.Future requires for its add_done_callback implementation, and the create_future method conveniently solves this problem. X-Gentoo-bug: 591760 X-Gentoo-bug-url: https://bugs.gentoo.org/show_bug.cgi?id=591760 --- pym/portage/tests/ebuild/test_ipc_daemon.py | 3 +- .../tests/util/eventloop/test_call_soon_fifo.py | 6 +- pym/portage/tests/util/futures/__init__.py | 0 pym/portage/tests/util/futures/__test__.py | 0 .../tests/util/futures/test_done_callback.py | 35 +++++++++ pym/portage/util/_async/SchedulerInterface.py | 3 +- pym/portage/util/_eventloop/EventLoop.py | 14 ++++ pym/portage/util/futures/futures.py | 82 ++++++++++++++++++++-- 8 files changed, 132 insertions(+), 11 deletions(-) create mode 100644 pym/portage/tests/util/futures/__init__.py create mode 100644 pym/portage/tests/util/futures/__test__.py create mode 100644 pym/portage/tests/util/futures/test_done_callback.py diff --git a/pym/portage/tests/ebuild/test_ipc_daemon.py b/pym/portage/tests/ebuild/test_ipc_daemon.py index 68f139a..fc79165 100644 --- a/pym/portage/tests/ebuild/test_ipc_daemon.py +++ b/pym/portage/tests/ebuild/test_ipc_daemon.py @@ -16,7 +16,6 @@ from portage.util import ensure_dirs from portage.util._async.ForkProcess import ForkProcess from portage.util._async.TaskScheduler import TaskScheduler from portage.util._eventloop.global_event_loop import global_event_loop -from portage.util.futures.futures import Future from _emerge.SpawnProcess import SpawnProcess from _emerge.EbuildBuildDir import EbuildBuildDir from _emerge.EbuildIpcDaemon import EbuildIpcDaemon @@ -150,7 +149,7 @@ class IpcDaemonTestCase(TestCase): self._run_done.set_result(True) def _run(self, event_loop, task_scheduler, timeout): - self._run_done = Future() + self._run_done = event_loop.create_future() timeout_id = event_loop.timeout_add(timeout, self._timeout_callback, task_scheduler) task_scheduler.addExitListener(self._exit_callback) diff --git a/pym/portage/tests/util/eventloop/test_call_soon_fifo.py b/pym/portage/tests/util/eventloop/test_call_soon_fifo.py index 5ecc13f..f970c67 100644 --- a/pym/portage/tests/util/eventloop/test_call_soon_fifo.py +++ b/pym/portage/tests/util/eventloop/test_call_soon_fifo.py @@ -7,22 +7,22 @@ import random from portage import os from portage.tests import TestCase from portage.util._eventloop.global_event_loop import global_event_loop -from portage.util.futures.futures import Future + class CallSoonFifoTestCase(TestCase): def testCallSoonFifo(self): + event_loop = global_event_loop() inputs = [random.random() for index in range(10)] outputs = [] - finished = Future() + finished = event_loop.create_future() def add_output(value): outputs.append(value) if len(outputs) == len(inputs): finished.set_result(True) - event_loop = global_event_loop() for value in inputs: event_loop.call_soon(functools.partial(add_output, value)) diff --git a/pym/portage/tests/util/futures/__init__.py b/pym/portage/tests/util/futures/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pym/portage/tests/util/futures/__test__.py b/pym/portage/tests/util/futures/__test__.py new file mode 100644 index 0000000..e69de29 diff --git a/pym/portage/tests/util/futures/test_done_callback.py b/pym/portage/tests/util/futures/test_done_callback.py new file mode 100644 index 0000000..76b727b --- /dev/null +++ b/pym/portage/tests/util/futures/test_done_callback.py @@ -0,0 +1,35 @@ +# Copyright 2017 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from portage.tests import TestCase +from portage.util._eventloop.global_event_loop import global_event_loop + + +class FutureDoneCallbackTestCase(TestCase): + + def testFutureDoneCallback(self): + + event_loop = global_event_loop() + + def done_callback(finished): + done_callback_called.set_result(True) + + done_callback_called = event_loop.create_future() + finished = event_loop.create_future() + finished.add_done_callback(done_callback) + event_loop.call_soon(finished.set_result, True) + event_loop.run_until_complete(done_callback_called) + + def done_callback2(finished): + done_callback2_called.set_result(True) + + done_callback_called = event_loop.create_future() + done_callback2_called = event_loop.create_future() + finished = event_loop.create_future() + finished.add_done_callback(done_callback) + finished.add_done_callback(done_callback2) + finished.remove_done_callback(done_callback) + event_loop.call_soon(finished.set_result, True) + event_loop.run_until_complete(done_callback2_called) + + self.assertFalse(done_callback_called.done()) diff --git a/pym/portage/util/_async/SchedulerInterface.py b/pym/portage/util/_async/SchedulerInterface.py index 6028fd9..21420ae 100644 --- a/pym/portage/util/_async/SchedulerInterface.py +++ b/pym/portage/util/_async/SchedulerInterface.py @@ -13,7 +13,8 @@ class SchedulerInterface(SlotObject): _event_loop_attrs = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT", "IO_PRI", - "call_soon", "call_soon_threadsafe", "child_watch_add", + "call_soon", "call_soon_threadsafe", + "child_watch_add", "create_future", "idle_add", "io_add_watch", "iteration", "run_until_complete", "source_remove", "timeout_add") diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 308157b..712838e 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -22,6 +22,11 @@ try: except ImportError: import dummy_threading as threading +import portage +portage.proxy.lazyimport.lazyimport(globals(), + 'portage.util.futures.futures:_EventLoopFuture', +) + from portage import OrderedDict from portage.util import writemsg_level from ..SlotObject import SlotObject @@ -157,6 +162,15 @@ class EventLoop(object): self._sigchld_src_id = None self._pid = os.getpid() + def create_future(self): + """ + Create a Future object attached to the loop. This returns + an instance of _EventLoopFuture, because EventLoop is currently + missing some of the asyncio.AbstractEventLoop methods that + asyncio.Future requires. + """ + return _EventLoopFuture(loop=self) + def _new_source_id(self): """ Generate a new source id. This method is thread-safe. diff --git a/pym/portage/util/futures/futures.py b/pym/portage/util/futures/futures.py index c648f10..dd913a1 100644 --- a/pym/portage/util/futures/futures.py +++ b/pym/portage/util/futures/futures.py @@ -23,10 +23,6 @@ except ImportError: from portage.exception import PortageException - _PENDING = 'PENDING' - _CANCELLED = 'CANCELLED' - _FINISHED = 'FINISHED' - class Error(PortageException): pass @@ -37,12 +33,40 @@ except ImportError: class InvalidStateError(Error): pass - class Future(object): + Future = None + +from portage.util._eventloop.global_event_loop import global_event_loop + +_PENDING = 'PENDING' +_CANCELLED = 'CANCELLED' +_FINISHED = 'FINISHED' + +class _EventLoopFuture(object): + """ + This class provides (a subset of) the asyncio.Future interface, for + use with the EventLoop class, because EventLoop is currently + missing some of the asyncio.AbstractEventLoop methods that + asyncio.Future requires. + """ # Class variables serving as defaults for instance variables. _state = _PENDING _result = None _exception = None + _loop = None + + def __init__(self, loop=None): + """Initialize the future. + + The optional loop argument allows explicitly setting the event + loop object used by the future. If it's not provided, the future uses + the default event loop. + """ + if loop is None: + self._loop = global_event_loop() + else: + self._loop = loop + self._callbacks = [] def cancel(self): """Cancel the future and schedule callbacks. @@ -54,8 +78,27 @@ except ImportError: if self._state != _PENDING: return False self._state = _CANCELLED + self._schedule_callbacks() return True + def _schedule_callbacks(self): + """Internal: Ask the event loop to call all callbacks. + + The callbacks are scheduled to be called as soon as possible. Also + clears the callback list. + """ + callbacks = self._callbacks[:] + if not callbacks: + return + + self._callbacks[:] = [] + for callback in callbacks: + self._loop.call_soon(callback, self) + + def cancelled(self): + """Return True if the future was cancelled.""" + return self._state == _CANCELLED + def done(self): """Return True if the future is done. @@ -93,6 +136,29 @@ except ImportError: raise InvalidStateError('Exception is not set.') return self._exception + def add_done_callback(self, fn): + """Add a callback to be run when the future becomes done. + + The callback is called with a single argument - the future object. If + the future is already done when this is called, the callback is + scheduled with call_soon. + """ + if self._state != _PENDING: + self._loop.call_soon(fn, self) + else: + self._callbacks.append(fn) + + def remove_done_callback(self, fn): + """Remove all instances of a callback from the "call when done" list. + + Returns the number of callbacks removed. + """ + filtered_callbacks = [f for f in self._callbacks if f != fn] + removed_count = len(self._callbacks) - len(filtered_callbacks) + if removed_count: + self._callbacks[:] = filtered_callbacks + return removed_count + def set_result(self, result): """Mark the future done and set its result. @@ -103,6 +169,7 @@ except ImportError: raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = _FINISHED + self._schedule_callbacks() def set_exception(self, exception): """Mark the future done and set an exception. @@ -116,3 +183,8 @@ except ImportError: exception = exception() self._exception = exception self._state = _FINISHED + self._schedule_callbacks() + + +if Future is None: + Future = _EventLoopFuture -- 2.10.2