moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117151252


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice 
day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: 
S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", 
len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files 
from AWS S3 to GCS"""
+        if self.poll_interval <= 0:

Review Comment:
   Good point! Fixed.



##########
airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py:
##########
@@ -490,3 +494,54 @@ def operations_contain_expected_statuses(
                 f"Expected: {', '.join(expected_statuses_set)}"
             )
         return False
+
+
+class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook):
+    """Asynchronous hook for Google Storage Transfer Service."""
+
+    def __init__(self, project_id: str | None = None, **kwargs: Any):
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self._client: StorageTransferServiceAsyncClient | None = None
+
+    def get_conn(self):
+        """
+        Returns async connection to the Storage Transfer Service
+
+        :return: Google Storage Transfer asynchronous client.
+        """
+        if not self._client:
+            try:
+                self._client = 
storage_transfer_v1.StorageTransferServiceAsyncClient()
+            except GoogleAuthError as ex:
+                raise AirflowException(ex)
+        return self._client
+
+    def get_jobs(self, job_names: list[str]):

Review Comment:
   Sure, I also updated  unit tests for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to