This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6137c94e0e9 Add callback_execution_timeout config for deadline
callbacks (#66609)
6137c94e0e9 is described below
commit 6137c94e0e9203d26c4df2697147a66b73e8b5d7
Author: Sean Ghaeli <[email protected]>
AuthorDate: Mon Jun 1 10:00:21 2026 -0700
Add callback_execution_timeout config for deadline callbacks (#66609)
Introduces a new [deadline] callback_execution_timeout configuration
option that sets a maximum duration for deadline callback execution.
When set, callbacks that exceed this timeout are terminated. Defaults
to disabled (no timeout) to preserve existing behavior.
---
.../src/airflow/config_templates/config.yml | 15 +++++
.../sdk/execution_time/callback_supervisor.py | 26 +++++++++
.../execution_time/test_callback_supervisor.py | 66 ++++++++++++++++++++++
3 files changed, 107 insertions(+)
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index ae47f132b08..0852865cabb 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -3197,3 +3197,18 @@ profiling:
type: boolean
example: ~
default: "False"
+
+callbacks:
+ description: |
+ Configuration for callbacks (deadline alerts, etc.).
+ options:
+ callback_execution_timeout:
+ description: |
+ Maximum execution time in seconds for deadline callbacks.
+ Set to a positive integer to enable. When a callback exceeds this
duration,
+ it is terminated with SIGTERM followed by SIGKILL if it does not exit
within 5 seconds.
+ 0 means no timeout (default).
+ version_added: 3.3.0
+ type: integer
+ example: "300"
+ default: "0"
diff --git a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
index 579833f413d..7679c2328f9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
@@ -18,6 +18,7 @@
from __future__ import annotations
+import signal
import sys
import time
from importlib import import_module
@@ -246,16 +247,41 @@ class CallbackSubprocess(WatchedSubprocess):
self._exit_code = self._exit_code if self._exit_code is not None else 1
return self._exit_code
+ def _get_callback_execution_timeout(self) -> int:
+ """Read the callback_execution_timeout config value."""
+ from airflow.sdk.configuration import conf
+
+ return conf.getint("callbacks", "callback_execution_timeout",
fallback=0)
+
def _monitor_subprocess(self):
"""
Monitor the subprocess until it exits.
A simplified version of ActivitySubprocess._monitor_subprocess()
without heartbeating
or timeout handling, just process output monitoring and stuck-socket
cleanup.
+
+ If ``[callbacks] callback_execution_timeout`` is set to a positive
value, the subprocess
+ is killed after that many seconds.
"""
+ timeout = self._get_callback_execution_timeout()
+ start_monotonic = time.monotonic()
+
while self._exit_code is None or self._open_sockets:
self._service_subprocess(max_wait_time=MIN_HEARTBEAT_INTERVAL)
+ # Enforce execution timeout if configured.
+ if timeout > 0 and self._exit_code is None:
+ elapsed = time.monotonic() - start_monotonic
+ if elapsed > timeout:
+ log.warning(
+ "Callback execution timeout exceeded; terminating
subprocess",
+ pid=self.pid,
+ timeout_seconds=timeout,
+ elapsed_seconds=elapsed,
+ )
+ self.kill(signal.SIGTERM, escalation_delay=5.0, force=True)
+ break
+
# If the process has exited but sockets remain open, apply a
timeout
# to prevent hanging indefinitely on stuck sockets.
if self._exit_code is not None and self._open_sockets:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
index 523a7d0c604..b0f82cd37da 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
@@ -19,6 +19,7 @@
from __future__ import annotations
+import signal
import socket
from dataclasses import dataclass
from operator import attrgetter
@@ -239,3 +240,68 @@ class TestCallbackHandleRequest:
if client_mock:
mock_client_method.assert_called_once_with(*client_mock.args,
**client_mock.kwargs)
+
+
+class TestCallbackExecutionTimeout:
+ """Tests for the callback_execution_timeout config enforcement."""
+
+ @pytest.fixture
+ def callback_subprocess(self, mocker):
+ read_end, write_end = socket.socketpair()
+ proc = CallbackSubprocess(
+ process_log=mocker.MagicMock(),
+ id="12345678-1234-5678-1234-567812345678",
+ pid=12345,
+ stdin=write_end,
+ client=mocker.Mock(),
+ process=mocker.Mock(),
+ )
+ yield proc
+ read_end.close()
+ write_end.close()
+
+ def test_timeout_zero_does_not_kill(self, callback_subprocess, mocker):
+ """When timeout=0, no kill is issued regardless of how long the
subprocess runs."""
+ proc = callback_subprocess
+
+ # Simulate subprocess exiting normally after some iterations
+ call_count = 0
+
+ def fake_service_subprocess(self_arg, **kwargs):
+ nonlocal call_count
+ call_count += 1
+ if call_count >= 3:
+ proc._exit_code = 0
+ return proc._exit_code
+
+ mocker.patch.object(
+ CallbackSubprocess, "_service_subprocess", autospec=True,
side_effect=fake_service_subprocess
+ )
+ mocker.patch.object(
+ CallbackSubprocess, "_get_callback_execution_timeout",
autospec=True, return_value=0
+ )
+ mock_kill = mocker.patch.object(CallbackSubprocess, "kill",
autospec=True)
+
+ proc._monitor_subprocess()
+
+ mock_kill.assert_not_called()
+ assert proc._exit_code == 0
+
+ def test_timeout_kills_long_running_subprocess(self, callback_subprocess,
mocker):
+ """When timeout>0 and the subprocess exceeds the timeout, it is
killed."""
+ proc = callback_subprocess
+
+ # Simulate time progressing beyond the timeout
+ time_values = iter([100.0, 100.0, 106.0]) # start, start, elapsed >
5s timeout
+
mocker.patch("airflow.sdk.execution_time.callback_supervisor.time.monotonic",
side_effect=time_values)
+
+ # Subprocess never exits on its own
+ mocker.patch.object(CallbackSubprocess, "_service_subprocess",
autospec=True, return_value=None)
+ mocker.patch.object(
+ CallbackSubprocess, "_get_callback_execution_timeout",
autospec=True, return_value=5
+ )
+ mock_kill = mocker.patch.object(CallbackSubprocess, "kill",
autospec=True)
+
+ proc._monitor_subprocess()
+
+ mock_kill.assert_called_once_with(proc, signal.SIGTERM,
escalation_delay=5.0, force=True)