ashb commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862719032


##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
         self.stdin.write(msg.model_dump_json().encode())
         self.stdin.write(b"\n")
 
-    def kill(self, signal: signal.Signals = signal.SIGINT):
+    def kill(
+        self,
+        signal_to_send: signal.Signals = signal.SIGINT,
+        escalation_delay: float = 5.0,
+        bypass_escalation: bool = False,
+    ):
+        """
+        Attempt to terminate the subprocess with a given signal.
+
+        If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+        :param signal_to_send: The signal to send initially (default is 
SIGINT).
+        :param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+        :param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+        """
         if self._exit_code is not None:
             return
 
-        with suppress(ProcessLookupError):
-            os.kill(self.pid, signal)
+        if bypass_escalation:

Review Comment:
   Lets call this `force`



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
         self.stdin.write(msg.model_dump_json().encode())
         self.stdin.write(b"\n")
 
-    def kill(self, signal: signal.Signals = signal.SIGINT):
+    def kill(
+        self,
+        signal_to_send: signal.Signals = signal.SIGINT,
+        escalation_delay: float = 5.0,
+        bypass_escalation: bool = False,
+    ):
+        """
+        Attempt to terminate the subprocess with a given signal.
+
+        If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+        :param signal_to_send: The signal to send initially (default is 
SIGINT).
+        :param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+        :param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+        """
         if self._exit_code is not None:
             return
 
-        with suppress(ProcessLookupError):
-            os.kill(self.pid, signal)
+        if bypass_escalation:
+            with suppress(ProcessLookupError):
+                os.kill(self.pid, signal_to_send)
+            return
+
+        # Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+        escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+        if signal_to_send in escalation_path:
+            # Start from `initial_signal`
+            escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+        for sig in escalation_path:
+            try:
+                if sig == signal.SIGKILL:
+                    self._process.kill()
+                elif sig == signal.SIGTERM:
+                    self._process.terminate()
+                else:
+                    os.kill(self.pid, sig)
+
+                self._exit_code = self._process.wait(timeout=escalation_delay)

Review Comment:
   We probably want to service the sockets while we are waiting to kill it 
(collect any logs, and it may well be try to report the final state of the task 
before it exits which we will need to handle!)



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
         self.stdin.write(msg.model_dump_json().encode())
         self.stdin.write(b"\n")
 
-    def kill(self, signal: signal.Signals = signal.SIGINT):
+    def kill(
+        self,
+        signal_to_send: signal.Signals = signal.SIGINT,
+        escalation_delay: float = 5.0,
+        bypass_escalation: bool = False,
+    ):
+        """
+        Attempt to terminate the subprocess with a given signal.
+
+        If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+        :param signal_to_send: The signal to send initially (default is 
SIGINT).
+        :param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+        :param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+        """
         if self._exit_code is not None:
             return
 
-        with suppress(ProcessLookupError):
-            os.kill(self.pid, signal)
+        if bypass_escalation:
+            with suppress(ProcessLookupError):
+                os.kill(self.pid, signal_to_send)
+            return
+
+        # Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+        escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+        if signal_to_send in escalation_path:
+            # Start from `initial_signal`
+            escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+        for sig in escalation_path:
+            try:
+                if sig == signal.SIGKILL:
+                    self._process.kill()
+                elif sig == signal.SIGTERM:
+                    self._process.terminate()
+                else:
+                    os.kill(self.pid, sig)
+
+                self._exit_code = self._process.wait(timeout=escalation_delay)
+                log.debug("Process exited", pid=self.pid, 
exit_code=self._exit_code, signal=sig.name)
+                return
+            except psutil.TimeoutExpired:
+                log.warning("Process did not terminate in time; escalating", 
pid=self.pid, signal=sig.name)
+            except psutil.NoSuchProcess:
+                log.debug("Process already terminated", pid=self.pid)
+                self._exit_code = 0

Review Comment:
   `-1` might be better here.



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
         self.stdin.write(msg.model_dump_json().encode())
         self.stdin.write(b"\n")
 
-    def kill(self, signal: signal.Signals = signal.SIGINT):
+    def kill(
+        self,
+        signal_to_send: signal.Signals = signal.SIGINT,
+        escalation_delay: float = 5.0,
+        bypass_escalation: bool = False,
+    ):
+        """
+        Attempt to terminate the subprocess with a given signal.
+
+        If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+        :param signal_to_send: The signal to send initially (default is 
SIGINT).
+        :param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+        :param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+        """
         if self._exit_code is not None:
             return
 
-        with suppress(ProcessLookupError):
-            os.kill(self.pid, signal)
+        if bypass_escalation:
+            with suppress(ProcessLookupError):
+                os.kill(self.pid, signal_to_send)
+            return
+
+        # Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+        escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+        if signal_to_send in escalation_path:
+            # Start from `initial_signal`
+            escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+        for sig in escalation_path:
+            try:
+                if sig == signal.SIGKILL:
+                    self._process.kill()
+                elif sig == signal.SIGTERM:
+                    self._process.terminate()
+                else:
+                    os.kill(self.pid, sig)

Review Comment:
   Do we need to handle this differently, or can we just use `os.kill` always?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to