This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e55cffc4175 Fail closed in GCS log handler when existing-log read 
fails (#67511)
e55cffc4175 is described below

commit e55cffc4175567e2519341035c52b7672411ea49
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri May 29 15:32:42 2026 +0200

    Fail closed in GCS log handler when existing-log read fails (#67511)
    
    * google: fail closed in GCS log handler when existing-log read fails
    
    ``GCSRemoteLogIO.write`` reads the existing blob, appends the new log
    content, then uploads. When ``download_as_bytes()`` failed for a
    reason other than "object does not exist" (transient GCS outage, IAM
    glitch, network blip), the read exception was caught, a warning was
    logged, and execution fell through to upload **only the new content**
    — silently truncating prior log history.
    
    Distinguish the 404 case (safe: write the new content as a fresh
    blob) from non-404 read failures (transient: keep local logs, return
    False, let the next heartbeat retry). The 404 path is unchanged.
    
    Updates ``test_write`` to reflect the new fail-closed contract: when
    ``download_as_bytes`` raises a non-404 error the handler now returns
    ``False`` and does not call ``upload_from_string``.
    
    * Update compat test for fail-closed GCS log handler contract
    
    The pre-existing 2.11-compat test asserted the old fail-open contract
    (download fails, upload with truncated content). Align it with the new
    fail-closed behavior: download attempted, no upload.
---
 .../providers/google/cloud/log/gcs_task_handler.py | 14 ++++++-
 .../unit/google/cloud/log/test_gcs_task_handler.py | 46 +++++++++++-----------
 2 files changed, 37 insertions(+), 23 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py 
b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
index 24dcfd5f01c..f9a64b4929e 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -139,7 +139,19 @@ class GCSRemoteLogIO(LoggingMixin):  # noqa: D101
             log = f"{old_log}\n{log}" if old_log else log
         except Exception as e:
             if not self.no_log_found(e):
-                self.log.warning("Error checking for previous log: %s", e)
+                # Read failed for a reason other than "object does not exist" 
(e.g. transient
+                # GCS outage, IAM glitch, network blip). Fall through to the 
upload would
+                # overwrite the existing blob with only the new content and 
*truncate* the
+                # prior log history. Fail closed instead: keep local logs and 
let the next
+                # heartbeat retry. ``no_log_found`` covers the 404 case, where 
it is safe to
+                # write the new content as a fresh blob.
+                self.log.warning(
+                    "Refusing to overwrite remote log %s: could not read 
existing content (%s). "
+                    "Keeping local logs for later retry.",
+                    remote_log_location,
+                    e,
+                )
+                return False
         try:
             blob = storage.Blob.from_string(remote_log_location, self.client)
             blob.upload_from_string(log, content_type="text/plain")
diff --git 
a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py 
b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py
index a592d2f29a6..aba65728472 100644
--- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py
+++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py
@@ -191,17 +191,23 @@ class TestGCSRemoteLogIO:
         result = gcs_remote_log_io.write(new_log_content, remote_log_location)
 
         # verify
-        assert result == upload_success
-
-        # verify the content that was uploaded
-        if upload_success:
-            call_args = 
mock_blob.from_string.return_value.upload_from_string.call_args
-            if call_args:
-                uploaded_content = call_args[0][0]
-                if old_log_exists and not old_log_read_error:
-                    assert uploaded_content == 
f"{old_log_content}\n{new_log_content}"
-                else:
-                    assert uploaded_content == new_log_content
+        # If reading the existing blob failed for a reason other than "not 
found", the
+        # handler now fails closed (returns False without uploading) rather 
than overwriting
+        # the existing blob with only the new content. The 404 case still 
proceeds to upload.
+        if old_log_read_error is not None:
+            assert result is False
+            
mock_blob.from_string.return_value.upload_from_string.assert_not_called()
+        else:
+            assert result == upload_success
+            # verify the content that was uploaded
+            if upload_success:
+                call_args = 
mock_blob.from_string.return_value.upload_from_string.call_args
+                if call_args:
+                    uploaded_content = call_args[0][0]
+                    if old_log_exists:
+                        assert uploaded_content == 
f"{old_log_content}\n{new_log_content}"
+                    else:
+                        assert uploaded_content == new_log_content
 
     @pytest.mark.parametrize(
         "is_stream_method",
@@ -541,18 +547,14 @@ class TestGCSTaskHandler:
         )
         self.gcs_task_handler.close()
 
-        mock_blob.from_string.assert_has_calls(
-            [
-                mock.call("gs://bucket/remote/log/location/1.log", 
mock_client.return_value),
-                mock.call().download_as_bytes(),
-                mock.call("gs://bucket/remote/log/location/1.log", 
mock_client.return_value),
-                mock.call().upload_from_string(
-                    "MESSAGE\n",
-                    content_type="text/plain",
-                ),
-            ],
-            any_order=False,
+        # Fail-closed contract: when reading the existing blob fails for a 
reason other than
+        # "object does not exist", the handler must not overwrite the remote 
log with only
+        # the new content. Expect the read attempt but no upload.
+        mock_blob.from_string.assert_called_once_with(
+            "gs://bucket/remote/log/location/1.log", mock_client.return_value
         )
+        
mock_blob.from_string.return_value.download_as_bytes.assert_called_once()
+        
mock_blob.from_string.return_value.upload_from_string.assert_not_called()
 
     @pytest.mark.parametrize(
         ("delete_local_copy", "expected_existence_of_local_copy"),

Reply via email to