For readability, it's desirable to make asynchronous code use
coroutines to avoid callbacks when possible. For python2 compatibility,
generators that yield Futures can be used to implement coroutines.

Add a compat_coroutine module which provides a @coroutine decorator
and a coroutine_return function that can be used to return a value
from a generator. The decorated function returns a Future which is
done when the generator is exhausted. Usage is very similar to asyncio
coroutine usage in python3.4 (see unit tests).

Bug: https://bugs.gentoo.org/660426
---
[PATCH v2] fixed to support decoration of object methods, and added
a unit test using this support to demonstrate interaction between
multiple coroutines

 .../tests/util/futures/test_compat_coroutine.py    | 122 +++++++++++++++++++++
 pym/portage/util/futures/compat_coroutine.py       |  96 ++++++++++++++++
 2 files changed, 218 insertions(+)
 create mode 100644 pym/portage/tests/util/futures/test_compat_coroutine.py
 create mode 100644 pym/portage/util/futures/compat_coroutine.py

diff --git a/pym/portage/tests/util/futures/test_compat_coroutine.py 
b/pym/portage/tests/util/futures/test_compat_coroutine.py
new file mode 100644
index 0000000000..f9de409ae4
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_compat_coroutine.py
@@ -0,0 +1,122 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import (
+       coroutine,
+       coroutine_return,
+)
+from portage.tests import TestCase
+
+
+class CompatCoroutineTestCase(TestCase):
+
+       def test_returning_coroutine(self):
+               @coroutine
+               def returning_coroutine():
+                       coroutine_return('success')
+                       yield None
+
+               self.assertEqual('success',
+                       
asyncio.get_event_loop().run_until_complete(returning_coroutine()))
+
+       def test_raising_coroutine(self):
+
+               class TestException(Exception):
+                       pass
+
+               @coroutine
+               def raising_coroutine():
+                       raise TestException('exception')
+                       yield None
+
+               self.assertRaises(TestException,
+                       asyncio.get_event_loop().run_until_complete, 
raising_coroutine())
+
+       def test_cancelled_coroutine(self):
+
+               @coroutine
+               def endlessly_sleeping_coroutine(loop=None):
+                       loop = asyncio._wrap_loop(loop)
+                       yield loop.create_future()
+
+               loop = asyncio.get_event_loop()
+               future = endlessly_sleeping_coroutine(loop=loop)
+               loop.call_soon(future.cancel)
+
+               self.assertRaises(asyncio.CancelledError,
+                       loop.run_until_complete, future)
+
+       def test_sleeping_coroutine(self):
+               @coroutine
+               def sleeping_coroutine():
+                       for i in range(3):
+                               x = yield asyncio.sleep(0, result=i)
+                               self.assertEqual(x, i)
+
+               
asyncio.get_event_loop().run_until_complete(sleeping_coroutine())
+
+       def test_method_coroutine(self):
+
+               class Cubby(object):
+
+                       _empty = object()
+
+                       def __init__(self, loop):
+                               self._loop = loop
+                               self._value = self._empty
+                               self._waiters = []
+
+                       def _notify(self):
+                               waiters = self._waiters
+                               self._waiters = []
+                               for waiter in waiters:
+                                       waiter.set_result(None)
+
+                       def _wait(self):
+                               waiter = self._loop.create_future()
+                               self._waiters.append(waiter)
+                               return waiter
+
+                       @coroutine
+                       def read(self):
+                               while self._value is self._empty:
+                                       yield self._wait()
+
+                               value = self._value
+                               self._value = self._empty
+                               self._notify()
+                               coroutine_return(value)
+
+                       @coroutine
+                       def write(self, value):
+                               while self._value is not self._empty:
+                                       yield self._wait()
+
+                               self._value = value
+                               self._notify()
+
+               @coroutine
+               def writer_coroutine(cubby, values, sentinel):
+                       for value in values:
+                               yield cubby.write(value)
+                       yield cubby.write(sentinel)
+
+               @coroutine
+               def reader_coroutine(cubby, sentinel):
+                       results = []
+                       while True:
+                               result = yield cubby.read()
+                               if result == sentinel:
+                                       break
+                               results.append(result)
+                       coroutine_return(results)
+
+               loop = asyncio.get_event_loop()
+               cubby = Cubby(loop)
+               values = list(range(3))
+               writer = asyncio.ensure_future(writer_coroutine(cubby, values, 
None), loop=loop)
+               reader = asyncio.ensure_future(reader_coroutine(cubby, None), 
loop=loop)
+               loop.run_until_complete(asyncio.wait([writer, reader]))
+
+               self.assertEqual(reader.result(), values)
diff --git a/pym/portage/util/futures/compat_coroutine.py 
b/pym/portage/util/futures/compat_coroutine.py
new file mode 100644
index 0000000000..32909f4b4c
--- /dev/null
+++ b/pym/portage/util/futures/compat_coroutine.py
@@ -0,0 +1,96 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+import functools
+
+
+def coroutine(generator_func):
+       """
+       A decorator for a generator function that behaves as coroutine function.
+       The generator should yield a Future instance in order to wait for it,
+       and the result becomes the result of the current yield-expression,
+       via the PEP 342 generator send() method.
+
+       The decorated function returns a Future which is done when the generator
+       is exhausted. The generator can return a value via the coroutine_return
+       function.
+       """
+       # Note that functools.partial does not work for decoration of
+       # methods, since it doesn't implement the descriptor protocol.
+       # This problem is solve by defining a wrapper function.
+       @functools.wraps(generator_func)
+       def wrapped(*args, **kwargs):
+               return _generator_future(generator_func, *args, **kwargs)
+       return wrapped
+
+
+def coroutine_return(result=None):
+       """
+       Return a result from the current coroutine.
+       """
+       raise _CoroutineReturnValue(result)
+
+
+def _generator_future(generator_func, *args, **kwargs):
+       """
+       Call generator_func with the given arguments, and return a Future
+       that is done when the resulting generation is exhausted. If is a
+       keyword argument named 'loop' is given, then it is used instead of
+       the default event loop.
+       """
+       loop = asyncio._wrap_loop(kwargs.get('loop'))
+       result = loop.create_future()
+       _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop)
+       return result
+
+
+class _CoroutineReturnValue(Exception):
+       def __init__(self, result):
+               self.result = result
+
+
+class _GeneratorTask(object):
+       """
+       Asynchronously executes the generator to completion, waiting for
+       the result of each Future that it yields, and sending the result
+       to the generator.
+       """
+       def __init__(self, generator, result, loop):
+               self._generator = generator
+               self._result = result
+               self._loop = loop
+               result.add_done_callback(self._cancel_callback)
+               loop.call_soon(self._next)
+
+       def _cancel_callback(self, result):
+               if result.cancelled():
+                       self._generator.close()
+
+       def _next(self, previous=None):
+               if self._result.cancelled():
+                       return
+               try:
+                       if previous is None:
+                               future = next(self._generator)
+                       elif previous.cancelled():
+                               self._generator.throw(asyncio.CancelledError())
+                               future = next(self._generator)
+                       elif previous.exception() is None:
+                               future = self._generator.send(previous.result())
+                       else:
+                               self._generator.throw(previous.exception())
+                               future = next(self._generator)
+
+               except _CoroutineReturnValue as e:
+                       if not self._result.cancelled():
+                               self._result.set_result(e.result)
+               except StopIteration:
+                       if not self._result.cancelled():
+                               self._result.set_result(None)
+               except Exception as e:
+                       if not self._result.cancelled():
+                               self._result.set_exception(e)
+               else:
+                       future = asyncio.ensure_future(future, loop=self._loop)
+                       future.add_done_callback(self._next)
-- 
2.13.6


Reply via email to