dabla commented on code in PR #65480:
URL: https://github.com/apache/airflow/pull/65480#discussion_r3279964839


##########
providers/sftp/src/airflow/providers/sftp/triggers/sftp.py:
##########
@@ -139,3 +139,135 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
     def _get_async_hook(self) -> SFTPHookAsync:
         return SFTPHookAsync(sftp_conn_id=self.sftp_conn_id)
+
+
+class SFTPOperatorTrigger(BaseTrigger):
+    """
+    Trigger for SFTPOperator deferrable mode.
+
+    Fires when a file transfer (PUT, GET, or DELETE) completes
+    on the SFTP server, freeing the worker slot during the transfer.
+
+    :param ssh_conn_id: The SSH connection ID to use.
+    :param local_filepath: Local file path(s) to transfer.
+    :param remote_filepath: Remote file path(s) on the SFTP server.
+    :param operation: The SFTP operation - put, get, or delete.
+    :param confirm: Whether to confirm the file transfer.
+    :param create_intermediate_dirs: Whether to create intermediate dirs.
+    :param remote_host: Remote host to connect to (overrides connection).
+    :param concurrency: Number of threads for directory transfers.
+    :param prefetch: Whether to prefetch during file retrieval.
+    """
+
+    def __init__(
+        self,
+        ssh_conn_id: str | None = None,
+        local_filepath: str | list[str] | None = None,
+        remote_filepath: str | list[str] = "",
+        operation: str = "put",
+        confirm: bool = True,
+        create_intermediate_dirs: bool = False,
+        remote_host: str | None = None,
+        concurrency: int = 1,
+        prefetch: bool = True,
+    ) -> None:
+        super().__init__()
+        self.ssh_conn_id = ssh_conn_id
+        self.local_filepath = local_filepath
+        self.remote_filepath = remote_filepath
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.remote_host = remote_host
+        self.concurrency = concurrency
+        self.prefetch = prefetch
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serialize the trigger for storage in the database."""
+        return (
+            "airflow.providers.sftp.triggers.sftp.SFTPOperatorTrigger",
+            {
+                "ssh_conn_id": self.ssh_conn_id,
+                "local_filepath": self.local_filepath,
+                "remote_filepath": self.remote_filepath,
+                "operation": self.operation,
+                "confirm": self.confirm,
+                "create_intermediate_dirs": self.create_intermediate_dirs,
+                "remote_host": self.remote_host,
+                "concurrency": self.concurrency,
+                "prefetch": self.prefetch,
+            },
+        )
+
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        """Run the file transfer asynchronously and yield a TriggerEvent when 
done."""
+        try:
+            loop = asyncio.get_running_loop()
+            await loop.run_in_executor(
+                None,
+                self._do_transfer,
+            )
+            yield TriggerEvent(
+                {
+                    "status": "success",
+                    "local_filepath": self.local_filepath,
+                }
+            )
+        except Exception as e:
+            yield TriggerEvent({"status": "error", "message": str(e)})
+
+    def _do_transfer(self) -> None:
+        """Run the actual synchronous SFTP transfer in a thread executor."""
+        import os
+        from pathlib import Path
+
+        from airflow.providers.sftp.constants import SFTPOperation
+        from airflow.providers.sftp.hooks.sftp import SFTPHook
+
+        sftp_hook = SFTPHook(
+            ssh_conn_id=self.ssh_conn_id,
+            remote_host=self.remote_host or "",
+        )
+
+        if isinstance(self.local_filepath, str):
+            local_filepath_array = [self.local_filepath] if 
self.local_filepath else []
+        else:
+            local_filepath_array = self.local_filepath or []
+
+        if isinstance(self.remote_filepath, str):
+            remote_filepath_array = [self.remote_filepath]
+        else:
+            remote_filepath_array = list(self.remote_filepath)
+
+        if self.operation.lower() == SFTPOperation.GET:

Review Comment:
   Would it make sense to move the _do_transfer logic into the hook? We 
currently have duplicated logic in both the trigger and the operator (`execute` 
method).
   
   Ideally, the `SFTPOperator` should delegate this responsibility to the hook 
so the logic lives in a single, reusable place, in line with the DRY principle.
   
   So far we’ve gotten away with this duplication because the logic was only 
implemented in the operator’s execute method. However, even that was not ideal: 
operators should primarily act as lightweight wrappers for DAG usage, while the 
actual implementation logic should reside in hooks. This ensures that users 
relying directly on the hook have access to the same functionality without 
depending on the operator, and avoids divergence between execution paths.



##########
providers/sftp/src/airflow/providers/sftp/triggers/sftp.py:
##########
@@ -139,3 +139,135 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
     def _get_async_hook(self) -> SFTPHookAsync:
         return SFTPHookAsync(sftp_conn_id=self.sftp_conn_id)
+
+
+class SFTPOperatorTrigger(BaseTrigger):
+    """
+    Trigger for SFTPOperator deferrable mode.
+
+    Fires when a file transfer (PUT, GET, or DELETE) completes
+    on the SFTP server, freeing the worker slot during the transfer.
+
+    :param ssh_conn_id: The SSH connection ID to use.
+    :param local_filepath: Local file path(s) to transfer.
+    :param remote_filepath: Remote file path(s) on the SFTP server.
+    :param operation: The SFTP operation - put, get, or delete.
+    :param confirm: Whether to confirm the file transfer.
+    :param create_intermediate_dirs: Whether to create intermediate dirs.
+    :param remote_host: Remote host to connect to (overrides connection).
+    :param concurrency: Number of threads for directory transfers.
+    :param prefetch: Whether to prefetch during file retrieval.
+    """
+
+    def __init__(
+        self,
+        ssh_conn_id: str | None = None,
+        local_filepath: str | list[str] | None = None,
+        remote_filepath: str | list[str] = "",
+        operation: str = "put",
+        confirm: bool = True,
+        create_intermediate_dirs: bool = False,
+        remote_host: str | None = None,
+        concurrency: int = 1,
+        prefetch: bool = True,
+    ) -> None:
+        super().__init__()
+        self.ssh_conn_id = ssh_conn_id
+        self.local_filepath = local_filepath
+        self.remote_filepath = remote_filepath
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.remote_host = remote_host
+        self.concurrency = concurrency
+        self.prefetch = prefetch
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serialize the trigger for storage in the database."""
+        return (
+            "airflow.providers.sftp.triggers.sftp.SFTPOperatorTrigger",

Review Comment:
   I would make generation of qualified class name of trigger dynamic, so it's 
refactor proof in the future if renaming would occur.
   
   So instead of:
   
   `"airflow.providers.sftp.triggers.sftp.SFTPOperatorTrigger",`
   
   I would write it like:
   
   `f"{self.__class__.__module__}.{self.__class__.__name__}",`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to