Replace os.fork with multiprocessing.Process, in order to leverage
any pre-fork and post-fork interpreter housekeeping that it provides,
promoting a healthy state for the forked interpreter.

Since multiprocessing.Process closes sys.__stdin__, fix relevant
code to use the portage._get_stdin() compatibility function.
In case there's a legitimate need to inherit stdin for things like
PROPERTIES=interactive support, create a temporary duplicate of
fd_pipes[0] when appropriate, and restore sys.stdin and sys.__stdin__
in the subprocess.

Bug: https://bugs.gentoo.org/730192
Signed-off-by: Zac Medico <zmed...@gentoo.org>
---
[PATCH v2]
* Use sentinel for all python versions
* Add _proc_join coroutine for non-blocking join

 lib/portage/process.py                 |   4 +-
 lib/portage/sync/controller.py         |   4 +-
 lib/portage/util/_async/ForkProcess.py | 146 +++++++++++++++++++------
 3 files changed, 119 insertions(+), 35 deletions(-)

diff --git a/lib/portage/process.py b/lib/portage/process.py
index 6af668db4..b7316c89d 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -1,5 +1,5 @@
 # portage.py -- core Portage functionality
-# Copyright 1998-2019 Gentoo Authors
+# Copyright 1998-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 
@@ -107,7 +107,7 @@ def sanitize_fds():
        if _set_inheritable is not None:
 
                whitelist = frozenset([
-                       sys.__stdin__.fileno(),
+                       portage._get_stdin().fileno(),
                        sys.__stdout__.fileno(),
                        sys.__stderr__.fileno(),
                ])
diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py
index c4c72e73a..43bb5d520 100644
--- a/lib/portage/sync/controller.py
+++ b/lib/portage/sync/controller.py
@@ -1,4 +1,4 @@
-# Copyright 2014-2019 Gentoo Authors
+# Copyright 2014-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 from __future__ import print_function
@@ -231,7 +231,7 @@ class SyncManager(object):
                # Redirect command stderr to stdout, in order to prevent
                # spurious cron job emails (bug 566132).
                spawn_kwargs["fd_pipes"] = {
-                       0: sys.__stdin__.fileno(),
+                       0: portage._get_stdin().fileno(),
                        1: sys.__stdout__.fileno(),
                        2: sys.__stdout__.fileno()
                }
diff --git a/lib/portage/util/_async/ForkProcess.py 
b/lib/portage/util/_async/ForkProcess.py
index d84e93833..eb01a6232 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -1,37 +1,123 @@
-# Copyright 2012-2013 Gentoo Foundation
+# Copyright 2012-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
+import fcntl
+import functools
+import multiprocessing
 import signal
 import sys
-import traceback
 
 import portage
 from portage import os
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
 from _emerge.SpawnProcess import SpawnProcess
 
 class ForkProcess(SpawnProcess):
 
-       __slots__ = ()
+       __slots__ = ('_proc', '_proc_join_task')
+
+       # Number of seconds between poll attempts for process exit status
+       # (after the sentinel has become ready).
+       _proc_join_interval = 0.1
 
        def _spawn(self, args, fd_pipes=None, **kwargs):
                """
-               Fork a subprocess, apply local settings, and call fetch().
+               Override SpawnProcess._spawn to fork a subprocess that calls
+               self._run(). This uses multiprocessing.Process in order to 
leverage
+               any pre-fork and post-fork interpreter housekeeping that it 
provides,
+               promoting a healthy state for the forked interpreter.
                """
-
-               parent_pid = os.getpid()
-               pid = None
+               # Since multiprocessing.Process closes sys.__stdin__, create a
+               # temporary duplicate of fd_pipes[0] so that sys.__stdin__ can
+               # be restored in the subprocess, in case this is needed for
+               # things like PROPERTIES=interactive support.
+               stdin_dup = None
                try:
-                       pid = os.fork()
+                       stdin_fd = fd_pipes.get(0)
+                       if stdin_fd is not None and stdin_fd == 
portage._get_stdin().fileno():
+                               stdin_dup = os.dup(stdin_fd)
+                               fcntl.fcntl(stdin_dup, fcntl.F_SETFD,
+                                       fcntl.fcntl(stdin_fd, fcntl.F_GETFD))
+                               fd_pipes[0] = stdin_dup
+                       self._proc = 
multiprocessing.Process(target=self._bootstrap, args=(fd_pipes,))
+                       self._proc.start()
+               finally:
+                       if stdin_dup is not None:
+                               os.close(stdin_dup)
+
+               self._proc_join_task = asyncio.ensure_future(
+                       self._proc_join(self._proc))
+               self._proc_join_task.add_done_callback(
+                       functools.partial(self._proc_join_done, self._proc))
+
+               return [self._proc.pid]
+
+       def _cancel(self):
+               if self._proc is None:
+                       super(ForkProcess, self)._cancel()
+               else:
+                       self._proc.terminate()
+
+       def _async_wait(self):
+               if self._proc_join_task is None:
+                       super(ForkProcess, self)._async_wait()
 
-                       if pid != 0:
-                               if not isinstance(pid, int):
-                                       raise AssertionError(
-                                               "fork returned non-integer: %s" 
% (repr(pid),))
-                               return [pid]
+       def _async_waitpid(self):
+               if self._proc_join_task is None:
+                       super(ForkProcess, self)._async_waitpid()
 
-                       rval = 1
+       @coroutine
+       def _proc_join(self, proc):
+               sentinel_reader = self.scheduler.create_future()
+               self.scheduler.add_reader(proc.sentinel,
+                       lambda: sentinel_reader.done() or 
sentinel_reader.set_result(None))
+               try:
+                       yield sentinel_reader
+               finally:
+                       # If multiprocessing.Process supports the close method, 
then
+                       # access to proc.sentinel will raise ValueError if the
+                       # sentinel has been closed. In this case it's not safe 
to call
+                       # remove_reader, since the file descriptor may have 
been closed
+                       # and then reallocated to a concurrent coroutine. When 
the
+                       # close method is not supported, proc.sentinel remains 
open
+                       # until proc's finalizer is called.
                        try:
+                               self.scheduler.remove_reader(proc.sentinel)
+                       except ValueError:
+                               pass
+
+               # Now that proc.sentinel is ready, poll until process exit
+               # status has become available.
+               while True:
+                       proc.join(0)
+                       if proc.exitcode is not None:
+                               break
+                       yield asyncio.sleep(self._proc_join_interval)
+
+       def _proc_join_done(self, proc, future):
+               future.cancelled() or future.result()
+               self._was_cancelled()
+               if self.returncode is None:
+                       self.returncode = proc.exitcode
 
+               self._proc = None
+               if hasattr(proc, 'close'):
+                       proc.close()
+               self._proc_join_task = None
+               self._async_wait()
+
+       def _unregister(self):
+               super(ForkProcess, self)._unregister()
+               if self._proc is not None:
+                       if self._proc.is_alive():
+                               self._proc.terminate()
+                       self._proc = None
+               if self._proc_join_task is not None:
+                       self._proc_join_task.cancel()
+                       self._proc_join_task = None
+
+       def _bootstrap(self, fd_pipes):
                                # Use default signal handlers in order to avoid 
problems
                                # killing subprocesses as reported in bug 
#353239.
                                signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -52,24 +138,22 @@ class ForkProcess(SpawnProcess):
                                # (see _setup_pipes docstring).
                                portage.process._setup_pipes(fd_pipes, 
close_fds=False)
 
-                               rval = self._run()
-                       except SystemExit:
-                               raise
-                       except:
-                               traceback.print_exc()
-                               # os._exit() skips stderr flush!
-                               sys.stderr.flush()
-                       finally:
-                               os._exit(rval)
+                               # Since multiprocessing.Process closes 
sys.__stdin__ and
+                               # makes sys.stdin refer to os.devnull, restore 
it when
+                               # appropriate.
+                               if 0 in fd_pipes:
+                                       # It's possible that sys.stdin.fileno() 
is already 0,
+                                       # and in that case the above 
_setup_pipes call will
+                                       # have already updated its identity via 
dup2. Otherwise,
+                                       # perform the dup2 call now, and also 
copy the file
+                                       # descriptor flags.
+                                       if sys.stdin.fileno() != 0:
+                                               os.dup2(0, sys.stdin.fileno())
+                                               fcntl.fcntl(sys.stdin.fileno(), 
fcntl.F_SETFD,
+                                                       fcntl.fcntl(0, 
fcntl.F_GETFD))
+                                       sys.__stdin__ = sys.stdin
 
-               finally:
-                       if pid == 0 or (pid is None and os.getpid() != 
parent_pid):
-                               # Call os._exit() from a finally block in order
-                               # to suppress any finally blocks from earlier
-                               # in the call stack (see bug #345289). This
-                               # finally block has to be setup before the fork
-                               # in order to avoid a race condition.
-                               os._exit(1)
+                               sys.exit(self._run())
 
        def _run(self):
                raise NotImplementedError(self)
-- 
2.25.3


Reply via email to