moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117162070


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice 
day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: 
S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", 
len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files 
from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value 
greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, 
s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": 
now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,

Review Comment:
   Speaking about consistency, agree with you, thanks for noticing - I fixed 
that.
   Regarding other questions I'm not sure that I understand what exactly you 
mean. Could you please describe it in more detailed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to