This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fdda4478e5 Fix check served logs logic (#41272)
fdda4478e5 is described below

commit fdda4478e5829c08aa5059f8ad86f92922313d19
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon Aug 5 18:29:50 2024 -0700

    Fix check served logs logic (#41272)
---
 airflow/utils/log/file_task_handler.py | 16 ++++++------
 tests/utils/test_log_handlers.py       | 46 ++++++----------------------------
 2 files changed, 14 insertions(+), 48 deletions(-)

diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 2ae15b454a..e99ffae0c9 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -381,28 +381,23 @@ class FileTaskHandler(logging.Handler):
         executor_messages: list[str] = []
         executor_logs: list[str] = []
         served_logs: list[str] = []
-        is_in_running_or_deferred = ti.state in (
-            TaskInstanceState.RUNNING,
-            TaskInstanceState.DEFERRED,
-        )
-        is_up_for_retry = ti.state == TaskInstanceState.UP_FOR_RETRY
         with suppress(NotImplementedError):
             remote_messages, remote_logs = self._read_remote_logs(ti, 
try_number, metadata)
             messages_list.extend(remote_messages)
+        has_k8s_exec_pod = False
         if ti.state == TaskInstanceState.RUNNING:
             response = self._executor_get_task_log(ti, try_number)
             if response:
                 executor_messages, executor_logs = response
             if executor_messages:
                 messages_list.extend(executor_messages)
+                has_k8s_exec_pod = True
         if not (remote_logs and ti.state not in State.unfinished):
             # when finished, if we have remote logs, no need to check local
             worker_log_full_path = Path(self.local_base, worker_log_rel_path)
             local_messages, local_logs = 
self._read_from_local(worker_log_full_path)
             messages_list.extend(local_messages)
-        if (is_in_running_or_deferred or is_up_for_retry) and not 
executor_messages and not remote_logs:
-            # While task instance is still running and we don't have either 
executor nor remote logs, look for served logs
-            # This is for cases when users have not setup remote logging nor 
shared drive for logs
+        if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) 
and not has_k8s_exec_pod:
             served_messages, served_logs = self._read_from_logs_server(ti, 
worker_log_rel_path)
             messages_list.extend(served_messages)
         elif ti.state not in State.unfinished and not (local_logs or 
remote_logs):
@@ -422,7 +417,10 @@ class FileTaskHandler(logging.Handler):
         )
         log_pos = len(logs)
         messages = "".join([f"*** {x}\n" for x in messages_list])
-        end_of_log = ti.try_number != try_number or not 
is_in_running_or_deferred
+        end_of_log = ti.try_number != try_number or ti.state not in (
+            TaskInstanceState.RUNNING,
+            TaskInstanceState.DEFERRED,
+        )
         if metadata and "log_pos" in metadata:
             previous_chars = metadata["log_pos"]
             logs = logs[previous_chars:]  # Cut off previously passed log test 
as new tail
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index a9d9bfbabe..8d0b96435b 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -316,58 +316,26 @@ class TestFileTaskLogHandler:
         else:
             mock_k8s_get_task_log.assert_not_called()
 
-    # We are not testing TaskInstanceState.DEFERRED in this test because with 
the current testing setup,
-    # as it creates an inconsistent tests that succeeds in local but fails in 
CI. See https://github.com/apache/airflow/pull/39496#issuecomment-2149692239
-    # TODO: Fix the test setup so it is possible to test 
TaskInstanceState.DEFERRED as well.
-    @pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, 
TaskInstanceState.UP_FOR_RETRY])
-    def test__read_for_celery_executor_fallbacks_to_worker(self, state, 
create_task_instance):
+    def test__read_for_celery_executor_fallbacks_to_worker(self, 
create_task_instance):
         """Test for executors which do not have `get_task_log` method, it 
fallbacks to reading
-        log from worker if and only if remote logs aren't found"""
+        log from worker"""
         executor_name = "CeleryExecutor"
-        # Reading logs from worker should occur when the task is either 
running, deferred, or up for retry.
+
         ti = create_task_instance(
-            dag_id=f"dag_for_testing_celery_executor_log_read_{state}",
+            dag_id="dag_for_testing_celery_executor_log_read",
             task_id="task_for_testing_celery_executor_log_read",
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
         )
-        ti.try_number = 2
-        ti.state = state
+        ti.state = TaskInstanceState.RUNNING
         with conf_vars({("core", "executor"): executor_name}):
-            reload(executor_loader)
             fth = FileTaskHandler("")
-            fth._read_from_logs_server = mock.Mock()
-            fth._read_from_logs_server.return_value = ["this message"], 
["this\nlog\ncontent"]
-            actual = fth._read(ti=ti, try_number=2)
-            fth._read_from_logs_server.assert_called_once()
-            # If we are in the up for retry state, the log has ended.
-            expected_end_of_log = state in (TaskInstanceState.UP_FOR_RETRY)
-            assert actual == (
-                "*** this message\nthis\nlog\ncontent",
-                {"end_of_log": expected_end_of_log, "log_pos": 16},
-            )
 
-            # Previous try_number should return served logs when remote logs 
aren't implemented
             fth._read_from_logs_server = mock.Mock()
-            fth._read_from_logs_server.return_value = ["served logs 
try_number=1"], ["this\nlog\ncontent"]
+            fth._read_from_logs_server.return_value = ["this message"], 
["this\nlog\ncontent"]
             actual = fth._read(ti=ti, try_number=1)
             fth._read_from_logs_server.assert_called_once()
-            assert actual == (
-                "*** served logs try_number=1\nthis\nlog\ncontent",
-                {"end_of_log": True, "log_pos": 16},
-            )
-
-            # When remote_logs is implemented, previous try_number is from 
remote logs without reaching worker server
-            fth._read_from_logs_server.reset_mock()
-            fth._read_remote_logs = mock.Mock()
-            fth._read_remote_logs.return_value = ["remote logs"], 
["remote\nlog\ncontent"]
-            actual = fth._read(ti=ti, try_number=1)
-            fth._read_remote_logs.assert_called_once()
-            fth._read_from_logs_server.assert_not_called()
-            assert actual == (
-                "*** remote logs\nremote\nlog\ncontent",
-                {"end_of_log": True, "log_pos": 18},
-            )
+        assert actual == ("*** this message\nthis\nlog\ncontent", 
{"end_of_log": True, "log_pos": 16})
 
     @pytest.mark.parametrize(
         "remote_logs, local_logs, served_logs_checked",

Reply via email to