This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 033aeb1a7c1 Upload task logs even when final state update fails
(#67935)
033aeb1a7c1 is described below
commit 033aeb1a7c1d0d2e58756014a660b76047e8bd89
Author: manipatnam <[email protected]>
AuthorDate: Tue Jun 16 04:09:37 2026 +0530
Upload task logs even when final state update fails (#67935)
When a task subprocess exits, the supervisor updates the task's terminal
state and then uploads the remote logs. If the state update raised (e.g. a
transient API error), log upload was skipped entirely — yet a failed state
update is exactly when the logs are most needed for debugging.
Run the log upload in a `finally` block so it always happens, while the
original state-update exception still propagates to the caller.
Co-authored-by: AI Assistant <[email protected]>
---
.../src/airflow/sdk/execution_time/supervisor.py | 12 +++++----
.../task_sdk/execution_time/test_supervisor.py | 31 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 3ab7eb1f8d6..c4b9c72420a 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1422,11 +1422,13 @@ class ActivitySubprocess(WatchedSubprocess):
# If it hasn't, assume it's failed
self._exit_code = self._exit_code if self._exit_code is not None else 1
- self.update_task_state_if_needed()
-
- # Now at the last possible moment, when all logs and comms with the
subprocess has finished, lets
- # upload the remote logs
- self._upload_logs()
+ try:
+ self.update_task_state_if_needed()
+ finally:
+ # Now at the last possible moment, when all logs and comms with
the subprocess has finished,
+ # lets upload the remote logs. Run this in a `finally` so the logs
are uploaded even if the
+ # state update above raised — a failed state update is exactly
when the logs matter most.
+ self._upload_logs()
return self._exit_code
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 41a16161606..57ac9f39b50 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -3580,6 +3580,37 @@ def test_log_upload_failures_are_non_fatal(mocker):
)
+def test_logs_uploaded_even_when_state_update_fails(mocker):
+ """`wait()` must upload remote logs even if the final state update raises.
+
+ A failed state update (e.g. a transient API error) is exactly when the logs
+ matter most for debugging, so `_upload_logs()` runs in a `finally` block
and
+ the original exception still propagates to the caller.
+ """
+ proc = ActivitySubprocess(
+ process_log=mocker.MagicMock(),
+ id=TI_ID,
+ pid=12345,
+ stdin=mocker.MagicMock(),
+ client=mocker.MagicMock(),
+ process=mocker.MagicMock(),
+ )
+ # Leave `_exit_code` unset so `wait()` doesn't short-circuit; the no-op
+ # `_monitor_subprocess` mock leaves it as None and `wait()` defaults it to
1.
+ mocker.patch.object(ActivitySubprocess, "_monitor_subprocess")
+ mocker.patch.object(
+ ActivitySubprocess,
+ "update_task_state_if_needed",
+ side_effect=httpx.ConnectError("connection refused"),
+ )
+ upload_logs = mocker.patch.object(ActivitySubprocess, "_upload_logs")
+
+ with pytest.raises(httpx.ConnectError):
+ proc.wait()
+
+ upload_logs.assert_called_once_with()
+
+
def test_remote_logging_conn_sets_process_context(monkeypatch, mocker):
"""
Test that _remote_logging_conn sets _AIRFLOW_PROCESS_CONTEXT=client.