Dawnpool commented on code in PR #46233:
URL: https://github.com/apache/airflow/pull/46233#discussion_r1942106732


##########
providers/sftp/src/airflow/providers/sftp/operators/sftp.py:
##########
@@ -144,32 +152,46 @@ def execute(self, context: Any) -> str | list[str] | None:
                 )
                 self.sftp_hook.remote_host = self.remote_host
 
-            for _local_filepath, _remote_filepath in zip(local_filepath_array, 
remote_filepath_array):
-                if self.operation.lower() == SFTPOperation.GET:
-                    local_folder = os.path.dirname(_local_filepath)
-                    if self.create_intermediate_dirs:
-                        Path(local_folder).mkdir(parents=True, exist_ok=True)
-                    file_msg = f"from {_remote_filepath} to {_local_filepath}"
-                    self.log.info("Starting to transfer %s", file_msg)
+            if self.operation.lower() in (SFTPOperation.GET, 
SFTPOperation.PUT):
+                for _local_filepath, _remote_filepath in 
zip(local_filepath_array, remote_filepath_array):
+                    if self.operation.lower() == SFTPOperation.GET:
+                        local_folder = os.path.dirname(_local_filepath)
+                        if self.create_intermediate_dirs:
+                            Path(local_folder).mkdir(parents=True, 
exist_ok=True)
+                        file_msg = f"from {_remote_filepath} to 
{_local_filepath}"
+                        self.log.info("Starting to transfer %s", file_msg)
+                        if self.sftp_hook.isdir(_remote_filepath):
+                            
self.sftp_hook.retrieve_directory(_remote_filepath, _local_filepath)
+                        else:
+                            self.sftp_hook.retrieve_file(_remote_filepath, 
_local_filepath)
+                    elif self.operation.lower() == SFTPOperation.PUT:
+                        remote_folder = os.path.dirname(_remote_filepath)
+                        if self.create_intermediate_dirs:
+                            self.sftp_hook.create_directory(remote_folder)
+                        file_msg = f"from {_local_filepath} to 
{_remote_filepath}"
+                        self.log.info("Starting to transfer file %s", file_msg)
+                        if os.path.isdir(_local_filepath):
+                            self.sftp_hook.store_directory(
+                                _remote_filepath, _local_filepath, 
confirm=self.confirm
+                            )
+                        else:
+                            self.sftp_hook.store_file(_remote_filepath, 
_local_filepath, confirm=self.confirm)
+            elif self.operation.lower() == SFTPOperation.DELETE:
+                for _remote_filepath in remote_filepath_array:
+                    file_msg = f"{_remote_filepath}"
+                    self.log.info("Starting to delete %s", file_msg)
                     if self.sftp_hook.isdir(_remote_filepath):
-                        self.sftp_hook.retrieve_directory(_remote_filepath, 
_local_filepath)
+                        self.sftp_hook.delete_directory(_remote_filepath, 
include_files=True)
                     else:
-                        self.sftp_hook.retrieve_file(_remote_filepath, 
_local_filepath)
-                else:
-                    remote_folder = os.path.dirname(_remote_filepath)
-                    if self.create_intermediate_dirs:
-                        self.sftp_hook.create_directory(remote_folder)
-                    file_msg = f"from {_local_filepath} to {_remote_filepath}"
-                    self.log.info("Starting to transfer file %s", file_msg)
-                    if os.path.isdir(_local_filepath):
-                        self.sftp_hook.store_directory(
-                            _remote_filepath, _local_filepath, 
confirm=self.confirm
-                        )
-                    else:
-                        self.sftp_hook.store_file(_remote_filepath, 
_local_filepath, confirm=self.confirm)
+                        self.sftp_hook.delete_file(_remote_filepath)
 
         except Exception as e:
-            raise AirflowException(f"Error while transferring {file_msg}, 
error: {e}")
+            operation_msg = (
+                "transferring"
+                if self.operation.lower() in (SFTPOperation.GET, 
SFTPOperation.PUT)
+                else "deleting"
+            )
+            raise AirflowException(f"Error while {operation_msg} {file_msg}, 
error: {e}")

Review Comment:
   Then the error message for GET and PUT would be like `Error while processing 
from local_filepath to remote_filepath`, which feels a little awkward to me.
   
   How about
   ```
   raise AirflowException(f"Error while processing {self.operation.lower()} 
operation {file_msg}, error: {e}")
   ```
   so the message would be like `Error while processing GET operation from 
local_filepath to remote_filepath`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to