Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil merged PR #44465: URL: https://github.com/apache/airflow/pull/44465 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1864034492 ## task_sdk/tests/execution_time/test_supervisor.py: ## @@ -430,6 +432,216 @@ def test_heartbeat_failures_handling(self, monkeypatch, mocker, captured_logs, t } in captured_logs +class TestWatchedSubprocessKill: +@pytest.fixture +def mock_process(self, mocker): +process = mocker.Mock(spec=psutil.Process) +process.pid = 12345 +return process + +@pytest.fixture +def watched_subprocess(self, mocker, mock_process): +proc = WatchedSubprocess( +ti_id=TI_ID, +pid=12345, +stdin=mocker.Mock(), +client=mocker.Mock(), +process=mock_process, +) +# Mock the selector +mock_selector = mocker.Mock(spec=selectors.DefaultSelector) +mock_selector.select.return_value = [] + +# Set the selector on the process +proc.selector = mock_selector +return proc + +@pytest.mark.parametrize( +["signal_to_send", "wait_side_effect", "expected_signals"], +[ +pytest.param( +signal.SIGINT, +[0], +[signal.SIGINT], +id="SIGINT-success-without-escalation", +), +pytest.param( +signal.SIGINT, +[psutil.TimeoutExpired(0.1), 0], +[signal.SIGINT, signal.SIGTERM], +id="SIGINT-escalates-to-SIGTERM", +), +pytest.param( +signal.SIGINT, +[ +psutil.TimeoutExpired(0.1), # SIGINT times out +psutil.TimeoutExpired(0.1), # SIGTERM times out +0, # SIGKILL succeeds +], +[signal.SIGINT, signal.SIGTERM, signal.SIGKILL], +id="SIGINT-escalates-to-SIGTERM-then-SIGKILL", +), +pytest.param( +signal.SIGTERM, +[ +psutil.TimeoutExpired(0.1), # SIGTERM times out +0, # SIGKILL succeeds +], +[signal.SIGTERM, signal.SIGKILL], +id="SIGTERM-escalates-to-SIGKILL", +), +pytest.param( +signal.SIGKILL, +[0], +[signal.SIGKILL], +id="SIGKILL-success-without-escalation", +), +], +) +def test_force_kill_escalation( +self, +watched_subprocess, +mock_process, +mocker, +signal_to_send, +wait_side_effect, +expected_signals, +captured_logs, +): +"""Test escalation path for SIGINT, SIGTERM, and SIGKILL when force=True.""" +# Mock the process wait method to return the exit code or raise an exception +mock_process.wait.side_effect = wait_side_effect + +watched_subprocess.kill(signal_to_send=signal_to_send, escalation_delay=0.1, force=True) + +# Check that the correct signals were sent +mock_process.send_signal.assert_has_calls([mocker.call(sig) for sig in expected_signals]) + +# Check that the process was waited on for each signal +mock_process.wait.assert_has_calls([mocker.call(timeout=0)] * len(expected_signals)) + +## Validate log messages +# If escalation occurred, we should see a warning log for each signal sent +if len(expected_signals) > 1: +assert { +"event": "Process did not terminate in time; escalating", +"level": "warning", Review Comment: This could be changed to debug too! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1864004089 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) + +self._exit_code = self._process.wait(timeout=escalation_delay) Review Comment: Updated in https://github.com/apache/airflow/pull/44465/commits/1e7dc464f5982e4b7b2b8c3a5f5585d6da53b83d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1864004174 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: Review Comment: sure, changed the approach in https://github.com/apache/airflow/pull/44465/commits/1e7dc464f5982e4b7b2b8c3a5f5585d6da53b83d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
potiuk commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863860334 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: interesting :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863580386 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: https://github.com/giampaolo/psutil/blob/master/psutil/__init__.py#L1278-L1291 `psutil` manages some of that for us -- i.e. we don't have to check it explicitly ourselves (re SIGTERM): ```py def send_signal(self, sig): """Send a signal *sig* to process pre-emptively checking whether PID has been reused (see signal module constants) . On Windows only SIGTERM is valid and is treated as an alias for kill(). """ if POSIX: self._send_signal(sig) else: # pragma: no cover self._raise_if_pid_reused() if sig != signal.SIGTERM and not self.is_running(): msg = "process no longer exists" raise NoSuchProcess(self.pid, self._name, msg=msg) self._proc.send_signal(sig) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863580386 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: https://github.com/giampaolo/psutil/blob/master/psutil/__init__.py#L1278-L1291 psutil manages some of that for us: ```py def send_signal(self, sig): """Send a signal *sig* to process pre-emptively checking whether PID has been reused (see signal module constants) . On Windows only SIGTERM is valid and is treated as an alias for kill(). """ if POSIX: self._send_signal(sig) else: # pragma: no cover self._raise_if_pid_reused() if sig != signal.SIGTERM and not self.is_running(): msg = "process no longer exists" raise NoSuchProcess(self.pid, self._name, msg=msg) self._proc.send_signal(sig) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
potiuk commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863576653 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: Not mentioning fork that also does not **really** work well on MacOS (because on MacOS system libraries might start threads which does not play well with threads https://github.com/python/cpython/issues/77906), so if we would like to have explicit support for "production" MacOS (and later Windows) for "workers" - we will have to make some serious `ifs` anyway. BTW. In Python 3.14 there are some changes coming that **might** finally result in a possibility of using fork() in production for MacOS https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods - maybe we should already add "get_context()" and "set_start_method" to explicity set "fork" on systems that support it (minus MacOS :) - as this will be a breaking change in Python 3.14 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
potiuk commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863576653 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: Not mentioning that also `fork` does not **really** work well on MacOS (because on MacOS system libraries might start threads which does not play well with threads https://github.com/python/cpython/issues/77906), so if we would like to have explicit support for "production" MacOS (and later Windows) for "workers" - we will have to make some serious `ifs` anyway. BTW. In Python 3.14 there are some changes coming that **might** finally result in a possibility of using fork() in production for MacOS https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods - maybe we should already add "get_context()" and "set_start_method" to explicity set "fork" on systems that support it (minus MacOS :) - as this will be a breaking change in Python 3.14 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
potiuk commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863576653 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: Not mentioning fork that also does not **really** work well on MacOS (because on MacOS system libraries might start threads which does not play well with threads https://github.com/python/cpython/issues/77906), so if we would like to have explicit support for MacOS (and later Windows) for "workers" - we will have to make some serious `ifs` anyway. BTW. In Python 3.14 there are some changes coming that **might** finally result in a possibility of using fork() in production for MacOS https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods - maybe we should already add "get_context()" and "set_start_method" to explicity set "fork" on systems that support it (minus MacOS :) - as this will be a breaking change in Python 3.14 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
potiuk commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863576653 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: Not mentioning fork that also does not **really** work well on MacOS (because on MacOS system libraries might start threads which does not play well with threads https://github.com/python/cpython/issues/77906), so if we would like to have explicit support for MacOS (and later Windows) for "workers" - we will have to make some serious ifs BTW. In Python 3.14 there are some changes coming that **might** finally result in a possibility of using fork() in production for MacOS https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods - maybe we should already add "get_context()" and "set_start_method" to explicity set "fork" on systems that support it (minus MacOS :) - as this will be a breaking change in Python 3.14 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
potiuk commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863576653 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: Not mentioning fork that also deoes not **really** work well on MacOS (because on MacOS system libraries might start threads which does not play well with threads https://github.com/python/cpython/issues/77906), so if we would like to have explicit support for MacOS (and later Windows) for "workers" - we will have to make some serious ifs BTW. In Python 3.14 there are some changes coming that **might** finally result in a possibility of using fork() in production for MacOS https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods - maybe we should already add "get_context()" and "set_start_method" to explicity set "fork" on systems that support it (minus MacOS :) - as this will be a breaking change in Python 3.14 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
potiuk commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863564022 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: > Though we should try and support non-UNIX (i.e. Windows) which doesn't have process groups. SIGTERM does not work on Windows either, so we will have to do it completely differently if we want to support Windows. See for example here: https://stackoverflow.com/questions/47306805/signal-sigterm-not-received-by-subprocess-on-windows -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
ashb commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863177225 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: Review Comment: That's fine I think, as SIGKILL is uncatchable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
ashb commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863171511 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: Though we should try and support non-UNIX (i.e. Windows) which doesn't have process groups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
ashb commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863168995 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: Yes we should create a new process group, there is already a todo for that, so that will be in a different PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
ashb commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863167664 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) + +self._exit_code = self._process.wait(timeout=escalation_delay) Review Comment: No not really. The flow could be something like this: 1. We send SIGINT to the process. 2. It catches it, and send a log message 3. The first call to `self.selector.select` would catch that, then enter wait 4. The subprocess now sends a request to, for example, update an XCom value, and blocks waiting to read the response 5. The supervisor is blocking in wait 6. Supervisor wait times out, we then send sigkill. Remember, the socket being closed (which also happens when the process is hard killed with kill -9) counts as an event in the selector. We want to call this again. ```python events = self.selector.select(timeout=escalation_delay) self._process_file_object_events(events) self._check_subprocess_exit() ``` and I suspect all three of those calls should be moved into `_process_file_object_events` and it renamed, so that the call is something like: ```python def _service_subprocess(max_time: float): events = self.selector.select(timeout=max_time) for key, _ in events: socket_handler = key.data self._check_subprocess_exit() ``` That way `_monitor_subprocess` and `kill` can both simply call `_service_subprocess`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
potiuk commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863114700 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: Following discussion we had in https://github.com/apache/airflow/pull/41329#issuecomment-2506804789. -> should we create a new process group when we start task via supervisor and send the signal to process group ? This handles way better situation where one of the child processes does not propagate the signals (which happens for example by default in `bash`). Typical approach here is to start a new process group with same id as the process it starts (setpgid(0,0)) right after we fork from supervisor: ```python # Check if we are not a group leader already (We should not be) if os.getpid() != os.getsid(0): # and create a new process group where we are the leader os.setpgid(0, 0) ``` And then instead of sending signal to process you send it to process group ```python os.killpg(gid, signal.SIGTERM) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
potiuk commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863120669 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: This is for example what Ctrl-C does by default when you have interactive process - the TERM signal is sent to the foreground process group that shell sets for the foreground process - not **just** to the process. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
potiuk commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1863114700 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +force: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param force: If True, send the signal immediately without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if not force: +with suppress(ProcessLookupError): +self._process.send_signal(signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +self._process.send_signal(sig) Review Comment: Following discussion we had in https://github.com/apache/airflow/pull/41329#issuecomment-2506804789. -> should we create a new process group when we start task via supervisor and send the signal to process group ? This handles way better situation where one of the child processes does not propagate the signals (which happens for example by default in bash). Typical approach here is to start a new process group with same id as the process it starts (setpgid(0,0)) right after we fork from supervisor: ```python # Check if we are not a group leader already (We should not be) if os.getpid() != os.getsid(0): # and create a new process group where we are the leader os.setpgid(0, 0) ``` And then instead of sending signal to process you send it to process group ```python os.killpg(gid, signal.SIGTERM) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862814924 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) + +self._exit_code = self._process.wait(timeout=escalation_delay) Review Comment: Does https://github.com/apache/airflow/pull/44465/commits/21fa9cbe3313fbca77964d1f83853c8f823b246d work? or do you think we should move it after `wait`? I did it before assuming most process would end already when `self._process.wait` is returned/raise exception -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862814924 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) + +self._exit_code = self._process.wait(timeout=escalation_delay) Review Comment: does https://github.com/apache/airflow/pull/44465/commits/21fa9cbe3313fbca77964d1f83853c8f823b246d work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862802176 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: Review Comment: it is also called here though: https://github.com/apache/airflow/blob/840018a4264a841acbd41ca72f07938331abd3b8/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L330-L339 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862773600 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) Review Comment: oh nice, done in https://github.com/apache/airflow/pull/44465/commits/e6105241a0d56d4fac90cdfe96e75e379551ebd1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
ashb commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862732883 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: Review Comment: This "should" be impossible to hit, assuming it's from the wait, not the os.kill call, as even if the process has exited, the process hangs around as a Zombie (on linux this shows up as `` in `ps aux`) until something `wait`s on it. And as long as our process is alive, nothing else should wait on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
ashb commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862731293 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) Review Comment: In which case, use `self._process.send_singal(sig)` -- that does the same https://github.com/giampaolo/psutil/blob/master/psutil/__init__.py#L1278-L1291 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862730091 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) + +self._exit_code = self._process.wait(timeout=escalation_delay) +log.debug("Process exited", pid=self.pid, exit_code=self._exit_code, signal=sig.name) +return +except psutil.TimeoutExpired: +log.warning("Process did not terminate in time; escalating", pid=self.pid, signal=sig.name) +except psutil.NoSuchProcess: +log.debug("Process already terminated", pid=self.pid) +self._exit_code = 0 Review Comment: https://github.com/apache/airflow/pull/44465/commits/63ac85ef632c26a3a5b2f989a2c5cb804cebd3b2 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: Review Comment: https://github.com/apache/airflow/pull/44465/commits/63ac85ef632c26a3a5b2f989a2c5cb804cebd3b2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862728650 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: Review Comment: I went back and forth with it -- as i was playing around with the default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862727780 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) + +self._exit_code = self._process.wait(timeout=escalation_delay) +log.debug("Process exited", pid=self.pid, exit_code=self._exit_code, signal=sig.name) +return +except psutil.TimeoutExpired: +log.warning("Process did not terminate in time; escalating", pid=self.pid, signal=sig.name) +except psutil.NoSuchProcess: +log.debug("Process already terminated", pid=self.pid) +self._exit_code = 0 Review Comment: hmm, my initial thought was that process could have terminated, but yeah that's an assumption -- will change it to -1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862726361 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) Review Comment: We could get some free niceties by directly calling `process.{kill,terminate}` -- some additional checks and cross-platform : https://github.com/giampaolo/psutil/blob/master/psutil/__init__.py#L1326-L1334 ```py def kill(self): """Kill the current process with SIGKILL pre-emptively checking whether PID has been reused. """ if POSIX: self._send_signal(signal.SIGKILL) else: # pragma: no cover self._raise_if_pid_reused() self._proc.kill() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
kaxil commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862726361 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) Review Comment: We could get some free goodies by directly calling `process.{kill,terminate}` -- some additional checks and cross-platform : https://github.com/giampaolo/psutil/blob/master/psutil/__init__.py#L1326-L1334 ```py def kill(self): """Kill the current process with SIGKILL pre-emptively checking whether PID has been reused. """ if POSIX: self._send_signal(signal.SIGKILL) else: # pragma: no cover self._raise_if_pid_reused() self._proc.kill() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]
ashb commented on code in PR #44465: URL: https://github.com/apache/airflow/pull/44465#discussion_r1862719032 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: Review Comment: Lets call this `force` ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] +if signal_to_send in escalation_path: +# Start from `initial_signal` +escalation_path = escalation_path[escalation_path.index(signal_to_send) :] + +for sig in escalation_path: +try: +if sig == signal.SIGKILL: +self._process.kill() +elif sig == signal.SIGTERM: +self._process.terminate() +else: +os.kill(self.pid, sig) + +self._exit_code = self._process.wait(timeout=escalation_delay) Review Comment: We probably want to service the sockets while we are waiting to kill it (collect any logs, and it may well be try to report the final state of the task before it exits which we will need to handle!) ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: str | os.PathLike[str], self.stdin.write(msg.model_dump_json().encode()) self.stdin.write(b"\n") -def kill(self, signal: signal.Signals = signal.SIGINT): +def kill( +self, +signal_to_send: signal.Signals = signal.SIGINT, +escalation_delay: float = 5.0, +bypass_escalation: bool = False, +): +""" +Attempt to terminate the subprocess with a given signal. + +If the process does not exit within `escalation_delay` seconds, escalate to SIGTERM and eventually SIGKILL if necessary. + +:param signal_to_send: The signal to send initially (default is SIGINT). +:param escalation_delay: Time in seconds to wait before escalating to a stronger signal. +:param bypass_escalation: If True, send the signal directly to the process without escalation. +""" if self._exit_code is not None: return -with suppress(ProcessLookupError): -os.kill(self.pid, signal) +if bypass_escalation: +with suppress(ProcessLookupError): +os.kill(self.pid, signal_to_send) +return + +# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL +