This is an automated email from the ASF dual-hosted git repository. pankajkoti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new f5c8059553 sftp_sensor: fixing resource management with sensor (#40022) f5c8059553 is described below commit f5c80595532299f31d911823a64c9730a838b4d7 Author: Ashish Patel <ashishpatel0...@gmail.com> AuthorDate: Fri Jun 7 18:24:32 2024 +0530 sftp_sensor: fixing resource management with sensor (#40022) closes: #39922 Summary When a user tries to use the SFTPSensor operator with deferrable=True, using path/newer_than, it will open a connection and remain open, the reason is because of method get_mod_time in opening a sftp connection but not closing it afterward. As part of this change, we are closing the connection. --- airflow/providers/sftp/hooks/sftp.py | 10 +++++++--- airflow/providers/sftp/sensors/sftp.py | 13 +++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/airflow/providers/sftp/hooks/sftp.py b/airflow/providers/sftp/hooks/sftp.py index 0907fba2eb..7a2e34a215 100644 --- a/airflow/providers/sftp/hooks/sftp.py +++ b/airflow/providers/sftp/hooks/sftp.py @@ -549,7 +549,7 @@ class SFTPHookAsync(BaseHook): matched_files = [file for file in files_list if fnmatch(str(file.filename), fnmatch_pattern)] return matched_files - async def get_mod_time(self, path: str) -> str: + async def get_mod_time(self, path: str) -> str: # type: ignore[return] """ Make SFTP async connection. @@ -558,9 +558,10 @@ class SFTPHookAsync(BaseHook): :param path: full path to the remote file """ - ssh_conn = await self._get_conn() - sftp_client = await ssh_conn.start_sftp_client() + ssh_conn = None try: + ssh_conn = await self._get_conn() + sftp_client = await ssh_conn.start_sftp_client() ftp_mdtm = await sftp_client.stat(path) modified_time = ftp_mdtm.mtime mod_time = datetime.datetime.fromtimestamp(modified_time).strftime("%Y%m%d%H%M%S") # type: ignore[arg-type] @@ -568,3 +569,6 @@ class SFTPHookAsync(BaseHook): return mod_time except asyncssh.SFTPNoSuchFile: raise AirflowException("No files matching") + finally: + if ssh_conn: + ssh_conn.close() diff --git a/airflow/providers/sftp/sensors/sftp.py b/airflow/providers/sftp/sensors/sftp.py index de3870937d..f56ad93410 100644 --- a/airflow/providers/sftp/sensors/sftp.py +++ b/airflow/providers/sftp/sensors/sftp.py @@ -111,6 +111,19 @@ class SFTPSensor(BaseSensorOperator): _newer_than = convert_to_utc(self.newer_than) if _newer_than <= _mod_time: files_found.append(actual_file_to_check) + self.log.info( + "File %s has modification time: '%s', which is newer than: '%s'", + actual_file_to_check, + str(_mod_time), + str(_newer_than), + ) + else: + self.log.info( + "File %s has modification time: '%s', which is older than: '%s'", + actual_file_to_check, + str(_mod_time), + str(_newer_than), + ) else: files_found.append(actual_file_to_check)