elaye-canopy commented on code in PR #36545:
URL: https://github.com/apache/airflow/pull/36545#discussion_r1443975480


##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -168,6 +204,68 @@ def _copy_single_object(
             self.log.info("Executing delete of %s", source_path)
             sftp_hook.delete_file(source_path)
 
+    def _stream_single_object(
+        self, 
+        sftp_hook: SFTPHook, 
+        gcs_hook: GCSHook, 
+        source_path: str, 
+        destination_object: str
+    ) -> None:
+        """Helper function to stream a single object with robust handling and 
logging."""
+        self.log.info(
+            "Starting stream of %s to gs://%s/%s",
+            source_path,
+            self.destination_bucket,
+            destination_object,
+        )
+
+        client = gcs_hook.get_conn()
+        dest_bucket = client.bucket(self.destination_bucket)
+        dest_blob = dest_bucket.blob(destination_object)
+        temp_destination_object = f"{destination_object}.tmp"
+        temp_dest_blob = dest_bucket.blob(temp_destination_object)
+
+        # Check and delete any existing temp file from previous failed attempts
+        if temp_dest_blob.exists():
+            self.log.warning(f"Temporary file {temp_destination_object} found, 
deleting for fresh upload.")
+            temp_dest_blob.delete()
+
+        with sftp_hook.get_conn().file(source_path, 'rb') as source_stream:
+            if self.source_stream_wrapper:
+                source_stream = self.source_stream_wrapper(source_stream)
+            total_bytes_uploaded = 0
+            interval_bytes_uploaded = 0
+            if self.stream_chunk_size and self.stream_chunk_size < 
_DEFAULT_CHUNKSIZE:
+                # Use manual stream transfer
+                with temp_dest_blob.open("wb") as write_stream:
+                    while True:
+                        chunk = source_stream.read(self.stream_chunk_size)
+                        if not chunk:
+                            break
+                        write_stream.write(chunk)
+                        total_bytes_uploaded += len(chunk)
+                        interval_bytes_uploaded += len(chunk)
+
+                        # Log upload progress at intervals
+                        if self.log_interval and interval_bytes_uploaded >= 
self.log_interval:
+                            self.log.info(f"Uploaded {total_bytes_uploaded} 
bytes so far.")
+                            interval_bytes_uploaded %= self.log_interval
+            else:
+                # Use the upload_from_file method
+                temp_dest_blob.upload_from_file(source_stream)
+
+        # Copy from temp blob to final destination
+        if temp_dest_blob.exists():
+            self.log.info("Copying from temporary location to final 
destination.")
+            dest_bucket.copy_blob(temp_dest_blob, dest_bucket, 
destination_object)
+            temp_dest_blob.delete()  # Clean up the temp file
+        else:

Review Comment:
   This basically encapuslates your entire codeblock into a single external 
paramiko maintained function as far as I can see.



##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -168,6 +204,68 @@ def _copy_single_object(
             self.log.info("Executing delete of %s", source_path)
             sftp_hook.delete_file(source_path)
 
+    def _stream_single_object(
+        self, 
+        sftp_hook: SFTPHook, 
+        gcs_hook: GCSHook, 
+        source_path: str, 
+        destination_object: str
+    ) -> None:
+        """Helper function to stream a single object with robust handling and 
logging."""
+        self.log.info(
+            "Starting stream of %s to gs://%s/%s",
+            source_path,
+            self.destination_bucket,
+            destination_object,
+        )
+
+        client = gcs_hook.get_conn()
+        dest_bucket = client.bucket(self.destination_bucket)
+        dest_blob = dest_bucket.blob(destination_object)
+        temp_destination_object = f"{destination_object}.tmp"
+        temp_dest_blob = dest_bucket.blob(temp_destination_object)
+
+        # Check and delete any existing temp file from previous failed attempts
+        if temp_dest_blob.exists():
+            self.log.warning(f"Temporary file {temp_destination_object} found, 
deleting for fresh upload.")
+            temp_dest_blob.delete()
+
+        with sftp_hook.get_conn().file(source_path, 'rb') as source_stream:
+            if self.source_stream_wrapper:
+                source_stream = self.source_stream_wrapper(source_stream)
+            total_bytes_uploaded = 0
+            interval_bytes_uploaded = 0
+            if self.stream_chunk_size and self.stream_chunk_size < 
_DEFAULT_CHUNKSIZE:
+                # Use manual stream transfer
+                with temp_dest_blob.open("wb") as write_stream:
+                    while True:
+                        chunk = source_stream.read(self.stream_chunk_size)
+                        if not chunk:
+                            break
+                        write_stream.write(chunk)
+                        total_bytes_uploaded += len(chunk)
+                        interval_bytes_uploaded += len(chunk)
+
+                        # Log upload progress at intervals
+                        if self.log_interval and interval_bytes_uploaded >= 
self.log_interval:
+                            self.log.info(f"Uploaded {total_bytes_uploaded} 
bytes so far.")
+                            interval_bytes_uploaded %= self.log_interval
+            else:
+                # Use the upload_from_file method
+                temp_dest_blob.upload_from_file(source_stream)
+
+        # Copy from temp blob to final destination
+        if temp_dest_blob.exists():
+            self.log.info("Copying from temporary location to final 
destination.")
+            dest_bucket.copy_blob(temp_dest_blob, dest_bucket, 
destination_object)
+            temp_dest_blob.delete()  # Clean up the temp file
+        else:

Review Comment:
   Based on my testing, this will lead to performance degradation since the 
non-streamed transfer uses paramiko's prefetch default feature to speed things 
up whereas you go back and forth per chunk.
   We used Paramiko's, which also allows passing of a logging function that 
takes (size, file_size) as parameters to report progress.
   
   Paramiko 3.3.0 or higher is required for the following snippet, prior to 
3.3.0 getfo doesn't have the concurrency limit parameter and defaults to 
unlimited
   
   `    def _stream_single_object(self, sftp_hook: SFTPHook, gcs_hook: GCSHook, 
source_path: str, destination_object: str):
           """Helper function to stream single object."""
           self.log.info(
               "Executing stream of %s to gs://%s/%s",
               source_path,
               self.destination_bucket,
               destination_object,
           )
   
           client = gcs_hook.get_conn()
           dest_blob = 
client.bucket(self.destination_bucket).blob(destination_object)
   
           with dest_blob.open("wb") as write_stream:
               sftp_hook.get_conn().getfo(
                   source_path,
                   write_stream,
                   callback=self.log_stream_progress,
                   prefetch=self.sftp_prefetch, 
max_concurrent_prefetch_requests=self.max_concurrent_prefetch_requests)` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to