This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 68b21aeec21 (return a warning instead of an error when file is already
absent) (#62639)
68b21aeec21 is described below
commit 68b21aeec21cea0f4b1eb6f58072151729b51abe
Author: Eason09053360 <[email protected]>
AuthorDate: Thu Mar 12 08:46:27 2026 +0800
(return a warning instead of an error when file is already absent) (#62639)
---
.../src/airflow/providers/sftp/operators/sftp.py | 27 ++++++++++++++---
.../sftp/tests/unit/sftp/operators/test_sftp.py | 35 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 4 deletions(-)
diff --git a/providers/sftp/src/airflow/providers/sftp/operators/sftp.py
b/providers/sftp/src/airflow/providers/sftp/operators/sftp.py
index 5fbc0eae3bb..0b47b5b7d5d 100644
--- a/providers/sftp/src/airflow/providers/sftp/operators/sftp.py
+++ b/providers/sftp/src/airflow/providers/sftp/operators/sftp.py
@@ -19,6 +19,7 @@
from __future__ import annotations
+import errno
import os
import socket
from collections.abc import Sequence
@@ -206,10 +207,18 @@ class SFTPOperator(BaseOperator):
for _remote_filepath in remote_filepath_array:
file_msg = f"{_remote_filepath}"
self.log.info("Starting to delete %s", file_msg)
- if self.sftp_hook.isdir(_remote_filepath):
- self.sftp_hook.delete_directory(_remote_filepath,
include_files=True)
- else:
- self.sftp_hook.delete_file(_remote_filepath)
+ try:
+ if self.sftp_hook.isdir(_remote_filepath):
+ self.sftp_hook.delete_directory(_remote_filepath,
include_files=True)
+ else:
+ self.sftp_hook.delete_file(_remote_filepath)
+ except OSError as exc:
+ if self._is_missing_path_error(exc):
+ self.log.warning(
+ "Remote path %s does not exist. Skipping
delete.", _remote_filepath
+ )
+ continue
+ raise
except Exception as e:
raise AirflowException(
@@ -218,6 +227,16 @@ class SFTPOperator(BaseOperator):
return self.local_filepath
+ @staticmethod
+ def _is_missing_path_error(exc: Exception) -> bool:
+ if isinstance(exc, FileNotFoundError):
+ return True
+ if isinstance(exc, OSError) and exc.errno == errno.ENOENT:
+ return True
+ if exc.args and isinstance(exc.args[0], int) and exc.args[0] ==
errno.ENOENT:
+ return True
+ return False
+
def get_openlineage_facets_on_start(self):
"""
Return OpenLineage datasets.
diff --git a/providers/sftp/tests/unit/sftp/operators/test_sftp.py
b/providers/sftp/tests/unit/sftp/operators/test_sftp.py
index 3c70f84d4bd..815d9813201 100644
--- a/providers/sftp/tests/unit/sftp/operators/test_sftp.py
+++ b/providers/sftp/tests/unit/sftp/operators/test_sftp.py
@@ -18,6 +18,8 @@
from __future__ import annotations
import contextlib
+import errno
+import logging
import os
import socket
from base64 import b64encode
@@ -566,6 +568,39 @@ class TestSFTPOperator:
args, _ = mock_delete.call_args_list[0]
assert args == (remote_filepath,)
+ @mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.delete_file")
+ @mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.isdir")
+ def test_delete_missing_file_warns(self, mock_isdir, mock_delete, caplog):
+ mock_isdir.return_value = False
+ mock_delete.side_effect = FileNotFoundError("missing")
+ remote_filepath = "/tmp/missing"
+ sftp_op = SFTPOperator(
+ task_id="test_missing_file_delete_warns",
+ sftp_hook=self.sftp_hook,
+ remote_filepath=remote_filepath,
+ operation=SFTPOperation.DELETE,
+ )
+
+ with caplog.at_level(logging.WARNING):
+ sftp_op.execute(None)
+
+ assert "does not exist. Skipping delete." in caplog.text
+
+ @mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.delete_file")
+ @mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.isdir")
+ def test_delete_permission_error_raises(self, mock_isdir, mock_delete):
+ mock_isdir.return_value = False
+ mock_delete.side_effect = PermissionError(errno.EACCES, "denied")
+ remote_filepath = "/tmp/protected"
+
+ with pytest.raises(AirflowException):
+ SFTPOperator(
+ task_id="test_permission_error_delete_raises",
+ sftp_hook=self.sftp_hook,
+ remote_filepath=remote_filepath,
+ operation=SFTPOperation.DELETE,
+ ).execute(None)
+
@mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.delete_file")
def test_local_filepath_exists_error_delete(self, mock_delete):
local_filepath = "/tmp"