commit:     055c66ec9482064aaaf51bfb6b01e260ea27808e
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Feb  2 16:01:18 2024 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Feb  2 16:01:18 2024 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=055c66ec

SpawnProcess: Use spawn returnproc parameter

Migrate SpawnProcess to use the spawn returnproc parameter,
and make adaptations to descendent classes as needed. Introduce a
portage.process.MultiprocessingProcess class for ForkProcess to wrap
multiprocessing.Process instances, needed because ForkProcess
inherits from SpawnProcess. Use portage.process.Process to wrap
the pid in EbuildMetadataPhase, so that returnproc support in
the doebuild function can be reserved for a later commit.

Bug: https://bugs.gentoo.org/916566
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/_emerge/EbuildMetadataPhase.py      |   4 +-
 lib/_emerge/SpawnProcess.py             |  16 ++---
 lib/_emerge/SubProcess.py               |  25 +++----
 lib/portage/package/ebuild/doebuild.py  |   4 +-
 lib/portage/process.py                  | 120 ++++++++++++++++++++++++++++----
 lib/portage/util/_async/ForkProcess.py  |  87 +++--------------------
 lib/portage/util/_async/PopenProcess.py |   5 +-
 7 files changed, 143 insertions(+), 118 deletions(-)

diff --git a/lib/_emerge/EbuildMetadataPhase.py 
b/lib/_emerge/EbuildMetadataPhase.py
index 8905a058fc..a7c9650d74 100644
--- a/lib/_emerge/EbuildMetadataPhase.py
+++ b/lib/_emerge/EbuildMetadataPhase.py
@@ -1,4 +1,4 @@
-# Copyright 1999-2020 Gentoo Authors
+# Copyright 1999-2024 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 from _emerge.SubProcess import SubProcess
@@ -137,7 +137,7 @@ class EbuildMetadataPhase(SubProcess):
             self._async_wait()
             return
 
-        self.pid = retval[0]
+        self._proc = portage.process.Process(retval[0])
 
     def _output_handler(self):
         while True:

diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py
index 40740df9aa..7f4a23892b 100644
--- a/lib/_emerge/SpawnProcess.py
+++ b/lib/_emerge/SpawnProcess.py
@@ -1,4 +1,4 @@
-# Copyright 2008-2023 Gentoo Authors
+# Copyright 2008-2024 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import functools
@@ -123,24 +123,16 @@ class SpawnProcess(SubProcess):
                 kwargs[k] = v
 
         kwargs["fd_pipes"] = fd_pipes
-        kwargs["returnpid"] = True
+        kwargs["returnproc"] = True
         kwargs.pop("logfile", None)
 
-        retval = self._spawn(self.args, **kwargs)
+        self._proc = self._spawn(self.args, **kwargs)
 
         if slave_fd is not None:
             os.close(slave_fd)
         if null_input is not None:
             os.close(null_input)
 
-        if isinstance(retval, int):
-            # spawn failed
-            self.returncode = retval
-            self._async_wait()
-            return
-
-        self.pid = retval[0]
-
         if not fd_pipes:
             self._registered = True
             self._async_waitpid()
@@ -232,7 +224,7 @@ class SpawnProcess(SubProcess):
         got_pty, master_fd, slave_fd = 
_create_pty_or_pipe(copy_term_size=stdout_pipe)
         return (master_fd, slave_fd)
 
-    def _spawn(self, args, **kwargs):
+    def _spawn(self, args: list[str], **kwargs) -> portage.process.Process:
         spawn_func = portage.process.spawn
 
         if self._selinux_type is not None:

diff --git a/lib/_emerge/SubProcess.py b/lib/_emerge/SubProcess.py
index b734591d11..029bbc3f44 100644
--- a/lib/_emerge/SubProcess.py
+++ b/lib/_emerge/SubProcess.py
@@ -1,4 +1,4 @@
-# Copyright 1999-2020 Gentoo Authors
+# Copyright 1999-2024 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import logging
@@ -12,12 +12,16 @@ import errno
 
 
 class SubProcess(AbstractPollTask):
-    __slots__ = ("pid",) + ("_dummy_pipe_fd", "_files", "_waitpid_id")
+    __slots__ = ("_dummy_pipe_fd", "_files", "_proc", "_waitpid_id")
 
     # This is how much time we allow for waitpid to succeed after
     # we've sent a kill signal to our subprocess.
     _cancel_timeout = 1  # seconds
 
+    @property
+    def pid(self):
+        return self._proc.pid
+
     def _poll(self):
         # Simply rely on _async_waitpid_cb to set the returncode.
         return self.returncode
@@ -58,15 +62,11 @@ class SubProcess(AbstractPollTask):
         if self.returncode is not None:
             self._async_wait()
         elif self._waitpid_id is None:
-            self._waitpid_id = self.pid
-            self.scheduler._asyncio_child_watcher.add_child_handler(
-                self.pid, self._async_waitpid_cb
-            )
-
-    def _async_waitpid_cb(self, pid, returncode):
-        if pid != self.pid:
-            raise AssertionError(f"expected pid {self.pid}, got {pid}")
-        self.returncode = returncode
+            self._waitpid_id = asyncio.ensure_future(self._proc.wait(), 
self.scheduler)
+            self._waitpid_id.add_done_callback(self._async_waitpid_cb)
+
+    def _async_waitpid_cb(self, future):
+        self.returncode = future.result()
         self._async_wait()
 
     def _orphan_process_warn(self):
@@ -80,7 +80,8 @@ class SubProcess(AbstractPollTask):
         self._registered = False
 
         if self._waitpid_id is not None:
-            
self.scheduler._asyncio_child_watcher.remove_child_handler(self._waitpid_id)
+            if not self._waitpid_id.done():
+                self._waitpid_id.cancel()
             self._waitpid_id = None
 
         if self._files is not None:

diff --git a/lib/portage/package/ebuild/doebuild.py 
b/lib/portage/package/ebuild/doebuild.py
index ed604415da..e10b884e08 100644
--- a/lib/portage/package/ebuild/doebuild.py
+++ b/lib/portage/package/ebuild/doebuild.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2023 Gentoo Authors
+# Copyright 2010-2024 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 __all__ = ["doebuild", "doebuild_environment", "spawn", "spawnebuild"]
@@ -2094,7 +2094,7 @@ def spawn(
         mysettings.configdict["env"]["LOGNAME"] = logname
 
     try:
-        if keywords.get("returnpid"):
+        if keywords.get("returnpid") or keywords.get("returnproc"):
             return spawn_func(mystring, env=mysettings.environ(), **keywords)
 
         proc = EbuildSpawnProcess(

diff --git a/lib/portage/process.py b/lib/portage/process.py
index 6ec52efc4a..01426179d7 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -29,6 +29,7 @@ import portage
 portage.proxy.lazyimport.lazyimport(
     globals(),
     "portage.util._eventloop.global_event_loop:global_event_loop",
+    "portage.util.futures:asyncio",
     "portage.util:dump_traceback,writemsg,writemsg_level",
 )
 
@@ -279,7 +280,21 @@ def calc_env_stats(env) -> EnvStats:
 env_too_large_warnings = 0
 
 
-class Process:
+class AbstractProcess:
+    def send_signal(self, sig):
+        """Send a signal to the process."""
+        if self.returncode is not None:
+            # Skip signalling a process that we know has already died.
+            return
+
+        try:
+            os.kill(self.pid, sig)
+        except ProcessLookupError:
+            # Suppress the race condition error; bpo-40550.
+            pass
+
+
+class Process(AbstractProcess):
     """
     An object that wraps OS processes created by spawn.
     In the future, spawn will return objects of a different type
@@ -289,7 +304,7 @@ class Process:
     the process lifecycle and need to persist until it exits.
     """
 
-    def __init__(self, pid):
+    def __init__(self, pid: int):
         self.pid = pid
         self.returncode = None
         self._exit_waiters = []
@@ -323,25 +338,106 @@ class Process:
                 waiter.set_result(returncode)
         self._exit_waiters = None
 
-    def send_signal(self, sig):
-        """Send a signal to the process."""
+    def terminate(self):
+        """Terminate the process with SIGTERM"""
+        self.send_signal(signal.SIGTERM)
+
+    def kill(self):
+        """Kill the process with SIGKILL"""
+        self.send_signal(signal.SIGKILL)
+
+
+class MultiprocessingProcess(AbstractProcess):
+    """
+    An object that wraps OS processes created by multiprocessing.Process.
+    """
+
+    # Number of seconds between poll attempts for process exit status
+    # (after the sentinel has become ready).
+    _proc_join_interval = 0.1
+
+    def __init__(self, proc: multiprocessing.Process):
+        self._proc = proc
+        self.pid = proc.pid
+        self.returncode = None
+        self._exit_waiters = []
+
+    def __repr__(self):
+        return f"<{self.__class__.__name__} {self.pid}>"
+
+    async def wait(self):
+        """
+        Wait for the child process to terminate.
+
+        Set and return the returncode attribute.
+        """
         if self.returncode is not None:
-            # Skip signalling a process that we know has already died.
-            return
+            return self.returncode
 
+        loop = global_event_loop()
+        if not self._exit_waiters:
+            asyncio.ensure_future(self._proc_join(), 
loop=loop).add_done_callback(
+                self._proc_join_done
+            )
+        waiter = loop.create_future()
+        self._exit_waiters.append(waiter)
+        return await waiter
+
+    async def _proc_join(self):
+        loop = global_event_loop()
+        sentinel_reader = loop.create_future()
+        proc = self._proc
+        loop.add_reader(
+            proc.sentinel,
+            lambda: sentinel_reader.done() or sentinel_reader.set_result(None),
+        )
         try:
-            os.kill(self.pid, sig)
-        except ProcessLookupError:
-            # Suppress the race condition error; bpo-40550.
-            pass
+            await sentinel_reader
+        finally:
+            # If multiprocessing.Process supports the close method, then
+            # access to proc.sentinel will raise ValueError if the
+            # sentinel has been closed. In this case it's not safe to call
+            # remove_reader, since the file descriptor may have been closed
+            # and then reallocated to a concurrent coroutine. When the
+            # close method is not supported, proc.sentinel remains open
+            # until proc's finalizer is called.
+            try:
+                loop.remove_reader(proc.sentinel)
+            except ValueError:
+                pass
+
+        # Now that proc.sentinel is ready, poll until process exit
+        # status has become available.
+        while True:
+            proc.join(0)
+            if proc.exitcode is not None:
+                break
+            await asyncio.sleep(self._proc_join_interval, loop=loop)
+
+    def _proc_join_done(self, future):
+        # The join task should never be cancelled, so let it raise
+        # asyncio.CancelledError here if that somehow happens.
+        future.result()
+
+        self.returncode = self._proc.exitcode
+        if hasattr(self._proc, "close"):
+            self._proc.close()
+        self._proc = None
+
+        for waiter in self._exit_waiters:
+            if not waiter.cancelled():
+                waiter.set_result(self.returncode)
+        self._exit_waiters = None
 
     def terminate(self):
         """Terminate the process with SIGTERM"""
-        self.send_signal(signal.SIGTERM)
+        if self._proc is not None:
+            self._proc.terminate()
 
     def kill(self):
         """Kill the process with SIGKILL"""
-        self.send_signal(signal.SIGKILL)
+        if self._proc is not None:
+            self._proc.kill()
 
 
 def spawn(

diff --git a/lib/portage/util/_async/ForkProcess.py 
b/lib/portage/util/_async/ForkProcess.py
index 780545be0e..711bd2a7ba 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -1,13 +1,14 @@
-# Copyright 2012-2023 Gentoo Authors
+# Copyright 2012-2024 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import fcntl
-import functools
 import multiprocessing
 import warnings
 import signal
 import sys
 
+from typing import Optional
+
 import portage
 from portage import os
 from portage.cache.mappings import slot_dict_class
@@ -23,17 +24,11 @@ class ForkProcess(SpawnProcess):
         "kwargs",
         "target",
         "_child_connection",
-        "_proc",
-        "_proc_join_task",
     )
 
     _file_names = ("connection", "slave_fd")
     _files_dict = slot_dict_class(_file_names, prefix="")
 
-    # Number of seconds between poll attempts for process exit status
-    # (after the sentinel has become ready).
-    _proc_join_interval = 0.1
-
     _HAVE_SEND_HANDLE = getattr(multiprocessing.reduction, "HAVE_SEND_HANDLE", 
False)
 
     def _start(self):
@@ -58,9 +53,8 @@ class ForkProcess(SpawnProcess):
                 duplex=self._HAVE_SEND_HANDLE
             )
 
-        retval = self._spawn(self.args, fd_pipes=self.fd_pipes)
+        self._proc = self._spawn(self.args, fd_pipes=self.fd_pipes)
 
-        self.pid = retval[0]
         self._registered = True
 
         if self._child_connection is None:
@@ -133,7 +127,9 @@ class ForkProcess(SpawnProcess):
 
         await super()._main(build_logger, pipe_logger, loop=loop)
 
-    def _spawn(self, args, fd_pipes=None, **kwargs):
+    def _spawn(
+        self, args: list[str], fd_pipes: Optional[dict[int, int]] = None, 
**kwargs
+    ) -> portage.process.MultiprocessingProcess:
         """
         Override SpawnProcess._spawn to fork a subprocess that calls
         self._run(). This uses multiprocessing.Process in order to leverage
@@ -175,7 +171,7 @@ class ForkProcess(SpawnProcess):
                 # Handle fd_pipes in _main instead.
                 fd_pipes = None
 
-            self._proc = multiprocessing.Process(
+            proc = multiprocessing.Process(
                 target=self._bootstrap,
                 args=(
                     self._child_connection,
@@ -186,19 +182,12 @@ class ForkProcess(SpawnProcess):
                     kwargs,
                 ),
             )
-            self._proc.start()
+            proc.start()
         finally:
             if stdin_dup is not None:
                 os.close(stdin_dup)
 
-        self._proc_join_task = asyncio.ensure_future(
-            self._proc_join(self._proc, loop=self.scheduler), 
loop=self.scheduler
-        )
-        self._proc_join_task.add_done_callback(
-            functools.partial(self._proc_join_done, self._proc)
-        )
-
-        return [self._proc.pid]
+        return portage.process.MultiprocessingProcess(proc)
 
     def _cancel(self):
         if self._proc is None:
@@ -206,64 +195,10 @@ class ForkProcess(SpawnProcess):
         else:
             self._proc.terminate()
 
-    def _async_wait(self):
-        if self._proc_join_task is None:
-            super()._async_wait()
-
-    def _async_waitpid(self):
-        if self._proc_join_task is None:
-            super()._async_waitpid()
-
-    async def _proc_join(self, proc, loop=None):
-        sentinel_reader = self.scheduler.create_future()
-        self.scheduler.add_reader(
-            proc.sentinel,
-            lambda: sentinel_reader.done() or sentinel_reader.set_result(None),
-        )
-        try:
-            await sentinel_reader
-        finally:
-            # If multiprocessing.Process supports the close method, then
-            # access to proc.sentinel will raise ValueError if the
-            # sentinel has been closed. In this case it's not safe to call
-            # remove_reader, since the file descriptor may have been closed
-            # and then reallocated to a concurrent coroutine. When the
-            # close method is not supported, proc.sentinel remains open
-            # until proc's finalizer is called.
-            try:
-                self.scheduler.remove_reader(proc.sentinel)
-            except ValueError:
-                pass
-
-        # Now that proc.sentinel is ready, poll until process exit
-        # status has become available.
-        while True:
-            proc.join(0)
-            if proc.exitcode is not None:
-                break
-            await asyncio.sleep(self._proc_join_interval, loop=loop)
-
-    def _proc_join_done(self, proc, future):
-        future.cancelled() or future.result()
-        self._was_cancelled()
-        if self.returncode is None:
-            self.returncode = proc.exitcode
-
-        self._proc = None
-        if hasattr(proc, "close"):
-            proc.close()
-        self._proc_join_task = None
-        self._async_wait()
-
     def _unregister(self):
         super()._unregister()
         if self._proc is not None:
-            if self._proc.is_alive():
-                self._proc.terminate()
-            self._proc = None
-        if self._proc_join_task is not None:
-            self._proc_join_task.cancel()
-            self._proc_join_task = None
+            self._proc.terminate()
 
     @staticmethod
     def _bootstrap(child_connection, have_send_handle, fd_pipes, target, args, 
kwargs):

diff --git a/lib/portage/util/_async/PopenProcess.py 
b/lib/portage/util/_async/PopenProcess.py
index c9bca1c524..a0e532e278 100644
--- a/lib/portage/util/_async/PopenProcess.py
+++ b/lib/portage/util/_async/PopenProcess.py
@@ -1,6 +1,7 @@
-# Copyright 2012-2021 Gentoo Authors
+# Copyright 2012-2024 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
+import portage
 from _emerge.SubProcess import SubProcess
 
 
@@ -11,7 +12,7 @@ class PopenProcess(SubProcess):
     )
 
     def _start(self):
-        self.pid = self.proc.pid
+        self._proc = portage.process.Process(self.proc.pid)
         self._registered = True
 
         if self.pipe_reader is None:

Reply via email to