This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 567ad643c95 issue-57891: Adding sftp_remote_host to S3 transfer 
Operators (#63147)
567ad643c95 is described below

commit 567ad643c9525e0cd08d1e9a912436e50b568027
Author: Jake Roach <[email protected]>
AuthorDate: Mon Mar 9 12:37:17 2026 -0400

    issue-57891: Adding sftp_remote_host to S3 transfer Operators (#63147)
---
 .../providers/amazon/aws/transfers/s3_to_sftp.py   |  8 +++-
 .../providers/amazon/aws/transfers/sftp_to_s3.py   |  8 +++-
 .../unit/amazon/aws/transfers/test_s3_to_sftp.py   | 55 ++++++++++++++++++++++
 .../unit/amazon/aws/transfers/test_sftp_to_s3.py   | 51 ++++++++++++++++++++
 4 files changed, 120 insertions(+), 2 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
index 0b3b5e1cb84..87d8454af96 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
@@ -42,6 +42,8 @@ class S3ToSFTPOperator(BaseOperator):
         establishing a connection to the SFTP server.
     :param sftp_path: The sftp remote path. This is the specified file path for
         uploading file to the SFTP server.
+    :param sftp_remote_host: The remote host of the SFTP server. Overrides 
host in
+        Connection.
     :param aws_conn_id: The Airflow connection used for AWS credentials.
         If this is None or empty then the default boto3 behaviour is used. If
         running Airflow in a distributed manner and aws_conn_id is None or
@@ -65,6 +67,7 @@ class S3ToSFTPOperator(BaseOperator):
         s3_key: str,
         sftp_path: str,
         sftp_conn_id: str = "ssh_default",
+        sftp_remote_host: str = "",
         aws_conn_id: str | None = "aws_default",
         confirm: bool = True,
         **kwargs,
@@ -74,6 +77,7 @@ class S3ToSFTPOperator(BaseOperator):
         self.sftp_path = sftp_path
         self.s3_bucket = s3_bucket
         self.s3_key = s3_key
+        self.sftp_remote_host = sftp_remote_host
         self.aws_conn_id = aws_conn_id
         self.confirm = confirm
 
@@ -85,7 +89,9 @@ class S3ToSFTPOperator(BaseOperator):
 
     def execute(self, context: Context) -> None:
         self.s3_key = self.get_s3_key(self.s3_key)
-        ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id)
+
+        # SSHHook will handle a None/"" sftp_remote_host
+        ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id, 
remote_host=self.sftp_remote_host)
         s3_hook = S3Hook(self.aws_conn_id)
 
         s3_client = s3_hook.get_conn()
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
index 0b3aada1950..4897ccca25c 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
@@ -40,6 +40,8 @@ class SFTPToS3Operator(BaseOperator):
 
     :param sftp_conn_id: The sftp connection id. The name or identifier for
         establishing a connection to the SFTP server.
+    :param sftp_remote_host: The remote host of the SFTP server. Overrides 
host in
+        Connection.
     :param sftp_path: The sftp remote path. This is the specified file path
         for downloading the file from the SFTP server.
     :param s3_conn_id: The s3 connection id. The name or identifier for
@@ -63,6 +65,7 @@ class SFTPToS3Operator(BaseOperator):
         s3_key: str,
         sftp_path: str,
         sftp_conn_id: str = "ssh_default",
+        sftp_remote_host: str = "",
         s3_conn_id: str = "aws_default",
         use_temp_file: bool = True,
         fail_on_file_not_exist: bool = True,
@@ -71,6 +74,7 @@ class SFTPToS3Operator(BaseOperator):
         super().__init__(**kwargs)
         self.sftp_conn_id = sftp_conn_id
         self.sftp_path = sftp_path
+        self.sftp_remote_host = sftp_remote_host
         self.s3_bucket = s3_bucket
         self.s3_key = s3_key
         self.s3_conn_id = s3_conn_id
@@ -85,7 +89,9 @@ class SFTPToS3Operator(BaseOperator):
 
     def execute(self, context: Context) -> None:
         self.s3_key = self.get_s3_key(self.s3_key)
-        ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id)
+
+        # SSHHook will handle a None/"" sftp_remote_host
+        ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id, 
remote_host=self.sftp_remote_host)
         s3_hook = S3Hook(self.s3_conn_id)
 
         sftp_client = ssh_hook.get_conn().open_sftp()
diff --git 
a/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py 
b/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py
index fecf207c6f4..257b898922c 100644
--- a/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py
+++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py
@@ -256,5 +256,60 @@ class TestS3ToSFTPOperator:
         conn.delete_bucket(Bucket=self.s3_bucket)
         assert not s3_hook.check_for_bucket(self.s3_bucket)
 
+    @mock_aws
+    @conf_vars({("core", "enable_xcom_pickling"): "True"})
+    def test_s3_to_sftp_operator_sftp_remote_host(self):
+        """Test that sftp_remote_host overrides the connection host when 
provided."""
+        s3_hook = S3Hook(aws_conn_id=None)
+        test_remote_file_content = (
+            "This is remote file content for sftp_remote_host test \n which is 
also multiline "
+            "another line here \n this is last line. EOF"
+        )
+
+        # Test for creation of s3 bucket
+        conn = boto3.client("s3")
+        conn.create_bucket(Bucket=self.s3_bucket)
+        assert s3_hook.check_for_bucket(self.s3_bucket)
+
+        with open(LOCAL_FILE_PATH, "w") as file:
+            file.write(test_remote_file_content)
+        s3_hook.load_file(LOCAL_FILE_PATH, self.s3_key, bucket_name=BUCKET)
+
+        # Check if object was created in s3
+        objects_in_dest_bucket = conn.list_objects(Bucket=self.s3_bucket, 
Prefix=self.s3_key)
+        assert len(objects_in_dest_bucket["Contents"]) == 1
+        assert objects_in_dest_bucket["Contents"][0]["Key"] == self.s3_key
+
+        # Execute with sftp_remote_host overriding the connection host to the 
same localhost
+        run_task = S3ToSFTPOperator(
+            s3_bucket=BUCKET,
+            s3_key=S3_KEY,
+            sftp_path=SFTP_PATH,
+            sftp_conn_id=SFTP_CONN_ID,
+            sftp_remote_host="localhost",
+            task_id=TASK_ID + "_remote_host",
+            dag=self.dag,
+        )
+        assert run_task is not None
+
+        run_task.execute(None)
+
+        # Check that the file is created remotely with correct content
+        check_file_task = SSHOperator(
+            task_id="test_check_file_remote_host",
+            ssh_hook=self.hook,
+            command=f"cat {self.sftp_path}",
+            do_xcom_push=True,
+            dag=self.dag,
+        )
+        assert check_file_task is not None
+        result = check_file_task.execute(None)
+        assert result.strip() == test_remote_file_content.encode("utf-8")
+
+        # Clean up after finishing with test
+        conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
+        conn.delete_bucket(Bucket=self.s3_bucket)
+        assert not s3_hook.check_for_bucket(self.s3_bucket)
+
     def teardown_method(self):
         self.delete_remote_resource()
diff --git 
a/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py 
b/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py
index e8fd3be4905..feb85e33a3c 100644
--- a/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py
+++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py
@@ -157,3 +157,54 @@ class TestSFTPToS3Operator:
         conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
         conn.delete_bucket(Bucket=self.s3_bucket)
         assert not s3_hook.check_for_bucket(self.s3_bucket)
+
+    @mock_aws
+    @conf_vars({("core", "enable_xcom_pickling"): "True"})
+    def test_sftp_to_s3_sftp_remote_host(self):
+        """Test that sftp_remote_host overrides the connection host when 
provided."""
+        test_remote_file_content = (
+            "This is remote file content for sftp_remote_host test \n which is 
also multiline "
+            "another line here \n this is last line. EOF"
+        )
+
+        # Create a test file remotely
+        create_file_task = SSHOperator(
+            task_id="test_create_file_remote_host",
+            ssh_hook=self.hook,
+            command=f"echo '{test_remote_file_content}' > {self.sftp_path}",
+            do_xcom_push=True,
+            dag=self.dag,
+        )
+        assert create_file_task is not None
+        create_file_task.execute(None)
+
+        # Test for creation of s3 bucket
+        s3_hook = S3Hook(aws_conn_id=None)
+        conn = boto3.client("s3")
+        conn.create_bucket(Bucket=self.s3_bucket)
+        assert s3_hook.check_for_bucket(self.s3_bucket)
+
+        # Execute with sftp_remote_host overriding the connection host to the 
same localhost
+        run_task = SFTPToS3Operator(
+            s3_bucket=BUCKET,
+            s3_key=S3_KEY,
+            sftp_path=SFTP_PATH,
+            sftp_conn_id=SFTP_CONN_ID,
+            sftp_remote_host="localhost",
+            s3_conn_id=S3_CONN_ID,
+            task_id="test_sftp_to_s3_remote_host",
+            dag=self.dag,
+        )
+        assert run_task is not None
+
+        run_task.execute(None)
+
+        # Check if object was created in s3
+        objects_in_dest_bucket = conn.list_objects(Bucket=self.s3_bucket, 
Prefix=self.s3_key)
+        assert len(objects_in_dest_bucket["Contents"]) == 1
+        assert objects_in_dest_bucket["Contents"][0]["Key"] == self.s3_key
+
+        # Clean up after finishing with test
+        conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
+        conn.delete_bucket(Bucket=self.s3_bucket)
+        assert not s3_hook.check_for_bucket(self.s3_bucket)

Reply via email to