This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b069f1fdbf8 Fix error when retrieving logs of ti not run because of
upstream failures (#55517)
b069f1fdbf8 is described below
commit b069f1fdbf833488a8da885d62734258fc3a73eb
Author: Sebastián Ortega <[email protected]>
AuthorDate: Tue Sep 16 11:03:07 2025 +0200
Fix error when retrieving logs of ti not run because of upstream failures
(#55517)
* Fix TestFileTaskLogHandler tests
* Return placeholder message when requested the logs of tasks skipped
because of upstream failures
---
.../src/airflow/utils/log/file_task_handler.py | 5 ++++-
airflow-core/tests/unit/utils/test_log_handlers.py | 25 +++++++++++-----------
2 files changed, 17 insertions(+), 13 deletions(-)
diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index 3669aae122b..9954688a41f 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -741,7 +741,10 @@ class FileTaskHandler(logging.Handler):
if try_number is None:
try_number = task_instance.try_number
- if try_number == 0 and task_instance.state ==
TaskInstanceState.SKIPPED:
+ if try_number == 0 and task_instance.state in (
+ TaskInstanceState.SKIPPED,
+ TaskInstanceState.UPSTREAM_FAILED,
+ ):
logs = [StructuredLogMessage(event="Task was skipped, no logs
available.")]
return chain(logs), {"end_of_log": True}
diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py
b/airflow-core/tests/unit/utils/test_log_handlers.py
index b5d939d52e7..6d2caaa8a43 100644
--- a/airflow-core/tests/unit/utils/test_log_handlers.py
+++ b/airflow-core/tests/unit/utils/test_log_handlers.py
@@ -79,7 +79,7 @@ from tests_common.test_utils.file_task_handler import (
)
from tests_common.test_utils.markers import
skip_if_force_lowest_dependencies_marker
-pytestmark = [pytest.mark.db_test, pytest.mark.xfail()]
+pytestmark = [pytest.mark.db_test]
DEFAULT_DATE = pendulum.datetime(2016, 1, 1)
TASK_LOGGER = "airflow.task"
@@ -156,7 +156,8 @@ class TestFileTaskLogHandler:
# Remove the generated tmp log file.
os.remove(log_filename)
- def test_file_task_handler_when_ti_is_skipped(self, dag_maker):
+ @pytest.mark.parametrize("ti_state", [TaskInstanceState.SKIPPED,
TaskInstanceState.UPSTREAM_FAILED])
+ def test_file_task_handler_when_ti_is_not_run(self, dag_maker, ti_state):
def task_callable(ti):
ti.log.info("test")
@@ -170,10 +171,10 @@ class TestFileTaskLogHandler:
ti = TaskInstance(task=task, run_id=dagrun.run_id,
dag_version_id=dag_version.id)
ti.try_number = 0
- ti.state = State.SKIPPED
+ ti.state = ti_state
- logger = ti.log
- ti.log.disabled = False
+ logger = logging.getLogger(TASK_LOGGER)
+ logger.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name ==
FILE_TASK_HANDLER), None
@@ -295,8 +296,8 @@ class TestFileTaskLogHandler:
ti.executor = executor_name
ti.try_number = 1
ti.state = TaskInstanceState.RUNNING
- logger = ti.log
- ti.log.disabled = False
+ logger = logging.getLogger(TASK_LOGGER)
+ logger.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name ==
FILE_TASK_HANDLER), None
@@ -344,8 +345,8 @@ class TestFileTaskLogHandler:
ti.try_number = 2
ti.state = State.RUNNING
- logger = ti.log
- ti.log.disabled = False
+ logger = logging.getLogger(TASK_LOGGER)
+ logger.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name ==
FILE_TASK_HANDLER), None
@@ -396,8 +397,8 @@ class TestFileTaskLogHandler:
ti.try_number = 1
ti.state = State.RUNNING
- logger = ti.log
- ti.log.disabled = False
+ logger = logging.getLogger(TASK_LOGGER)
+ logger.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name ==
FILE_TASK_HANDLER), None
@@ -413,7 +414,7 @@ class TestFileTaskLogHandler:
assert log_filename.endswith("1.log"), log_filename
# mock to generate 2000 lines of log, the total size is larger than
max_bytes_size
- for i in range(1, 2000):
+ for i in range(1, 3000):
logger.info("this is a Test. %s", i)
# this is the rotate log file