The asyncio.create_subprocess_exec function is essential for
using subprocesses in coroutines, so add support to do this
for python2. This paves the way for extensive use of coroutines
in portage, since coroutines are well-suited for many portage
tasks that involve subprocesses.

Bug: https://bugs.gentoo.org/662388
---
 .../util/futures/asyncio/test_subprocess_exec.py   | 116 +++++++++------------
 lib/portage/util/futures/_asyncio/__init__.py      |  52 +++++++++
 lib/portage/util/futures/_asyncio/process.py       |  82 +++++++++++++++
 lib/portage/util/futures/_asyncio/streams.py       |  51 +++++++++
 4 files changed, 237 insertions(+), 64 deletions(-)
 create mode 100644 lib/portage/util/futures/_asyncio/process.py
 create mode 100644 lib/portage/util/futures/_asyncio/streams.py

diff --git a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index 5a812ba6a..15adfbfd6 100644
--- a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -3,61 +3,17 @@
 
 import os
 import subprocess
-
-try:
-       from asyncio import create_subprocess_exec
-except ImportError:
-       create_subprocess_exec = None
+import sys
 
 from portage.process import find_binary
 from portage.tests import TestCase
 from portage.util._eventloop.global_event_loop import global_event_loop
 from portage.util.futures import asyncio
+from portage.util.futures._asyncio import create_subprocess_exec
+from portage.util.futures._asyncio.streams import _reader as reader
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from portage.util.futures.executor.fork import ForkExecutor
 from portage.util.futures.unix_events import DefaultEventLoopPolicy
-from _emerge.PipeReader import PipeReader
-
-
-def reader(input_file, loop=None):
-       """
-       Asynchronously read a binary input file.
-
-       @param input_file: binary input file
-       @type input_file: file
-       @param loop: event loop
-       @type loop: EventLoop
-       @return: bytes
-       @rtype: asyncio.Future (or compatible)
-       """
-       loop = asyncio._wrap_loop(loop)
-       future = loop.create_future()
-       _Reader(future, input_file, loop)
-       return future
-
-
-class _Reader(object):
-       def __init__(self, future, input_file, loop):
-               self._future = future
-               self._pipe_reader = PipeReader(
-                       input_files={'input_file':input_file}, scheduler=loop)
-
-               self._future.add_done_callback(self._cancel_callback)
-               self._pipe_reader.addExitListener(self._eof)
-               self._pipe_reader.start()
-
-       def _cancel_callback(self, future):
-               if future.cancelled():
-                       self._cancel()
-
-       def _eof(self, pipe_reader):
-               self._pipe_reader = None
-               self._future.set_result(pipe_reader.getvalue())
-
-       def _cancel(self):
-               if self._pipe_reader is not None and self._pipe_reader.poll() 
is None:
-                       self._pipe_reader.removeExitListener(self._eof)
-                       self._pipe_reader.cancel()
-                       self._pipe_reader = None
 
 
 class SubprocessExecTestCase(TestCase):
@@ -76,9 +32,6 @@ class SubprocessExecTestCase(TestCase):
                                
self.assertFalse(global_event_loop().is_closed())
 
        def testEcho(self):
-               if create_subprocess_exec is None:
-                       self.skipTest('create_subprocess_exec not implemented 
for python2')
-
                args_tuple = (b'hello', b'world')
                echo_binary = find_binary("echo")
                self.assertNotEqual(echo_binary, None)
@@ -98,7 +51,8 @@ class SubprocessExecTestCase(TestCase):
                                        proc = loop.run_until_complete(
                                                create_subprocess_exec(
                                                echo_binary, *args_tuple,
-                                               stdin=devnull, 
stdout=stdout_pw, stderr=stdout_pw))
+                                               stdin=devnull, 
stdout=stdout_pw, stderr=stdout_pw,
+                                               loop=loop))
 
                                # This belongs exclusively to the subprocess 
now.
                                stdout_pw.close()
@@ -110,6 +64,41 @@ class SubprocessExecTestCase(TestCase):
                                        loop.run_until_complete(proc.wait()), 
os.EX_OK)
                                self.assertEqual(
                                        
tuple(loop.run_until_complete(output).split()), args_tuple)
+
+                               @coroutine
+                               def test_coroutine(loop=None):
+                                       proc = (yield 
create_subprocess_exec(echo_binary, *args_tuple,
+                                                       stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT,
+                                                       loop=loop))
+
+                                       out, err = (yield proc.communicate())
+                                       self.assertEqual(tuple(out.split()), 
args_tuple)
+                                       self.assertEqual(proc.returncode, 
os.EX_OK)
+
+                                       proc = (yield create_subprocess_exec(
+                                                       'bash', '-c', 'echo 
foo; echo bar 1>&2;',
+                                                       stdout=subprocess.PIPE, 
stderr=subprocess.PIPE,
+                                                       loop=loop))
+
+                                       out, err = (yield proc.communicate())
+                                       self.assertEqual(out, b'foo\n')
+                                       self.assertEqual(err, b'bar\n')
+                                       self.assertEqual(proc.returncode, 
os.EX_OK)
+
+                                       proc = (yield create_subprocess_exec(
+                                                       'bash', '-c', 'echo 
foo; echo bar 1>&2;',
+                                                       stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT,
+                                                       loop=loop))
+
+                                       out, err = (yield proc.communicate())
+                                       self.assertEqual(out, b'foo\nbar\n')
+                                       self.assertEqual(err, None)
+                                       self.assertEqual(proc.returncode, 
os.EX_OK)
+
+                                       coroutine_return('success')
+
+                               self.assertEqual('success',
+                                       
loop.run_until_complete(test_coroutine(loop=loop)))
                        finally:
                                if output is not None and not output.done():
                                        output.cancel()
@@ -119,9 +108,6 @@ class SubprocessExecTestCase(TestCase):
                self._run_test(test)
 
        def testCat(self):
-               if create_subprocess_exec is None:
-                       self.skipTest('create_subprocess_exec not implemented 
for python2')
-
                stdin_data = b'hello world'
                cat_binary = find_binary("cat")
                self.assertNotEqual(cat_binary, None)
@@ -143,9 +129,9 @@ class SubprocessExecTestCase(TestCase):
                        output = None
                        try:
                                proc = loop.run_until_complete(
-                                       create_subprocess_exec(
-                                       cat_binary,
-                                       stdin=stdin_pr, stdout=stdout_pw, 
stderr=stdout_pw))
+                                       create_subprocess_exec(cat_binary,
+                                       stdin=stdin_pr, stdout=stdout_pw, 
stderr=stdout_pw,
+                                       loop=loop))
 
                                # These belong exclusively to the subprocess 
now.
                                stdout_pw.close()
@@ -178,8 +164,8 @@ class SubprocessExecTestCase(TestCase):
                requires an AbstractEventLoop.connect_read_pipe implementation
                (and a ReadTransport implementation for it to return).
                """
-               if create_subprocess_exec is None:
-                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+               if sys.version_info.major < 3:
+                       self.skipTest('ReadTransport not implemented for 
python2')
 
                args_tuple = (b'hello', b'world')
                echo_binary = find_binary("echo")
@@ -192,7 +178,8 @@ class SubprocessExecTestCase(TestCase):
                                        create_subprocess_exec(
                                        echo_binary, *args_tuple,
                                        stdin=devnull,
-                                       stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT))
+                                       stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT,
+                                       loop=loop))
 
                        self.assertEqual(
                                
tuple(loop.run_until_complete(proc.stdout.read()).split()),
@@ -207,8 +194,8 @@ class SubprocessExecTestCase(TestCase):
                requires an AbstractEventLoop.connect_write_pipe implementation
                (and a WriteTransport implementation for it to return).
                """
-               if create_subprocess_exec is None:
-                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+               if sys.version_info.major < 3:
+                       self.skipTest('WriteTransport not implemented for 
python2')
 
                stdin_data = b'hello world'
                cat_binary = find_binary("cat")
@@ -220,7 +207,8 @@ class SubprocessExecTestCase(TestCase):
                                create_subprocess_exec(
                                cat_binary,
                                stdin=subprocess.PIPE,
-                               stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT))
+                               stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT,
+                               loop=loop))
 
                        # This buffers data when necessary to avoid blocking.
                        proc.stdin.write(stdin_data)
diff --git a/lib/portage/util/futures/_asyncio/__init__.py 
b/lib/portage/util/futures/_asyncio/__init__.py
index acfd59396..e3f678086 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -20,6 +20,9 @@ __all__ = (
        'wait',
 )
 
+import subprocess
+import sys
+
 try:
        import asyncio as _real_asyncio
 except ImportError:
@@ -45,6 +48,7 @@ from portage.util.futures.futures import (
        InvalidStateError,
        TimeoutError,
 )
+from portage.util.futures._asyncio.process import _Process
 from portage.util.futures._asyncio.tasks import (
        ALL_COMPLETED,
        FIRST_COMPLETED,
@@ -105,6 +109,48 @@ def set_child_watcher(watcher):
     return get_event_loop_policy().set_child_watcher(watcher)
 
 
+def create_subprocess_exec(*args, **kwargs):
+       """
+       Create a subprocess.
+
+       @param args: program and arguments
+       @type args: str
+       @param stdin: stdin file descriptor
+       @type stdin: file or int
+       @param stdout: stdout file descriptor
+       @type stdout: file or int
+       @param stderr: stderr file descriptor
+       @type stderr: file or int
+       @param close_fds: close file descriptors
+       @type close_fds: bool
+       @param loop: asyncio.AbstractEventLoop (or compatible)
+       @type loop: event loop
+       @type kwargs: varies
+       @param kwargs: subprocess.Popen parameters
+       @rtype: asyncio.Future (or compatible)
+       @return: subset of asyncio.subprocess.Process interface
+       """
+       loop = _wrap_loop(kwargs.pop('loop', None))
+       if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop):
+               # Use the real asyncio loop and create_subprocess_exec.
+               return _real_asyncio.create_subprocess_exec(*args, 
loop=loop._loop, **kwargs)
+
+       if sys.version_info < (3, 4):
+               # Python 3.4 and later implement PEP 446, which makes newly
+               # created file descriptors non-inheritable by default.
+               kwargs.setdefault('close_fds', True)
+
+       result = loop.create_future()
+
+       result.set_result(_Process(subprocess.Popen(
+               args,
+               stdin=kwargs.pop('stdin', None),
+               stdout=kwargs.pop('stdout', None),
+               stderr=kwargs.pop('stderr', None), **kwargs), loop))
+
+       return result
+
+
 class Task(Future):
        """
        Schedule the execution of a coroutine: wrap it in a future. A task
@@ -127,6 +173,12 @@ def ensure_future(coro_or_future, loop=None):
        @rtype: asyncio.Future (or compatible)
        @return: an instance of Future
        """
+       loop = _wrap_loop(loop)
+       if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop):
+               # Use the real asyncio loop and ensure_future.
+               return _real_asyncio.ensure_future(
+                       coro_or_future, loop=loop._loop)
+
        if isinstance(coro_or_future, Future):
                return coro_or_future
        raise NotImplementedError
diff --git a/lib/portage/util/futures/_asyncio/process.py 
b/lib/portage/util/futures/_asyncio/process.py
new file mode 100644
index 000000000..3e448da9c
--- /dev/null
+++ b/lib/portage/util/futures/_asyncio/process.py
@@ -0,0 +1,82 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+       'portage.util.futures:asyncio',
+)
+from portage.util.futures._asyncio.streams import _reader
+
+
+class _Process(object):
+       """
+       Emulate a subset of the asyncio.subprocess.Process interface,
+       for python2.
+       """
+       def __init__(self, proc, loop):
+               """
+               @param proc: process instance
+               @type proc: subprocess.Popen
+               @param loop: asyncio.AbstractEventLoop (or compatible)
+               @type loop: event loop
+               """
+               self._proc = proc
+               self._loop = loop
+               self.terminate = proc.terminate
+               self.kill = proc.kill
+               self.send_signal = proc.send_signal
+               self.pid = proc.pid
+               self._waiters = []
+               loop._asyncio_child_watcher.\
+                       add_child_handler(self.pid, self._proc_exit)
+
+       @property
+       def returncode(self):
+               return self._proc.returncode
+
+       def communicate(self):
+               """
+               Read data from stdout and stderr, until end-of-file is reached.
+               Wait for process to terminate.
+
+               @return: tuple (stdout_data, stderr_data)
+               @rtype: asyncio.Future (or compatible)
+               """
+               futures = []
+               for input_file in (self._proc.stdout, self._proc.stderr):
+                       if input_file is None:
+                               future = self._loop.create_future()
+                               future.set_result(None)
+                       else:
+                               future = _reader(input_file, loop=self._loop)
+                       futures.append(future)
+
+               result = self._loop.create_future()
+               asyncio.ensure_future(asyncio.wait(futures + [self.wait()], 
loop=self._loop),
+                       loop=self._loop).add_done_callback(
+                       lambda waiter: None if result.cancelled() else
+                       result.set_result(tuple(future.result() for future in 
futures)))
+               return result
+
+       def wait(self):
+               """
+               Wait for child process to terminate. Set and return returncode 
attribute.
+
+               @return: returncode
+               @rtype: asyncio.Future (or compatible)
+               """
+               waiter = self._loop.create_future()
+               if self.returncode is None:
+                       self._waiters.append(waiter)
+                       waiter.add_done_callback(lambda waiter: 
self._waiters.remove(waiter)
+                               if waiter.cancelled() else None)
+               else:
+                       waiter.set_result(self.returncode)
+               return waiter
+
+       def _proc_exit(self, pid, returncode):
+               self._proc.returncode = returncode
+               waiters = self._waiters
+               self._waiters = []
+               for waiter in waiters:
+                       waiter.set_result(returncode)
diff --git a/lib/portage/util/futures/_asyncio/streams.py 
b/lib/portage/util/futures/_asyncio/streams.py
new file mode 100644
index 000000000..95d626dea
--- /dev/null
+++ b/lib/portage/util/futures/_asyncio/streams.py
@@ -0,0 +1,51 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+       '_emerge.PipeReader:PipeReader',
+       'portage.util.futures:asyncio',
+)
+
+
+def _reader(input_file, loop=None):
+       """
+       Asynchronously read a binary input file, and close it when
+       it reaches EOF.
+
+       @param input_file: binary input file descriptor
+       @type input_file: file or int
+       @param loop: asyncio.AbstractEventLoop (or compatible)
+       @type loop: event loop
+       @return: bytes
+       @rtype: asyncio.Future (or compatible)
+       """
+       loop = asyncio._wrap_loop(loop)
+       future = loop.create_future()
+       _Reader(future, input_file, loop)
+       return future
+
+
+class _Reader(object):
+       def __init__(self, future, input_file, loop):
+               self._future = future
+               self._pipe_reader = PipeReader(
+                       input_files={'input_file':input_file}, scheduler=loop)
+
+               self._future.add_done_callback(self._cancel_callback)
+               self._pipe_reader.addExitListener(self._eof)
+               self._pipe_reader.start()
+
+       def _cancel_callback(self, future):
+               if future.cancelled():
+                       self._cancel()
+
+       def _eof(self, pipe_reader):
+               self._pipe_reader = None
+               self._future.set_result(pipe_reader.getvalue())
+
+       def _cancel(self):
+               if self._pipe_reader is not None and self._pipe_reader.poll() 
is None:
+                       self._pipe_reader.removeExitListener(self._eof)
+                       self._pipe_reader.cancel()
+                       self._pipe_reader = None
-- 
2.16.4


Reply via email to