This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e26a618d77 Harden Stackdriver handler against Cloud Logging failures 
(#67513)
4e26a618d77 is described below

commit 4e26a618d77b426a3080a4ce69899d3b1a6d6ab2
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Jun 2 17:09:09 2026 +0200

    Harden Stackdriver handler against Cloud Logging failures (#67513)
    
    * google: harden Stackdriver handler against Cloud Logging failures
    
    Three failure modes in ``StackdriverTaskHandler`` exposed internal
    details or broke shutdown:
    
    1. ``read()`` did not wrap ``_read_logs()``. When Cloud Logging was
       unavailable, gRPC errors propagated as HTTP 500 from the log
       viewer instead of degrading gracefully (F-011).
    2. gRPC errors from ``list_log_entries`` carry project IDs, resource
       names, and service-account info in their ``__str__``, and were
       forwarded into the user-visible error response (F-010).
    3. ``close()`` called ``self._transport.flush()`` without exception
       handling. A failed flush during shutdown raised through stdlib's
       logging machinery, which does not handle exceptions from
       ``Handler.close()`` gracefully (F-013).
    
    Wrap ``_read_logs()`` in ``read()`` with a try/except that surfaces a
    short, generic user-facing message ("Cloud Logging is currently
    unavailable.") and writes the full traceback to the handler's own
    ``_logger``. The outer guard catches the gRPC exceptions before they
    reach the user, so F-010's leakage path is closed without adding a
    second swallow inside ``_read_single_logs_page``.
    
    Wrap ``_transport.flush()`` in ``close()`` with a try/except that
    prints to stderr (since logging itself may be shutting down) so the
    shutdown chain continues even when Cloud Logging is unreachable.
    
    * Fix transport mock injection in test_close_swallows_transport_flush_errors
    
    StackdriverRemoteLogIO is a slotted attrs class (@attrs.define), so the
    cached_property `transport` is stored in a slot, not in the instance
    __dict__. The test injected the broken transport via
    `handler.io.__dict__["transport"] = ...`, which a slotted class silently
    ignores — close() then built the real transport and hit
    DefaultCredentialsError, so the assertion on the expected message failed.
    
    Assign the attribute directly (`handler.io.transport = broken_transport`)
    to pre-seed the slot without building a real transport.
---
 .../google/cloud/log/stackdriver_task_handler.py   | 33 +++++++-
 .../cloud/log/test_stackdriver_task_handler.py     | 91 ++++++++++++++++++++++
 2 files changed, 122 insertions(+), 2 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
 
b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
index 145c2564539..262f141fabb 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -23,6 +23,7 @@ import copy
 import logging
 import os
 import shutil
+import sys
 import warnings
 from collections.abc import Collection
 from datetime import datetime
@@ -64,6 +65,11 @@ if TYPE_CHECKING:
 DEFAULT_LOGGER_NAME = "airflow"
 _GLOBAL_RESOURCE = Resource(type="global", labels={})
 
+# Dedicated logger for handler-internal failures (Cloud Logging unavailable, 
gRPC errors).
+# Routed to the same 
``airflow.providers.google.cloud.log.stackdriver_task_handler`` namespace
+# so operators see these alongside the rest of the handler's logs.
+_logger = logging.getLogger(__name__)
+
 _DEFAULT_SCOPESS = frozenset(
     ["https://www.googleapis.com/auth/logging.read";, 
"https://www.googleapis.com/auth/logging.write";]
 )
@@ -422,7 +428,19 @@ class StackdriverTaskHandler(logging.Handler):
         next_page_token = metadata.get("next_page_token", None)
         all_pages = "download_logs" in metadata and metadata["download_logs"]
 
-        messages, end_of_log, next_page_token = self.io.read_logs(log_filter, 
next_page_token, all_pages)
+        try:
+            messages, end_of_log, next_page_token = 
self.io.read_logs(log_filter, next_page_token, all_pages)
+        except Exception:
+            # Cloud Logging unavailable / IAM glitch / gRPC error. Without a 
guard, the
+            # exception used to propagate up as HTTP 500 from the log viewer. 
Degrade
+            # gracefully instead: surface a short user-facing message, mark 
the read
+            # complete (no spinning retry), and log the full traceback to the 
handler's
+            # own logger for the operator.
+            _logger.exception("Failed to read logs from Cloud Logging for 
filter %s", log_filter)
+            return (
+                [((self.task_instance_hostname, "Cloud Logging is currently 
unavailable."),)],
+                [{"end_of_log": True}],
+            )
 
         new_metadata: dict[str, str | bool] = {"end_of_log": end_of_log}
 
@@ -483,4 +501,15 @@ class StackdriverTaskHandler(logging.Handler):
         return url
 
     def close(self) -> None:
-        self.io.transport.flush()
+        # ``flush()`` is best-effort during shutdown — if Cloud Logging is 
unavailable or
+        # the transport raises, that's not a reason to break the rest of the 
handler's
+        # shutdown chain (and the stdlib logging machinery does not handle 
exceptions
+        # from ``Handler.close()`` gracefully). Print to stderr as last resort 
since
+        # logging itself may be shutting down.
+        try:
+            self.io.transport.flush()
+        except Exception as exc:
+            print(
+                f"StackdriverTaskHandler.close: transport flush failed: 
{type(exc).__name__}: {exc}",
+                file=sys.stderr,
+            )
diff --git 
a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py 
b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
index a55b62fa4f4..0eb6c209f8f 100644
--- 
a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
+++ 
b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
@@ -672,3 +672,94 @@ class TestStackdriverLoggingHandlerTask:
             f'labels.try_number="{self.ti.try_number}"',
         ]
         assert set(expected_filter) == set(filter_params)
+
+
+class TestStackdriverTaskHandlerExceptionHandling:
+    """Cloud Logging failures must degrade gracefully, not leak internals."""
+
+    
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
+    
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client")
+    def test_read_falls_back_when_cloud_logging_unavailable(
+        self, mock_client, mock_get_creds_and_project_id, caplog
+    ):
+        """``read()`` must surface a user-facing message when Cloud Logging 
raises.
+
+        Without a guard, a gRPC error from ``list_log_entries`` propagates as 
HTTP 500
+        on the log viewer. The fix degrades gracefully and logs the full 
traceback for
+        the operator.
+        """
+        from google.api_core import exceptions as gapi_exceptions
+
+        mock_get_creds_and_project_id.return_value = ("creds", "project_id")
+        mock_client.return_value.list_log_entries.side_effect = 
gapi_exceptions.ServiceUnavailable(
+            "Stackdriver returned an internal error for project 
secret-project-id"
+        )
+
+        handler = StackdriverTaskHandler()
+        ti = mock.MagicMock()
+        ti.task_id = "t"
+        ti.dag_id = "d"
+        ti.try_number = 1
+        ti.logical_date = mock.MagicMock(isoformat=lambda: 
"2020-01-01T00:00:00+00:00")
+        ti.execution_date = ti.logical_date
+
+        with caplog.at_level(logging.ERROR):
+            logs, metadata = handler.read(ti, try_number=1)
+
+        # The user-facing message must NOT include the project id / internal 
details.
+        message = logs[0][0][1]
+        assert "Cloud Logging is currently unavailable" in message
+        assert "secret-project-id" not in message
+        assert metadata == [{"end_of_log": True}]
+
+    
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
+    
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client")
+    def test_read_does_not_leak_internals_in_user_facing_message(
+        self, mock_client, mock_get_creds_and_project_id
+    ):
+        """``read()`` must not propagate gRPC error details into user-visible 
messages.
+
+        A ``PermissionDenied`` from ``list_log_entries`` typically carries the 
service
+        account email + the missing IAM permission. The outer guard in 
``read()`` must
+        replace it with a generic message so an authenticated user sees no 
internal
+        identifiers.
+        """
+        from google.api_core import exceptions as gapi_exceptions
+
+        mock_get_creds_and_project_id.return_value = ("creds", "project_id")
+        mock_client.return_value.list_log_entries.side_effect = 
gapi_exceptions.PermissionDenied(
+            "service account '[email protected]' lacks 
logging.logEntries.list"
+        )
+
+        handler = StackdriverTaskHandler()
+        ti = mock.MagicMock()
+        ti.task_id = "t"
+        ti.dag_id = "d"
+        ti.try_number = 1
+        ti.logical_date = mock.MagicMock(isoformat=lambda: 
"2020-01-01T00:00:00+00:00")
+        ti.execution_date = ti.logical_date
+
+        logs, _ = handler.read(ti, try_number=1)
+
+        message = logs[0][0][1]
+        assert "Cloud Logging is currently unavailable" in message
+        assert "[email protected]" not in message
+        assert "logging.logEntries.list" not in message
+
+    def test_close_swallows_transport_flush_errors(self, capsys):
+        """``close()`` must never raise — even when transport ``flush()`` 
fails."""
+        handler = StackdriverTaskHandler()
+        broken_transport = mock.MagicMock()
+        broken_transport.flush.side_effect = RuntimeError("flush failed during 
shutdown")
+        # ``transport`` is a cached_property on the slotted attrs class
+        # ``StackdriverRemoteLogIO``; its value lives in a slot, not 
``__dict__``, so
+        # assign the attribute directly to pre-seed it without building a real 
transport.
+        handler.io.transport = broken_transport
+
+        # Must not raise.
+        handler.close()
+
+        # The failure is surfaced on stderr because the logging machinery may 
be shutting down.
+        captured = capsys.readouterr()
+        assert "transport flush failed" in captured.err
+        assert "flush failed during shutdown" in captured.err

Reply via email to