This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8e2a348abe23fd59d554c6d4fbe237a282df16be
Author: Yaming Zhang <[email protected]>
AuthorDate: Wed Aug 6 02:06:25 2025 -0700

    Fix type error with TIH when reading served log (#54114)
    
    (cherry picked from commit 3df18a07a1c2bb0f85ebdf7d889d687c70d39b47)
---
 airflow-core/src/airflow/utils/log/file_task_handler.py |  4 ++--
 airflow-core/tests/unit/utils/test_log_handlers.py      | 12 ++++++++++++
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py 
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index 4aa802f37c8..37149b05d6c 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -867,13 +867,13 @@ class FileTaskHandler(logging.Handler):
 
     def _read_from_logs_server(
         self,
-        ti: TaskInstance,
+        ti: TaskInstance | TaskInstanceHistory,
         worker_log_rel_path: str,
     ) -> LogResponse:
         sources: LogSourceInfo = []
         log_streams: list[RawLogStream] = []
         try:
-            log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER
+            log_type = LogType.TRIGGER if getattr(ti, "triggerer_job", False) 
else LogType.WORKER
             url, rel_path = self._get_log_retrieval_url(ti, 
worker_log_rel_path, log_type=log_type)
             response = _fetch_logs_from_service(url, rel_path)
             if response.status_code == 403:
diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py 
b/airflow-core/tests/unit/utils/test_log_handlers.py
index b4be1c3f895..d3a2512a067 100644
--- a/airflow-core/tests/unit/utils/test_log_handlers.py
+++ b/airflow-core/tests/unit/utils/test_log_handlers.py
@@ -557,6 +557,18 @@ class TestFileTaskLogHandler:
         assert extract_events(logs, False) == expected_logs
         assert metadata == {"end_of_log": True, "log_pos": 3}
 
+    @pytest.mark.parametrize("is_tih", [False, True])
+    def test_read_served_logs(self, is_tih, create_task_instance):
+        ti = create_task_instance(
+            state=TaskInstanceState.SUCCESS,
+            hostname="test_hostname",
+        )
+        if is_tih:
+            ti = TaskInstanceHistory(ti, ti.state)
+        fth = FileTaskHandler("")
+        sources, _ = fth._read_from_logs_server(ti, "test.log")
+        assert len(sources) > 0
+
     def test_add_triggerer_suffix(self):
         sample = "any/path/to/thing.txt"
         assert FileTaskHandler.add_triggerer_suffix(sample) == sample + 
".trigger"

Reply via email to