potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863114700


##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
         self.stdin.write(msg.model_dump_json().encode())
         self.stdin.write(b"\n")
 
-    def kill(self, signal: signal.Signals = signal.SIGINT):
+    def kill(
+        self,
+        signal_to_send: signal.Signals = signal.SIGINT,
+        escalation_delay: float = 5.0,
+        force: bool = False,
+    ):
+        """
+        Attempt to terminate the subprocess with a given signal.
+
+        If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+        :param signal_to_send: The signal to send initially (default is 
SIGINT).
+        :param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+        :param force: If True, send the signal immediately without escalation.
+        """
         if self._exit_code is not None:
             return
 
-        with suppress(ProcessLookupError):
-            os.kill(self.pid, signal)
+        if not force:
+            with suppress(ProcessLookupError):
+                self._process.send_signal(signal_to_send)
+            return
+
+        # Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+        escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+        if signal_to_send in escalation_path:
+            # Start from `initial_signal`
+            escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+        for sig in escalation_path:
+            try:
+                self._process.send_signal(sig)

Review Comment:
   Following discussion we had in 
https://github.com/apache/airflow/pull/41329#issuecomment-2506804789. ->  
should we create a new process group when we start task via supervisor and send 
the signal to process group ? 
   
   This handles way better situation where one of the child processes does not 
propagate the signals (which happens for example by default in bash).
   
   Typical approach here is to start a new process group with same id as the 
process it starts (setpgid(0,0)) right after we fork from supervisor:
   
   ```python
           # Check if we are not a group leader already (We should not be)
           if os.getpid() != os.getsid(0):
               # and create a new process group where we are the leader
               os.setpgid(0, 0)
   ```
   
   And then instead of sending signal to process you send it to process group
   
   ```python
    os.killpg(gid, signal.SIGTERM)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to