This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new 34466ce8833 [v3-0-test] Fix: Consistent no-log handling for tasks with
try_number=0 in API and UI (#54749) (#55035) (#55162)
34466ce8833 is described below
commit 34466ce8833a75d0d14901bdb5e95a8ef6c96564
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Sep 3 15:40:36 2025 +0300
[v3-0-test] Fix: Consistent no-log handling for tasks with try_number=0 in
API and UI (#54749) (#55035) (#55162)
(cherry picked from commit 22677defbeac3f952e337d98b8abdee42cc89fd2)
Co-authored-by: Kumbha Lakshmi Narayana
<[email protected]>
---
airflow-core/src/airflow/utils/log/log_reader.py | 31 ++++++++++++
.../tests/unit/utils/log/test_log_reader.py | 57 ++++++++++++++++++++++
2 files changed, 88 insertions(+)
diff --git a/airflow-core/src/airflow/utils/log/log_reader.py
b/airflow-core/src/airflow/utils/log/log_reader.py
index 9f61c2f730c..0ee0047aee0 100644
--- a/airflow-core/src/airflow/utils/log/log_reader.py
+++ b/airflow-core/src/airflow/utils/log/log_reader.py
@@ -19,11 +19,13 @@ from __future__ import annotations
import logging
import time
from collections.abc import Generator, Iterator
+from datetime import datetime, timezone
from functools import cached_property
from typing import TYPE_CHECKING
from airflow.configuration import conf
from airflow.utils.helpers import render_log_filename
+from airflow.utils.log.file_task_handler import StructuredLogMessage
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState
@@ -50,6 +52,24 @@ class TaskLogReader:
STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 10
"""Number of empty loop iterations before stopping the stream"""
+ @staticmethod
+ def get_no_log_state_message(ti: TaskInstance | TaskInstanceHistory) ->
Iterator[StructuredLogMessage]:
+ """Yield standardized no-log messages for a given TI state."""
+ msg = {
+ TaskInstanceState.SKIPPED: "Task was skipped — no logs available.",
+ TaskInstanceState.UPSTREAM_FAILED: "Task did not run because
upstream task(s) failed.",
+ }.get(ti.state, "No logs available for this task.")
+
+ yield StructuredLogMessage(
+ timestamp=None,
+ event="::group::Log message source details",
+ )
+ yield StructuredLogMessage(timestamp=None, event="::endgroup::")
+ yield StructuredLogMessage(
+ timestamp=ti.updated_at or datetime.now(timezone.utc),
+ event=msg,
+ )
+
def read_log_chunks(
self,
ti: TaskInstance | TaskInstanceHistory,
@@ -74,6 +94,11 @@ class TaskLogReader:
contain information about the task log which can enable you read logs
to the
end.
"""
+ if try_number == 0:
+ msg = self.get_no_log_state_message(ti) # returns
StructuredLogMessage
+ # one message + tell the caller it's the end so stream stops
+ return msg, {"end_of_log": True}
+
return self.log_handler.read(ti, try_number, metadata=metadata)
def read_log_stream(
@@ -92,6 +117,12 @@ class TaskLogReader:
if try_number is None:
try_number = ti.try_number
+ # Handle skipped / upstream_failed case directly
+ if try_number == 0:
+ for msg in self.get_no_log_state_message(ti):
+ yield f"{msg.model_dump_json()}\n"
+ return
+
for key in ("end_of_log", "max_offset", "offset", "log_pos"):
#
https://mypy.readthedocs.io/en/stable/typed_dict.html#supported-operations
metadata.pop(key, None) # type: ignore[misc]
diff --git a/airflow-core/tests/unit/utils/log/test_log_reader.py
b/airflow-core/tests/unit/utils/log/test_log_reader.py
index cd8b430090a..a1d51ee06f0 100644
--- a/airflow-core/tests/unit/utils/log/test_log_reader.py
+++ b/airflow-core/tests/unit/utils/log/test_log_reader.py
@@ -329,3 +329,60 @@ class TestLogView:
reader = TaskLogReader()
assert reader.render_log_filename(scheduled_ti, 1) !=
reader.render_log_filename(manual_ti, 1)
+
+ @pytest.mark.parametrize(
+ "state,try_number,expected_event,use_self_ti",
+ [
+ (TaskInstanceState.SKIPPED, 0, "Task was skipped — no logs
available.", False),
+ (
+ TaskInstanceState.UPSTREAM_FAILED,
+ 0,
+ "Task did not run because upstream task(s) failed.",
+ False,
+ ),
+ (TaskInstanceState.SUCCESS, 1, "try_number=1.", True),
+ ],
+ )
+ def test_read_log_chunks_no_logs_and_normal(
+ self, create_task_instance, state, try_number, expected_event,
use_self_ti
+ ):
+ task_log_reader = TaskLogReader()
+
+ if use_self_ti:
+ ti = copy.copy(self.ti) # already prepared with log files
+ else:
+ ti = create_task_instance(dag_id="dag_no_logs",
task_id="task_no_logs")
+
+ ti.state = state
+ logs, _ = task_log_reader.read_log_chunks(ti=ti,
try_number=try_number, metadata={})
+ events = [log.event for log in logs]
+
+ assert any(expected_event in e for e in events)
+
+ @pytest.mark.parametrize(
+ "state,try_number,expected_event,use_self_ti",
+ [
+ (TaskInstanceState.SKIPPED, 0, "Task was skipped — no logs
available.", False),
+ (
+ TaskInstanceState.UPSTREAM_FAILED,
+ 0,
+ "Task did not run because upstream task(s) failed.",
+ False,
+ ),
+ (TaskInstanceState.SUCCESS, 1, "try_number=1.", True),
+ ],
+ )
+ def test_read_log_stream_no_logs_and_normal(
+ self, create_task_instance, state, try_number, expected_event,
use_self_ti
+ ):
+ task_log_reader = TaskLogReader()
+
+ if use_self_ti:
+ ti = copy.copy(self.ti) # session-bound TI with logs
+ else:
+ ti = create_task_instance(dag_id="dag_no_logs",
task_id="task_no_logs")
+
+ ti.state = state
+ stream = task_log_reader.read_log_stream(ti=ti, try_number=try_number,
metadata={})
+
+ assert any(expected_event in line for line in stream)