This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 34466ce8833 [v3-0-test] Fix: Consistent no-log handling for tasks with 
try_number=0 in API and UI (#54749) (#55035) (#55162)
34466ce8833 is described below

commit 34466ce8833a75d0d14901bdb5e95a8ef6c96564
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Sep 3 15:40:36 2025 +0300

    [v3-0-test] Fix: Consistent no-log handling for tasks with try_number=0 in 
API and UI (#54749) (#55035) (#55162)
    
    (cherry picked from commit 22677defbeac3f952e337d98b8abdee42cc89fd2)
    
    Co-authored-by: Kumbha Lakshmi Narayana 
<[email protected]>
---
 airflow-core/src/airflow/utils/log/log_reader.py   | 31 ++++++++++++
 .../tests/unit/utils/log/test_log_reader.py        | 57 ++++++++++++++++++++++
 2 files changed, 88 insertions(+)

diff --git a/airflow-core/src/airflow/utils/log/log_reader.py 
b/airflow-core/src/airflow/utils/log/log_reader.py
index 9f61c2f730c..0ee0047aee0 100644
--- a/airflow-core/src/airflow/utils/log/log_reader.py
+++ b/airflow-core/src/airflow/utils/log/log_reader.py
@@ -19,11 +19,13 @@ from __future__ import annotations
 import logging
 import time
 from collections.abc import Generator, Iterator
+from datetime import datetime, timezone
 from functools import cached_property
 from typing import TYPE_CHECKING
 
 from airflow.configuration import conf
 from airflow.utils.helpers import render_log_filename
+from airflow.utils.log.file_task_handler import StructuredLogMessage
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import TaskInstanceState
@@ -50,6 +52,24 @@ class TaskLogReader:
     STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 10
     """Number of empty loop iterations before stopping the stream"""
 
+    @staticmethod
+    def get_no_log_state_message(ti: TaskInstance | TaskInstanceHistory) -> 
Iterator[StructuredLogMessage]:
+        """Yield standardized no-log messages for a given TI state."""
+        msg = {
+            TaskInstanceState.SKIPPED: "Task was skipped — no logs available.",
+            TaskInstanceState.UPSTREAM_FAILED: "Task did not run because 
upstream task(s) failed.",
+        }.get(ti.state, "No logs available for this task.")
+
+        yield StructuredLogMessage(
+            timestamp=None,
+            event="::group::Log message source details",
+        )
+        yield StructuredLogMessage(timestamp=None, event="::endgroup::")
+        yield StructuredLogMessage(
+            timestamp=ti.updated_at or datetime.now(timezone.utc),
+            event=msg,
+        )
+
     def read_log_chunks(
         self,
         ti: TaskInstance | TaskInstanceHistory,
@@ -74,6 +94,11 @@ class TaskLogReader:
         contain information about the task log which can enable you read logs 
to the
         end.
         """
+        if try_number == 0:
+            msg = self.get_no_log_state_message(ti)  # returns 
StructuredLogMessage
+            # one message + tell the caller it's the end so stream stops
+            return msg, {"end_of_log": True}
+
         return self.log_handler.read(ti, try_number, metadata=metadata)
 
     def read_log_stream(
@@ -92,6 +117,12 @@ class TaskLogReader:
         if try_number is None:
             try_number = ti.try_number
 
+        # Handle skipped / upstream_failed case directly
+        if try_number == 0:
+            for msg in self.get_no_log_state_message(ti):
+                yield f"{msg.model_dump_json()}\n"
+            return
+
         for key in ("end_of_log", "max_offset", "offset", "log_pos"):
             # 
https://mypy.readthedocs.io/en/stable/typed_dict.html#supported-operations
             metadata.pop(key, None)  # type: ignore[misc]
diff --git a/airflow-core/tests/unit/utils/log/test_log_reader.py 
b/airflow-core/tests/unit/utils/log/test_log_reader.py
index cd8b430090a..a1d51ee06f0 100644
--- a/airflow-core/tests/unit/utils/log/test_log_reader.py
+++ b/airflow-core/tests/unit/utils/log/test_log_reader.py
@@ -329,3 +329,60 @@ class TestLogView:
 
         reader = TaskLogReader()
         assert reader.render_log_filename(scheduled_ti, 1) != 
reader.render_log_filename(manual_ti, 1)
+
+    @pytest.mark.parametrize(
+        "state,try_number,expected_event,use_self_ti",
+        [
+            (TaskInstanceState.SKIPPED, 0, "Task was skipped — no logs 
available.", False),
+            (
+                TaskInstanceState.UPSTREAM_FAILED,
+                0,
+                "Task did not run because upstream task(s) failed.",
+                False,
+            ),
+            (TaskInstanceState.SUCCESS, 1, "try_number=1.", True),
+        ],
+    )
+    def test_read_log_chunks_no_logs_and_normal(
+        self, create_task_instance, state, try_number, expected_event, 
use_self_ti
+    ):
+        task_log_reader = TaskLogReader()
+
+        if use_self_ti:
+            ti = copy.copy(self.ti)  # already prepared with log files
+        else:
+            ti = create_task_instance(dag_id="dag_no_logs", 
task_id="task_no_logs")
+
+        ti.state = state
+        logs, _ = task_log_reader.read_log_chunks(ti=ti, 
try_number=try_number, metadata={})
+        events = [log.event for log in logs]
+
+        assert any(expected_event in e for e in events)
+
+    @pytest.mark.parametrize(
+        "state,try_number,expected_event,use_self_ti",
+        [
+            (TaskInstanceState.SKIPPED, 0, "Task was skipped — no logs 
available.", False),
+            (
+                TaskInstanceState.UPSTREAM_FAILED,
+                0,
+                "Task did not run because upstream task(s) failed.",
+                False,
+            ),
+            (TaskInstanceState.SUCCESS, 1, "try_number=1.", True),
+        ],
+    )
+    def test_read_log_stream_no_logs_and_normal(
+        self, create_task_instance, state, try_number, expected_event, 
use_self_ti
+    ):
+        task_log_reader = TaskLogReader()
+
+        if use_self_ti:
+            ti = copy.copy(self.ti)  # session-bound TI with logs
+        else:
+            ti = create_task_instance(dag_id="dag_no_logs", 
task_id="task_no_logs")
+
+        ti.state = state
+        stream = task_log_reader.read_log_stream(ti=ti, try_number=try_number, 
metadata={})
+
+        assert any(expected_event in line for line in stream)

Reply via email to