This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7b7782d9a1ea51db598477dfc599c9df54866582 Author: Mikhail Ilchenko <pr0wler...@gmail.com> AuthorDate: Tue Mar 1 12:18:34 2022 +0300 Fix filesystem sensor for directories (#21729) Fix walking through wildcarded directory in `FileSensor.poke` method (cherry picked from commit 6b0ca646ec849af91fe8a10d3d5656cafa3ed4bd) --- airflow/sensors/filesystem.py | 2 +- tests/sensors/test_filesystem.py | 36 ++++++++++++++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py index 130be5c..1a4711c 100644 --- a/airflow/sensors/filesystem.py +++ b/airflow/sensors/filesystem.py @@ -65,7 +65,7 @@ class FileSensor(BaseSensorOperator): self.log.info('Found File %s last modified: %s', str(path), str(mod_time)) return True - for _, _, files in os.walk(full_path): + for _, _, files in os.walk(path): if len(files) > 0: return True return False diff --git a/tests/sensors/test_filesystem.py b/tests/sensors/test_filesystem.py index 4d23331..e696f1e 100644 --- a/tests/sensors/test_filesystem.py +++ b/tests/sensors/test_filesystem.py @@ -17,7 +17,7 @@ # under the License. # -import os.path +import os import shutil import tempfile import unittest @@ -131,6 +131,38 @@ class TestFileSensor(unittest.TestCase): task._hook = self.hook task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_wildcard_empty_directory(self): + with tempfile.TemporaryDirectory() as temp_dir: + with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir): + task = FileSensor( + task_id='test', + filepath=os.path.join(temp_dir, '*dir'), + fs_conn_id='fs_default', + dag=self.dag, + timeout=0, + ) + task._hook = self.hook + + # No files in dir + with pytest.raises(AirflowSensorTimeout): + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + def test_wildcard_directory_with_files(self): + with tempfile.TemporaryDirectory() as temp_dir: + with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir) as subdir: + task = FileSensor( + task_id='test', + filepath=os.path.join(temp_dir, '*dir'), + fs_conn_id='fs_default', + dag=self.dag, + timeout=0, + ) + task._hook = self.hook + + # `touch` the file in subdir + open(os.path.join(subdir, 'file'), 'a').close() + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_wildcared_directory(self): temp_dir = tempfile.mkdtemp() subdir = tempfile.mkdtemp(dir=temp_dir) @@ -146,7 +178,7 @@ class TestFileSensor(unittest.TestCase): task._hook = self.hook try: - # `touch` the dir + # `touch` the file in subdir open(subdir + "/file", "a").close() task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) finally: