Lee-W commented on code in PR #36545:
URL: https://github.com/apache/airflow/pull/36545#discussion_r1441262932


##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -85,13 +106,18 @@ def __init__(
         source_path: str,
         destination_bucket: str,
         destination_path: str | None = None,
-        gcp_conn_id: str = "google_cloud_default",
         sftp_conn_id: str = "ssh_default",
+        gcp_conn_id: str = "google_cloud_default",

Review Comment:
   Not sure whether we should change the order here; it might break users who 
are not passing this value with key `gcp_conn_id` to initialize this operator



##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -85,13 +106,18 @@ def __init__(
         source_path: str,
         destination_bucket: str,
         destination_path: str | None = None,
-        gcp_conn_id: str = "google_cloud_default",
         sftp_conn_id: str = "ssh_default",
+        gcp_conn_id: str = "google_cloud_default",
         mime_type: str = "application/octet-stream",
         gzip: bool = False,
         move_object: bool = False,
         impersonation_chain: str | Sequence[str] | None = None,
         sftp_prefetch: bool = True,
+        use_stream: bool = False,
+        log_stream_progress: bool = False,
+        stream_chunk_size = _DEFAULT_CHUNKSIZE, # 1024 * 1024 B * 100 = 100 MB

Review Comment:
   ```suggestion
           stream_chunk_size: int = _DEFAULT_CHUNKSIZE, # 1024 * 1024 B * 100 = 
100 MB
   ```



##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -70,6 +70,27 @@ class SFTPToGCSOperator(BaseOperator):
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
     :param sftp_prefetch: Whether to enable SFTP prefetch, the default is True.
+    :param use_stream: Determines the method of file transfer between SFTP and 
GCS.
+    - If set to False (default), the file is downloaded to the worker's local 
storage and 
+      then uploaded to GCS. This may require significant disk space on the 
worker for large files.
+    - If set to True, the file is streamed directly from SFTP to GCS, which 
does not consume 
+      local disk space on the worker. 

Review Comment:
   ```suggestion
           - If set to False (default), the file is downloaded to the worker's 
local storage and 
             then uploaded to GCS. This may require significant disk space on 
the worker for large files.
           - If set to True, the file is streamed directly from SFTP to GCS, 
which does not consume 
             local disk space on the worker. 
   ```
   
   Should we add an additional indentation here?



##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -168,6 +204,68 @@ def _copy_single_object(
             self.log.info("Executing delete of %s", source_path)
             sftp_hook.delete_file(source_path)
 
+    def _stream_single_object(
+        self, 
+        sftp_hook: SFTPHook, 
+        gcs_hook: GCSHook, 
+        source_path: str, 
+        destination_object: str
+    ) -> None:
+        """Helper function to stream a single object with robust handling and 
logging."""
+        self.log.info(
+            "Starting stream of %s to gs://%s/%s",
+            source_path,
+            self.destination_bucket,
+            destination_object,
+        )
+
+        client = gcs_hook.get_conn()
+        dest_bucket = client.bucket(self.destination_bucket)
+        dest_blob = dest_bucket.blob(destination_object)
+        temp_destination_object = f"{destination_object}.tmp"
+        temp_dest_blob = dest_bucket.blob(temp_destination_object)
+
+        # Check and delete any existing temp file from previous failed attempts
+        if temp_dest_blob.exists():
+            self.log.warning(f"Temporary file {temp_destination_object} found, 
deleting for fresh upload.")
+            temp_dest_blob.delete()
+
+        with sftp_hook.get_conn().file(source_path, 'rb') as source_stream:

Review Comment:
   ```suggestion
           with sftp_hook.get_conn().file(source_path, "rb") as source_stream:
   ```
   
   you might want to run `breeze static-checks` locally to catch/fix some of 
the style issues



##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -85,13 +106,18 @@ def __init__(
         source_path: str,
         destination_bucket: str,
         destination_path: str | None = None,
-        gcp_conn_id: str = "google_cloud_default",
         sftp_conn_id: str = "ssh_default",
+        gcp_conn_id: str = "google_cloud_default",
         mime_type: str = "application/octet-stream",
         gzip: bool = False,
         move_object: bool = False,
         impersonation_chain: str | Sequence[str] | None = None,
         sftp_prefetch: bool = True,
+        use_stream: bool = False,
+        log_stream_progress: bool = False,
+        stream_chunk_size = _DEFAULT_CHUNKSIZE, # 1024 * 1024 B * 100 = 100 MB
+        source_stream_wrapper = None, 
+        log_interval = None,

Review Comment:
   ```suggestion
           log_interval: int | None = None,
   ```



##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -168,6 +204,68 @@ def _copy_single_object(
             self.log.info("Executing delete of %s", source_path)
             sftp_hook.delete_file(source_path)
 
+    def _stream_single_object(
+        self, 
+        sftp_hook: SFTPHook, 
+        gcs_hook: GCSHook, 
+        source_path: str, 
+        destination_object: str
+    ) -> None:
+        """Helper function to stream a single object with robust handling and 
logging."""
+        self.log.info(
+            "Starting stream of %s to gs://%s/%s",
+            source_path,
+            self.destination_bucket,
+            destination_object,
+        )
+
+        client = gcs_hook.get_conn()
+        dest_bucket = client.bucket(self.destination_bucket)
+        dest_blob = dest_bucket.blob(destination_object)
+        temp_destination_object = f"{destination_object}.tmp"
+        temp_dest_blob = dest_bucket.blob(temp_destination_object)
+
+        # Check and delete any existing temp file from previous failed attempts
+        if temp_dest_blob.exists():
+            self.log.warning(f"Temporary file {temp_destination_object} found, 
deleting for fresh upload.")
+            temp_dest_blob.delete()
+
+        with sftp_hook.get_conn().file(source_path, 'rb') as source_stream:

Review Comment:
   https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst



##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -168,6 +204,68 @@ def _copy_single_object(
             self.log.info("Executing delete of %s", source_path)
             sftp_hook.delete_file(source_path)
 
+    def _stream_single_object(
+        self, 
+        sftp_hook: SFTPHook, 
+        gcs_hook: GCSHook, 
+        source_path: str, 
+        destination_object: str
+    ) -> None:
+        """Helper function to stream a single object with robust handling and 
logging."""
+        self.log.info(
+            "Starting stream of %s to gs://%s/%s",
+            source_path,
+            self.destination_bucket,
+            destination_object,
+        )
+
+        client = gcs_hook.get_conn()
+        dest_bucket = client.bucket(self.destination_bucket)
+        dest_blob = dest_bucket.blob(destination_object)
+        temp_destination_object = f"{destination_object}.tmp"
+        temp_dest_blob = dest_bucket.blob(temp_destination_object)
+
+        # Check and delete any existing temp file from previous failed attempts
+        if temp_dest_blob.exists():
+            self.log.warning(f"Temporary file {temp_destination_object} found, 
deleting for fresh upload.")

Review Comment:
   AFAIK, airflow prefer to use %s when logging



##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -130,18 +160,24 @@ def execute(self, context: Context):
 
             for file in files:
                 destination_path = file.replace(base_path, 
self.destination_path, 1)
-                self._copy_single_object(gcs_hook, sftp_hook, file, 
destination_path)
+                if self.use_stream:
+                    self._stream_single_object(sftp_hook, gcs_hook, file, 
destination_path)
+                else:
+                    self._copy_single_object(sftp_hook, gcs_hook, file, 
destination_path)

Review Comment:
   ```suggestion
                   transfer_single_object = self._stream_single_object if 
self.use_stream else self.self._copy_single_object
                   transfer_single_object(sftp_hook, gcs_hook, file, 
destination_path)
   ```
   
   not sure whether it's better, but I might try something like this to reduce 
the mental work of comparing the arguments



##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -85,13 +106,18 @@ def __init__(
         source_path: str,
         destination_bucket: str,
         destination_path: str | None = None,
-        gcp_conn_id: str = "google_cloud_default",
         sftp_conn_id: str = "ssh_default",
+        gcp_conn_id: str = "google_cloud_default",
         mime_type: str = "application/octet-stream",
         gzip: bool = False,
         move_object: bool = False,
         impersonation_chain: str | Sequence[str] | None = None,
         sftp_prefetch: bool = True,
+        use_stream: bool = False,
+        log_stream_progress: bool = False,
+        stream_chunk_size = _DEFAULT_CHUNKSIZE, # 1024 * 1024 B * 100 = 100 MB
+        source_stream_wrapper = None, 

Review Comment:
   ```suggestion
           source_stream_wrapper: Callable | None = None, 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to