This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 75ca1714f96 fix: Support log level parsing from container output in 
`EcsRunTaskOperator` (#67180)
75ca1714f96 is described below

commit 75ca1714f963d75aa2f9af6f2ddb2605decf6670
Author: Zach Liu <[email protected]>
AuthorDate: Thu May 21 13:12:14 2026 -0400

    fix: Support log level parsing from container output in 
`EcsRunTaskOperator` (#67180)
    
    * Add log level detection for CloudWatch ECS task logs
    
    * Add log level detection to AwsTaskLogFetcher
---
 .../airflow/providers/amazon/aws/triggers/ecs.py   |   5 +-
 .../providers/amazon/aws/utils/task_log_fetcher.py |  48 +++++++-
 .../unit/amazon/aws/utils/test_task_log_fetcher.py | 124 ++++++++++++++++++++-
 3 files changed, 168 insertions(+), 9 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/ecs.py 
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/ecs.py
index ae38079ff53..3768af58a12 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/ecs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/ecs.py
@@ -26,7 +26,7 @@ from botocore.exceptions import ClientError, WaiterError
 from airflow.providers.amazon.aws.hooks.ecs import EcsHook
 from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
 from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
-from airflow.providers.amazon.aws.utils.task_log_fetcher import 
AwsTaskLogFetcher
+from airflow.providers.amazon.aws.utils.task_log_fetcher import 
AwsTaskLogFetcher, _parse_log_level
 from airflow.providers.common.compat.sdk import AirflowException
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
@@ -228,7 +228,8 @@ class TaskDoneTrigger(BaseTrigger):
 
             events = response["events"]
             for log_event in events:
-                self.log.info(AwsTaskLogFetcher.event_to_str(log_event))
+                level = _parse_log_level(log_event["message"])
+                self.log.log(level, AwsTaskLogFetcher.event_to_str(log_event))
 
             if len(events) == 0 or next_token == response["nextForwardToken"]:
                 return response["nextForwardToken"]
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/utils/task_log_fetcher.py 
b/providers/amazon/src/airflow/providers/amazon/aws/utils/task_log_fetcher.py
index c509c2a7fa7..dc0c35aedb2 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/utils/task_log_fetcher.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/utils/task_log_fetcher.py
@@ -17,6 +17,9 @@
 
 from __future__ import annotations
 
+import json
+import logging
+import re
 import time
 from collections.abc import Generator
 from datetime import datetime, timedelta, timezone
@@ -30,9 +33,49 @@ from airflow.providers.amazon.aws.hooks.logs import 
AwsLogsHook
 if TYPE_CHECKING:
     from airflow.sdk.types import Logger
 
+_LOG_LEVEL_PATTERN = re.compile(
+    
r"^\s*(?:\[)?(DEBUG|INFO|WARNING|WARN|ERROR|CRITICAL|FATAL)(?:\])?\s*[:\-]?\s*",
+    re.IGNORECASE,
+)
+_LOG_LEVEL_MAP: dict[str, int] = {
+    "DEBUG": logging.DEBUG,
+    "INFO": logging.INFO,
+    "WARNING": logging.WARNING,
+    "WARN": logging.WARNING,
+    "ERROR": logging.ERROR,
+    "CRITICAL": logging.CRITICAL,
+    "FATAL": logging.CRITICAL,
+}
+
+
+def _parse_log_level(message: str) -> int:
+    """
+    Detect the Python logging level from a CloudWatch log message.
+
+    Supports two formats:
+    1. JSON-structured logs with a ``levelname`` or ``level`` field.
+    2. Plain-text prefixes such as ``ERROR:``, ``[WARNING]``, ``CRITICAL -``, 
etc.
+
+    Returns ``logging.INFO`` when no known level is found 
(backwards-compatible).
+    """
+    stripped = message.strip()
+    if stripped.startswith("{"):
+        try:
+            parsed = json.loads(stripped)
+            level_str = parsed.get("levelname") or parsed.get("level") or ""
+            level = _LOG_LEVEL_MAP.get(level_str.upper(), -1)
+            if level != -1:
+                return level
+        except (json.JSONDecodeError, AttributeError):
+            pass
+    match = _LOG_LEVEL_PATTERN.match(message)
+    if match:
+        return _LOG_LEVEL_MAP.get(match.group(1).upper(), logging.INFO)
+    return logging.INFO
+
 
 class AwsTaskLogFetcher(Thread):
-    """Fetch Cloudwatch log events with specific interval and send the log 
events to the logger.info."""
+    """Fetch Cloudwatch log events with specific interval and forward them at 
the detected log level."""
 
     def __init__(
         self,
@@ -72,7 +115,8 @@ class AwsTaskLogFetcher(Thread):
                     # When a slight delay is added before logging the event, 
that solves the issue
                     # See https://github.com/apache/airflow/issues/40875
                     time.sleep(0.001)
-                self.logger.info(self.event_to_str(log_event))
+                level = _parse_log_level(log_event["message"])
+                self.logger.log(level, self.event_to_str(log_event))
                 prev_timestamp_event = current_timestamp_event
 
     def _get_log_events(self, skip_token: AwsLogsHook.ContinuationToken | None 
= None) -> Generator:
diff --git 
a/providers/amazon/tests/unit/amazon/aws/utils/test_task_log_fetcher.py 
b/providers/amazon/tests/unit/amazon/aws/utils/test_task_log_fetcher.py
index c6da6208373..66f883120eb 100644
--- a/providers/amazon/tests/unit/amazon/aws/utils/test_task_log_fetcher.py
+++ b/providers/amazon/tests/unit/amazon/aws/utils/test_task_log_fetcher.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+import logging
 from datetime import timedelta
 from unittest import mock
 from unittest.mock import PropertyMock
@@ -25,7 +26,7 @@ import pytest
 from botocore.exceptions import ClientError
 
 from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
-from airflow.providers.amazon.aws.utils.task_log_fetcher import 
AwsTaskLogFetcher
+from airflow.providers.amazon.aws.utils.task_log_fetcher import 
AwsTaskLogFetcher, _parse_log_level
 
 
 class TestAwsTaskLogFetcher:
@@ -67,11 +68,11 @@ class TestAwsTaskLogFetcher:
     def test_run(self, get_log_events_mock, event_is_set_mock):
         self.log_fetcher.run()
 
-        self.logger_mock.info.assert_has_calls(
+        self.logger_mock.log.assert_has_calls(
             [
-                mock.call("[2021-04-02 21:51:07,123] First"),
-                mock.call("[2021-04-02 21:52:47,456] Second"),
-                mock.call("[2021-04-02 21:54:27,789] Third"),
+                mock.call(logging.INFO, "[2021-04-02 21:51:07,123] First"),
+                mock.call(logging.INFO, "[2021-04-02 21:52:47,456] Second"),
+                mock.call(logging.INFO, "[2021-04-02 21:54:27,789] Third"),
             ]
         )
 
@@ -144,3 +145,116 @@ class TestAwsTaskLogFetcher:
     @mock.patch.object(AwsLogsHook, "conn")
     def test_get_last_log_messages_with_no_log_events(self, mock_conn):
         assert self.log_fetcher.get_last_log_messages(2) == []
+
+    @mock.patch(
+        "threading.Event.is_set",
+        side_effect=(False, True),
+    )
+    @mock.patch(
+        "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events",
+        side_effect=(
+            iter(
+                [
+                    {
+                        "timestamp": 1617400267123,
+                        "message": '{"levelname": "ERROR", "message": 
"Something failed"}',
+                    },
+                    {
+                        "timestamp": 1617400367456,
+                        "message": "WARNING: disk space low",
+                    },
+                    {
+                        "timestamp": 1617400467789,
+                        "message": "Just a plain message",
+                    },
+                ]
+            ),
+        ),
+    )
+    def test_run_with_log_level_detection(self, get_log_events_mock, 
event_is_set_mock):
+        self.log_fetcher.run()
+
+        self.logger_mock.log.assert_has_calls(
+            [
+                mock.call(
+                    logging.ERROR,
+                    '[2021-04-02 21:51:07,123] {"levelname": "ERROR", 
"message": "Something failed"}',
+                ),
+                mock.call(logging.WARNING, "[2021-04-02 21:52:47,456] WARNING: 
disk space low"),
+                mock.call(logging.INFO, "[2021-04-02 21:54:27,789] Just a 
plain message"),
+            ]
+        )
+
+
+class TestParseLogLevel:
+    @pytest.mark.parametrize(
+        ("message", "expected_level"),
+        [
+            ('{"levelname": "ERROR", "message": "fail"}', logging.ERROR),
+            ('{"levelname": "WARNING", "message": "warn"}', logging.WARNING),
+            ('{"levelname": "DEBUG", "message": "dbg"}', logging.DEBUG),
+            ('{"levelname": "CRITICAL", "message": "crit"}', logging.CRITICAL),
+            ('{"levelname": "INFO", "message": "ok"}', logging.INFO),
+            ('{"level": "error", "msg": "fail"}', logging.ERROR),
+            ('{"level": "WARNING", "msg": "warn"}', logging.WARNING),
+        ],
+        ids=[
+            "json-error",
+            "json-warning",
+            "json-debug",
+            "json-critical",
+            "json-info",
+            "json-level-field-lowercase",
+            "json-level-field-uppercase",
+        ],
+    )
+    def test_json_structured_logs(self, message, expected_level):
+        assert _parse_log_level(message) == expected_level
+
+    @pytest.mark.parametrize(
+        ("message", "expected_level"),
+        [
+            ("ERROR: something broke", logging.ERROR),
+            ("WARNING: watch out", logging.WARNING),
+            ("WARN: also watch out", logging.WARNING),
+            ("DEBUG: details", logging.DEBUG),
+            ("CRITICAL: very bad", logging.CRITICAL),
+            ("FATAL: system down", logging.CRITICAL),
+            ("[ERROR] something broke", logging.ERROR),
+            ("[WARNING] watch out", logging.WARNING),
+            ("INFO - all good", logging.INFO),
+        ],
+        ids=[
+            "prefix-error",
+            "prefix-warning",
+            "prefix-warn",
+            "prefix-debug",
+            "prefix-critical",
+            "prefix-fatal",
+            "bracketed-error",
+            "bracketed-warning",
+            "prefix-info-dash",
+        ],
+    )
+    def test_plain_text_prefix(self, message, expected_level):
+        assert _parse_log_level(message) == expected_level
+
+    @pytest.mark.parametrize(
+        "message",
+        [
+            "Just a regular log message",
+            "This message mentions ERROR in the middle",
+            "",
+            "2026-05-18 08:06:04 some log",
+            "{invalid json",
+        ],
+        ids=[
+            "plain-text",
+            "error-in-middle",
+            "empty",
+            "timestamp-prefix",
+            "invalid-json",
+        ],
+    )
+    def test_defaults_to_info(self, message):
+        assert _parse_log_level(message) == logging.INFO

Reply via email to