This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c60edf762aa Skip S3 folder-marker keys in S3ToGCSOperator (#65724)
c60edf762aa is described below

commit c60edf762aabcfc2fd831e67819ab50982b2b23b
Author: Yuseok Jo <[email protected]>
AuthorDate: Tue May 12 13:50:57 2026 +0900

    Skip S3 folder-marker keys in S3ToGCSOperator (#65724)
---
 .../providers/google/cloud/transfers/s3_to_gcs.py  |  32 +++++
 .../unit/google/cloud/transfers/test_s3_to_gcs.py  | 157 +++++++++++++++++++++
 2 files changed, 189 insertions(+)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py 
b/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py
index 7b44556c044..861769d5768 100644
--- a/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py
+++ b/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py
@@ -206,10 +206,42 @@ class S3ToGCSOperator(S3ListOperator):
         gcs_bucket, _ = _parse_gcs_url(self.dest_gcs)
         return [f"gs://{gcs_bucket}/{self.s3_to_gcs_object(s3_object=k)}" for 
k in s3_keys]
 
+    @staticmethod
+    def _strip_overlapping_folder_markers(keys: list[str]) -> tuple[list[str], 
list[str]]:
+        """
+        Drop trailing-slash keys that are strict prefixes of other listed keys.
+
+        Treated as S3 directory markers. A lone trailing-slash key with no 
overlap
+        (e.g. ``lonely/``) is preserved, and a non-slash key that happens to 
be a
+        strict prefix of another (e.g. ``abc`` of ``abcdef``) is also 
preserved.
+        Returns ``(kept, dropped)``.
+        """
+        if not keys:
+            return [], []
+        ordered = sorted(set(keys))
+        kept: list[str] = []
+        dropped: list[str] = []
+        for current, nxt in zip(ordered, ordered[1:]):
+            if current.endswith("/") and nxt.startswith(current):
+                dropped.append(current)
+            else:
+                kept.append(current)
+        kept.append(ordered[-1])
+        return kept, dropped
+
     def _get_files(self, context: Context, gcs_hook: GCSHook) -> list[str]:
         # use the super method to list all the files in an S3 bucket/key
         s3_objects = super().execute(context)
 
+        s3_objects, dropped_overlap_keys = 
self._strip_overlapping_folder_markers(s3_objects)
+        if dropped_overlap_keys:
+            self.log.info(
+                "Skipping %s S3 folder-marker key(s) overlapping listed 
objects "
+                "(omitted from transfer and XCom output): %s",
+                len(dropped_overlap_keys),
+                dropped_overlap_keys,
+            )
+
         if not self.replace:
             s3_objects = self.exclude_existing_objects(s3_objects=s3_objects, 
gcs_hook=gcs_hook)
 
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_s3_to_gcs.py 
b/providers/google/tests/unit/google/cloud/transfers/test_s3_to_gcs.py
index c53a18b881b..19b3c111728 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_s3_to_gcs.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_s3_to_gcs.py
@@ -551,6 +551,163 @@ class TestS3ToGoogleCloudStorageOperatorDeferrable:
         mock_create_transfer_job.assert_called()
         assert job_names == expected_job_names
 
+    @pytest.mark.parametrize(
+        ("keys", "expected_kept", "expected_dropped"),
+        [
+            # Empty / single / no-overlap listings keep everything as is.
+            ([], [], []),
+            (["a"], ["a"], []),
+            (["a", "b"], ["a", "b"], []),
+            # Non-slash prefix overlaps must NOT be treated as folder markers.
+            (["a", "ax"], ["a", "ax"], []),
+            (["a", "ax", "ay"], ["a", "ax", "ay"], []),
+            (["abc", "abcdef"], ["abc", "abcdef"], []),
+            # Slash-suffixed keys overlapping a sibling are folder markers and 
dropped.
+            (["foo/", "foo/bar.txt"], ["foo/bar.txt"], ["foo/"]),
+            (
+                ["data/", "data/sub/", "data/sub/file.txt"],
+                ["data/sub/file.txt"],
+                ["data/", "data/sub/"],
+            ),
+            # A lone trailing-slash key with no overlap is a real object and 
stays.
+            (["lonely/"], ["lonely/"], []),
+            (["lonely/", "report.csv"], ["lonely/", "report.csv"], []),
+        ],
+    )
+    def test_strip_overlapping_folder_markers(self, keys, expected_kept, 
expected_dropped):
+        """Folder-marker detection: requires both strict-prefix overlap AND 
trailing slash."""
+        kept, dropped = S3ToGCSOperator._strip_overlapping_folder_markers(keys)
+        assert kept == expected_kept
+        assert dropped == expected_dropped
+
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook")
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook")
+    def test_execute_skips_overlapping_folder_markers_in_deferrable_xcom(self, 
mock_gcs_hook, mock_s3_hook):
+        """Deferrable: overlapping folder markers are dropped from both STS 
transfer and XCom output."""
+        mock_gcs_hook.project_id = PROJECT_ID
+        operator = S3ToGCSOperator(
+            task_id=TASK_ID,
+            bucket=S3_BUCKET,
+            prefix=S3_PREFIX,
+            delimiter=S3_DELIMITER,
+            gcp_conn_id=GCS_CONN_ID,
+            dest_gcs=GCS_PATH_PREFIX,
+            deferrable=True,
+            replace=True,
+            return_gcs_uris=True,
+        )
+        operator.hook = mock.MagicMock()
+        operator.hook.list_keys.return_value = ["foo/", "foo/bar.txt", 
"lonely/"]
+
+        with mock.patch.object(operator, "submit_transfer_jobs") as 
mock_submit_transfer_jobs:
+            mock_submit_transfer_jobs.return_value = [TRANSFER_JOB_ID_0]
+            with pytest.raises(TaskDeferred) as exception_info:
+                operator.execute(context={})
+
+        # ``foo/`` is a strict prefix of ``foo/bar.txt`` and is treated as a 
folder marker, so it
+        # is omitted from the STS transfer, the trigger payload, and the XCom 
return. ``lonely/``
+        # has no overlap and is preserved everywhere.
+        expected_keys = ["foo/bar.txt", "lonely/"]
+        mock_submit_transfer_jobs.assert_called_once()
+        assert mock_submit_transfer_jobs.call_args.kwargs["files"] == 
expected_keys
+        assert exception_info.value.trigger.files == expected_keys
+
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook")
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook")
+    def test_execute_skips_overlapping_folder_markers_in_non_deferrable_xcom(
+        self, mock_gcs_hook, mock_s3_hook
+    ):
+        """Non-deferrable: overlapping folder markers are dropped from 
per-object transfer and XCom."""
+        operator = S3ToGCSOperator(
+            task_id=TASK_ID,
+            bucket=S3_BUCKET,
+            prefix=S3_PREFIX,
+            delimiter=S3_DELIMITER,
+            gcp_conn_id=GCS_CONN_ID,
+            dest_gcs=GCS_PATH_PREFIX,
+            replace=True,
+            return_gcs_uris=True,
+        )
+        operator.hook = mock.MagicMock()
+        operator.hook.list_keys.return_value = ["data/snapshot/", 
"data/snapshot/file.txt"]
+
+        with mock.patch.object(operator, "transfer_files") as 
mock_transfer_files:
+            result = operator.execute(context={})
+
+        # ``data/snapshot/`` is a strict prefix of ``data/snapshot/file.txt`` 
so it's treated as a
+        # folder marker and skipped. Only the real object is transferred and 
reported via XCom.
+        mock_transfer_files.assert_called_once()
+        assert mock_transfer_files.call_args.args[0] == 
["data/snapshot/file.txt"]
+        assert result == 
[f"gs://{GCS_BUCKET}/{GCS_PREFIX}data/snapshot/file.txt"]
+
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook")
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook")
+    def test_execute_keeps_real_slash_suffixed_keys(self, mock_gcs_hook, 
mock_s3_hook):
+        """Real S3 objects whose keys end with ``/`` and have no sibling 
overlap survive.
+
+        Regression for the reviewer's concern: S3 permits trailing-slash 
object names, so a lone
+        ``key/`` with no other listed key under that prefix is *not* a folder 
marker and must keep
+        its place in both the transfer call and the XCom payload.
+        """
+        operator = S3ToGCSOperator(
+            task_id=TASK_ID,
+            bucket=S3_BUCKET,
+            prefix=S3_PREFIX,
+            delimiter=S3_DELIMITER,
+            gcp_conn_id=GCS_CONN_ID,
+            dest_gcs=GCS_PATH_PREFIX,
+            replace=True,
+            return_gcs_uris=True,
+        )
+        operator.hook = mock.MagicMock()
+        operator.hook.list_keys.return_value = ["lonely/", "report.csv"]
+
+        with mock.patch.object(operator, "transfer_files") as 
mock_transfer_files:
+            result = operator.execute(context={})
+
+        mock_transfer_files.assert_called_once()
+        assert sorted(mock_transfer_files.call_args.args[0]) == ["lonely/", 
"report.csv"]
+        assert sorted(result) == sorted(
+            [
+                f"gs://{GCS_BUCKET}/{GCS_PREFIX}lonely/",
+                f"gs://{GCS_BUCKET}/{GCS_PREFIX}report.csv",
+            ]
+        )
+
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook")
+    @mock.patch("airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook")
+    def test_execute_keeps_non_slash_prefix_overlaps(self, mock_gcs_hook, 
mock_s3_hook):
+        """Non-slash keys that happen to be a strict prefix of another key are 
NOT folder markers.
+
+        Folder-marker detection requires both a strict-prefix overlap AND a 
trailing slash. Random
+        prefix relationships between regular object keys (e.g. opaque keys 
sharing a binary prefix)
+        must be left alone in both the transfer path and the XCom output.
+        """
+        operator = S3ToGCSOperator(
+            task_id=TASK_ID,
+            bucket=S3_BUCKET,
+            prefix=S3_PREFIX,
+            delimiter=S3_DELIMITER,
+            gcp_conn_id=GCS_CONN_ID,
+            dest_gcs=GCS_PATH_PREFIX,
+            replace=True,
+            return_gcs_uris=True,
+        )
+        operator.hook = mock.MagicMock()
+        operator.hook.list_keys.return_value = ["abc", "abcdef"]
+
+        with mock.patch.object(operator, "transfer_files") as 
mock_transfer_files:
+            result = operator.execute(context={})
+
+        mock_transfer_files.assert_called_once()
+        assert sorted(mock_transfer_files.call_args.args[0]) == ["abc", 
"abcdef"]
+        assert sorted(result) == sorted(
+            [
+                f"gs://{GCS_BUCKET}/{GCS_PREFIX}abc",
+                f"gs://{GCS_BUCKET}/{GCS_PREFIX}abcdef",
+            ]
+        )
+
     @mock.patch(
         
"airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator.log", 
new_callable=PropertyMock
     )

Reply via email to