This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8e2a348abe23fd59d554c6d4fbe237a282df16be Author: Yaming Zhang <[email protected]> AuthorDate: Wed Aug 6 02:06:25 2025 -0700 Fix type error with TIH when reading served log (#54114) (cherry picked from commit 3df18a07a1c2bb0f85ebdf7d889d687c70d39b47) --- airflow-core/src/airflow/utils/log/file_task_handler.py | 4 ++-- airflow-core/tests/unit/utils/test_log_handlers.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 4aa802f37c8..37149b05d6c 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -867,13 +867,13 @@ class FileTaskHandler(logging.Handler): def _read_from_logs_server( self, - ti: TaskInstance, + ti: TaskInstance | TaskInstanceHistory, worker_log_rel_path: str, ) -> LogResponse: sources: LogSourceInfo = [] log_streams: list[RawLogStream] = [] try: - log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER + log_type = LogType.TRIGGER if getattr(ti, "triggerer_job", False) else LogType.WORKER url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type) response = _fetch_logs_from_service(url, rel_path) if response.status_code == 403: diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index b4be1c3f895..d3a2512a067 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -557,6 +557,18 @@ class TestFileTaskLogHandler: assert extract_events(logs, False) == expected_logs assert metadata == {"end_of_log": True, "log_pos": 3} + @pytest.mark.parametrize("is_tih", [False, True]) + def test_read_served_logs(self, is_tih, create_task_instance): + ti = create_task_instance( + state=TaskInstanceState.SUCCESS, + hostname="test_hostname", + ) + if is_tih: + ti = TaskInstanceHistory(ti, ti.state) + fth = FileTaskHandler("") + sources, _ = fth._read_from_logs_server(ti, "test.log") + assert len(sources) > 0 + def test_add_triggerer_suffix(self): sample = "any/path/to/thing.txt" assert FileTaskHandler.add_triggerer_suffix(sample) == sample + ".trigger"
