This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e1422be30e Add fail_on_file_not_exist to SFTPToGCSOperator (#56528)
2e1422be30e is described below

commit 2e1422be30e54a02f5e9af9ce02f0bb686b2fb0f
Author: John Nguyen <[email protected]>
AuthorDate: Mon Oct 20 03:39:07 2025 +0700

    Add fail_on_file_not_exist to SFTPToGCSOperator (#56528)
    
    * Add fail_on_file_not_exist to SFTPToGCSOperator
    
    * working with local DAG
    
    * working with local DAG
    
    * Trigger Build
    
    * fix test
    
    * fix unit test case
    
    ---------
    
    Co-authored-by: John Nguyen <[email protected]>
---
 .../google/cloud/transfers/sftp_to_gcs.py          | 13 ++++++++++--
 .../google/cloud/transfers/test_sftp_to_gcs.py     | 24 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py 
b/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py
index 9e53d16f943..8bd6af7f991 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py
@@ -78,6 +78,8 @@ class SFTPToGCSOperator(BaseOperator):
         then uploads (may require significant disk space).
         When ``True``, the file streams directly without using local disk.
         Defaults to ``False``.
+    :param fail_on_file_not_exist: If True, operator fails when file does not 
exist,
+        if False, operator will not fail and skips transfer. Default is True.
     """
 
     template_fields: Sequence[str] = (
@@ -101,6 +103,7 @@ class SFTPToGCSOperator(BaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         sftp_prefetch: bool = True,
         use_stream: bool = False,
+        fail_on_file_not_exist: bool = True,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -116,6 +119,7 @@ class SFTPToGCSOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         self.sftp_prefetch = sftp_prefetch
         self.use_stream = use_stream
+        self.fail_on_file_not_exist = fail_on_file_not_exist
 
     @cached_property
     def sftp_hook(self):
@@ -156,7 +160,13 @@ class SFTPToGCSOperator(BaseOperator):
             destination_object = (
                 self.destination_path if self.destination_path else 
self.source_path.rsplit("/", 1)[1]
             )
-            self._copy_single_object(gcs_hook, self.sftp_hook, 
self.source_path, destination_object)
+            try:
+                self._copy_single_object(gcs_hook, self.sftp_hook, 
self.source_path, destination_object)
+            except FileNotFoundError as e:
+                if self.fail_on_file_not_exist:
+                    raise e
+                self.log.info("File %s not found on SFTP server. Skipping 
transfer.", self.source_path)
+                return
 
     def _copy_single_object(
         self,
@@ -172,7 +182,6 @@ class SFTPToGCSOperator(BaseOperator):
             self.destination_bucket,
             destination_object,
         )
-
         if self.use_stream:
             dest_bucket = gcs_hook.get_bucket(self.destination_bucket)
             dest_blob = dest_bucket.blob(destination_object)
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py 
b/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py
index f15d1639744..d84f889b1e8 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import os
 from unittest import mock
+from unittest.mock import patch
 
 import pytest
 
@@ -377,3 +378,26 @@ class TestSFTPToGCSOperator:
         assert result.inputs[0].name == expected_source
         assert result.outputs[0].namespace == f"gs://{TEST_BUCKET}"
         assert result.outputs[0].name == expected_destination
+
+    @pytest.mark.parametrize("fail_on_file_not_exist", [False, True])
+    @mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.GCSHook")
+    
@mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.SFTPHook")
+    def test_sftp_to_gcs_fail_on_file_not_exist(self, sftp_hook, gcs_hook, 
fail_on_file_not_exist):
+        invalid_file_name = "main_dir/invalid-object.json"
+        task = SFTPToGCSOperator(
+            task_id=TASK_ID,
+            source_path=invalid_file_name,
+            destination_bucket=TEST_BUCKET,
+            destination_path=DESTINATION_PATH_FILE,
+            move_object=False,
+            gcp_conn_id=GCP_CONN_ID,
+            sftp_conn_id=SFTP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+            fail_on_file_not_exist=fail_on_file_not_exist,
+        )
+        with patch.object(sftp_hook.return_value, "retrieve_file", 
side_effect=FileNotFoundError):
+            if fail_on_file_not_exist:
+                with pytest.raises(FileNotFoundError):
+                    task.execute(None)
+            else:
+                task.execute(None)

Reply via email to