pankajastro commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1116213336


##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -490,3 +494,54 @@ def operations_contain_expected_statuses(
                 f"Expected: {', '.join(expected_statuses_set)}"
             )
         return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+    """Asynchronous hook for Google Storage Transfer Service."""
+
+    def __init__(self, project_id: str | None = None, **kwargs: Any):
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self._client: StorageTransferServiceAsyncClient | None = None
+
+    def get_conn(self):
+        """
+        Returns async connection to the Storage Transfer Service
+
+        :return: Google Storage Transfer asynchronous client.
+        """
+        if not self._client:
+            try:
+                self._client = 
storage_transfer_v1.StorageTransferServiceAsyncClient()
+            except GoogleAuthError as ex:
+                raise AirflowException(ex)
+        return self._client
+
+    def get_jobs(self, job_names: list[str]):

Review Comment:
   can we make this async fun 



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice 
day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: 
S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", 
len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files 
from AWS S3 to GCS"""

Review Comment:
   ```suggestion
           """Submits Google Cloud Storage Transfer Service job to copy files 
from AWS S3 to GCS"""
           
   ```



##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud 
Storage Transfer job
+
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id
+    """
+
+    def __init__(self, job_names: list[str], project_id: str | None = None, 
poll_interval: int = 10):
+        super().__init__()
+        self.project_id = project_id
+        self.job_names = job_names
+        self.poll_interval = poll_interval
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes StorageTransferJobsTrigger arguments and classpath."""
+        return (
+            
"airflow.providers.google.cloud.triggers.cloud_storage_transfer_service."
+            "CloudStorageTransferServiceCreateJobsTrigger",
+            {
+                "project_id": self.project_id,
+                "job_names": self.job_names,
+                "poll_interval": self.poll_interval,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:  # type: 
ignore[override]
+        """Gets current data storage transfer jobs and yields a TriggerEvent"""
+        async_hook = self.get_async_hook()
+
+        while True:
+            self.log.info("Attempting to request jobs statuses")

Review Comment:
   Adding more meaningful log messages here will help, something like maybe how 
many job done 



##########
tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service_async.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import sys
+
+import pytest
+from google.auth.exceptions import GoogleAuthError
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import (
+    CloudDataTransferServiceAsyncHook,
+)
+from tests.providers.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_default_project_id
+
+if sys.version_info < (3, 8):
+    from asynctest import mock
+else:
+    from unittest import mock
+
+
+TEST_PROJECT_ID = "project-id"
+
+
+@pytest.fixture
+def hook_async():
+    with mock.patch(
+        
"airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook.__init__",
+        new=mock_base_gcp_hook_default_project_id,
+    ):
+        yield CloudDataTransferServiceAsyncHook()
+
+
+if sys.version_info < (3, 8):

Review Comment:
   Let's reuse it from 
https://github.com/apache/airflow/blob/main/tests/providers/google/cloud/utils/compat.py



##########
tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service_async.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import sys
+
+import pytest
+from google.auth.exceptions import GoogleAuthError
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import (
+    CloudDataTransferServiceAsyncHook,
+)
+from tests.providers.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_default_project_id
+
+if sys.version_info < (3, 8):

Review Comment:
   reuse it from 
https://github.com/apache/airflow/blob/main/tests/providers/google/cloud/utils/compat.py



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice 
day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: 
S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", 
len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files 
from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value 
greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, 
s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": 
now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   keep these keys consistent i.e either use import constant for both key or 
hardcoded one like "secret_access_key"



##########
tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service_async.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import sys
+
+import pytest
+from google.auth.exceptions import GoogleAuthError
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import (
+    CloudDataTransferServiceAsyncHook,
+)
+from tests.providers.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_default_project_id
+
+if sys.version_info < (3, 8):
+    from asynctest import mock
+else:
+    from unittest import mock
+
+
+TEST_PROJECT_ID = "project-id"
+
+
+@pytest.fixture
+def hook_async():
+    with mock.patch(
+        
"airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook.__init__",
+        new=mock_base_gcp_hook_default_project_id,
+    ):
+        yield CloudDataTransferServiceAsyncHook()
+
+
+if sys.version_info < (3, 8):
+
+    class AsyncMock(mock.MagicMock):
+        async def __call__(self, *args, **kwargs):
+            return super(AsyncMock, self).__call__(*args, **kwargs)
+
+else:
+    from unittest.mock import AsyncMock
+
+
+class TestCloudDataTransferServiceAsyncHook:
+    @mock.patch(
+        "airflow.providers.google.cloud.hooks.cloud_storage_transfer_service"

Review Comment:
   This patch path is really long maybe you can keep a constant
   ```
   base_path = 
airflow.providers.google.cloud.hooks.cloud_storage_transfer_service
   
   
@mock.patch(f"{base_path}.storage_transfer_v1.StorageTransferServiceAsyncClient")
   def test_(...):
      ...
   ```



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice 
day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: 
S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", 
len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files 
from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value 
greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, 
s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": 
now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   what if roleArn is givien ?



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice 
day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: 
S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", 
len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files 
from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value 
greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, 
s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": 
now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   shall we path param also `path`, WDYT?



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice 
day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: 
S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", 
len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files 
from AWS S3 to GCS"""
+        if self.poll_interval <= 0:

Review Comment:
   shall we check this in transfer operator init only that way it would fail 
early 



##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice 
day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: 
S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", 
len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files 
from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value 
greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, 
s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": 
now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,
+                        "secret_access_key": config.aws_secret_access_key,
+                    },
+                },
+                OBJECT_CONDITIONS: {
+                    "include_prefixes": [],
+                },
+                GCS_DATA_SINK: {BUCKET_NAME: gcs_bucket, PATH: gcs_prefix},
+                TRANSFER_OPTIONS: {
+                    "overwrite_objects_already_existing_in_sink": self.replace,
+                },
+            },
+        }
+
+        # max size of the field 
'transfer_job.transfer_spec.object_conditions.include_prefixes' is 1000,
+        # that's why we submit multiple jobs transferring 1000 files each.
+        # See documentation below
+        # 
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions
+        chunk_size = self.transfer_job_max_files_number
+        job_names = []
+        transfer_hook = self.get_transfer_hook()
+        for i in range(0, len(files), chunk_size):
+            files_chunk = files[i : i + chunk_size]
+            body[TRANSFER_SPEC][OBJECT_CONDITIONS]["include_prefixes"] = 
files_chunk
+            job = transfer_hook.create_transfer_job(body=body)

Review Comment:
   If a job fails in the middle, do we need to cleanup? 



##########
airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py:
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator
+
+from google.api_core.exceptions import GoogleAPIError
+from google.cloud.storage_transfer_v1.types import TransferOperation
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import (
+    CloudDataTransferServiceAsyncHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger):
+    """
+    StorageTransferJobTrigger run on the trigger worker to perform Cloud 
Storage Transfer job
+
+    :param job_names: List of transfer jobs names
+    :param project_id: GCP project id
+    """
+
+    def __init__(self, job_names: list[str], project_id: str | None = None, 
poll_interval: int = 10):
+        super().__init__()
+        self.project_id = project_id
+        self.job_names = job_names
+        self.poll_interval = poll_interval
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes StorageTransferJobsTrigger arguments and classpath."""
+        return (
+            
"airflow.providers.google.cloud.triggers.cloud_storage_transfer_service."
+            "CloudStorageTransferServiceCreateJobsTrigger",
+            {
+                "project_id": self.project_id,
+                "job_names": self.job_names,
+                "poll_interval": self.poll_interval,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:  # type: 
ignore[override]
+        """Gets current data storage transfer jobs and yields a TriggerEvent"""
+        async_hook = self.get_async_hook()
+
+        while True:
+            self.log.info("Attempting to request jobs statuses")
+            jobs_completed_successful = 0
+            try:
+                jobs_pager = await 
async_hook.get_jobs(job_names=self.job_names)
+                jobs, awaitable_operations = [], []
+                async for job in jobs_pager:
+                    operation = async_hook.get_latest_operation(job)
+                    jobs.append(job)
+                    awaitable_operations.append(operation)
+
+                operations: list[TransferOperation] = await 
asyncio.gather(*awaitable_operations)
+
+                for job, operation in zip(jobs, operations):
+                    if operation is None:
+                        yield TriggerEvent(

Review Comment:
   once it fails what will happen to job that you have created in 
`create_transfer_job`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to