This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7e91517ffd0 Fail closed when supervisor IPC fails on a non-success
terminal state (#66573)
7e91517ffd0 is described below
commit 7e91517ffd01f0608f21349ad8aa86595617d220
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue May 19 15:40:18 2026 +0200
Fail closed when supervisor IPC fails on a non-success terminal state
(#66573)
* Fail closed when supervisor IPC fails on a non-success terminal state
When a task FAILED / SKIPPED / etc. and the IPC send of the
terminal-state message to the supervisor itself raised, the existing
finally block logged the exception and let run() return normally.
The task subprocess then exited with code 0, which the supervisor
final_state property maps to SUCCESS for an exit_code-0 process
without a _terminal_state (the supervisor never received the
message). A genuine task FAILURE was silently being upgraded to
SUCCESS on transient IPC failures, breaking downstream pipeline
correctness without any signal.
Exit non-zero from the finally block when the terminal state is
anything other than SUCCESS, so the supervisor's final_state
correctly classifies as FAILED (or UP_FOR_RETRY when retries are
configured). The SUCCESS exemption preserves the existing softening
for the legitimate scenario where the supervisor rejects the
terminal-state send with a 409 because the server already
terminalised the TI -- covered by
test_run_swallows_supervisor_terminal_send_failure, which continues
to pass.
New regression test: when the task fails and the supervisor IPC send
raises (BrokenPipeError simulating a dead Unix socket), run() now
raises SystemExit(1).
Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-006).
* Address review comments: defer fail-closed exit, narrow guard
- Narrow the fail-closed guard from `state != SUCCESS` to FAILED /
UP_FOR_RETRY only (kaxil). SKIPPED / UP_FOR_RESCHEDULE / DEFERRED
would otherwise be mismapped to FAILED by supervisor's final_state,
which is strictly worse than the default mapping.
- Stop calling sys.exit(1) inside run()'s finally — SystemExit is a
BaseException so main()'s `except Exception:` would not catch it
and finalize() would be skipped, silently dropping
on_failure_callback / on_retry_callback / listener hooks /
email_on_failure / email_on_retry on the same IPC failure (kaxil).
Signal via `_terminal_state_send_failed` on the ti and let main()
sys.exit(1) after finalize() has run.
- Remove the redundant inline `from ... import run` in the test
(Lee-W, kaxil) — `run` is already imported at module level.
- Rework the existing regression test to assert the new contract
(run() returns FAILED + sets the flag) and add a listener-based
test that locks in callbacks/listeners still firing on the
IPC-broken path (kaxil).
* Declare _terminal_state_send_failed on RuntimeTaskInstance for mypy
The fail-closed path in _handle_current_task_failure set
ti._terminal_state_send_failed = True
dynamically without a class-level declaration, so mypy raised
[attr-defined].
Add the field as a Pydantic PrivateAttr (bool, default False) matching the
existing _cached_template_context pattern. No runtime behavior change —
getattr(ti, '_terminal_state_send_failed', False) still returns False for
the unset case, now via the PrivateAttr default instead of the getattr
fallback.
---------
Co-authored-by: vatsrahul1001 <[email protected]>
---
.../src/airflow/sdk/execution_time/task_runner.py | 28 +++++
.../task_sdk/execution_time/test_task_runner.py | 127 +++++++++++++++++++++
2 files changed, 155 insertions(+)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 9cb766c9b2d..61e476e60b9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -183,6 +183,9 @@ class RuntimeTaskInstance(TaskInstance):
_cached_template_context: Context | None = None
"""The Task Instance context. This is used to cache
get_template_context."""
+ _terminal_state_send_failed: bool = False
+ """True when the supervisor IPC send for a non-success terminal state
raised; signals main() to sys.exit(1) after finalize() so the supervisor
doesn't misclassify the run as SUCCESS via exit code 0."""
+
_ti_context_from_server: Annotated[TIRunContext | None, Field(repr=False)]
= None
"""The Task Instance context from the API server, if any."""
@@ -1429,6 +1432,26 @@ def run(
"Failed to report terminal task state to supervisor",
state=state.value,
)
+ # Fail closed for FAILED / UP_FOR_RETRY: when the supervisor
+ # never receives the terminal-state message, exiting 0 would
+ # let the supervisor's final_state property default to
+ # SUCCESS (exit_code == 0 with no _terminal_state set),
+ # turning a real failure into a silent data-quality bug for
+ # every downstream task. We signal main() to sys.exit(1)
+ # AFTER finalize() runs, so on_failure_callback /
+ # on_retry_callback / listener hooks / email_on_failure /
+ # email_on_retry still fire. sys.exit(1) directly here would
+ # raise SystemExit, which is BaseException, not Exception —
+ # main()'s `except Exception:` would not catch it and
+ # finalize() at the call site would be skipped.
+ #
+ # SKIPPED / UP_FOR_RESCHEDULE / DEFERRED are intentionally
+ # not fail-closed: supervisor's final_state would misclassify
+ # them too, but exiting non-zero would map them to FAILED,
+ # which is strictly worse than the default. Those need a
+ # separate fix in supervisor's final_state.
+ if state in (TaskInstanceState.FAILED,
TaskInstanceState.UP_FOR_RETRY):
+ ti._terminal_state_send_failed = True
# Return the message to make unit tests easier too
ti.state = state
@@ -2030,6 +2053,11 @@ def main():
state, _, error = run(ti, context, log)
context["exception"] = error
finalize(ti, state, context, log, error)
+ # If run() couldn't deliver a FAILED / UP_FOR_RETRY terminal
+ # state to the supervisor, fail closed now — finalize() has
+ # already run, so callbacks and listeners observed the state.
+ if getattr(ti, "_terminal_state_send_failed", False):
+ sys.exit(1)
except KeyboardInterrupt:
log.exception("Ctrl-c hit")
sys.exit(2)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 54afc412f56..d7b72b2c48f 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -475,6 +475,86 @@ def
test_run_swallows_supervisor_terminal_send_failure(create_runtime_ti, mock_s
assert error is None
+def
test_run_signals_fail_closed_when_failure_terminal_send_fails(create_runtime_ti,
mock_supervisor_comms):
+ """
+ When the task FAILS and the terminal-state send to the supervisor fails too
+ (e.g. broken Unix socket / supervisor crashed / IPC channel dead), `run()`
+ must signal to main() that the process should exit non-zero — otherwise
+ the supervisor's `final_state` property defaults exit_code-0-with-no-
+ terminal-state to SUCCESS, turning a transient IPC blip into a silent
+ data-quality bug downstream.
+
+ The signal is deferred to main() (via `_terminal_state_send_failed` on the
+ ti) so finalize() still runs first — on_failure_callback / listener hooks /
+ email_on_failure must observe the FAILED state before the process exits.
+ """
+
+ class FailingOperator(BaseOperator):
+ def execute(self, context):
+ raise RuntimeError("task body failed")
+
+ task = FailingOperator(task_id="failing")
+ runtime_ti = create_runtime_ti(task=task)
+ context = runtime_ti.get_template_context()
+ log = mock.MagicMock()
+
+ # Let the terminal-state send raise an IPC-level failure.
+ def send_side_effect(msg=None, **kwargs):
+ if isinstance(msg, TaskState):
+ raise BrokenPipeError("supervisor IPC broken")
+ return mock.DEFAULT
+
+ mock_supervisor_comms.send.side_effect = send_side_effect
+
+ # run() must not raise — fail-closed is signalled via the ti attribute
+ # so main() can sys.exit(1) only after finalize() has run.
+ state, _, _ = run(runtime_ti, context, log)
+
+ assert state == TaskInstanceState.FAILED
+ assert runtime_ti._terminal_state_send_failed is True
+
+
[email protected](
+ ("state_when_send_fails", "should_fail_closed"),
+ [
+ (TaskInstanceState.SUCCESS, False),
+ (TaskInstanceState.SKIPPED, False),
+ ],
+)
+def test_run_does_not_signal_fail_closed_for_non_failed_states(
+ create_runtime_ti, mock_supervisor_comms, state_when_send_fails,
should_fail_closed
+):
+ """
+ Only FAILED / UP_FOR_RETRY are fail-closed. SUCCESS is exempt (the existing
+ 409-rejection softening). SKIPPED is also exempt: supervisor's final_state
+ misclassifies it either way, and exiting non-zero would map it to FAILED,
+ which is strictly worse than the default mapping.
+ """
+
+ class Op(BaseOperator):
+ def execute(self, context):
+ if state_when_send_fails == TaskInstanceState.SKIPPED:
+ raise AirflowSkipException("skip")
+ return "ok"
+
+ task = Op(task_id="op")
+ runtime_ti = create_runtime_ti(task=task)
+ context = runtime_ti.get_template_context()
+ log = mock.MagicMock()
+
+ def send_side_effect(msg=None, **kwargs):
+ if isinstance(msg, (TaskState, SucceedTask)):
+ raise BrokenPipeError("supervisor IPC broken")
+ return mock.DEFAULT
+
+ mock_supervisor_comms.send.side_effect = send_side_effect
+
+ state, _, _ = run(runtime_ti, context, log)
+
+ assert state == state_when_send_fails
+ assert getattr(runtime_ti, "_terminal_state_send_failed", False) is
should_fail_closed
+
+
def test_task_span_is_child_of_dag_run_span(make_ti_context):
"""Full trace hierarchy: dag_run → task_run.my_task (API server) →
worker.my_task (task runner)."""
# Single provider shared by all spans so contexts are compatible.
@@ -4076,6 +4156,53 @@ class TestTaskRunnerCallsListeners:
assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.FAILED]
assert listener.error == error
+ def test_task_runner_calls_listeners_failed_when_terminal_send_fails(
+ self, mocked_parse, mock_supervisor_comms, listener_manager
+ ):
+ """Callbacks/listeners must still fire when the FAILED terminal-state
+ IPC send to the supervisor fails. The fail-closed exit is deferred to
+ main() (signalled via `_terminal_state_send_failed` on the ti) so
+ finalize() runs first.
+ """
+ listener = self.CustomListener()
+ listener_manager(listener)
+
+ class CustomOperator(BaseOperator):
+ def execute(self, context):
+ raise RuntimeError("task body failed")
+
+ task = CustomOperator(task_id="failing_with_broken_ipc")
+ dag = get_inline_dag(dag_id="test_dag", task=task)
+ ti = TaskInstance(
+ id=uuid7(),
+ task_id=task.task_id,
+ dag_id=dag.dag_id,
+ run_id="test_run",
+ try_number=1,
+ dag_version_id=uuid7(),
+ )
+
+ runtime_ti = RuntimeTaskInstance.model_construct(
+ **ti.model_dump(exclude_unset=True), task=task,
start_date=timezone.utcnow()
+ )
+
+ def send_side_effect(msg=None, **kwargs):
+ if isinstance(msg, TaskState):
+ raise BrokenPipeError("supervisor IPC broken")
+ return mock.DEFAULT
+
+ mock_supervisor_comms.send.side_effect = send_side_effect
+
+ log = mock.MagicMock()
+ context = runtime_ti.get_template_context()
+ state, _, error = run(runtime_ti, context, log)
+ finalize(runtime_ti, state, context, log, error)
+
+ assert state == TaskInstanceState.FAILED
+ assert runtime_ti._terminal_state_send_failed is True
+ assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.FAILED]
+ assert listener.error == error
+
def test_task_runner_calls_listeners_skipped(self, mocked_parse,
mock_supervisor_comms, listener_manager):
listener = self.CustomListener()
listener_manager(listener)