pankajkoti commented on code in PR #37117:
URL: https://github.com/apache/airflow/pull/37117#discussion_r1474844227


##########
airflow/providers/sftp/sensors/sftp.py:
##########
@@ -119,3 +124,50 @@ def poke(self, context: Context) -> PokeReturnValue | bool:
                 xcom_value={"files_found": files_found, 
"decorator_return_value": callable_return},
             )
         return True
+
+    def execute(self, context: Context) -> Any:
+        # Unlike other async sensors, we do not follow the pattern of calling 
the synchronous self.poke()
+        # method before deferring here. This is due to the current limitations 
we have in the synchronous
+        # SFTPHook methods. The limitations are discovered while being worked 
upon the ticket
+        # https://github.com/astronomer/astronomer-providers/issues/1021. They 
are as follows:

Review Comment:
   ```suggestion
           # SFTPHook methods. They are as follows:
   ```



##########
airflow/providers/sftp/sensors/sftp.py:
##########
@@ -119,3 +124,50 @@ def poke(self, context: Context) -> PokeReturnValue | bool:
                 xcom_value={"files_found": files_found, 
"decorator_return_value": callable_return},
             )
         return True
+
+    def execute(self, context: Context) -> Any:
+        # Unlike other async sensors, we do not follow the pattern of calling 
the synchronous self.poke()
+        # method before deferring here. This is due to the current limitations 
we have in the synchronous
+        # SFTPHook methods. The limitations are discovered while being worked 
upon the ticket
+        # https://github.com/astronomer/astronomer-providers/issues/1021. They 
are as follows:
+        #
+        # For file_pattern sensing, the hook implements list_directory() 
method which returns a list of
+        # filenames only without the attributes like modified time which is 
required for the file_pattern
+        # sensing when newer_than is supplied. This leads to intermittent 
failures potentially due to
+        # throttling by the SFTP server as the hook makes multiple calls to 
the server to get the
+        # attributes for each of the files in the directory.This limitation is 
resolved here by instead
+        # calling the read_directory() method which returns a list of files 
along with their attributes
+        # in a single call. We can add back the call to self.poke() before 
deferring once the above
+        # limitations are resolved in the

Review Comment:
   ```suggestion
           # limitations are resolved.
   ```



##########
airflow/providers/sftp/hooks/sftp.py:
##########
@@ -400,3 +406,177 @@ def get_files_by_pattern(self, path, fnmatch_pattern) -> 
list[str]:
                 matched_files.append(file)
 
         return matched_files
+
+
+class SFTPHookAsync(BaseHook):
+    """
+    Interact with an SFTP server via asyncssh package.
+
+    :param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP 
server
+    :param host: hostname of the SFTP server
+    :param port: port of the SFTP server
+    :param username: username used when authenticating to the SFTP server
+    :param password: password used when authenticating to the SFTP server.
+        Can be left blank if using a key file
+    :param known_hosts: path to the known_hosts file on the local file system. 
Defaults to ``~/.ssh/known_hosts``.
+    :param key_file: path to the client key file used for authentication to 
SFTP server
+    :param passphrase: passphrase used with the key_file for authentication to 
SFTP server
+    """
+
+    conn_name_attr = "ssh_conn_id"
+    default_conn_name = "sftp_default"
+    conn_type = "sftp"
+    hook_name = "SFTP"
+    default_known_hosts = "~/.ssh/known_hosts"
+
+    def __init__(  # nosec: B107
+        self,
+        sftp_conn_id: str = default_conn_name,
+        host: str = "",
+        port: int = 22,
+        username: str = "",
+        password: str = "",
+        known_hosts: str = default_known_hosts,
+        key_file: str = "",
+        passphrase: str = "",
+        private_key: str = "",
+    ) -> None:
+        self.sftp_conn_id = sftp_conn_id
+        self.host = host
+        self.port = port
+        self.username = username
+        self.password = password
+        self.known_hosts: bytes | str = os.path.expanduser(known_hosts)
+        self.key_file = key_file
+        self.passphrase = passphrase
+        self.private_key = private_key
+
+    def _parse_extras(self, conn: Connection) -> None:
+        """Parse extra fields from the connection into instance fields."""
+        extra_options = conn.extra_dejson
+        if "key_file" in extra_options and self.key_file == "":
+            self.key_file = extra_options["key_file"]
+        if "known_hosts" in extra_options and self.known_hosts != 
self.default_known_hosts:
+            self.known_hosts = extra_options["known_hosts"]
+        if ("passphrase" or "private_key_passphrase") in extra_options:
+            self.passphrase = extra_options["passphrase"]
+        if "private_key" in extra_options:
+            self.private_key = extra_options["private_key"]
+
+        host_key = extra_options.get("host_key")
+        no_host_key_check = extra_options.get("no_host_key_check")
+
+        if no_host_key_check is not None:
+            no_host_key_check = str(no_host_key_check).lower() == "true"
+            if host_key is not None and no_host_key_check:
+                raise ValueError("Host key check was skipped, but `host_key` 
value was given")
+            if no_host_key_check:
+                self.log.warning(
+                    "No Host Key Verification. This won't protect against 
Man-In-The-Middle attacks"
+                )
+                self.known_hosts = "none"
+
+        if host_key is not None:
+            self.known_hosts = f"{conn.host} {host_key}".encode()
+
+    async def _get_conn(self) -> asyncssh.SSHClientConnection:
+        """
+        Asynchronously connect to the SFTP server as an SSH client.
+
+        The following parameters are provided either in the extra json object 
in
+        the SFTP connection definition
+
+        - key_file
+        - known_hosts
+        - passphrase
+        """
+        conn = await sync_to_async(self.get_connection)(self.sftp_conn_id)
+        if conn.extra is not None:
+            self._parse_extras(conn)
+
+        conn_config = {
+            "host": conn.host,
+            "port": conn.port,
+            "username": conn.login,
+            "password": conn.password,
+        }
+        if self.key_file:
+            conn_config.update(client_keys=self.key_file)
+        if self.known_hosts:
+            if self.known_hosts.lower() == "none":
+                conn_config.update(known_hosts=None)
+            else:
+                conn_config.update(known_hosts=self.known_hosts)
+        if self.private_key:
+            _private_key = asyncssh.import_private_key(self.private_key, 
self.passphrase)
+            conn_config.update(client_keys=[_private_key])
+        if self.passphrase:
+            conn_config.update(passphrase=self.passphrase)
+        ssh_client_conn = await asyncssh.connect(**conn_config)
+        return ssh_client_conn
+
+    async def list_directory(self, path: str = "") -> list[str] | None:
+        """Returns a list of files on the SFTP server at the provided path."""
+        ssh_conn = await self._get_conn()
+        sftp_client = await ssh_conn.start_sftp_client()
+        try:
+            files = await sftp_client.listdir(path)
+            return sorted(files)
+        except asyncssh.SFTPNoSuchFile:
+            return None
+
+    async def read_directory(self, path: str = "") -> 
Sequence[asyncssh.sftp.SFTPName] | None:
+        """Returns a list of files along with their attributes on the SFTP 
server at the provided path."""
+        ssh_conn = await self._get_conn()
+        sftp_client = await ssh_conn.start_sftp_client()
+        try:
+            files = await sftp_client.readdir(path)
+            return files
+        except asyncssh.SFTPNoSuchFile:
+            return None
+
+    async def get_files_and_attrs_by_pattern(
+        self, path: str = "", fnmatch_pattern: str = ""
+    ) -> Sequence[asyncssh.sftp.SFTPName]:
+        """
+        Get the files along with their attributes matching the pattern (e.g. 
``*.pdf``) at the provided path.
+
+        if one exists. Otherwise, raises an AirflowException to be handled 
upstream for deferring
+        """
+        files_list = await self.read_directory(path)
+        if files_list is None:
+            raise FileNotFoundError(f"No files at path {path!r} found...")

Review Comment:
   I am thinking on whether we should catch this exception in the Trigger and 
put then to sleep so that it continues to sense for the availability of this 
file.



##########
airflow/providers/sftp/hooks/sftp.py:
##########
@@ -400,3 +406,177 @@ def get_files_by_pattern(self, path, fnmatch_pattern) -> 
list[str]:
                 matched_files.append(file)
 
         return matched_files
+
+
+class SFTPHookAsync(BaseHook):
+    """
+    Interact with an SFTP server via asyncssh package.
+
+    :param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP 
server
+    :param host: hostname of the SFTP server
+    :param port: port of the SFTP server
+    :param username: username used when authenticating to the SFTP server
+    :param password: password used when authenticating to the SFTP server.
+        Can be left blank if using a key file
+    :param known_hosts: path to the known_hosts file on the local file system. 
Defaults to ``~/.ssh/known_hosts``.
+    :param key_file: path to the client key file used for authentication to 
SFTP server
+    :param passphrase: passphrase used with the key_file for authentication to 
SFTP server
+    """
+
+    conn_name_attr = "ssh_conn_id"
+    default_conn_name = "sftp_default"
+    conn_type = "sftp"
+    hook_name = "SFTP"
+    default_known_hosts = "~/.ssh/known_hosts"
+
+    def __init__(  # nosec: B107
+        self,
+        sftp_conn_id: str = default_conn_name,
+        host: str = "",
+        port: int = 22,
+        username: str = "",
+        password: str = "",
+        known_hosts: str = default_known_hosts,
+        key_file: str = "",
+        passphrase: str = "",
+        private_key: str = "",
+    ) -> None:
+        self.sftp_conn_id = sftp_conn_id
+        self.host = host
+        self.port = port
+        self.username = username
+        self.password = password
+        self.known_hosts: bytes | str = os.path.expanduser(known_hosts)
+        self.key_file = key_file
+        self.passphrase = passphrase
+        self.private_key = private_key
+
+    def _parse_extras(self, conn: Connection) -> None:
+        """Parse extra fields from the connection into instance fields."""
+        extra_options = conn.extra_dejson
+        if "key_file" in extra_options and self.key_file == "":
+            self.key_file = extra_options["key_file"]
+        if "known_hosts" in extra_options and self.known_hosts != 
self.default_known_hosts:
+            self.known_hosts = extra_options["known_hosts"]
+        if ("passphrase" or "private_key_passphrase") in extra_options:
+            self.passphrase = extra_options["passphrase"]
+        if "private_key" in extra_options:
+            self.private_key = extra_options["private_key"]
+
+        host_key = extra_options.get("host_key")
+        no_host_key_check = extra_options.get("no_host_key_check")
+
+        if no_host_key_check is not None:
+            no_host_key_check = str(no_host_key_check).lower() == "true"
+            if host_key is not None and no_host_key_check:
+                raise ValueError("Host key check was skipped, but `host_key` 
value was given")
+            if no_host_key_check:
+                self.log.warning(
+                    "No Host Key Verification. This won't protect against 
Man-In-The-Middle attacks"
+                )
+                self.known_hosts = "none"
+
+        if host_key is not None:
+            self.known_hosts = f"{conn.host} {host_key}".encode()
+
+    async def _get_conn(self) -> asyncssh.SSHClientConnection:
+        """
+        Asynchronously connect to the SFTP server as an SSH client.
+
+        The following parameters are provided either in the extra json object 
in
+        the SFTP connection definition
+
+        - key_file
+        - known_hosts
+        - passphrase
+        """
+        conn = await sync_to_async(self.get_connection)(self.sftp_conn_id)
+        if conn.extra is not None:
+            self._parse_extras(conn)
+
+        conn_config = {
+            "host": conn.host,
+            "port": conn.port,
+            "username": conn.login,
+            "password": conn.password,
+        }
+        if self.key_file:
+            conn_config.update(client_keys=self.key_file)
+        if self.known_hosts:
+            if self.known_hosts.lower() == "none":
+                conn_config.update(known_hosts=None)
+            else:
+                conn_config.update(known_hosts=self.known_hosts)
+        if self.private_key:
+            _private_key = asyncssh.import_private_key(self.private_key, 
self.passphrase)
+            conn_config.update(client_keys=[_private_key])
+        if self.passphrase:
+            conn_config.update(passphrase=self.passphrase)
+        ssh_client_conn = await asyncssh.connect(**conn_config)
+        return ssh_client_conn
+
+    async def list_directory(self, path: str = "") -> list[str] | None:
+        """Returns a list of files on the SFTP server at the provided path."""
+        ssh_conn = await self._get_conn()
+        sftp_client = await ssh_conn.start_sftp_client()
+        try:
+            files = await sftp_client.listdir(path)
+            return sorted(files)
+        except asyncssh.SFTPNoSuchFile:
+            return None
+
+    async def read_directory(self, path: str = "") -> 
Sequence[asyncssh.sftp.SFTPName] | None:
+        """Returns a list of files along with their attributes on the SFTP 
server at the provided path."""
+        ssh_conn = await self._get_conn()
+        sftp_client = await ssh_conn.start_sftp_client()
+        try:
+            files = await sftp_client.readdir(path)
+            return files
+        except asyncssh.SFTPNoSuchFile:
+            return None
+
+    async def get_files_and_attrs_by_pattern(
+        self, path: str = "", fnmatch_pattern: str = ""
+    ) -> Sequence[asyncssh.sftp.SFTPName]:
+        """
+        Get the files along with their attributes matching the pattern (e.g. 
``*.pdf``) at the provided path.
+
+        if one exists. Otherwise, raises an AirflowException to be handled 
upstream for deferring
+        """
+        files_list = await self.read_directory(path)
+        if files_list is None:
+            raise FileNotFoundError(f"No files at path {path!r} found...")
+        matched_files = [file for file in files_list if 
fnmatch(str(file.filename), fnmatch_pattern)]
+        return matched_files
+
+    async def get_files_by_pattern(self, path: str = "", fnmatch_pattern: str 
= "") -> list[str]:
+        """
+        Returns the name of a file matching the file pattern at the provided 
path.
+
+        If one exists Otherwise, raises an AirflowException to be handled 
upstream for deferring.
+        """
+        files_list = await self.list_directory(path)
+        if files_list is None:
+            raise AirflowException(f"No files at path {path} found...")
+        matched_files = [file for file in files_list if fnmatch(file, 
fnmatch_pattern)]
+        return matched_files

Review Comment:
   I think we can remove this method as it's not getting used here and also 
cleanup the unit tests for this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to