Dawnpool commented on code in PR #46233: URL: https://github.com/apache/airflow/pull/46233#discussion_r1943886182
########## providers/sftp/src/airflow/providers/sftp/operators/sftp.py: ########## @@ -144,32 +152,46 @@ def execute(self, context: Any) -> str | list[str] | None: ) self.sftp_hook.remote_host = self.remote_host - for _local_filepath, _remote_filepath in zip(local_filepath_array, remote_filepath_array): - if self.operation.lower() == SFTPOperation.GET: - local_folder = os.path.dirname(_local_filepath) - if self.create_intermediate_dirs: - Path(local_folder).mkdir(parents=True, exist_ok=True) - file_msg = f"from {_remote_filepath} to {_local_filepath}" - self.log.info("Starting to transfer %s", file_msg) + if self.operation.lower() in (SFTPOperation.GET, SFTPOperation.PUT): + for _local_filepath, _remote_filepath in zip(local_filepath_array, remote_filepath_array): + if self.operation.lower() == SFTPOperation.GET: + local_folder = os.path.dirname(_local_filepath) + if self.create_intermediate_dirs: + Path(local_folder).mkdir(parents=True, exist_ok=True) + file_msg = f"from {_remote_filepath} to {_local_filepath}" + self.log.info("Starting to transfer %s", file_msg) + if self.sftp_hook.isdir(_remote_filepath): + self.sftp_hook.retrieve_directory(_remote_filepath, _local_filepath) + else: + self.sftp_hook.retrieve_file(_remote_filepath, _local_filepath) + elif self.operation.lower() == SFTPOperation.PUT: + remote_folder = os.path.dirname(_remote_filepath) + if self.create_intermediate_dirs: + self.sftp_hook.create_directory(remote_folder) + file_msg = f"from {_local_filepath} to {_remote_filepath}" + self.log.info("Starting to transfer file %s", file_msg) + if os.path.isdir(_local_filepath): + self.sftp_hook.store_directory( + _remote_filepath, _local_filepath, confirm=self.confirm + ) + else: + self.sftp_hook.store_file(_remote_filepath, _local_filepath, confirm=self.confirm) + elif self.operation.lower() == SFTPOperation.DELETE: + for _remote_filepath in remote_filepath_array: + file_msg = f"{_remote_filepath}" + self.log.info("Starting to delete %s", file_msg) if self.sftp_hook.isdir(_remote_filepath): - self.sftp_hook.retrieve_directory(_remote_filepath, _local_filepath) + self.sftp_hook.delete_directory(_remote_filepath, include_files=True) else: - self.sftp_hook.retrieve_file(_remote_filepath, _local_filepath) - else: - remote_folder = os.path.dirname(_remote_filepath) - if self.create_intermediate_dirs: - self.sftp_hook.create_directory(remote_folder) - file_msg = f"from {_local_filepath} to {_remote_filepath}" - self.log.info("Starting to transfer file %s", file_msg) - if os.path.isdir(_local_filepath): - self.sftp_hook.store_directory( - _remote_filepath, _local_filepath, confirm=self.confirm - ) - else: - self.sftp_hook.store_file(_remote_filepath, _local_filepath, confirm=self.confirm) + self.sftp_hook.delete_file(_remote_filepath) except Exception as e: - raise AirflowException(f"Error while transferring {file_msg}, error: {e}") + operation_msg = ( + "transferring" + if self.operation.lower() in (SFTPOperation.GET, SFTPOperation.PUT) + else "deleting" + ) + raise AirflowException(f"Error while {operation_msg} {file_msg}, error: {e}") Review Comment: Pushed it! Thanks :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org