This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e4780046c1c Fix CloudwatchTaskHandler not deleting local logs after 
streaming (#62985)
e4780046c1c is described below

commit e4780046c1c107e3648d64945d7869523676a85b
Author: Sam Dumont <[email protected]>
AuthorDate: Mon Mar 9 15:27:43 2026 +0100

    Fix CloudwatchTaskHandler not deleting local logs after streaming (#62985)
    
    * Fix CloudwatchTaskHandler not deleting local logs after streaming
    
    In Airflow 3, `CloudwatchTaskHandler` streams logs to CloudWatch in
    real-time via structlog processors, so `upload()` was a no-op. However,
    `delete_local_copy` was never honoured, causing local log files to
    accumulate indefinitely on shared storage (e.g. EFS).
    
    Changes:
    - `CloudWatchRemoteLogIO.upload()` now deletes the local log directory
      when `delete_local_copy` is True.
    - `CloudwatchTaskHandler.close()` calls `upload()` with the rendered
      log path stored during `set_context()`.
    - Path traversal guard prevents deletion outside `base_log_folder`.
    
    * chore: rename newsfragment to PR #62985
    
    * Remove provider newsfragment (not used for providers)
    
    * fix: align close() guard and tests with S3/GCS/HDFS convention
    
    Use hasattr(self, "ti") guard in close() like S3/GCS/HDFS handlers.
    Fix tests to be version-agnostic by asserting against handler's own
    log_relative_path instead of hardcoded filename format.
---
 .../amazon/aws/log/cloudwatch_task_handler.py      |  34 ++++++-
 .../amazon/aws/log/test_cloudwatch_task_handler.py | 112 ++++++++++++++++++++-
 2 files changed, 138 insertions(+), 8 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index ab32a5ffae7..055cfd44933 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -22,6 +22,7 @@ import copy
 import json
 import logging
 import os
+import shutil
 from collections.abc import Generator
 from datetime import date, datetime, timedelta, timezone
 from functools import cached_property
@@ -166,10 +167,27 @@ class CloudWatchRemoteLogIO(LoggingMixin):  # noqa: D101
         self.handler.flush()
 
     def upload(self, path: os.PathLike | str, ti: RuntimeTI):
-        # No-op, as we upload via the processor as we go
-        # But we need to give the handler time to finish off its business
+        """Upload the given log path to the remote storage."""
+        # No batch upload — logs stream in real-time. Flush pending events and 
clean up.
         self.close()
-        return
+        if self.delete_local_copy:
+            base = self.base_log_folder.resolve()
+            raw = Path(path)
+            local_path = (raw if raw.is_absolute() else base / raw).resolve()
+            try:
+                local_path.relative_to(base)
+            except ValueError:
+                self.log.warning(
+                    "Skipping deletion: path %s is outside base_log_folder %s",
+                    local_path,
+                    base,
+                )
+                return
+            parent = local_path.parent
+            if parent.exists():
+                shutil.rmtree(parent, ignore_errors=True)
+                if parent.exists():
+                    self.log.warning("Failed to delete local log dir: %s", 
parent)
 
     def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse:
         messages, logs = self.stream(relative_path, ti)
@@ -261,10 +279,12 @@ class CloudwatchTaskHandler(FileTaskHandler, 
LoggingMixin):
         self.log_group = split_arn[6]
         self.region_name = split_arn[3]
         self.closed = False
+        self.log_relative_path: str = ""
 
         self.io = CloudWatchRemoteLogIO(
             base_log_folder=base_log_folder,
             log_group_arn=log_group_arn,
+            delete_local_copy=conf.getboolean("logging", "delete_local_logs"),
         )
 
     @cached_property
@@ -281,8 +301,9 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
     def set_context(self, ti: TaskInstance, *, identifier: str | None = None):
         super().set_context(ti)
         self.io.log_stream_name = self._render_filename(ti, ti.try_number)
-
         self.handler = self.io.handler
+        self.ti = ti
+        self.log_relative_path = self.io.log_stream_name
 
     def close(self):
         """Close the handler responsible for the upload of the local log file 
to Cloudwatch."""
@@ -295,6 +316,11 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
 
         if self.handler is not None:
             self.handler.close()
+        if hasattr(self, "ti"):
+            try:
+                self.io.upload(self.log_relative_path, self.ti)
+            except Exception:
+                self.log.exception("Failed to delete local log after streaming 
to CloudWatch")
         # Mark closed so we don't double write if close is called twice
         self.closed = True
 
diff --git 
a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py 
b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
index 9cf1b0be6ed..cb5830b36cc 100644
--- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -21,6 +21,7 @@ import logging
 import textwrap
 import time
 from datetime import datetime as dt, timedelta, timezone
+from pathlib import Path
 from unittest import mock
 from unittest.mock import ANY, call
 
@@ -111,6 +112,8 @@ class TestCloudRemoteLogIO:
                 airflow.sdk.log._ActiveLoggingConfig.set(self.subject, None)
             else:
                 monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", 
self.subject)
+            # Clear @cache'd processors so this test picks up self.subject.
+            airflow.sdk.log.logging_processors.cache_clear()
             try:
                 procs = airflow.sdk.log.logging_processors(colors=False, 
json_output=False)
             except TypeError:
@@ -137,6 +140,57 @@ class TestCloudRemoteLogIO:
             processors.clear()
             processors.extend(old_processors)
             structlog.configure(processors=old_processors, 
logger_factory=logger_factory)
+            # Clear @cache to avoid cross-test contamination.
+            airflow.sdk.log.logging_processors.cache_clear()
+
+    def test_upload_deletes_local_log_dir_when_delete_local_copy_true(self):
+        """upload() with delete_local_copy=True deletes the local log parent 
directory."""
+        assert self.subject.delete_local_copy is True  # attrs default
+        dag_dir = self.local_log_location / "dag_id=a"
+        assert dag_dir.exists()
+
+        with mock.patch.object(self.subject, "close"):
+            self.subject.upload(self.task_log_path, self.ti)
+
+        assert not dag_dir.exists()
+        # Close watchtower handler; conf_vars needed because base_log_folder 
differs from tmp_path.
+        with conf_vars({("logging", "base_log_folder"): 
self.local_log_location.as_posix()}):
+            self.subject.handler.close()
+
+    def test_upload_does_not_delete_when_delete_local_copy_false(self):
+        """upload() with delete_local_copy=False leaves local files 
untouched."""
+        self.subject.delete_local_copy = False
+        dag_dir = self.local_log_location / "dag_id=a"
+        assert dag_dir.exists()
+
+        with mock.patch.object(self.subject, "close"):
+            self.subject.upload(self.task_log_path, self.ti)
+
+        assert dag_dir.exists()
+        with conf_vars({("logging", "base_log_folder"): 
self.local_log_location.as_posix()}):
+            self.subject.handler.close()
+
+    def test_upload_handles_absolute_path(self):
+        """upload() resolves an absolute path to the correct parent 
directory."""
+        dag_dir = self.local_log_location / "dag_id=a"
+        assert dag_dir.exists()
+        absolute_path = self.local_log_location / self.task_log_path
+
+        with mock.patch.object(self.subject, "close"):
+            self.subject.upload(absolute_path, self.ti)
+
+        assert not dag_dir.exists()
+        with conf_vars({("logging", "base_log_folder"): 
self.local_log_location.as_posix()}):
+            self.subject.handler.close()
+
+    def test_upload_skips_deletion_outside_base_log_folder(self):
+        """upload() logs a warning and skips deletion for paths outside 
base_log_folder."""
+        outside_path = "/tmp/evil/../../../etc/passwd"
+        with conf_vars({("logging", "base_log_folder"): 
self.local_log_location.as_posix()}):
+            with mock.patch.object(self.subject, "close"):
+                self.subject.upload(outside_path, self.ti)
+        dag_dir = self.local_log_location / "dag_id=a"
+        assert dag_dir.exists()
 
     @time_machine.travel(datetime(2025, 3, 27, 21, 58, 1, 2345), tick=False)
     def test_log_message(self):
@@ -404,11 +458,61 @@ class TestCloudwatchTaskHandler:
     def test_close_prevents_duplicate_calls(self):
         with mock.patch("watchtower.CloudWatchLogHandler.close") as 
mock_log_handler_close:
             with 
mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"):
-                self.cloudwatch_task_handler.set_context(self.ti)
-                for _ in range(5):
-                    self.cloudwatch_task_handler.close()
+                with mock.patch.object(self.cloudwatch_task_handler.io, 
"upload"):
+                    self.cloudwatch_task_handler.set_context(self.ti)
+                    for _ in range(5):
+                        self.cloudwatch_task_handler.close()
+
+                    mock_log_handler_close.assert_called_once()
+
+    def test_set_context_stores_ti_and_log_relative_path(self):
+        """set_context() stores the task instance and rendered log path for 
use in close()."""
+        self.cloudwatch_task_handler.set_context(self.ti)
+
+        assert self.cloudwatch_task_handler.ti is self.ti
+        assert self.cloudwatch_task_handler.log_relative_path != ""
+        assert (
+            self.cloudwatch_task_handler.log_relative_path == 
self.cloudwatch_task_handler.io.log_stream_name
+        )
+
+    def test_close_calls_upload_once(self):
+        """close() calls io.upload() exactly once, even when called multiple 
times."""
+        with 
mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"):
+            with mock.patch.object(self.cloudwatch_task_handler.io, "upload") 
as mock_upload:
+                with mock.patch("watchtower.CloudWatchLogHandler.close"):
+                    self.cloudwatch_task_handler.set_context(self.ti)
+                    for _ in range(3):
+                        self.cloudwatch_task_handler.close()
+
+                    mock_upload.assert_called_once_with(
+                        self.cloudwatch_task_handler.log_relative_path, self.ti
+                    )
+
+    def test_close_skips_upload_without_set_context(self):
+        """close() without a prior set_context() should not call 
io.upload()."""
+        with mock.patch.object(self.cloudwatch_task_handler.io, "upload") as 
mock_upload:
+            self.cloudwatch_task_handler.close()
+            mock_upload.assert_not_called()
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="delete_local_copy only 
relevant for Airflow 3")
+    def test_close_deletes_local_log_dir(self):
+        """close() after set_context() deletes the local log directory."""
+        log_file_path = Path(self.local_log_location) / self.remote_log_stream
+        log_file_path.parent.mkdir(parents=True, exist_ok=True)
+        log_file_path.write_text("some log content")
+        assert log_file_path.parent.exists()
+
+        with conf_vars({("logging", "delete_local_logs"): "True"}):
+            handler = CloudwatchTaskHandler(
+                self.local_log_location,
+                
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
+            )
+        with 
mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"):
+            with mock.patch("watchtower.CloudWatchLogHandler.close"):
+                handler.set_context(self.ti)
+                handler.close()
 
-                mock_log_handler_close.assert_called_once()
+        assert not log_file_path.parent.exists()
 
     def test_filename_template_for_backward_compatibility(self):
         # filename_template arg support for running the latest provider on 
airflow 2

Reply via email to