elaye-canopy commented on code in PR #36545:
URL: https://github.com/apache/airflow/pull/36545#discussion_r1446051467


##########
airflow/providers/google/cloud/transfers/sftp_to_gcs.py:
##########
@@ -168,6 +204,68 @@ def _copy_single_object(
             self.log.info("Executing delete of %s", source_path)
             sftp_hook.delete_file(source_path)
 
+    def _stream_single_object(
+        self, 
+        sftp_hook: SFTPHook, 
+        gcs_hook: GCSHook, 
+        source_path: str, 
+        destination_object: str
+    ) -> None:
+        """Helper function to stream a single object with robust handling and 
logging."""
+        self.log.info(
+            "Starting stream of %s to gs://%s/%s",
+            source_path,
+            self.destination_bucket,
+            destination_object,
+        )
+
+        client = gcs_hook.get_conn()
+        dest_bucket = client.bucket(self.destination_bucket)
+        dest_blob = dest_bucket.blob(destination_object)
+        temp_destination_object = f"{destination_object}.tmp"
+        temp_dest_blob = dest_bucket.blob(temp_destination_object)
+
+        # Check and delete any existing temp file from previous failed attempts
+        if temp_dest_blob.exists():
+            self.log.warning(f"Temporary file {temp_destination_object} found, 
deleting for fresh upload.")
+            temp_dest_blob.delete()
+
+        with sftp_hook.get_conn().file(source_path, 'rb') as source_stream:
+            if self.source_stream_wrapper:
+                source_stream = self.source_stream_wrapper(source_stream)
+            total_bytes_uploaded = 0
+            interval_bytes_uploaded = 0
+            if self.stream_chunk_size and self.stream_chunk_size < 
_DEFAULT_CHUNKSIZE:
+                # Use manual stream transfer
+                with temp_dest_blob.open("wb") as write_stream:
+                    while True:
+                        chunk = source_stream.read(self.stream_chunk_size)
+                        if not chunk:
+                            break
+                        write_stream.write(chunk)
+                        total_bytes_uploaded += len(chunk)
+                        interval_bytes_uploaded += len(chunk)
+
+                        # Log upload progress at intervals
+                        if self.log_interval and interval_bytes_uploaded >= 
self.log_interval:
+                            self.log.info(f"Uploaded {total_bytes_uploaded} 
bytes so far.")
+                            interval_bytes_uploaded %= self.log_interval
+            else:
+                # Use the upload_from_file method
+                temp_dest_blob.upload_from_file(source_stream)
+
+        # Copy from temp blob to final destination
+        if temp_dest_blob.exists():
+            self.log.info("Copying from temporary location to final 
destination.")
+            dest_bucket.copy_blob(temp_dest_blob, dest_bucket, 
destination_object)
+            temp_dest_blob.delete()  # Clean up the temp file
+        else:

Review Comment:
   Happy to help.
   Just make sure that if you make the changes this way then - the dependencies 
need to be updated to ensure a minimal version of Paramiko



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to