This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c60edf762aa Skip S3 folder-marker keys in S3ToGCSOperator (#65724)
c60edf762aa is described below
commit c60edf762aabcfc2fd831e67819ab50982b2b23b
Author: Yuseok Jo <[email protected]>
AuthorDate: Tue May 12 13:50:57 2026 +0900
Skip S3 folder-marker keys in S3ToGCSOperator (#65724)
---
.../providers/google/cloud/transfers/s3_to_gcs.py | 32 +++++
.../unit/google/cloud/transfers/test_s3_to_gcs.py | 157 +++++++++++++++++++++
2 files changed, 189 insertions(+)
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py
b/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py
index 7b44556c044..861769d5768 100644
--- a/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py
+++ b/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py
@@ -206,10 +206,42 @@ class S3ToGCSOperator(S3ListOperator):
gcs_bucket, _ = _parse_gcs_url(self.dest_gcs)
return [f"gs://{gcs_bucket}/{self.s3_to_gcs_object(s3_object=k)}" for
k in s3_keys]
+ @staticmethod
+ def _strip_overlapping_folder_markers(keys: list[str]) -> tuple[list[str],
list[str]]:
+ """
+ Drop trailing-slash keys that are strict prefixes of other listed keys.
+
+ Treated as S3 directory markers. A lone trailing-slash key with no
overlap
+ (e.g. ``lonely/``) is preserved, and a non-slash key that happens to
be a
+ strict prefix of another (e.g. ``abc`` of ``abcdef``) is also
preserved.
+ Returns ``(kept, dropped)``.
+ """
+ if not keys:
+ return [], []
+ ordered = sorted(set(keys))
+ kept: list[str] = []
+ dropped: list[str] = []
+ for current, nxt in zip(ordered, ordered[1:]):
+ if current.endswith("/") and nxt.startswith(current):
+ dropped.append(current)
+ else:
+ kept.append(current)
+ kept.append(ordered[-1])
+ return kept, dropped
+
def _get_files(self, context: Context, gcs_hook: GCSHook) -> list[str]:
# use the super method to list all the files in an S3 bucket/key
s3_objects = super().execute(context)
+ s3_objects, dropped_overlap_keys =
self._strip_overlapping_folder_markers(s3_objects)
+ if dropped_overlap_keys:
+ self.log.info(
+ "Skipping %s S3 folder-marker key(s) overlapping listed
objects "
+ "(omitted from transfer and XCom output): %s",
+ len(dropped_overlap_keys),
+ dropped_overlap_keys,
+ )
+
if not self.replace:
s3_objects = self.exclude_existing_objects(s3_objects=s3_objects,
gcs_hook=gcs_hook)
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_s3_to_gcs.py
b/providers/google/tests/unit/google/cloud/transfers/test_s3_to_gcs.py
index c53a18b881b..19b3c111728 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_s3_to_gcs.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_s3_to_gcs.py
@@ -551,6 +551,163 @@ class TestS3ToGoogleCloudStorageOperatorDeferrable:
mock_create_transfer_job.assert_called()
assert job_names == expected_job_names
+ @pytest.mark.parametrize(
+ ("keys", "expected_kept", "expected_dropped"),
+ [
+ # Empty / single / no-overlap listings keep everything as is.
+ ([], [], []),
+ (["a"], ["a"], []),
+ (["a", "b"], ["a", "b"], []),
+ # Non-slash prefix overlaps must NOT be treated as folder markers.
+ (["a", "ax"], ["a", "ax"], []),
+ (["a", "ax", "ay"], ["a", "ax", "ay"], []),
+ (["abc", "abcdef"], ["abc", "abcdef"], []),
+ # Slash-suffixed keys overlapping a sibling are folder markers and
dropped.
+ (["foo/", "foo/bar.txt"], ["foo/bar.txt"], ["foo/"]),
+ (
+ ["data/", "data/sub/", "data/sub/file.txt"],
+ ["data/sub/file.txt"],
+ ["data/", "data/sub/"],
+ ),
+ # A lone trailing-slash key with no overlap is a real object and
stays.
+ (["lonely/"], ["lonely/"], []),
+ (["lonely/", "report.csv"], ["lonely/", "report.csv"], []),
+ ],
+ )
+ def test_strip_overlapping_folder_markers(self, keys, expected_kept,
expected_dropped):
+ """Folder-marker detection: requires both strict-prefix overlap AND
trailing slash."""
+ kept, dropped = S3ToGCSOperator._strip_overlapping_folder_markers(keys)
+ assert kept == expected_kept
+ assert dropped == expected_dropped
+
+ @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook")
+ @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook")
+ def test_execute_skips_overlapping_folder_markers_in_deferrable_xcom(self,
mock_gcs_hook, mock_s3_hook):
+ """Deferrable: overlapping folder markers are dropped from both STS
transfer and XCom output."""
+ mock_gcs_hook.project_id = PROJECT_ID
+ operator = S3ToGCSOperator(
+ task_id=TASK_ID,
+ bucket=S3_BUCKET,
+ prefix=S3_PREFIX,
+ delimiter=S3_DELIMITER,
+ gcp_conn_id=GCS_CONN_ID,
+ dest_gcs=GCS_PATH_PREFIX,
+ deferrable=True,
+ replace=True,
+ return_gcs_uris=True,
+ )
+ operator.hook = mock.MagicMock()
+ operator.hook.list_keys.return_value = ["foo/", "foo/bar.txt",
"lonely/"]
+
+ with mock.patch.object(operator, "submit_transfer_jobs") as
mock_submit_transfer_jobs:
+ mock_submit_transfer_jobs.return_value = [TRANSFER_JOB_ID_0]
+ with pytest.raises(TaskDeferred) as exception_info:
+ operator.execute(context={})
+
+ # ``foo/`` is a strict prefix of ``foo/bar.txt`` and is treated as a
folder marker, so it
+ # is omitted from the STS transfer, the trigger payload, and the XCom
return. ``lonely/``
+ # has no overlap and is preserved everywhere.
+ expected_keys = ["foo/bar.txt", "lonely/"]
+ mock_submit_transfer_jobs.assert_called_once()
+ assert mock_submit_transfer_jobs.call_args.kwargs["files"] ==
expected_keys
+ assert exception_info.value.trigger.files == expected_keys
+
+ @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook")
+ @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook")
+ def test_execute_skips_overlapping_folder_markers_in_non_deferrable_xcom(
+ self, mock_gcs_hook, mock_s3_hook
+ ):
+ """Non-deferrable: overlapping folder markers are dropped from
per-object transfer and XCom."""
+ operator = S3ToGCSOperator(
+ task_id=TASK_ID,
+ bucket=S3_BUCKET,
+ prefix=S3_PREFIX,
+ delimiter=S3_DELIMITER,
+ gcp_conn_id=GCS_CONN_ID,
+ dest_gcs=GCS_PATH_PREFIX,
+ replace=True,
+ return_gcs_uris=True,
+ )
+ operator.hook = mock.MagicMock()
+ operator.hook.list_keys.return_value = ["data/snapshot/",
"data/snapshot/file.txt"]
+
+ with mock.patch.object(operator, "transfer_files") as
mock_transfer_files:
+ result = operator.execute(context={})
+
+ # ``data/snapshot/`` is a strict prefix of ``data/snapshot/file.txt``
so it's treated as a
+ # folder marker and skipped. Only the real object is transferred and
reported via XCom.
+ mock_transfer_files.assert_called_once()
+ assert mock_transfer_files.call_args.args[0] ==
["data/snapshot/file.txt"]
+ assert result ==
[f"gs://{GCS_BUCKET}/{GCS_PREFIX}data/snapshot/file.txt"]
+
+ @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook")
+ @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook")
+ def test_execute_keeps_real_slash_suffixed_keys(self, mock_gcs_hook,
mock_s3_hook):
+ """Real S3 objects whose keys end with ``/`` and have no sibling
overlap survive.
+
+ Regression for the reviewer's concern: S3 permits trailing-slash
object names, so a lone
+ ``key/`` with no other listed key under that prefix is *not* a folder
marker and must keep
+ its place in both the transfer call and the XCom payload.
+ """
+ operator = S3ToGCSOperator(
+ task_id=TASK_ID,
+ bucket=S3_BUCKET,
+ prefix=S3_PREFIX,
+ delimiter=S3_DELIMITER,
+ gcp_conn_id=GCS_CONN_ID,
+ dest_gcs=GCS_PATH_PREFIX,
+ replace=True,
+ return_gcs_uris=True,
+ )
+ operator.hook = mock.MagicMock()
+ operator.hook.list_keys.return_value = ["lonely/", "report.csv"]
+
+ with mock.patch.object(operator, "transfer_files") as
mock_transfer_files:
+ result = operator.execute(context={})
+
+ mock_transfer_files.assert_called_once()
+ assert sorted(mock_transfer_files.call_args.args[0]) == ["lonely/",
"report.csv"]
+ assert sorted(result) == sorted(
+ [
+ f"gs://{GCS_BUCKET}/{GCS_PREFIX}lonely/",
+ f"gs://{GCS_BUCKET}/{GCS_PREFIX}report.csv",
+ ]
+ )
+
+ @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook")
+ @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook")
+ def test_execute_keeps_non_slash_prefix_overlaps(self, mock_gcs_hook,
mock_s3_hook):
+ """Non-slash keys that happen to be a strict prefix of another key are
NOT folder markers.
+
+ Folder-marker detection requires both a strict-prefix overlap AND a
trailing slash. Random
+ prefix relationships between regular object keys (e.g. opaque keys
sharing a binary prefix)
+ must be left alone in both the transfer path and the XCom output.
+ """
+ operator = S3ToGCSOperator(
+ task_id=TASK_ID,
+ bucket=S3_BUCKET,
+ prefix=S3_PREFIX,
+ delimiter=S3_DELIMITER,
+ gcp_conn_id=GCS_CONN_ID,
+ dest_gcs=GCS_PATH_PREFIX,
+ replace=True,
+ return_gcs_uris=True,
+ )
+ operator.hook = mock.MagicMock()
+ operator.hook.list_keys.return_value = ["abc", "abcdef"]
+
+ with mock.patch.object(operator, "transfer_files") as
mock_transfer_files:
+ result = operator.execute(context={})
+
+ mock_transfer_files.assert_called_once()
+ assert sorted(mock_transfer_files.call_args.args[0]) == ["abc",
"abcdef"]
+ assert sorted(result) == sorted(
+ [
+ f"gs://{GCS_BUCKET}/{GCS_PREFIX}abc",
+ f"gs://{GCS_BUCKET}/{GCS_PREFIX}abcdef",
+ ]
+ )
+
@mock.patch(
"airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator.log",
new_callable=PropertyMock
)