This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ca62b8f23d Fix Cloudwatch remote logging (#48774)
5ca62b8f23d is described below

commit 5ca62b8f23ddb23bfb468044dfc08ef1126031ff
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Apr 4 06:25:37 2025 -0700

    Fix Cloudwatch remote logging (#48774)
    
    There were three main issues:
    
    1) A circular loop that eventually fails due to call depth exceeded. This 
is because the handler was lazily initted during the first log emission. But 
when the handler is created some code down stream tries to log, and since there 
is no handler yet (because we're in the middle of creating it), it tries to 
create another one, and we were spinning ad infinitum.
    2) The stream name is not set on read, because we don't call set_context 
anywhere in the SDK path, and the processor doesn't have access to the TI 
anyway (which is used for the stream name). So a 0 byte stream name was being 
used and was causing a failure in Watchtower.
    3) read is also failing because it is using the relative_path as the stream 
name, which is almost right,  but the name isn't sanitized (there are some 
characters that cloudwatch doesn't allow in a stream name). set_context used to 
sanitize the name and set it, but it isn't called in the SDK path.
---
 .../amazon/aws/log/cloudwatch_task_handler.py      | 13 ++++-
 .../amazon/aws/log/test_cloudwatch_task_handler.py | 65 ++++++++++++++--------
 task-sdk/src/airflow/sdk/log.py                    | 28 ++++++----
 3 files changed, 70 insertions(+), 36 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index 37cff5bca19..6191cb82f50 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -116,8 +116,18 @@ class CloudWatchRemoteLogIO(LoggingMixin):  # noqa: D101
         import structlog.stdlib
 
         logRecordFactory = getLogRecordFactory()
+        # The handler MUST be initted here, before the processor is actually 
used to log anything.
+        # Otherwise, logging that occurs during the creation of the handler 
can create infinite loops.
+        _handler = self.handler
+        from airflow.sdk.log import relative_path_from_logger
 
         def proc(logger: structlog.typing.WrappedLogger, method_name: str, 
event: structlog.typing.EventDict):
+            if not logger or not (stream_name := 
relative_path_from_logger(logger)):
+                return event
+            # Only init the handler stream_name once. We cannot do it above 
when we init the handler because
+            # we don't yet know the log path at that point.
+            if not _handler.log_stream_name:
+                _handler.log_stream_name = stream_name.as_posix().replace(":", 
"_")
             name = event.get("logger_name") or event.get("logger", "")
             level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(), 
logging.INFO)
             msg = copy.copy(event)
@@ -134,7 +144,7 @@ class CloudWatchRemoteLogIO(LoggingMixin):  # noqa: D101
                 ct = created.timestamp()
                 record.created = ct
                 record.msecs = int((ct - int(ct)) * 1000) + 0.0  # Copied from 
stdlib logging
-            self.handler.handle(record)
+            _handler.handle(record)
             return event
 
         return (proc,)
@@ -177,6 +187,7 @@ class CloudWatchRemoteLogIO(LoggingMixin):  # noqa: D101
         :param task_instance: the task instance to get logs about
         :return: string of all logs from the given log stream
         """
+        stream_name = stream_name.replace(":", "_")
         # If there is an end_date to the task instance, fetch logs until that 
date + 30 seconds
         # 30 seconds is an arbitrary buffer so that we don't miss any logs 
that were emitted
         end_time = (
diff --git 
a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py 
b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
index dd765ab6945..3696f552d76 100644
--- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -75,19 +75,24 @@ class TestCloudRemoteLogIO:
 
         self.remote_log_group = "log_group_name"
         self.region_name = "us-west-2"
+        self.task_log_path = "dag_id=a/0:0.log"
         self.local_log_location = tmp_path / "local-cloudwatch-log-location"
         self.local_log_location.mkdir()
+        # Create the local log file structure
+        task_log_path_parts = self.task_log_path.split("/")
+        dag_dir = self.local_log_location / task_log_path_parts[0]
+        dag_dir.mkdir()
+        task_log_file = dag_dir / task_log_path_parts[1]
+        task_log_file.touch()
 
         # The subject under test
         self.subject = CloudWatchRemoteLogIO(
             base_log_folder=self.local_log_location,
             
log_group_arn=f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
-            log_stream_name="dag_id=a/0.log",
         )
 
         conn = boto3.client("logs", region_name=self.region_name)
         conn.create_log_group(logGroupName=self.remote_log_group)
-        conn.create_log_stream(logGroupName=self.remote_log_group, 
logStreamName=self.subject.log_stream_name)
 
         processors = structlog.get_config()["processors"]
         old_processors = processors.copy()
@@ -108,7 +113,13 @@ class TestCloudRemoteLogIO:
                 raise structlog.DropEvent()
 
             processors[-1] = drop
-            structlog.configure(processors=processors)
+            structlog.configure(
+                processors=processors,
+                # Create a logger factory and pass in the file path we want it 
to use
+                # This is because we use the logger to determine the 
streamname/filepath
+                # in the CloudWatchRemoteLogIO processor.
+                
logger_factory=structlog.PrintLoggerFactory(file=task_log_file.open("w+")),
+            )
             yield
         finally:
             # remove LogCapture and restore original processors
@@ -118,32 +129,38 @@ class TestCloudRemoteLogIO:
 
     @time_machine.travel(datetime(2025, 3, 27, 21, 58, 1, 2345), tick=False)
     def test_log_message(self):
-        import structlog
+        # Use a context instead of a decorator on the test method because we 
need access to self to
+        # get the path from the setup method.
+        with conf_vars({("logging", "base_log_folder"): 
self.local_log_location.as_posix()}):
+            import structlog
 
-        log = structlog.get_logger()
-        log.info("Hi", foo="bar")
-        # We need to close in order to flush the logs etc.
-        self.subject.close()
+            log = structlog.get_logger()
+            log.info("Hi", foo="bar")
+            # We need to close in order to flush the logs etc.
+            self.subject.close()
 
-        logs = self.subject.read("dag_id=a/0.log", self.ti)
+            # Inside the Cloudwatch logger we swap colons for underscores 
since colons are not allowed in
+            # stream names.
+            stream_name = self.task_log_path.replace(":", "_")
+            logs = self.subject.read(stream_name, self.ti)
 
-        if AIRFLOW_V_3_0_PLUS:
-            from airflow.utils.log.file_task_handler import 
StructuredLogMessage
+            if AIRFLOW_V_3_0_PLUS:
+                from airflow.utils.log.file_task_handler import 
StructuredLogMessage
 
-            metadata, logs = logs
+                metadata, logs = logs
 
-            results = TypeAdapter(list[StructuredLogMessage]).dump_python(logs)
-            assert metadata == [
-                "Reading remote log from Cloudwatch log_group: log_group_name 
log_stream: dag_id=a/0.log"
-            ]
-            assert results == [
-                {
-                    "event": "Hi",
-                    "foo": "bar",
-                    "level": "info",
-                    "timestamp": datetime(2025, 3, 27, 21, 58, 1, 2000, 
tzinfo=TzInfo(0)),
-                },
-            ]
+                results = 
TypeAdapter(list[StructuredLogMessage]).dump_python(logs)
+                assert metadata == [
+                    f"Reading remote log from Cloudwatch log_group: 
log_group_name log_stream: {stream_name}"
+                ]
+                assert results == [
+                    {
+                        "event": "Hi",
+                        "foo": "bar",
+                        "level": "info",
+                        "timestamp": datetime(2025, 3, 27, 21, 58, 1, 2000, 
tzinfo=TzInfo(0)),
+                    },
+                ]
 
     def test_event_to_str(self):
         handler = self.subject
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index d209a14a7f4..8c6e6ff8f9d 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -496,26 +496,32 @@ def load_remote_log_handler() -> RemoteLogIO | None:
     return airflow.logging_config.REMOTE_TASK_LOG
 
 
-def upload_to_remote(logger: FilteringBoundLogger):
-    from airflow.configuration import conf
-
-    raw_logger = getattr(logger, "_logger")
-
-    if not raw_logger or not hasattr(raw_logger, "_file"):
+def relative_path_from_logger(logger) -> Path | None:
+    if not logger:
+        return None
+    if not hasattr(logger, "_file"):
         logger.warning("Unable to find log file, logger was of unexpected 
type", type=type(logger))
-        return
+        return None
 
-    fh = raw_logger._file
+    fh = logger._file
     fname = fh.name
 
     if fh.fileno() == 1 or not isinstance(fname, str):
         # Logging to stdout, or something odd about this logger, don't try to 
upload!
-        return
+        return None
+    from airflow.configuration import conf
+
     base_log_folder = conf.get("logging", "base_log_folder")
-    relative_path = Path(fname).relative_to(base_log_folder)
+    return Path(fname).relative_to(base_log_folder)
+
+
+def upload_to_remote(logger: FilteringBoundLogger):
+    raw_logger = getattr(logger, "_logger")
+
+    relative_path = relative_path_from_logger(raw_logger)
 
     handler = load_remote_log_handler()
-    if not handler:
+    if not handler or not relative_path:
         return
 
     log_relative_path = relative_path.as_posix()

Reply via email to