dabla commented on code in PR #65480:
URL: https://github.com/apache/airflow/pull/65480#discussion_r3279996174


##########
providers/sftp/src/airflow/providers/sftp/triggers/sftp.py:
##########
@@ -139,3 +139,135 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
     def _get_async_hook(self) -> SFTPHookAsync:
         return SFTPHookAsync(sftp_conn_id=self.sftp_conn_id)
+
+
+class SFTPOperatorTrigger(BaseTrigger):
+    """
+    Trigger for SFTPOperator deferrable mode.
+
+    Fires when a file transfer (PUT, GET, or DELETE) completes
+    on the SFTP server, freeing the worker slot during the transfer.
+
+    :param ssh_conn_id: The SSH connection ID to use.
+    :param local_filepath: Local file path(s) to transfer.
+    :param remote_filepath: Remote file path(s) on the SFTP server.
+    :param operation: The SFTP operation - put, get, or delete.
+    :param confirm: Whether to confirm the file transfer.
+    :param create_intermediate_dirs: Whether to create intermediate dirs.
+    :param remote_host: Remote host to connect to (overrides connection).
+    :param concurrency: Number of threads for directory transfers.
+    :param prefetch: Whether to prefetch during file retrieval.
+    """
+
+    def __init__(
+        self,
+        ssh_conn_id: str | None = None,
+        local_filepath: str | list[str] | None = None,
+        remote_filepath: str | list[str] = "",
+        operation: str = "put",
+        confirm: bool = True,
+        create_intermediate_dirs: bool = False,
+        remote_host: str | None = None,
+        concurrency: int = 1,
+        prefetch: bool = True,
+    ) -> None:
+        super().__init__()
+        self.ssh_conn_id = ssh_conn_id
+        self.local_filepath = local_filepath
+        self.remote_filepath = remote_filepath
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.remote_host = remote_host
+        self.concurrency = concurrency
+        self.prefetch = prefetch
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serialize the trigger for storage in the database."""
+        return (
+            "airflow.providers.sftp.triggers.sftp.SFTPOperatorTrigger",
+            {
+                "ssh_conn_id": self.ssh_conn_id,
+                "local_filepath": self.local_filepath,
+                "remote_filepath": self.remote_filepath,
+                "operation": self.operation,
+                "confirm": self.confirm,
+                "create_intermediate_dirs": self.create_intermediate_dirs,
+                "remote_host": self.remote_host,
+                "concurrency": self.concurrency,
+                "prefetch": self.prefetch,
+            },
+        )
+
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        """Run the file transfer asynchronously and yield a TriggerEvent when 
done."""
+        try:
+            loop = asyncio.get_running_loop()
+            await loop.run_in_executor(
+                None,
+                self._do_transfer,
+            )
+            yield TriggerEvent(
+                {
+                    "status": "success",
+                    "local_filepath": self.local_filepath,
+                }
+            )
+        except Exception as e:
+            yield TriggerEvent({"status": "error", "message": str(e)})
+
+    def _do_transfer(self) -> None:
+        """Run the actual synchronous SFTP transfer in a thread executor."""
+        import os
+        from pathlib import Path
+
+        from airflow.providers.sftp.constants import SFTPOperation
+        from airflow.providers.sftp.hooks.sftp import SFTPHook
+
+        sftp_hook = SFTPHook(

Review Comment:
   Also it doesn't make sense to use the `SFTPHook` which is synchronous in a 
trigger knowing we have a native `SFTPHookAsync` which has same capabilities of 
`SFTPHook`.  We should use the `SFTPHookAsync` instead in trigger.  Same remark 
regarding `_do_transfer`, both hooks should offer that functionality, so that 
trigger and operator only delegate the call to the hook.  I would call the 
method `transfer` in `SFTPHook` and `SFTPHookAsync`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to