Dawnpool commented on code in PR #46233:
URL: https://github.com/apache/airflow/pull/46233#discussion_r1943886182


##########
providers/sftp/src/airflow/providers/sftp/operators/sftp.py:
##########
@@ -144,32 +152,46 @@ def execute(self, context: Any) -> str | list[str] | None:
                 )
                 self.sftp_hook.remote_host = self.remote_host
 
-            for _local_filepath, _remote_filepath in zip(local_filepath_array, 
remote_filepath_array):
-                if self.operation.lower() == SFTPOperation.GET:
-                    local_folder = os.path.dirname(_local_filepath)
-                    if self.create_intermediate_dirs:
-                        Path(local_folder).mkdir(parents=True, exist_ok=True)
-                    file_msg = f"from {_remote_filepath} to {_local_filepath}"
-                    self.log.info("Starting to transfer %s", file_msg)
+            if self.operation.lower() in (SFTPOperation.GET, 
SFTPOperation.PUT):
+                for _local_filepath, _remote_filepath in 
zip(local_filepath_array, remote_filepath_array):
+                    if self.operation.lower() == SFTPOperation.GET:
+                        local_folder = os.path.dirname(_local_filepath)
+                        if self.create_intermediate_dirs:
+                            Path(local_folder).mkdir(parents=True, 
exist_ok=True)
+                        file_msg = f"from {_remote_filepath} to 
{_local_filepath}"
+                        self.log.info("Starting to transfer %s", file_msg)
+                        if self.sftp_hook.isdir(_remote_filepath):
+                            
self.sftp_hook.retrieve_directory(_remote_filepath, _local_filepath)
+                        else:
+                            self.sftp_hook.retrieve_file(_remote_filepath, 
_local_filepath)
+                    elif self.operation.lower() == SFTPOperation.PUT:
+                        remote_folder = os.path.dirname(_remote_filepath)
+                        if self.create_intermediate_dirs:
+                            self.sftp_hook.create_directory(remote_folder)
+                        file_msg = f"from {_local_filepath} to 
{_remote_filepath}"
+                        self.log.info("Starting to transfer file %s", file_msg)
+                        if os.path.isdir(_local_filepath):
+                            self.sftp_hook.store_directory(
+                                _remote_filepath, _local_filepath, 
confirm=self.confirm
+                            )
+                        else:
+                            self.sftp_hook.store_file(_remote_filepath, 
_local_filepath, confirm=self.confirm)
+            elif self.operation.lower() == SFTPOperation.DELETE:
+                for _remote_filepath in remote_filepath_array:
+                    file_msg = f"{_remote_filepath}"
+                    self.log.info("Starting to delete %s", file_msg)
                     if self.sftp_hook.isdir(_remote_filepath):
-                        self.sftp_hook.retrieve_directory(_remote_filepath, 
_local_filepath)
+                        self.sftp_hook.delete_directory(_remote_filepath, 
include_files=True)
                     else:
-                        self.sftp_hook.retrieve_file(_remote_filepath, 
_local_filepath)
-                else:
-                    remote_folder = os.path.dirname(_remote_filepath)
-                    if self.create_intermediate_dirs:
-                        self.sftp_hook.create_directory(remote_folder)
-                    file_msg = f"from {_local_filepath} to {_remote_filepath}"
-                    self.log.info("Starting to transfer file %s", file_msg)
-                    if os.path.isdir(_local_filepath):
-                        self.sftp_hook.store_directory(
-                            _remote_filepath, _local_filepath, 
confirm=self.confirm
-                        )
-                    else:
-                        self.sftp_hook.store_file(_remote_filepath, 
_local_filepath, confirm=self.confirm)
+                        self.sftp_hook.delete_file(_remote_filepath)
 
         except Exception as e:
-            raise AirflowException(f"Error while transferring {file_msg}, 
error: {e}")
+            operation_msg = (
+                "transferring"
+                if self.operation.lower() in (SFTPOperation.GET, 
SFTPOperation.PUT)
+                else "deleting"
+            )
+            raise AirflowException(f"Error while {operation_msg} {file_msg}, 
error: {e}")

Review Comment:
   Pushed it! Thanks :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to