This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fd950357d6b Fix KubernetesPodOperator emitting orphan timestamps for 
empty container writes (#67652)
fd950357d6b is described below

commit fd950357d6b0485a962b66707be88d947c81bd06
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu May 28 20:00:43 2026 +0100

    Fix KubernetesPodOperator emitting orphan timestamps for empty container 
writes (#67652)
    
    When a container running under `KubernetesPodOperator` writes an empty 
line, kubelet streams it back (with `timestamps=True`) as `"<rfc3339-ts> \n"` 
-- a timestamp followed by a separator space and an empty message. 
`parse_log_line` in `pod_manager.py` called `line.strip().partition(" ")`, 
which removed the trailing separator space before partitioning, so the function 
returned `timestamp=None` and the caller treated the line as a continuation of 
the previous buffered log record. The b [...]
    
    ```
    [2026-05-28T13:07:50.160+0000] {pod_manager.py:520} INFO - [base] first 
test line
    2026-05-28T13:07:57.030578889Z
    2026-05-28T13:07:57.030581518Z
    2026-05-28T13:07:57.030642740Z
    [2026-05-28T13:07:57.034+0000] {pod_manager.py:520} INFO - [base] last test 
line
    ```
    
    Downstream that breaks 
[`airflow.utils.log.file_task_handler._parse_timestamp`](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/utils/log/file_task_handler.py#L201-L203),
 which feeds the line to `pendulum.parse` after stripping `[]`: malformed 
fragments from these orphan rows can raise `ValueError: month must be in 1..12` 
and fail the task entirely.
    
    Closes #36571.
    
    ## Root cause and history
    
    Regressed in [#33675](https://github.com/apache/airflow/pull/33675) (merged 
2023-08-24, shipped in cncf-kubernetes **7.5.0**) which replaced the original 
`line.find(\" \")` split with a `line.strip().partition(\" \")` pattern under 
the banner of a refactor:
    
    ```diff
    - split_at = line.find(\" \")
    - if split_at == -1:
    -     ...
    - timestamp = line[:split_at]
    - message = line[split_at + 1 :].rstrip()
    + timestamp, sep, message = line.strip().partition(\" \")
    + if not sep:
    +     ...
    ```
    
    The pre-refactor implementation correctly handled `<ts> \n` because 
`find(\" \")` matched the separator space directly and the message-side 
`.rstrip()` produced an empty string. The new code strips the separator off 
before partitioning, so the function loses its only signal that the line is 
well-formed.
    
    This matches the regression window the original reporter described in 
[#36571](https://github.com/apache/airflow/issues/36571): the bug appeared 
after upgrading cncf-kubernetes from 7.4.2 (pre-refactor) to 7.12.0+ 
(post-refactor) and is still reproducible on current `main` (10.17.x).
    
    ## Fix
    
    * `parse_log_line` no longer pre-strips the line; it `rstrip(\"\\n\")` only 
and partitions on the original separator, so empty container writes are 
recognised as `(timestamp, \"\")` rather than as continuations. If the 
partition yields no separator the whole line is tried as a bare timestamp (some 
kubelet versions emit `<ts>\\n` with no trailing space), and parse failures 
fall through to the original return-the-raw-line path. It also catches 
`ValueError`, not just `ParserError`, so a  [...]
    * The sync (`PodManager.fetch_container_logs.consume_logs`) and async 
(`AsyncPodManager.fetch_container_logs_before_current_sec`) log consumer loops 
skip emit for empty messages -- the resume marker still advances in the sync 
path so reconnect-since-time stays correct, but no noisy `[base] ` row is 
written.
    
    ## Tests
    
    * Parametrized `test_parse_log_line_handles_empty_container_writes` covers 
`<ts> \\n`, `<ts>\\n`, and `<ts> ` (no newline). Verified RED on `main`, GREEN 
with the fix.
    * End-to-end `test_empty_container_lines_do_not_pollute_previous_message` 
drives `fetch_container_logs` with the exact log sequence from the issue and 
asserts no orphan timestamps land in `caplog`. Also RED on `main`, GREEN with 
the fix.
    
    ## Gotchas
    
    * Truly empty container output (just `\\n`) is no longer surfaced as a 
`[base]` row. That output carries no information for the task log reader and 
was previously the trigger for downstream pendulum failures, so dropping it is 
a net improvement; if a future use case needs to count blank container lines, 
that's separable work.
---
 .../providers/cncf/kubernetes/utils/pod_manager.py | 27 +++++++--
 .../unit/cncf/kubernetes/utils/test_pod_manager.py | 69 ++++++++++++++++++++++
 2 files changed, 92 insertions(+), 4 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index e4bb7ae9115..835d239f209 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -566,6 +566,15 @@ class PodManager(LoggingMixin):
                         line = raw_line.decode("utf-8", 
errors="backslashreplace")
                         line_timestamp, message = parse_log_line(line)
                         if line_timestamp:  # detect new log line
+                            if not message:
+                                # Empty container write: advance the resume
+                                # marker but do not emit a noisy ``[base] ``
+                                # row or break the previous buffered message
+                                # with a stray continuation (#36571).
+                                self.container_log_times[
+                                    (pod.metadata.namespace, 
pod.metadata.name, container_name)
+                                ] = line_timestamp
+                                continue
                             if message_to_log is None:  # first line in the log
                                 message_to_log = message
                                 message_timestamp = line_timestamp
@@ -1108,12 +1117,17 @@ def parse_log_line(line: str) -> tuple[DateTime | None, 
str]:
     :param line: k8s log line
     :return: timestamp and log message
     """
-    timestamp, sep, message = line.strip().partition(" ")
-    if not sep:
-        return None, line
+    # Strip only the trailing newline so an empty container write (which
+    # kubelet streams back as "<rfc3339-ts> \n" under ``timestamps=True``)
+    # keeps the separator space and is recognised as a real log line, not a
+    # continuation of the previous one (#36571). When kubelet emits "<ts>\n"
+    # with no trailing space, ``partition`` returns the whole line as
+    # ``timestamp`` and ``message`` as ``""`` -- the parse below handles both.
+    stripped = line.rstrip("\n")
+    timestamp, _, message = stripped.partition(" ")
     try:
         last_log_time = cast("DateTime", pendulum.parse(timestamp))
-    except ParserError:
+    except (ParserError, ValueError):
         return None, line
     return last_log_time, message
 
@@ -1220,6 +1234,11 @@ class AsyncPodManager(LoggingMixin):
                 if line_timestamp and line_timestamp.replace(microsecond=0) == 
now_seconds:
                     break
                 if line_timestamp:  # detect new log line
+                    if not message:
+                        # Empty container write -- drop it instead of letting
+                        # it overwrite the buffered message with "" or be
+                        # emitted as a noisy ``[base] `` row (#36571).
+                        continue
                     if message_to_log is None:  # first line in the log
                         message_to_log = message
                     else:  # previous log line is complete
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 27cfd87fc6d..e2390f90dc3 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -88,6 +88,40 @@ def test_parse_log_line():
     assert line == log_message
 
 
[email protected](
+    ("raw_line", "expected_ts"),
+    [
+        pytest.param(
+            "2026-05-28T13:07:57.030578889Z \n",
+            "2026-05-28T13:07:57.030578889Z",
+            id="trailing-space-and-newline",
+        ),
+        pytest.param(
+            "2026-05-28T13:07:57.030581518Z\n",
+            "2026-05-28T13:07:57.030581518Z",
+            id="newline-only",
+        ),
+        pytest.param(
+            "2026-05-28T13:07:57.030642740Z ",
+            "2026-05-28T13:07:57.030642740Z",
+            id="trailing-space-no-newline",
+        ),
+    ],
+)
+def test_parse_log_line_handles_empty_container_writes(raw_line, expected_ts):
+    """
+    Regression for #36571: an empty container write (just ``\\n``) is streamed
+    back by kubelet as ``"<rfc3339-ts> \\n"`` when ``timestamps=True``. The
+    parser must recognise it as a real (empty) log line rather than as a
+    continuation of the previous one, otherwise the bare timestamp is appended
+    onto the previous buffered message and emitted unformatted into task logs.
+    """
+    timestamp, message = parse_log_line(raw_line)
+
+    assert timestamp == pendulum.parse(expected_ts)
+    assert message == ""
+
+
 def test_log_pod_event():
     """Test logging a pod event."""
     mock_pod_manager = mock.Mock()
@@ -782,6 +816,41 @@ class TestPodManager:
         assert "message3 line1" in caplog.text
         assert "ERROR" not in caplog.text
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
+    def test_empty_container_lines_do_not_pollute_previous_message(
+        self, mock_read_pod_logs, mock_container_is_running, caplog
+    ):
+        """
+        Regression for #36571: when a container writes empty lines, kubelet
+        returns them as ``"<ts> \\n"`` rows. Previously these slipped through
+        ``parse_log_line`` as "no timestamp" and were appended as continuations
+        onto the previous buffered message, which then emitted multi-line
+        records where only the first line carried the Airflow log prefix --
+        leaving bare ``<ts>`` rows in task logs that downstream pendulum-based
+        parsers ``(file_task_handler._parse_timestamp)`` then choked on.
+        """
+        log = (
+            "2026-05-28T13:07:50.160Z first test line\n"
+            "2026-05-28T13:07:57.030578889Z \n"
+            "2026-05-28T13:07:57.030581518Z\n"
+            "2026-05-28T13:07:57.030642740Z \n"
+            "2026-05-28T13:07:57.034Z last test line\n"
+        )
+        mock_read_pod_logs.return_value = [bytes(line, "utf-8") for line in 
log.split("\n")]
+        mock_container_is_running.return_value = False
+
+        with caplog.at_level(logging.INFO):
+            self.pod_manager.fetch_container_logs(mock.MagicMock(), "base", 
follow=True)
+
+        assert "first test line" in caplog.text
+        assert "last test line" in caplog.text
+        # The empty-line timestamps must not leak into the previous message and
+        # must not be emitted as orphan rows.
+        assert "2026-05-28T13:07:57.030578889Z" not in caplog.text
+        assert "2026-05-28T13:07:57.030581518Z" not in caplog.text
+        assert "2026-05-28T13:07:57.030642740Z" not in caplog.text
+
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
     def test_container_log_times_tracks_last_timestamp(self, 
mock_read_pod_logs, mock_container_is_running):

Reply via email to