moiseenkov commented on code in PR #29462:
URL: https://github.com/apache/airflow/pull/29462#discussion_r1117191833


##########
airflow/providers/google/cloud/transfers/s3_to_gcs.py:
##########
@@ -184,34 +212,127 @@ def execute(self, context: Context):
             else:
                 self.log.info("There are no new files to sync. Have a nice 
day!")
 
-        if files:
-            hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-
-            for file in files:
-                # GCS hook builds its own in-memory file so we have to create
-                # and pass the path
-                file_object = hook.get_key(file, self.bucket)
-                with NamedTemporaryFile(mode="wb", delete=True) as f:
-                    file_object.download_fileobj(f)
-                    f.flush()
-
-                    dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
-                    # There will always be a '/' before file because it is
-                    # enforced at instantiation time
-                    dest_gcs_object = dest_gcs_object_prefix + file
-
-                    # Sync is sequential and the hook already logs too much
-                    # so skip this for now
-                    # self.log.info(
-                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
-                    #     ' as object {3}'.format(file, self.bucket,
-                    #                             dest_gcs_bucket,
-                    #                             dest_gcs_object))
-
-                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
-
-            self.log.info("All done, uploaded %d files to Google Cloud 
Storage", len(files))
-        else:
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+        if not files:
             self.log.info("In sync, no files needed to be uploaded to Google 
Cloud Storage")
+        elif self.deferrable:
+            self.transfer_files_async(files, gcs_hook, s3_hook)
+        else:
+            self.transfer_files(files, gcs_hook, s3_hook)
 
         return files
+
+    def transfer_files(self, files: list[str], gcs_hook: GCSHook, s3_hook: 
S3Hook):
+        """Copies files from AWS S3 bucket to GCS bucket"""
+        for file in files:
+            # GCS hook builds its own in-memory file, so we have to create
+            # and pass the path
+            file_object = s3_hook.get_key(file, self.bucket)
+            with NamedTemporaryFile(mode="wb", delete=True) as f:
+                file_object.download_fileobj(f)
+                f.flush()
+
+                dest_gcs_bucket, dest_gcs_object_prefix = 
_parse_gcs_url(self.dest_gcs)
+                # There will always be a '/' before file because it is
+                # enforced at instantiation time
+                dest_gcs_object = dest_gcs_object_prefix + file
+
+                # Sync is sequential and the hook already logs too much
+                # so skip this for now
+                # self.log.info(
+                #     'Saving file {0} from S3 bucket {1} in GCS bucket {2}'
+                #     ' as object {3}'.format(file, self.bucket,
+                #                             dest_gcs_bucket,
+                #                             dest_gcs_object))
+
+                gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name, 
gzip=self.gzip)
+
+        self.log.info("All done, uploaded %d files to Google Cloud Storage", 
len(files))
+
+    def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook):
+        """Submits Google Cloud Storage Transfer Service job to copy files 
from AWS S3 to GCS"""
+        if self.poll_interval <= 0:
+            raise ValueError("Invalid value for poll_interval. Expected value 
greater than 0")
+        if len(files) <= 0:
+            raise ValueError("List of transferring files cannot be empty")
+        job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, 
s3_hook=s3_hook)
+
+        self.defer(
+            trigger=CloudStorageTransferServiceCreateJobsTrigger(
+                project_id=gcs_hook.project_id,
+                job_names=job_names,
+                poll_interval=self.poll_interval,
+            ),
+            method_name="execute_complete",
+        )
+
+    def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook) -> list[str]:
+        now = datetime.utcnow()
+        one_time_schedule = {"day": now.day, "month": now.month, "year": 
now.year}
+
+        gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
+        config = s3_hook.conn_config
+
+        body: dict[str, Any] = {
+            PROJECT_ID: gcs_hook.project_id,
+            STATUS: GcpTransferJobsStatus.ENABLED,
+            SCHEDULE: {
+                "schedule_start_date": one_time_schedule,
+                "schedule_end_date": one_time_schedule,
+            },
+            TRANSFER_SPEC: {
+                AWS_S3_DATA_SOURCE: {
+                    BUCKET_NAME: self.bucket,
+                    AWS_ACCESS_KEY: {
+                        ACCESS_KEY_ID: config.aws_access_key_id,
+                        "secret_access_key": config.aws_secret_access_key,
+                    },
+                },
+                OBJECT_CONDITIONS: {
+                    "include_prefixes": [],
+                },
+                GCS_DATA_SINK: {BUCKET_NAME: gcs_bucket, PATH: gcs_prefix},
+                TRANSFER_OPTIONS: {
+                    "overwrite_objects_already_existing_in_sink": self.replace,
+                },
+            },
+        }
+
+        # max size of the field 
'transfer_job.transfer_spec.object_conditions.include_prefixes' is 1000,
+        # that's why we submit multiple jobs transferring 1000 files each.
+        # See documentation below
+        # 
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions
+        chunk_size = self.transfer_job_max_files_number
+        job_names = []
+        transfer_hook = self.get_transfer_hook()
+        for i in range(0, len(files), chunk_size):
+            files_chunk = files[i : i + chunk_size]
+            body[TRANSFER_SPEC][OBJECT_CONDITIONS]["include_prefixes"] = 
files_chunk
+            job = transfer_hook.create_transfer_job(body=body)

Review Comment:
   As I see it, it's better to not perform cleaning up or undo operations if 
the job fails, because current implementation is transparent for users - they 
are able to see, where the transfer got interrupted, what files were copied and 
what files still waiting, they can find the failed job and examine it get a 
better understanding of what happened.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to