This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6872e9bf3bb980779b429f984d9432dd3fc63106
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Sep 11 17:09:26 2025 +0100

    Make task stdout/stderr output filterable in the Sources list in the UI 
(#55508)
    
    Previously output on stdout or stderr was showing up with `chan="stdout"` 
etc,
    which as a leakly implementation detail (no where else do we refer to
    channels, and it was just an internal detail that we treat these std streams
    somwhat like a "channel")
    
    This changes it so that we record the information in the same "logger" key
    that the structured logs store the logger name in, meaning we can filter by
    them in the UI
    
    (cherry picked from commit 1455f5af8e51aed44254596d7eea878200c13cb4)
---
 airflow-core/newsfragments/55508.feature.rst       |  1 +
 .../src/airflow/sdk/execution_time/supervisor.py   | 12 +++++------
 .../task_sdk/execution_time/test_supervisor.py     | 24 ++++++++--------------
 3 files changed, 16 insertions(+), 21 deletions(-)

diff --git a/airflow-core/newsfragments/55508.feature.rst 
b/airflow-core/newsfragments/55508.feature.rst
new file mode 100644
index 00000000000..47285553ba2
--- /dev/null
+++ b/airflow-core/newsfragments/55508.feature.rst
@@ -0,0 +1 @@
+Output on stdout/stderr from within tasks is now filterable in the Sources 
list in the UI log view
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index af5ce034e5f..e3e4a1cbe83 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -542,12 +542,12 @@ class WatchedSubprocess:
         if self.subprocess_logs_to_stdout:
             target_loggers += (log,)
         self.selector.register(
-            stdout, selectors.EVENT_READ, 
self._create_log_forwarder(target_loggers, channel="stdout")
+            stdout, selectors.EVENT_READ, 
self._create_log_forwarder(target_loggers, "task.stdout")
         )
         self.selector.register(
             stderr,
             selectors.EVENT_READ,
-            self._create_log_forwarder(target_loggers, channel="stderr", 
log_level=logging.ERROR),
+            self._create_log_forwarder(target_loggers, "task.stderr", 
log_level=logging.ERROR),
         )
         self.selector.register(
             logs,
@@ -562,10 +562,10 @@ class WatchedSubprocess:
             length_prefixed_frame_reader(self.handle_requests(log), 
on_close=self._on_socket_closed),
         )
 
-    def _create_log_forwarder(self, loggers, channel, log_level=logging.INFO) 
-> Callable[[socket], bool]:
+    def _create_log_forwarder(self, loggers, name, log_level=logging.INFO) -> 
Callable[[socket], bool]:
         """Create a socket handler that forwards logs to a logger."""
         return make_buffered_socket_reader(
-            forward_to_log(loggers, chan=channel, level=log_level), 
on_close=self._on_socket_closed
+            forward_to_log(loggers, logger=name, level=log_level), 
on_close=self._on_socket_closed
         )
 
     def _on_socket_closed(self, sock: socket):
@@ -1733,7 +1733,7 @@ def process_log_messages_from_subprocess(
 
 
 def forward_to_log(
-    target_loggers: tuple[FilteringBoundLogger, ...], chan: str, level: int
+    target_loggers: tuple[FilteringBoundLogger, ...], logger: str, level: int
 ) -> Generator[None, bytes | bytearray, None]:
     while True:
         line = yield
@@ -1744,7 +1744,7 @@ def forward_to_log(
         except UnicodeDecodeError:
             msg = line.decode("ascii", errors="replace")
         for log in target_loggers:
-            log.log(level, msg, chan=chan)
+            log.log(level, msg, logger=logger)
 
 
 def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 2277e74b8d2..e6b0a3dccd9 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -272,24 +272,21 @@ class TestWatchedSubprocess:
         assert captured_logs == unordered(
             [
                 {
-                    "chan": "stdout",
+                    "logger": "task.stdout",
                     "event": "I'm a short message",
                     "level": "info",
-                    "logger": "task",
                     "timestamp": "2024-11-07T12:34:56.078901Z",
                 },
                 {
-                    "chan": "stderr",
+                    "logger": "task.stderr",
                     "event": "stderr message",
                     "level": "error",
-                    "logger": "task",
                     "timestamp": "2024-11-07T12:34:56.078901Z",
                 },
                 {
-                    "chan": "stdout",
+                    "logger": "task.stdout",
                     "event": "Message split across two writes",
                     "level": "info",
-                    "logger": "task",
                     "timestamp": "2024-11-07T12:34:56.078901Z",
                 },
                 {
@@ -578,10 +575,9 @@ class TestWatchedSubprocess:
 
         # We should have a log from the task!
         assert {
-            "chan": "stdout",
+            "logger": "task.stdout",
             "event": "Hello World hello!",
             "level": "info",
-            "logger": "task",
             "timestamp": "2024-11-07T12:34:56.078901Z",
         } in captured_logs
 
@@ -1144,36 +1140,34 @@ class TestWatchedSubprocessKill:
         assert proc.wait() == exit_after or -signal.SIGKILL
         exit_after = exit_after or signal.SIGKILL
 
-        logs = [{"event": m["event"], "chan": m.get("chan"), "logger": 
m["logger"]} for m in captured_logs]
+        logs = [{"event": m["event"], "logger": m["logger"]} for m in 
captured_logs]
         expected_logs = [
-            {"chan": "stdout", "event": "Ready", "logger": "task"},
+            {"logger": "task.stdout", "event": "Ready"},
         ]
         # Work out what logs we expect to see
         if signal_to_send == signal.SIGINT:
-            expected_logs.append({"chan": "stderr", "event": "Signal 2 
received", "logger": "task"})
+            expected_logs.append({"logger": "task.stderr", "event": "Signal 2 
received"})
         if signal_to_send == signal.SIGTERM or (
             signal_to_send == signal.SIGINT and exit_after != signal.SIGINT
         ):
             if signal_to_send == signal.SIGINT:
                 expected_logs.append(
                     {
-                        "chan": None,
                         "event": "Process did not terminate in time; 
escalating",
                         "logger": "supervisor",
                     }
                 )
-            expected_logs.append({"chan": "stderr", "event": "Signal 15 
received", "logger": "task"})
+            expected_logs.append({"logger": "task.stderr", "event": "Signal 15 
received"})
         if exit_after == signal.SIGKILL:
             if signal_to_send in {signal.SIGINT, signal.SIGTERM}:
                 expected_logs.append(
                     {
-                        "chan": None,
                         "event": "Process did not terminate in time; 
escalating",
                         "logger": "supervisor",
                     }
                 )
 
-        expected_logs.extend(({"chan": None, "event": "Process exited", 
"logger": "supervisor"},))
+        expected_logs.extend(({"event": "Process exited", "logger": 
"supervisor"},))
         assert logs == expected_logs
 
     def test_service_subprocess(self, watched_subprocess, mock_process, 
mocker):

Reply via email to