commit:     142d08c0636b172fbc00a7f2b10dc07479a57e2d
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Mar  4 20:10:55 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Apr 11 01:44:34 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=142d08c0

Add minimal asyncio.AbstractEventLoop implementation (bug 649588)

This provides minimal interoperability with existing asyncio code,
by adding a portage.util.futures.unix_events.DefaultEventLoopPolicy
class that makes asyncio use portage's internal event loop when an
instance is passed into asyncio.set_event_loop_policy(). The
get_event_loop() method of this policy returns an instance of a
_PortageEventLoop class that wraps portage's internal event loop and
implements asyncio's AbstractEventLoop interface.

The portage.util.futures.asyncio module refers to the real
asyncio module when available, and otherwise falls back to a
minimal implementation that works with python2.7. The included
EventLoopInForkTestCase demonstrates usage, and works with all
supported versions of python, include python2.7.

In python3.4 and later, API consumers can use asyncio coroutines,
since _PortageEventLoop is compatible with asyncio.Task!

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio}/__init__.py              |   0
 .../util/futures/asyncio/__test__.py}              |   0
 .../futures/asyncio/test_event_loop_in_fork.py     |  59 +++++++
 pym/portage/util/_eventloop/EventLoop.py           |  11 +-
 pym/portage/util/futures/__init__.py               |  11 ++
 pym/portage/util/futures/_asyncio.py               | 114 ++++++++++++
 pym/portage/util/futures/events.py                 | 191 +++++++++++++++++++++
 pym/portage/util/futures/futures.py                |   9 +-
 pym/portage/util/futures/unix_events.py            |  91 ++++++++++
 9 files changed, 477 insertions(+), 9 deletions(-)

diff --git a/pym/portage/util/futures/__init__.py 
b/pym/portage/tests/util/futures/asyncio/__init__.py
similarity index 100%
copy from pym/portage/util/futures/__init__.py
copy to pym/portage/tests/util/futures/asyncio/__init__.py

diff --git a/pym/portage/util/futures/__init__.py 
b/pym/portage/tests/util/futures/asyncio/__test__.py
similarity index 100%
copy from pym/portage/util/futures/__init__.py
copy to pym/portage/tests/util/futures/asyncio/__test__.py

diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py 
b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
new file mode 100644
index 000000000..7868d792a
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
@@ -0,0 +1,59 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import multiprocessing
+import os
+
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+def fork_main(parent_conn, child_conn):
+       parent_conn.close()
+       loop = asyncio.get_event_loop()
+       # This fails with python's default event loop policy,
+       # see https://bugs.python.org/issue22087.
+       loop.run_until_complete(asyncio.sleep(0.1))
+
+
+def async_main(fork_exitcode, loop=None):
+       loop = loop or asyncio.get_event_loop()
+
+       # Since python2.7 does not support Process.sentinel, use Pipe to
+       # monitor for process exit.
+       parent_conn, child_conn = multiprocessing.Pipe()
+
+       def eof_callback(proc):
+               loop.remove_reader(parent_conn.fileno())
+               parent_conn.close()
+               proc.join()
+               fork_exitcode.set_result(proc.exitcode)
+
+       proc = multiprocessing.Process(target=fork_main, args=(parent_conn, 
child_conn))
+       loop.add_reader(parent_conn.fileno(), eof_callback, proc)
+       proc.start()
+       child_conn.close()
+
+
+class EventLoopInForkTestCase(TestCase):
+       """
+       The default asyncio event loop policy does not support loops
+       running in forks, see https://bugs.python.org/issue22087.
+       Portage's DefaultEventLoopPolicy supports forks.
+       """
+
+       def testEventLoopInForkTestCase(self):
+               initial_policy = asyncio.get_event_loop_policy()
+               if not isinstance(initial_policy, DefaultEventLoopPolicy):
+                       asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+               try:
+                       loop = asyncio.get_event_loop()
+                       fork_exitcode = loop.create_future()
+                       # Make async_main fork while the loop is running, which 
would
+                       # trigger https://bugs.python.org/issue22087 with 
asyncio's
+                       # default event loop policy.
+                       loop.call_soon(async_main, fork_exitcode)
+                       assert loop.run_until_complete(fork_exitcode) == 
os.EX_OK
+               finally:
+                       asyncio.set_event_loop_policy(initial_policy)

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index 72eb407fc..d53a76ba1 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -23,8 +23,9 @@ except ImportError:
 
 import portage
 portage.proxy.lazyimport.lazyimport(globals(),
-       'portage.util.futures.futures:_EventLoopFuture',
+       'portage.util.futures.futures:Future',
        'portage.util.futures.executor.fork:ForkExecutor',
+       'portage.util.futures.unix_events:_PortageEventLoop',
 )
 
 from portage import OrderedDict
@@ -188,15 +189,13 @@ class EventLoop(object):
                self._sigchld_write = None
                self._sigchld_src_id = None
                self._pid = os.getpid()
+               self._asyncio_wrapper = _PortageEventLoop(loop=self)
 
        def create_future(self):
                """
-               Create a Future object attached to the loop. This returns
-               an instance of _EventLoopFuture, because EventLoop is currently
-               missing some of the asyncio.AbstractEventLoop methods that
-               asyncio.Future requires.
+               Create a Future object attached to the loop.
                """
-               return _EventLoopFuture(loop=self)
+               return Future(loop=self._asyncio_wrapper)
 
        def _new_source_id(self):
                """

diff --git a/pym/portage/util/futures/__init__.py 
b/pym/portage/util/futures/__init__.py
index e69de29bb..0a5c103e6 100644
--- a/pym/portage/util/futures/__init__.py
+++ b/pym/portage/util/futures/__init__.py
@@ -0,0 +1,11 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+       'asyncio',
+)
+
+try:
+       import asyncio
+except ImportError:
+       from portage.util.futures import _asyncio as asyncio

diff --git a/pym/portage/util/futures/_asyncio.py 
b/pym/portage/util/futures/_asyncio.py
new file mode 100644
index 000000000..02ab59999
--- /dev/null
+++ b/pym/portage/util/futures/_asyncio.py
@@ -0,0 +1,114 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+       'ensure_future',
+       'get_event_loop',
+       'get_event_loop_policy',
+       'set_event_loop_policy',
+       'sleep',
+       'Task',
+)
+
+try:
+       import threading
+except ImportError:
+       import dummy_threading as threading
+
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+       'portage.util.futures.unix_events:DefaultEventLoopPolicy',
+)
+from portage.util.futures.futures import Future
+
+_lock = threading.Lock()
+_policy = None
+
+
+def get_event_loop_policy():
+       """
+       Get the current event loop policy.
+
+       @rtype: asyncio.AbstractEventLoopPolicy (or compatible)
+       @return: the current event loop policy
+       """
+       global _lock, _policy
+       with _lock:
+               if _policy is None:
+                       _policy = DefaultEventLoopPolicy()
+               return _policy
+
+
+def set_event_loop_policy(policy):
+       """
+       Set the current event loop policy. If policy is None, the default
+       policy is restored.
+
+       @type policy: asyncio.AbstractEventLoopPolicy or None
+       @param policy: new event loop policy
+       """
+       global _lock, _policy
+       with _lock:
+               _policy = policy or DefaultEventLoopPolicy()
+
+
+def get_event_loop():
+       """
+       Equivalent to calling get_event_loop_policy().get_event_loop().
+
+       @rtype: asyncio.AbstractEventLoop (or compatible)
+       @return: the event loop for the current context
+       """
+       return get_event_loop_policy().get_event_loop()
+
+
+class Task(Future):
+       """
+       Schedule the execution of a coroutine: wrap it in a future. A task
+       is a subclass of Future.
+       """
+       def __init__(self, coro, loop=None):
+               raise NotImplementedError
+
+
+def ensure_future(coro_or_future, loop=None):
+       """
+       Wrap a coroutine or an awaitable in a future.
+
+       If the argument is a Future, it is returned directly.
+
+       @type coro_or_future: coroutine or Future
+       @param coro_or_future: coroutine or future to wrap
+       @type loop: asyncio.AbstractEventLoop (or compatible)
+       @param loop: event loop
+       @rtype: asyncio.Future (or compatible)
+       @return: an instance of Future
+       """
+       if isinstance(coro_or_future, Future):
+               return coro_or_future
+       raise NotImplementedError
+
+
+def sleep(delay, result=None, loop=None):
+       """
+       Create a future that completes after a given time (in seconds). If
+       result is provided, it is produced to the caller when the future
+       completes.
+
+       @type delay: int or float
+       @param delay: delay seconds
+       @type result: object
+       @param result: result of the future
+       @type loop: asyncio.AbstractEventLoop (or compatible)
+       @param loop: event loop
+       @rtype: asyncio.Future (or compatible)
+       @return: an instance of Future
+       """
+       loop = loop or get_event_loop()
+       future = loop.create_future()
+       handle = loop.call_later(delay, future.set_result, result)
+       def cancel_callback(future):
+               if future.cancelled():
+                       handle.cancel()
+       future.add_done_callback(cancel_callback)
+       return future

diff --git a/pym/portage/util/futures/events.py 
b/pym/portage/util/futures/events.py
new file mode 100644
index 000000000..b772bc242
--- /dev/null
+++ b/pym/portage/util/futures/events.py
@@ -0,0 +1,191 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+       'AbstractEventLoopPolicy',
+       'AbstractEventLoop',
+)
+
+import socket
+import subprocess
+
+try:
+       from asyncio.events import (
+               AbstractEventLoop as _AbstractEventLoop,
+               AbstractEventLoopPolicy as _AbstractEventLoopPolicy,
+       )
+except ImportError:
+       _AbstractEventLoop = object
+       _AbstractEventLoopPolicy = object
+
+
+class AbstractEventLoopPolicy(_AbstractEventLoopPolicy):
+    """Abstract policy for accessing the event loop."""
+
+    def get_event_loop(self):
+        raise NotImplementedError
+
+    def set_event_loop(self, loop):
+        raise NotImplementedError
+
+    def new_event_loop(self):
+        raise NotImplementedError
+
+    def get_child_watcher(self):
+        raise NotImplementedError
+
+    def set_child_watcher(self, watcher):
+        raise NotImplementedError
+
+
+class AbstractEventLoop(_AbstractEventLoop):
+       """Abstract event loop."""
+
+       def run_forever(self):
+               raise NotImplementedError
+
+       def run_until_complete(self, future):
+               raise NotImplementedError
+
+       def stop(self):
+               raise NotImplementedError
+
+       def is_running(self):
+               raise NotImplementedError
+
+       def is_closed(self):
+               raise NotImplementedError
+
+       def close(self):
+               raise NotImplementedError
+
+       def shutdown_asyncgens(self):
+               raise NotImplementedError
+
+       def _timer_handle_cancelled(self, handle):
+               raise NotImplementedError
+
+       def call_soon(self, callback, *args):
+               return self.call_later(0, callback, *args)
+
+       def call_later(self, delay, callback, *args):
+               raise NotImplementedError
+
+       def call_at(self, when, callback, *args):
+               raise NotImplementedError
+
+       def time(self):
+               raise NotImplementedError
+
+       def create_future(self):
+               raise NotImplementedError
+
+       def create_task(self, coro):
+               raise NotImplementedError
+
+       def call_soon_threadsafe(self, callback, *args):
+               raise NotImplementedError
+
+       def run_in_executor(self, executor, func, *args):
+               raise NotImplementedError
+
+       def set_default_executor(self, executor):
+               raise NotImplementedError
+
+       def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
+               raise NotImplementedError
+
+       def getnameinfo(self, sockaddr, flags=0):
+               raise NotImplementedError
+
+       def create_connection(self, protocol_factory, host=None, port=None,
+                                                 ssl=None, family=0, proto=0, 
flags=0, sock=None,
+                                                 local_addr=None, 
server_hostname=None):
+               raise NotImplementedError
+
+       def create_server(self, protocol_factory, host=None, port=None,
+                                         family=socket.AF_UNSPEC, 
flags=socket.AI_PASSIVE,
+                                         sock=None, backlog=100, ssl=None, 
reuse_address=None,
+                                         reuse_port=None):
+               raise NotImplementedError
+
+       def create_unix_connection(self, protocol_factory, path,
+                                                          ssl=None, sock=None,
+                                                          
server_hostname=None):
+               raise NotImplementedError
+
+       def create_unix_server(self, protocol_factory, path,
+                                                  sock=None, backlog=100, 
ssl=None):
+               raise NotImplementedError
+
+       def create_datagram_endpoint(self, protocol_factory,
+                                                                
local_addr=None, remote_addr=None,
+                                                                family=0, 
proto=0, flags=0,
+                                                                
reuse_address=None, reuse_port=None,
+                                                                
allow_broadcast=None, sock=None):
+               raise NotImplementedError
+
+       def connect_read_pipe(self, protocol_factory, pipe):
+               raise NotImplementedError
+
+       def connect_write_pipe(self, protocol_factory, pipe):
+               raise NotImplementedError
+
+       def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
+                                                stdout=subprocess.PIPE, 
stderr=subprocess.PIPE,
+                                                **kwargs):
+               raise NotImplementedError
+
+       def subprocess_exec(self, protocol_factory, *args, **kwargs):
+               for k in ('stdin', 'stdout', 'stderr'):
+                       kwargs.setdefault(k, subprocess.PIPE)
+               raise NotImplementedError
+
+       def add_writer(self, fd, callback, *args):
+               raise NotImplementedError
+
+       def remove_writer(self, fd):
+               raise NotImplementedError
+
+       def sock_recv(self, sock, nbytes):
+               raise NotImplementedError
+
+       def sock_sendall(self, sock, data):
+               raise NotImplementedError
+
+       def sock_connect(self, sock, address):
+               raise NotImplementedError
+
+       def sock_accept(self, sock):
+               raise NotImplementedError
+
+       def add_signal_handler(self, sig, callback, *args):
+               raise NotImplementedError
+
+       def remove_signal_handler(self, sig):
+               raise NotImplementedError
+
+       def set_task_factory(self, factory):
+               raise NotImplementedError
+
+       def get_task_factory(self):
+               raise NotImplementedError
+
+       def get_exception_handler(self):
+               raise NotImplementedError
+
+       def set_exception_handler(self, handler):
+               raise NotImplementedError
+
+       def default_exception_handler(self, context):
+               raise NotImplementedError
+
+       def call_exception_handler(self, context):
+               raise NotImplementedError
+
+       def get_debug(self):
+               raise NotImplementedError
+
+       def set_debug(self, enabled):
+               raise NotImplementedError
+

diff --git a/pym/portage/util/futures/futures.py 
b/pym/portage/util/futures/futures.py
index cd56a27eb..14ff07dee 100644
--- a/pym/portage/util/futures/futures.py
+++ b/pym/portage/util/futures/futures.py
@@ -1,4 +1,4 @@
-# Copyright 2016 Gentoo Foundation
+# Copyright 2016-2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 #
 # For compatibility with python versions which do not have the
@@ -41,7 +41,10 @@ except ImportError:
 
        Future = None
 
-from portage.util._eventloop.global_event_loop import global_event_loop
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+       
'portage.util._eventloop.global_event_loop:global_event_loop@_global_event_loop',
+)
 
 _PENDING = 'PENDING'
 _CANCELLED = 'CANCELLED'
@@ -69,7 +72,7 @@ class _EventLoopFuture(object):
                the default event loop.
                """
                if loop is None:
-                       self._loop = global_event_loop()
+                       self._loop = _global_event_loop()._asyncio_wrapper
                else:
                        self._loop = loop
                self._callbacks = []

diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
new file mode 100644
index 000000000..ed4c6e519
--- /dev/null
+++ b/pym/portage/util/futures/unix_events.py
@@ -0,0 +1,91 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+       'DefaultEventLoopPolicy',
+)
+
+from portage.util._eventloop.global_event_loop import (
+       global_event_loop as _global_event_loop,
+)
+from portage.util.futures import (
+       asyncio,
+       events,
+)
+from portage.util.futures.futures import Future
+
+
+class _PortageEventLoop(events.AbstractEventLoop):
+       """
+       Implementation of asyncio.AbstractEventLoop which wraps portage's
+       internal event loop.
+       """
+
+       def __init__(self, loop):
+               """
+               @type loop: EventLoop
+               @param loop: an instance of portage's internal event loop
+               """
+               self._loop = loop
+               self.call_soon = loop.call_soon
+               self.call_soon_threadsafe = loop.call_soon_threadsafe
+               self.call_later = loop.call_later
+               self.call_at = loop.call_at
+               self.is_closed = loop.is_closed
+               self.close = loop.close
+               self.create_future = loop.create_future
+               self.add_reader = loop.add_reader
+               self.remove_reader = loop.remove_reader
+               self.add_writer = loop.add_writer
+               self.remove_writer = loop.remove_writer
+               self.run_in_executor = loop.run_in_executor
+               self.time = loop.time
+               self.set_debug = loop.set_debug
+               self.get_debug = loop.get_debug
+
+       def run_until_complete(self, future):
+               """
+               Run the event loop until a Future is done.
+
+               @type future: asyncio.Future
+               @param future: a Future to wait for
+               @rtype: object
+               @return: the Future's result
+               @raise: the Future's exception
+               """
+               return self._loop.run_until_complete(
+                       asyncio.ensure_future(future, loop=self))
+
+       def create_task(self, coro):
+               """
+               Schedule a coroutine object.
+
+               @type coro: coroutine
+               @param coro: a coroutine to schedule
+               @rtype: asyncio.Task
+               @return: a task object
+               """
+               return asyncio.Task(coro, loop=self)
+
+
+class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
+       """
+       Implementation of asyncio.AbstractEventLoopPolicy based on portage's
+       internal event loop. This supports running event loops in forks,
+       which is not supported by the default asyncio event loop policy,
+       see https://bugs.python.org/issue22087.
+       """
+       def get_event_loop(self):
+               """
+               Get the event loop for the current context.
+
+               Returns an event loop object implementing the AbstractEventLoop
+               interface.
+
+               @rtype: asyncio.AbstractEventLoop (or compatible)
+               @return: the current event loop policy
+               """
+               return _global_event_loop()._asyncio_wrapper
+
+
+DefaultEventLoopPolicy = _PortageEventLoopPolicy

Reply via email to