commit:     a69c1b853a47346192950c91b088163490287350
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Feb  3 21:27:45 2024 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Feb  7 00:49:26 2024 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=a69c1b85

process.spawn: Use multiprocessing.Process for returnproc

Use multiprocessing.Process for returnproc, so that
fork will stop being used when python makes "spawn"
the default multiprocessing start method.

Continue to use _start_fork when returnproc is not
enabled, for backward compatibility. Ultimately,
it can be removed at the same time as the returnpid
parameter.

The _setup_pipes_after_fork wrapper prevents a
"Bad file descriptor" error by making fd_pipes
inheritable on exec for bug 923755. ForkProcess
does not handle this because its target function
does not necessarily exec.

Bug: https://bugs.gentoo.org/916566
Bug: https://bugs.gentoo.org/923755
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/_emerge/SpawnProcess.py |  4 +-
 lib/portage/process.py      | 92 ++++++++++++++++++++++++++++++++++++++-------
 2 files changed, 82 insertions(+), 14 deletions(-)

diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py
index 716e94d665..b63afae01c 100644
--- a/lib/_emerge/SpawnProcess.py
+++ b/lib/_emerge/SpawnProcess.py
@@ -241,7 +241,9 @@ class SpawnProcess(SubProcess):
         got_pty, master_fd, slave_fd = 
_create_pty_or_pipe(copy_term_size=stdout_pipe)
         return (master_fd, slave_fd)
 
-    def _spawn(self, args: list[str], **kwargs) -> portage.process.Process:
+    def _spawn(
+        self, args: list[str], **kwargs
+    ) -> portage.process.MultiprocessingProcess:
         spawn_func = portage.process.spawn
 
         if self._selinux_type is not None:

diff --git a/lib/portage/process.py b/lib/portage/process.py
index b223ecb887..20327b38bc 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -19,7 +19,7 @@ import warnings
 
 from dataclasses import dataclass
 from functools import lru_cache
-from typing import Any, Optional, Callable
+from typing import Any, Optional, Callable, Union
 
 from portage import os
 from portage import _encodings
@@ -28,6 +28,7 @@ import portage
 
 portage.proxy.lazyimport.lazyimport(
     globals(),
+    "portage.util._async.ForkProcess:ForkProcess",
     "portage.util._eventloop.global_event_loop:global_event_loop",
     "portage.util.futures:asyncio",
     "portage.util:dump_traceback,writemsg,writemsg_level",
@@ -296,12 +297,19 @@ class AbstractProcess:
 
 class Process(AbstractProcess):
     """
-    An object that wraps OS processes created by spawn.
-    In the future, spawn will return objects of a different type
-    but with a compatible interface to this class, in order
-    to encapsulate implementation-dependent objects like
-    multiprocessing.Process which are designed to manage
-    the process lifecycle and need to persist until it exits.
+    An object that wraps OS processes which do not have an
+    associated multiprocessing.Process instance. Ultimately,
+    we need to stop using os.fork() to create these processes
+    because it is unsafe for threaded processes as discussed
+    in https://github.com/python/cpython/issues/84559.
+
+    Note that if subprocess.Popen is used without pass_fds
+    or preexec_fn parameters, then it avoids using os.fork()
+    by instead using posix_spawn. This approach is not used
+    by spawn because it needs to execute python code prior
+    to exec, so it instead uses multiprocessing.Process,
+    which only uses os.fork() when the multiprocessing start
+    method is fork.
     """
 
     def __init__(self, pid: int):
@@ -461,7 +469,7 @@ def spawn(
     unshare_mount=False,
     unshare_pid=False,
     warn_on_large_env=False,
-):
+) -> Union[int, MultiprocessingProcess, list[int]]:
     """
     Spawns a given command.
 
@@ -479,8 +487,8 @@ def spawn(
     @param returnpid: Return the Process IDs for a successful spawn.
     NOTE: This requires the caller clean up all the PIDs, otherwise spawn will 
clean them.
     @type returnpid: Boolean
-    @param returnproc: Return a Process object for a successful spawn 
(conflicts with logfile parameter).
-    NOTE: This requires the caller to asynchronously wait for the Process.
+    @param returnproc: Return a MultiprocessingProcess instance (conflicts 
with logfile parameter).
+    NOTE: This requires the caller to asynchronously wait for the 
MultiprocessingProcess instance.
     @type returnproc: Boolean
     @param uid: User ID to spawn as; useful for dropping privilages
     @type uid: Integer
@@ -626,7 +634,9 @@ def spawn(
     # fork, so that the result is cached in the main process.
     bool(groups)
 
-    pid = _start_fork(
+    start_func = _start_proc if returnproc else _start_fork
+
+    pid = start_func(
         _exec_wrapper,
         args=(
             binary,
@@ -652,6 +662,10 @@ def spawn(
         close_fds=close_fds,
     )
 
+    if returnproc:
+        # _start_proc returns a MultiprocessingProcess instance.
+        return pid
+
     if not isinstance(pid, int):
         raise AssertionError(f"fork returned non-integer: {repr(pid)}")
 
@@ -673,8 +687,6 @@ def spawn(
                 stacklevel=1,
             )
         return mypids
-    if returnproc:
-        return Process(mypids[0])
 
     # Otherwise we clean them up.
     while mypids:
@@ -1373,6 +1385,60 @@ def _start_fork(
     return pid
 
 
+class _setup_pipes_after_fork:
+    def __init__(self, target, fd_pipes):
+        self._target = target
+        self._fd_pipes = fd_pipes
+
+    def __call__(self, *args, **kwargs):
+        for fd in set(self._fd_pipes.values()):
+            os.set_inheritable(fd, True)
+        _setup_pipes(self._fd_pipes, close_fds=False, inheritable=True)
+        return self._target(*args, **kwargs)
+
+
+def _start_proc(
+    target: Callable[..., None],
+    args: Optional[tuple[Any, ...]] = (),
+    kwargs: Optional[dict[str, Any]] = {},
+    fd_pipes: Optional[dict[int, int]] = None,
+    close_fds: Optional[bool] = False,
+) -> MultiprocessingProcess:
+    """
+    Execute the target function using multiprocess.Process.
+    If the close_fds parameter is True then NotImplementedError
+    is raised, since it is risky to forcefully close file
+    descriptors that have references (bug 374335), and PEP 446
+    should ensure that any relevant file descriptors are
+    non-inheritable and therefore automatically closed on exec.
+    """
+    if close_fds:
+        raise NotImplementedError(
+            "close_fds is not supported (since file descriptors are 
non-inheritable by default for exec)"
+        )
+
+    # Manage fd_pipes inheritance for spawn/exec (bug 923755),
+    # which ForkProcess does not handle because its target
+    # function does not necessarily exec.
+    if fd_pipes and multiprocessing.get_start_method() == "fork":
+        target = _setup_pipes_after_fork(target, fd_pipes)
+        fd_pipes = None
+
+    proc = ForkProcess(
+        scheduler=global_event_loop(),
+        target=target,
+        args=args,
+        kwargs=kwargs,
+        fd_pipes=fd_pipes,
+        create_pipe=False,  # Pipe creation is delegated to the caller (see 
bug 923750).
+    )
+    proc.start()
+
+    # ForkProcess conveniently holds a MultiprocessingProcess
+    # instance that is suitable to return here.
+    return proc._proc
+
+
 def find_binary(binary):
     """
     Given a binary name, find the binary in PATH

Reply via email to