This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e4780046c1c Fix CloudwatchTaskHandler not deleting local logs after
streaming (#62985)
e4780046c1c is described below
commit e4780046c1c107e3648d64945d7869523676a85b
Author: Sam Dumont <[email protected]>
AuthorDate: Mon Mar 9 15:27:43 2026 +0100
Fix CloudwatchTaskHandler not deleting local logs after streaming (#62985)
* Fix CloudwatchTaskHandler not deleting local logs after streaming
In Airflow 3, `CloudwatchTaskHandler` streams logs to CloudWatch in
real-time via structlog processors, so `upload()` was a no-op. However,
`delete_local_copy` was never honoured, causing local log files to
accumulate indefinitely on shared storage (e.g. EFS).
Changes:
- `CloudWatchRemoteLogIO.upload()` now deletes the local log directory
when `delete_local_copy` is True.
- `CloudwatchTaskHandler.close()` calls `upload()` with the rendered
log path stored during `set_context()`.
- Path traversal guard prevents deletion outside `base_log_folder`.
* chore: rename newsfragment to PR #62985
* Remove provider newsfragment (not used for providers)
* fix: align close() guard and tests with S3/GCS/HDFS convention
Use hasattr(self, "ti") guard in close() like S3/GCS/HDFS handlers.
Fix tests to be version-agnostic by asserting against handler's own
log_relative_path instead of hardcoded filename format.
---
.../amazon/aws/log/cloudwatch_task_handler.py | 34 ++++++-
.../amazon/aws/log/test_cloudwatch_task_handler.py | 112 ++++++++++++++++++++-
2 files changed, 138 insertions(+), 8 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index ab32a5ffae7..055cfd44933 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -22,6 +22,7 @@ import copy
import json
import logging
import os
+import shutil
from collections.abc import Generator
from datetime import date, datetime, timedelta, timezone
from functools import cached_property
@@ -166,10 +167,27 @@ class CloudWatchRemoteLogIO(LoggingMixin): # noqa: D101
self.handler.flush()
def upload(self, path: os.PathLike | str, ti: RuntimeTI):
- # No-op, as we upload via the processor as we go
- # But we need to give the handler time to finish off its business
+ """Upload the given log path to the remote storage."""
+ # No batch upload — logs stream in real-time. Flush pending events and
clean up.
self.close()
- return
+ if self.delete_local_copy:
+ base = self.base_log_folder.resolve()
+ raw = Path(path)
+ local_path = (raw if raw.is_absolute() else base / raw).resolve()
+ try:
+ local_path.relative_to(base)
+ except ValueError:
+ self.log.warning(
+ "Skipping deletion: path %s is outside base_log_folder %s",
+ local_path,
+ base,
+ )
+ return
+ parent = local_path.parent
+ if parent.exists():
+ shutil.rmtree(parent, ignore_errors=True)
+ if parent.exists():
+ self.log.warning("Failed to delete local log dir: %s",
parent)
def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse:
messages, logs = self.stream(relative_path, ti)
@@ -261,10 +279,12 @@ class CloudwatchTaskHandler(FileTaskHandler,
LoggingMixin):
self.log_group = split_arn[6]
self.region_name = split_arn[3]
self.closed = False
+ self.log_relative_path: str = ""
self.io = CloudWatchRemoteLogIO(
base_log_folder=base_log_folder,
log_group_arn=log_group_arn,
+ delete_local_copy=conf.getboolean("logging", "delete_local_logs"),
)
@cached_property
@@ -281,8 +301,9 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
def set_context(self, ti: TaskInstance, *, identifier: str | None = None):
super().set_context(ti)
self.io.log_stream_name = self._render_filename(ti, ti.try_number)
-
self.handler = self.io.handler
+ self.ti = ti
+ self.log_relative_path = self.io.log_stream_name
def close(self):
"""Close the handler responsible for the upload of the local log file
to Cloudwatch."""
@@ -295,6 +316,11 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
if self.handler is not None:
self.handler.close()
+ if hasattr(self, "ti"):
+ try:
+ self.io.upload(self.log_relative_path, self.ti)
+ except Exception:
+ self.log.exception("Failed to delete local log after streaming
to CloudWatch")
# Mark closed so we don't double write if close is called twice
self.closed = True
diff --git
a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
index 9cf1b0be6ed..cb5830b36cc 100644
--- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -21,6 +21,7 @@ import logging
import textwrap
import time
from datetime import datetime as dt, timedelta, timezone
+from pathlib import Path
from unittest import mock
from unittest.mock import ANY, call
@@ -111,6 +112,8 @@ class TestCloudRemoteLogIO:
airflow.sdk.log._ActiveLoggingConfig.set(self.subject, None)
else:
monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG",
self.subject)
+ # Clear @cache'd processors so this test picks up self.subject.
+ airflow.sdk.log.logging_processors.cache_clear()
try:
procs = airflow.sdk.log.logging_processors(colors=False,
json_output=False)
except TypeError:
@@ -137,6 +140,57 @@ class TestCloudRemoteLogIO:
processors.clear()
processors.extend(old_processors)
structlog.configure(processors=old_processors,
logger_factory=logger_factory)
+ # Clear @cache to avoid cross-test contamination.
+ airflow.sdk.log.logging_processors.cache_clear()
+
+ def test_upload_deletes_local_log_dir_when_delete_local_copy_true(self):
+ """upload() with delete_local_copy=True deletes the local log parent
directory."""
+ assert self.subject.delete_local_copy is True # attrs default
+ dag_dir = self.local_log_location / "dag_id=a"
+ assert dag_dir.exists()
+
+ with mock.patch.object(self.subject, "close"):
+ self.subject.upload(self.task_log_path, self.ti)
+
+ assert not dag_dir.exists()
+ # Close watchtower handler; conf_vars needed because base_log_folder
differs from tmp_path.
+ with conf_vars({("logging", "base_log_folder"):
self.local_log_location.as_posix()}):
+ self.subject.handler.close()
+
+ def test_upload_does_not_delete_when_delete_local_copy_false(self):
+ """upload() with delete_local_copy=False leaves local files
untouched."""
+ self.subject.delete_local_copy = False
+ dag_dir = self.local_log_location / "dag_id=a"
+ assert dag_dir.exists()
+
+ with mock.patch.object(self.subject, "close"):
+ self.subject.upload(self.task_log_path, self.ti)
+
+ assert dag_dir.exists()
+ with conf_vars({("logging", "base_log_folder"):
self.local_log_location.as_posix()}):
+ self.subject.handler.close()
+
+ def test_upload_handles_absolute_path(self):
+ """upload() resolves an absolute path to the correct parent
directory."""
+ dag_dir = self.local_log_location / "dag_id=a"
+ assert dag_dir.exists()
+ absolute_path = self.local_log_location / self.task_log_path
+
+ with mock.patch.object(self.subject, "close"):
+ self.subject.upload(absolute_path, self.ti)
+
+ assert not dag_dir.exists()
+ with conf_vars({("logging", "base_log_folder"):
self.local_log_location.as_posix()}):
+ self.subject.handler.close()
+
+ def test_upload_skips_deletion_outside_base_log_folder(self):
+ """upload() logs a warning and skips deletion for paths outside
base_log_folder."""
+ outside_path = "/tmp/evil/../../../etc/passwd"
+ with conf_vars({("logging", "base_log_folder"):
self.local_log_location.as_posix()}):
+ with mock.patch.object(self.subject, "close"):
+ self.subject.upload(outside_path, self.ti)
+ dag_dir = self.local_log_location / "dag_id=a"
+ assert dag_dir.exists()
@time_machine.travel(datetime(2025, 3, 27, 21, 58, 1, 2345), tick=False)
def test_log_message(self):
@@ -404,11 +458,61 @@ class TestCloudwatchTaskHandler:
def test_close_prevents_duplicate_calls(self):
with mock.patch("watchtower.CloudWatchLogHandler.close") as
mock_log_handler_close:
with
mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"):
- self.cloudwatch_task_handler.set_context(self.ti)
- for _ in range(5):
- self.cloudwatch_task_handler.close()
+ with mock.patch.object(self.cloudwatch_task_handler.io,
"upload"):
+ self.cloudwatch_task_handler.set_context(self.ti)
+ for _ in range(5):
+ self.cloudwatch_task_handler.close()
+
+ mock_log_handler_close.assert_called_once()
+
+ def test_set_context_stores_ti_and_log_relative_path(self):
+ """set_context() stores the task instance and rendered log path for
use in close()."""
+ self.cloudwatch_task_handler.set_context(self.ti)
+
+ assert self.cloudwatch_task_handler.ti is self.ti
+ assert self.cloudwatch_task_handler.log_relative_path != ""
+ assert (
+ self.cloudwatch_task_handler.log_relative_path ==
self.cloudwatch_task_handler.io.log_stream_name
+ )
+
+ def test_close_calls_upload_once(self):
+ """close() calls io.upload() exactly once, even when called multiple
times."""
+ with
mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"):
+ with mock.patch.object(self.cloudwatch_task_handler.io, "upload")
as mock_upload:
+ with mock.patch("watchtower.CloudWatchLogHandler.close"):
+ self.cloudwatch_task_handler.set_context(self.ti)
+ for _ in range(3):
+ self.cloudwatch_task_handler.close()
+
+ mock_upload.assert_called_once_with(
+ self.cloudwatch_task_handler.log_relative_path, self.ti
+ )
+
+ def test_close_skips_upload_without_set_context(self):
+ """close() without a prior set_context() should not call
io.upload()."""
+ with mock.patch.object(self.cloudwatch_task_handler.io, "upload") as
mock_upload:
+ self.cloudwatch_task_handler.close()
+ mock_upload.assert_not_called()
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="delete_local_copy only
relevant for Airflow 3")
+ def test_close_deletes_local_log_dir(self):
+ """close() after set_context() deletes the local log directory."""
+ log_file_path = Path(self.local_log_location) / self.remote_log_stream
+ log_file_path.parent.mkdir(parents=True, exist_ok=True)
+ log_file_path.write_text("some log content")
+ assert log_file_path.parent.exists()
+
+ with conf_vars({("logging", "delete_local_logs"): "True"}):
+ handler = CloudwatchTaskHandler(
+ self.local_log_location,
+
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
+ )
+ with
mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"):
+ with mock.patch("watchtower.CloudWatchLogHandler.close"):
+ handler.set_context(self.ti)
+ handler.close()
- mock_log_handler_close.assert_called_once()
+ assert not log_file_path.parent.exists()
def test_filename_template_for_backward_compatibility(self):
# filename_template arg support for running the latest provider on
airflow 2