This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 68b21aeec21 (return a warning instead of an error when file is already 
absent) (#62639)
68b21aeec21 is described below

commit 68b21aeec21cea0f4b1eb6f58072151729b51abe
Author: Eason09053360 <[email protected]>
AuthorDate: Thu Mar 12 08:46:27 2026 +0800

    (return a warning instead of an error when file is already absent) (#62639)
---
 .../src/airflow/providers/sftp/operators/sftp.py   | 27 ++++++++++++++---
 .../sftp/tests/unit/sftp/operators/test_sftp.py    | 35 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git a/providers/sftp/src/airflow/providers/sftp/operators/sftp.py 
b/providers/sftp/src/airflow/providers/sftp/operators/sftp.py
index 5fbc0eae3bb..0b47b5b7d5d 100644
--- a/providers/sftp/src/airflow/providers/sftp/operators/sftp.py
+++ b/providers/sftp/src/airflow/providers/sftp/operators/sftp.py
@@ -19,6 +19,7 @@
 
 from __future__ import annotations
 
+import errno
 import os
 import socket
 from collections.abc import Sequence
@@ -206,10 +207,18 @@ class SFTPOperator(BaseOperator):
                 for _remote_filepath in remote_filepath_array:
                     file_msg = f"{_remote_filepath}"
                     self.log.info("Starting to delete %s", file_msg)
-                    if self.sftp_hook.isdir(_remote_filepath):
-                        self.sftp_hook.delete_directory(_remote_filepath, 
include_files=True)
-                    else:
-                        self.sftp_hook.delete_file(_remote_filepath)
+                    try:
+                        if self.sftp_hook.isdir(_remote_filepath):
+                            self.sftp_hook.delete_directory(_remote_filepath, 
include_files=True)
+                        else:
+                            self.sftp_hook.delete_file(_remote_filepath)
+                    except OSError as exc:
+                        if self._is_missing_path_error(exc):
+                            self.log.warning(
+                                "Remote path %s does not exist. Skipping 
delete.", _remote_filepath
+                            )
+                            continue
+                        raise
 
         except Exception as e:
             raise AirflowException(
@@ -218,6 +227,16 @@ class SFTPOperator(BaseOperator):
 
         return self.local_filepath
 
+    @staticmethod
+    def _is_missing_path_error(exc: Exception) -> bool:
+        if isinstance(exc, FileNotFoundError):
+            return True
+        if isinstance(exc, OSError) and exc.errno == errno.ENOENT:
+            return True
+        if exc.args and isinstance(exc.args[0], int) and exc.args[0] == 
errno.ENOENT:
+            return True
+        return False
+
     def get_openlineage_facets_on_start(self):
         """
         Return OpenLineage datasets.
diff --git a/providers/sftp/tests/unit/sftp/operators/test_sftp.py 
b/providers/sftp/tests/unit/sftp/operators/test_sftp.py
index 3c70f84d4bd..815d9813201 100644
--- a/providers/sftp/tests/unit/sftp/operators/test_sftp.py
+++ b/providers/sftp/tests/unit/sftp/operators/test_sftp.py
@@ -18,6 +18,8 @@
 from __future__ import annotations
 
 import contextlib
+import errno
+import logging
 import os
 import socket
 from base64 import b64encode
@@ -566,6 +568,39 @@ class TestSFTPOperator:
         args, _ = mock_delete.call_args_list[0]
         assert args == (remote_filepath,)
 
+    @mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.delete_file")
+    @mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.isdir")
+    def test_delete_missing_file_warns(self, mock_isdir, mock_delete, caplog):
+        mock_isdir.return_value = False
+        mock_delete.side_effect = FileNotFoundError("missing")
+        remote_filepath = "/tmp/missing"
+        sftp_op = SFTPOperator(
+            task_id="test_missing_file_delete_warns",
+            sftp_hook=self.sftp_hook,
+            remote_filepath=remote_filepath,
+            operation=SFTPOperation.DELETE,
+        )
+
+        with caplog.at_level(logging.WARNING):
+            sftp_op.execute(None)
+
+        assert "does not exist. Skipping delete." in caplog.text
+
+    @mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.delete_file")
+    @mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.isdir")
+    def test_delete_permission_error_raises(self, mock_isdir, mock_delete):
+        mock_isdir.return_value = False
+        mock_delete.side_effect = PermissionError(errno.EACCES, "denied")
+        remote_filepath = "/tmp/protected"
+
+        with pytest.raises(AirflowException):
+            SFTPOperator(
+                task_id="test_permission_error_delete_raises",
+                sftp_hook=self.sftp_hook,
+                remote_filepath=remote_filepath,
+                operation=SFTPOperation.DELETE,
+            ).execute(None)
+
     @mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.delete_file")
     def test_local_filepath_exists_error_delete(self, mock_delete):
         local_filepath = "/tmp"

Reply via email to