This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6137c94e0e9 Add callback_execution_timeout config for deadline 
callbacks (#66609)
6137c94e0e9 is described below

commit 6137c94e0e9203d26c4df2697147a66b73e8b5d7
Author: Sean Ghaeli <[email protected]>
AuthorDate: Mon Jun 1 10:00:21 2026 -0700

    Add callback_execution_timeout config for deadline callbacks (#66609)
    
    Introduces a new [deadline] callback_execution_timeout configuration
    option that sets a maximum duration for deadline callback execution.
    When set, callbacks that exceed this timeout are terminated. Defaults
    to disabled (no timeout) to preserve existing behavior.
---
 .../src/airflow/config_templates/config.yml        | 15 +++++
 .../sdk/execution_time/callback_supervisor.py      | 26 +++++++++
 .../execution_time/test_callback_supervisor.py     | 66 ++++++++++++++++++++++
 3 files changed, 107 insertions(+)

diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index ae47f132b08..0852865cabb 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -3197,3 +3197,18 @@ profiling:
       type: boolean
       example: ~
       default: "False"
+
+callbacks:
+  description: |
+    Configuration for callbacks (deadline alerts, etc.).
+  options:
+    callback_execution_timeout:
+      description: |
+        Maximum execution time in seconds for deadline callbacks.
+        Set to a positive integer to enable. When a callback exceeds this 
duration,
+        it is terminated with SIGTERM followed by SIGKILL if it does not exit 
within 5 seconds.
+        0 means no timeout (default).
+      version_added: 3.3.0
+      type: integer
+      example: "300"
+      default: "0"
diff --git a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
index 579833f413d..7679c2328f9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
@@ -18,6 +18,7 @@
 
 from __future__ import annotations
 
+import signal
 import sys
 import time
 from importlib import import_module
@@ -246,16 +247,41 @@ class CallbackSubprocess(WatchedSubprocess):
         self._exit_code = self._exit_code if self._exit_code is not None else 1
         return self._exit_code
 
+    def _get_callback_execution_timeout(self) -> int:
+        """Read the callback_execution_timeout config value."""
+        from airflow.sdk.configuration import conf
+
+        return conf.getint("callbacks", "callback_execution_timeout", 
fallback=0)
+
     def _monitor_subprocess(self):
         """
         Monitor the subprocess until it exits.
 
         A simplified version of ActivitySubprocess._monitor_subprocess() 
without heartbeating
         or timeout handling, just process output monitoring and stuck-socket 
cleanup.
+
+        If ``[callbacks] callback_execution_timeout`` is set to a positive 
value, the subprocess
+        is killed after that many seconds.
         """
+        timeout = self._get_callback_execution_timeout()
+        start_monotonic = time.monotonic()
+
         while self._exit_code is None or self._open_sockets:
             self._service_subprocess(max_wait_time=MIN_HEARTBEAT_INTERVAL)
 
+            # Enforce execution timeout if configured.
+            if timeout > 0 and self._exit_code is None:
+                elapsed = time.monotonic() - start_monotonic
+                if elapsed > timeout:
+                    log.warning(
+                        "Callback execution timeout exceeded; terminating 
subprocess",
+                        pid=self.pid,
+                        timeout_seconds=timeout,
+                        elapsed_seconds=elapsed,
+                    )
+                    self.kill(signal.SIGTERM, escalation_delay=5.0, force=True)
+                    break
+
             # If the process has exited but sockets remain open, apply a 
timeout
             # to prevent hanging indefinitely on stuck sockets.
             if self._exit_code is not None and self._open_sockets:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
index 523a7d0c604..b0f82cd37da 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
@@ -19,6 +19,7 @@
 
 from __future__ import annotations
 
+import signal
 import socket
 from dataclasses import dataclass
 from operator import attrgetter
@@ -239,3 +240,68 @@ class TestCallbackHandleRequest:
 
         if client_mock:
             mock_client_method.assert_called_once_with(*client_mock.args, 
**client_mock.kwargs)
+
+
+class TestCallbackExecutionTimeout:
+    """Tests for the callback_execution_timeout config enforcement."""
+
+    @pytest.fixture
+    def callback_subprocess(self, mocker):
+        read_end, write_end = socket.socketpair()
+        proc = CallbackSubprocess(
+            process_log=mocker.MagicMock(),
+            id="12345678-1234-5678-1234-567812345678",
+            pid=12345,
+            stdin=write_end,
+            client=mocker.Mock(),
+            process=mocker.Mock(),
+        )
+        yield proc
+        read_end.close()
+        write_end.close()
+
+    def test_timeout_zero_does_not_kill(self, callback_subprocess, mocker):
+        """When timeout=0, no kill is issued regardless of how long the 
subprocess runs."""
+        proc = callback_subprocess
+
+        # Simulate subprocess exiting normally after some iterations
+        call_count = 0
+
+        def fake_service_subprocess(self_arg, **kwargs):
+            nonlocal call_count
+            call_count += 1
+            if call_count >= 3:
+                proc._exit_code = 0
+            return proc._exit_code
+
+        mocker.patch.object(
+            CallbackSubprocess, "_service_subprocess", autospec=True, 
side_effect=fake_service_subprocess
+        )
+        mocker.patch.object(
+            CallbackSubprocess, "_get_callback_execution_timeout", 
autospec=True, return_value=0
+        )
+        mock_kill = mocker.patch.object(CallbackSubprocess, "kill", 
autospec=True)
+
+        proc._monitor_subprocess()
+
+        mock_kill.assert_not_called()
+        assert proc._exit_code == 0
+
+    def test_timeout_kills_long_running_subprocess(self, callback_subprocess, 
mocker):
+        """When timeout>0 and the subprocess exceeds the timeout, it is 
killed."""
+        proc = callback_subprocess
+
+        # Simulate time progressing beyond the timeout
+        time_values = iter([100.0, 100.0, 106.0])  # start, start, elapsed > 
5s timeout
+        
mocker.patch("airflow.sdk.execution_time.callback_supervisor.time.monotonic", 
side_effect=time_values)
+
+        # Subprocess never exits on its own
+        mocker.patch.object(CallbackSubprocess, "_service_subprocess", 
autospec=True, return_value=None)
+        mocker.patch.object(
+            CallbackSubprocess, "_get_callback_execution_timeout", 
autospec=True, return_value=5
+        )
+        mock_kill = mocker.patch.object(CallbackSubprocess, "kill", 
autospec=True)
+
+        proc._monitor_subprocess()
+
+        mock_kill.assert_called_once_with(proc, signal.SIGTERM, 
escalation_delay=5.0, force=True)

Reply via email to