This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e55cffc4175 Fail closed in GCS log handler when existing-log read
fails (#67511)
e55cffc4175 is described below
commit e55cffc4175567e2519341035c52b7672411ea49
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri May 29 15:32:42 2026 +0200
Fail closed in GCS log handler when existing-log read fails (#67511)
* google: fail closed in GCS log handler when existing-log read fails
``GCSRemoteLogIO.write`` reads the existing blob, appends the new log
content, then uploads. When ``download_as_bytes()`` failed for a
reason other than "object does not exist" (transient GCS outage, IAM
glitch, network blip), the read exception was caught, a warning was
logged, and execution fell through to upload **only the new content**
— silently truncating prior log history.
Distinguish the 404 case (safe: write the new content as a fresh
blob) from non-404 read failures (transient: keep local logs, return
False, let the next heartbeat retry). The 404 path is unchanged.
Updates ``test_write`` to reflect the new fail-closed contract: when
``download_as_bytes`` raises a non-404 error the handler now returns
``False`` and does not call ``upload_from_string``.
* Update compat test for fail-closed GCS log handler contract
The pre-existing 2.11-compat test asserted the old fail-open contract
(download fails, upload with truncated content). Align it with the new
fail-closed behavior: download attempted, no upload.
---
.../providers/google/cloud/log/gcs_task_handler.py | 14 ++++++-
.../unit/google/cloud/log/test_gcs_task_handler.py | 46 +++++++++++-----------
2 files changed, 37 insertions(+), 23 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
index 24dcfd5f01c..f9a64b4929e 100644
---
a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
+++
b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -139,7 +139,19 @@ class GCSRemoteLogIO(LoggingMixin): # noqa: D101
log = f"{old_log}\n{log}" if old_log else log
except Exception as e:
if not self.no_log_found(e):
- self.log.warning("Error checking for previous log: %s", e)
+ # Read failed for a reason other than "object does not exist"
(e.g. transient
+ # GCS outage, IAM glitch, network blip). Fall through to the
upload would
+ # overwrite the existing blob with only the new content and
*truncate* the
+ # prior log history. Fail closed instead: keep local logs and
let the next
+ # heartbeat retry. ``no_log_found`` covers the 404 case, where
it is safe to
+ # write the new content as a fresh blob.
+ self.log.warning(
+ "Refusing to overwrite remote log %s: could not read
existing content (%s). "
+ "Keeping local logs for later retry.",
+ remote_log_location,
+ e,
+ )
+ return False
try:
blob = storage.Blob.from_string(remote_log_location, self.client)
blob.upload_from_string(log, content_type="text/plain")
diff --git
a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py
b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py
index a592d2f29a6..aba65728472 100644
--- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py
+++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py
@@ -191,17 +191,23 @@ class TestGCSRemoteLogIO:
result = gcs_remote_log_io.write(new_log_content, remote_log_location)
# verify
- assert result == upload_success
-
- # verify the content that was uploaded
- if upload_success:
- call_args =
mock_blob.from_string.return_value.upload_from_string.call_args
- if call_args:
- uploaded_content = call_args[0][0]
- if old_log_exists and not old_log_read_error:
- assert uploaded_content ==
f"{old_log_content}\n{new_log_content}"
- else:
- assert uploaded_content == new_log_content
+ # If reading the existing blob failed for a reason other than "not
found", the
+ # handler now fails closed (returns False without uploading) rather
than overwriting
+ # the existing blob with only the new content. The 404 case still
proceeds to upload.
+ if old_log_read_error is not None:
+ assert result is False
+
mock_blob.from_string.return_value.upload_from_string.assert_not_called()
+ else:
+ assert result == upload_success
+ # verify the content that was uploaded
+ if upload_success:
+ call_args =
mock_blob.from_string.return_value.upload_from_string.call_args
+ if call_args:
+ uploaded_content = call_args[0][0]
+ if old_log_exists:
+ assert uploaded_content ==
f"{old_log_content}\n{new_log_content}"
+ else:
+ assert uploaded_content == new_log_content
@pytest.mark.parametrize(
"is_stream_method",
@@ -541,18 +547,14 @@ class TestGCSTaskHandler:
)
self.gcs_task_handler.close()
- mock_blob.from_string.assert_has_calls(
- [
- mock.call("gs://bucket/remote/log/location/1.log",
mock_client.return_value),
- mock.call().download_as_bytes(),
- mock.call("gs://bucket/remote/log/location/1.log",
mock_client.return_value),
- mock.call().upload_from_string(
- "MESSAGE\n",
- content_type="text/plain",
- ),
- ],
- any_order=False,
+ # Fail-closed contract: when reading the existing blob fails for a
reason other than
+ # "object does not exist", the handler must not overwrite the remote
log with only
+ # the new content. Expect the read attempt but no upload.
+ mock_blob.from_string.assert_called_once_with(
+ "gs://bucket/remote/log/location/1.log", mock_client.return_value
)
+
mock_blob.from_string.return_value.download_as_bytes.assert_called_once()
+
mock_blob.from_string.return_value.upload_from_string.assert_not_called()
@pytest.mark.parametrize(
("delete_local_copy", "expected_existence_of_local_copy"),