For readability, it's desirable to make asynchronous code use coroutines to avoid callbacks when possible. For python2 compatibility, generators that yield Futures can be used to implement coroutines.
Add a compat_coroutine module which provides a @coroutine decorator and a coroutine_return function that can be used to return a value from a generator. The decorated function returns a Future which is done when the generator is exhausted. Usage is very similar to asyncio coroutine usage in python3.4 (see unit tests). Bug: https://bugs.gentoo.org/660426 --- [PATCH v2] fixed to support decoration of object methods, and added a unit test using this support to demonstrate interaction between multiple coroutines .../tests/util/futures/test_compat_coroutine.py | 122 +++++++++++++++++++++ pym/portage/util/futures/compat_coroutine.py | 96 ++++++++++++++++ 2 files changed, 218 insertions(+) create mode 100644 pym/portage/tests/util/futures/test_compat_coroutine.py create mode 100644 pym/portage/util/futures/compat_coroutine.py diff --git a/pym/portage/tests/util/futures/test_compat_coroutine.py b/pym/portage/tests/util/futures/test_compat_coroutine.py new file mode 100644 index 0000000000..f9de409ae4 --- /dev/null +++ b/pym/portage/tests/util/futures/test_compat_coroutine.py @@ -0,0 +1,122 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import ( + coroutine, + coroutine_return, +) +from portage.tests import TestCase + + +class CompatCoroutineTestCase(TestCase): + + def test_returning_coroutine(self): + @coroutine + def returning_coroutine(): + coroutine_return('success') + yield None + + self.assertEqual('success', + asyncio.get_event_loop().run_until_complete(returning_coroutine())) + + def test_raising_coroutine(self): + + class TestException(Exception): + pass + + @coroutine + def raising_coroutine(): + raise TestException('exception') + yield None + + self.assertRaises(TestException, + asyncio.get_event_loop().run_until_complete, raising_coroutine()) + + def test_cancelled_coroutine(self): + + @coroutine + def endlessly_sleeping_coroutine(loop=None): + loop = asyncio._wrap_loop(loop) + yield loop.create_future() + + loop = asyncio.get_event_loop() + future = endlessly_sleeping_coroutine(loop=loop) + loop.call_soon(future.cancel) + + self.assertRaises(asyncio.CancelledError, + loop.run_until_complete, future) + + def test_sleeping_coroutine(self): + @coroutine + def sleeping_coroutine(): + for i in range(3): + x = yield asyncio.sleep(0, result=i) + self.assertEqual(x, i) + + asyncio.get_event_loop().run_until_complete(sleeping_coroutine()) + + def test_method_coroutine(self): + + class Cubby(object): + + _empty = object() + + def __init__(self, loop): + self._loop = loop + self._value = self._empty + self._waiters = [] + + def _notify(self): + waiters = self._waiters + self._waiters = [] + for waiter in waiters: + waiter.set_result(None) + + def _wait(self): + waiter = self._loop.create_future() + self._waiters.append(waiter) + return waiter + + @coroutine + def read(self): + while self._value is self._empty: + yield self._wait() + + value = self._value + self._value = self._empty + self._notify() + coroutine_return(value) + + @coroutine + def write(self, value): + while self._value is not self._empty: + yield self._wait() + + self._value = value + self._notify() + + @coroutine + def writer_coroutine(cubby, values, sentinel): + for value in values: + yield cubby.write(value) + yield cubby.write(sentinel) + + @coroutine + def reader_coroutine(cubby, sentinel): + results = [] + while True: + result = yield cubby.read() + if result == sentinel: + break + results.append(result) + coroutine_return(results) + + loop = asyncio.get_event_loop() + cubby = Cubby(loop) + values = list(range(3)) + writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop) + reader = asyncio.ensure_future(reader_coroutine(cubby, None), loop=loop) + loop.run_until_complete(asyncio.wait([writer, reader])) + + self.assertEqual(reader.result(), values) diff --git a/pym/portage/util/futures/compat_coroutine.py b/pym/portage/util/futures/compat_coroutine.py new file mode 100644 index 0000000000..32909f4b4c --- /dev/null +++ b/pym/portage/util/futures/compat_coroutine.py @@ -0,0 +1,96 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from portage.util.futures import asyncio +import functools + + +def coroutine(generator_func): + """ + A decorator for a generator function that behaves as coroutine function. + The generator should yield a Future instance in order to wait for it, + and the result becomes the result of the current yield-expression, + via the PEP 342 generator send() method. + + The decorated function returns a Future which is done when the generator + is exhausted. The generator can return a value via the coroutine_return + function. + """ + # Note that functools.partial does not work for decoration of + # methods, since it doesn't implement the descriptor protocol. + # This problem is solve by defining a wrapper function. + @functools.wraps(generator_func) + def wrapped(*args, **kwargs): + return _generator_future(generator_func, *args, **kwargs) + return wrapped + + +def coroutine_return(result=None): + """ + Return a result from the current coroutine. + """ + raise _CoroutineReturnValue(result) + + +def _generator_future(generator_func, *args, **kwargs): + """ + Call generator_func with the given arguments, and return a Future + that is done when the resulting generation is exhausted. If is a + keyword argument named 'loop' is given, then it is used instead of + the default event loop. + """ + loop = asyncio._wrap_loop(kwargs.get('loop')) + result = loop.create_future() + _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop) + return result + + +class _CoroutineReturnValue(Exception): + def __init__(self, result): + self.result = result + + +class _GeneratorTask(object): + """ + Asynchronously executes the generator to completion, waiting for + the result of each Future that it yields, and sending the result + to the generator. + """ + def __init__(self, generator, result, loop): + self._generator = generator + self._result = result + self._loop = loop + result.add_done_callback(self._cancel_callback) + loop.call_soon(self._next) + + def _cancel_callback(self, result): + if result.cancelled(): + self._generator.close() + + def _next(self, previous=None): + if self._result.cancelled(): + return + try: + if previous is None: + future = next(self._generator) + elif previous.cancelled(): + self._generator.throw(asyncio.CancelledError()) + future = next(self._generator) + elif previous.exception() is None: + future = self._generator.send(previous.result()) + else: + self._generator.throw(previous.exception()) + future = next(self._generator) + + except _CoroutineReturnValue as e: + if not self._result.cancelled(): + self._result.set_result(e.result) + except StopIteration: + if not self._result.cancelled(): + self._result.set_result(None) + except Exception as e: + if not self._result.cancelled(): + self._result.set_exception(e) + else: + future = asyncio.ensure_future(future, loop=self._loop) + future.add_done_callback(self._next) -- 2.13.6