This is an automated email from the ASF dual-hosted git repository.

pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f5c8059553 sftp_sensor: fixing resource management with sensor (#40022)
f5c8059553 is described below

commit f5c80595532299f31d911823a64c9730a838b4d7
Author: Ashish Patel <ashishpatel0...@gmail.com>
AuthorDate: Fri Jun 7 18:24:32 2024 +0530

    sftp_sensor: fixing resource management with sensor (#40022)
    
    closes: #39922
    
    Summary
    When a user tries to use the SFTPSensor operator with deferrable=True, 
using path/newer_than, it will open a connection and remain open, the reason is 
because of method get_mod_time in opening a sftp connection but not closing it 
afterward.
    
    As part of this change, we are closing the connection.
---
 airflow/providers/sftp/hooks/sftp.py   | 10 +++++++---
 airflow/providers/sftp/sensors/sftp.py | 13 +++++++++++++
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/sftp/hooks/sftp.py 
b/airflow/providers/sftp/hooks/sftp.py
index 0907fba2eb..7a2e34a215 100644
--- a/airflow/providers/sftp/hooks/sftp.py
+++ b/airflow/providers/sftp/hooks/sftp.py
@@ -549,7 +549,7 @@ class SFTPHookAsync(BaseHook):
         matched_files = [file for file in files_list if 
fnmatch(str(file.filename), fnmatch_pattern)]
         return matched_files
 
-    async def get_mod_time(self, path: str) -> str:
+    async def get_mod_time(self, path: str) -> str:  # type: ignore[return]
         """
         Make SFTP async connection.
 
@@ -558,9 +558,10 @@ class SFTPHookAsync(BaseHook):
 
         :param path: full path to the remote file
         """
-        ssh_conn = await self._get_conn()
-        sftp_client = await ssh_conn.start_sftp_client()
+        ssh_conn = None
         try:
+            ssh_conn = await self._get_conn()
+            sftp_client = await ssh_conn.start_sftp_client()
             ftp_mdtm = await sftp_client.stat(path)
             modified_time = ftp_mdtm.mtime
             mod_time = 
datetime.datetime.fromtimestamp(modified_time).strftime("%Y%m%d%H%M%S")  # 
type: ignore[arg-type]
@@ -568,3 +569,6 @@ class SFTPHookAsync(BaseHook):
             return mod_time
         except asyncssh.SFTPNoSuchFile:
             raise AirflowException("No files matching")
+        finally:
+            if ssh_conn:
+                ssh_conn.close()
diff --git a/airflow/providers/sftp/sensors/sftp.py 
b/airflow/providers/sftp/sensors/sftp.py
index de3870937d..f56ad93410 100644
--- a/airflow/providers/sftp/sensors/sftp.py
+++ b/airflow/providers/sftp/sensors/sftp.py
@@ -111,6 +111,19 @@ class SFTPSensor(BaseSensorOperator):
                 _newer_than = convert_to_utc(self.newer_than)
                 if _newer_than <= _mod_time:
                     files_found.append(actual_file_to_check)
+                    self.log.info(
+                        "File %s has modification time: '%s', which is newer 
than: '%s'",
+                        actual_file_to_check,
+                        str(_mod_time),
+                        str(_newer_than),
+                    )
+                else:
+                    self.log.info(
+                        "File %s has modification time: '%s', which is older 
than: '%s'",
+                        actual_file_to_check,
+                        str(_mod_time),
+                        str(_newer_than),
+                    )
             else:
                 files_found.append(actual_file_to_check)
 

Reply via email to