morooshka commented on code in PR #46233:
URL: https://github.com/apache/airflow/pull/46233#discussion_r1939974467
##########
providers/sftp/src/airflow/providers/sftp/operators/sftp.py:
##########
@@ -144,29 +152,38 @@ def execute(self, context: Any) -> str | list[str] | None:
)
self.sftp_hook.remote_host = self.remote_host
- for _local_filepath, _remote_filepath in zip(local_filepath_array,
remote_filepath_array):
- if self.operation.lower() == SFTPOperation.GET:
- local_folder = os.path.dirname(_local_filepath)
- if self.create_intermediate_dirs:
- Path(local_folder).mkdir(parents=True, exist_ok=True)
- file_msg = f"from {_remote_filepath} to {_local_filepath}"
- self.log.info("Starting to transfer %s", file_msg)
+ if self.operation in (SFTPOperation.GET, SFTPOperation.PUT):
+ for _local_filepath, _remote_filepath in
zip(local_filepath_array, remote_filepath_array):
+ if self.operation.lower() == SFTPOperation.GET:
Review Comment:
The conditions in lines 155 and 157 are not equal, I think we need add
`.lower()` in the 155-th line also
##########
providers/sftp/src/airflow/providers/sftp/operators/sftp.py:
##########
@@ -144,29 +152,38 @@ def execute(self, context: Any) -> str | list[str] | None:
)
self.sftp_hook.remote_host = self.remote_host
- for _local_filepath, _remote_filepath in zip(local_filepath_array,
remote_filepath_array):
- if self.operation.lower() == SFTPOperation.GET:
- local_folder = os.path.dirname(_local_filepath)
- if self.create_intermediate_dirs:
- Path(local_folder).mkdir(parents=True, exist_ok=True)
- file_msg = f"from {_remote_filepath} to {_local_filepath}"
- self.log.info("Starting to transfer %s", file_msg)
+ if self.operation in (SFTPOperation.GET, SFTPOperation.PUT):
+ for _local_filepath, _remote_filepath in
zip(local_filepath_array, remote_filepath_array):
+ if self.operation.lower() == SFTPOperation.GET:
+ local_folder = os.path.dirname(_local_filepath)
+ if self.create_intermediate_dirs:
+ Path(local_folder).mkdir(parents=True,
exist_ok=True)
+ file_msg = f"from {_remote_filepath} to
{_local_filepath}"
+ self.log.info("Starting to transfer %s", file_msg)
+ if self.sftp_hook.isdir(_remote_filepath):
+
self.sftp_hook.retrieve_directory(_remote_filepath, _local_filepath)
+ else:
+ self.sftp_hook.retrieve_file(_remote_filepath,
_local_filepath)
+ elif self.operation.lower() == SFTPOperation.PUT:
+ remote_folder = os.path.dirname(_remote_filepath)
+ if self.create_intermediate_dirs:
+ self.sftp_hook.create_directory(remote_folder)
+ file_msg = f"from {_local_filepath} to
{_remote_filepath}"
+ self.log.info("Starting to transfer file %s", file_msg)
+ if os.path.isdir(_local_filepath):
+ self.sftp_hook.store_directory(
+ _remote_filepath, _local_filepath,
confirm=self.confirm
+ )
+ else:
+ self.sftp_hook.store_file(_remote_filepath,
_local_filepath, confirm=self.confirm)
+ else:
Review Comment:
Maybe it is better to use
~~~
elif self.operation.lower() == SFTPOperation.DELETE:
~~~
here for better readability?
##########
providers/sftp/src/airflow/providers/sftp/operators/sftp.py:
##########
@@ -144,29 +152,38 @@ def execute(self, context: Any) -> str | list[str] | None:
)
self.sftp_hook.remote_host = self.remote_host
- for _local_filepath, _remote_filepath in zip(local_filepath_array,
remote_filepath_array):
- if self.operation.lower() == SFTPOperation.GET:
- local_folder = os.path.dirname(_local_filepath)
- if self.create_intermediate_dirs:
- Path(local_folder).mkdir(parents=True, exist_ok=True)
- file_msg = f"from {_remote_filepath} to {_local_filepath}"
- self.log.info("Starting to transfer %s", file_msg)
+ if self.operation in (SFTPOperation.GET, SFTPOperation.PUT):
+ for _local_filepath, _remote_filepath in
zip(local_filepath_array, remote_filepath_array):
+ if self.operation.lower() == SFTPOperation.GET:
+ local_folder = os.path.dirname(_local_filepath)
+ if self.create_intermediate_dirs:
+ Path(local_folder).mkdir(parents=True,
exist_ok=True)
+ file_msg = f"from {_remote_filepath} to
{_local_filepath}"
+ self.log.info("Starting to transfer %s", file_msg)
+ if self.sftp_hook.isdir(_remote_filepath):
+
self.sftp_hook.retrieve_directory(_remote_filepath, _local_filepath)
+ else:
+ self.sftp_hook.retrieve_file(_remote_filepath,
_local_filepath)
+ elif self.operation.lower() == SFTPOperation.PUT:
+ remote_folder = os.path.dirname(_remote_filepath)
+ if self.create_intermediate_dirs:
+ self.sftp_hook.create_directory(remote_folder)
+ file_msg = f"from {_local_filepath} to
{_remote_filepath}"
+ self.log.info("Starting to transfer file %s", file_msg)
+ if os.path.isdir(_local_filepath):
+ self.sftp_hook.store_directory(
+ _remote_filepath, _local_filepath,
confirm=self.confirm
+ )
+ else:
+ self.sftp_hook.store_file(_remote_filepath,
_local_filepath, confirm=self.confirm)
+ else:
+ for _remote_filepath in remote_filepath_array:
+ file_msg = f"{_remote_filepath}"
+ self.log.info("Starting to delete %s", file_msg)
if self.sftp_hook.isdir(_remote_filepath):
- self.sftp_hook.retrieve_directory(_remote_filepath,
_local_filepath)
- else:
- self.sftp_hook.retrieve_file(_remote_filepath,
_local_filepath)
- else:
- remote_folder = os.path.dirname(_remote_filepath)
- if self.create_intermediate_dirs:
- self.sftp_hook.create_directory(remote_folder)
- file_msg = f"from {_local_filepath} to {_remote_filepath}"
- self.log.info("Starting to transfer file %s", file_msg)
- if os.path.isdir(_local_filepath):
- self.sftp_hook.store_directory(
- _remote_filepath, _local_filepath,
confirm=self.confirm
- )
+ self.sftp_hook.delete_directory(_remote_filepath,
include_files=True)
else:
- self.sftp_hook.store_file(_remote_filepath,
_local_filepath, confirm=self.confirm)
+ self.sftp_hook.delete_file(_remote_filepath)
except Exception as e:
raise AirflowException(f"Error while transferring {file_msg},
error: {e}")
Review Comment:
Please rephrase the errmsg here, it does not feet the delete case
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]