commit: a69c1b853a47346192950c91b088163490287350 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Sat Feb 3 21:27:45 2024 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Wed Feb 7 00:49:26 2024 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=a69c1b85
process.spawn: Use multiprocessing.Process for returnproc Use multiprocessing.Process for returnproc, so that fork will stop being used when python makes "spawn" the default multiprocessing start method. Continue to use _start_fork when returnproc is not enabled, for backward compatibility. Ultimately, it can be removed at the same time as the returnpid parameter. The _setup_pipes_after_fork wrapper prevents a "Bad file descriptor" error by making fd_pipes inheritable on exec for bug 923755. ForkProcess does not handle this because its target function does not necessarily exec. Bug: https://bugs.gentoo.org/916566 Bug: https://bugs.gentoo.org/923755 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/_emerge/SpawnProcess.py | 4 +- lib/portage/process.py | 92 ++++++++++++++++++++++++++++++++++++++------- 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py index 716e94d665..b63afae01c 100644 --- a/lib/_emerge/SpawnProcess.py +++ b/lib/_emerge/SpawnProcess.py @@ -241,7 +241,9 @@ class SpawnProcess(SubProcess): got_pty, master_fd, slave_fd = _create_pty_or_pipe(copy_term_size=stdout_pipe) return (master_fd, slave_fd) - def _spawn(self, args: list[str], **kwargs) -> portage.process.Process: + def _spawn( + self, args: list[str], **kwargs + ) -> portage.process.MultiprocessingProcess: spawn_func = portage.process.spawn if self._selinux_type is not None: diff --git a/lib/portage/process.py b/lib/portage/process.py index b223ecb887..20327b38bc 100644 --- a/lib/portage/process.py +++ b/lib/portage/process.py @@ -19,7 +19,7 @@ import warnings from dataclasses import dataclass from functools import lru_cache -from typing import Any, Optional, Callable +from typing import Any, Optional, Callable, Union from portage import os from portage import _encodings @@ -28,6 +28,7 @@ import portage portage.proxy.lazyimport.lazyimport( globals(), + "portage.util._async.ForkProcess:ForkProcess", "portage.util._eventloop.global_event_loop:global_event_loop", "portage.util.futures:asyncio", "portage.util:dump_traceback,writemsg,writemsg_level", @@ -296,12 +297,19 @@ class AbstractProcess: class Process(AbstractProcess): """ - An object that wraps OS processes created by spawn. - In the future, spawn will return objects of a different type - but with a compatible interface to this class, in order - to encapsulate implementation-dependent objects like - multiprocessing.Process which are designed to manage - the process lifecycle and need to persist until it exits. + An object that wraps OS processes which do not have an + associated multiprocessing.Process instance. Ultimately, + we need to stop using os.fork() to create these processes + because it is unsafe for threaded processes as discussed + in https://github.com/python/cpython/issues/84559. + + Note that if subprocess.Popen is used without pass_fds + or preexec_fn parameters, then it avoids using os.fork() + by instead using posix_spawn. This approach is not used + by spawn because it needs to execute python code prior + to exec, so it instead uses multiprocessing.Process, + which only uses os.fork() when the multiprocessing start + method is fork. """ def __init__(self, pid: int): @@ -461,7 +469,7 @@ def spawn( unshare_mount=False, unshare_pid=False, warn_on_large_env=False, -): +) -> Union[int, MultiprocessingProcess, list[int]]: """ Spawns a given command. @@ -479,8 +487,8 @@ def spawn( @param returnpid: Return the Process IDs for a successful spawn. NOTE: This requires the caller clean up all the PIDs, otherwise spawn will clean them. @type returnpid: Boolean - @param returnproc: Return a Process object for a successful spawn (conflicts with logfile parameter). - NOTE: This requires the caller to asynchronously wait for the Process. + @param returnproc: Return a MultiprocessingProcess instance (conflicts with logfile parameter). + NOTE: This requires the caller to asynchronously wait for the MultiprocessingProcess instance. @type returnproc: Boolean @param uid: User ID to spawn as; useful for dropping privilages @type uid: Integer @@ -626,7 +634,9 @@ def spawn( # fork, so that the result is cached in the main process. bool(groups) - pid = _start_fork( + start_func = _start_proc if returnproc else _start_fork + + pid = start_func( _exec_wrapper, args=( binary, @@ -652,6 +662,10 @@ def spawn( close_fds=close_fds, ) + if returnproc: + # _start_proc returns a MultiprocessingProcess instance. + return pid + if not isinstance(pid, int): raise AssertionError(f"fork returned non-integer: {repr(pid)}") @@ -673,8 +687,6 @@ def spawn( stacklevel=1, ) return mypids - if returnproc: - return Process(mypids[0]) # Otherwise we clean them up. while mypids: @@ -1373,6 +1385,60 @@ def _start_fork( return pid +class _setup_pipes_after_fork: + def __init__(self, target, fd_pipes): + self._target = target + self._fd_pipes = fd_pipes + + def __call__(self, *args, **kwargs): + for fd in set(self._fd_pipes.values()): + os.set_inheritable(fd, True) + _setup_pipes(self._fd_pipes, close_fds=False, inheritable=True) + return self._target(*args, **kwargs) + + +def _start_proc( + target: Callable[..., None], + args: Optional[tuple[Any, ...]] = (), + kwargs: Optional[dict[str, Any]] = {}, + fd_pipes: Optional[dict[int, int]] = None, + close_fds: Optional[bool] = False, +) -> MultiprocessingProcess: + """ + Execute the target function using multiprocess.Process. + If the close_fds parameter is True then NotImplementedError + is raised, since it is risky to forcefully close file + descriptors that have references (bug 374335), and PEP 446 + should ensure that any relevant file descriptors are + non-inheritable and therefore automatically closed on exec. + """ + if close_fds: + raise NotImplementedError( + "close_fds is not supported (since file descriptors are non-inheritable by default for exec)" + ) + + # Manage fd_pipes inheritance for spawn/exec (bug 923755), + # which ForkProcess does not handle because its target + # function does not necessarily exec. + if fd_pipes and multiprocessing.get_start_method() == "fork": + target = _setup_pipes_after_fork(target, fd_pipes) + fd_pipes = None + + proc = ForkProcess( + scheduler=global_event_loop(), + target=target, + args=args, + kwargs=kwargs, + fd_pipes=fd_pipes, + create_pipe=False, # Pipe creation is delegated to the caller (see bug 923750). + ) + proc.start() + + # ForkProcess conveniently holds a MultiprocessingProcess + # instance that is suitable to return here. + return proc._proc + + def find_binary(binary): """ Given a binary name, find the binary in PATH