Repository: incubator-airflow Updated Branches: refs/heads/master 410736dbc -> 1c4cff056
[AIRFLOW-715] A more efficient HDFS Sensor: A more efficient HDFS Sensor: HDFS Sensor is now capable to trigger true based on a file size, a directory status (empty or not) a regex to match files in a directory and also to discard copying files. With the base HDFS Sensor, it was not possible to watch a directory for files with a unknown name. HDFS Sensors is now extended with (contrib): - HdfsSensorRegex : for matching files wih a regex (re) - HdfsSensorFolder : for matching with directory HDFS Sensor has now to built in filters : - filter_for_filesize : to filter list result by the filesize - filter_for_ignored_ext : to discard or not copying files Unittests added with a new FakeSnakebite client and a FakeHdfsHook A more efficient HDFS Sensor: HDFS Sensor is now capable to trigger true based on a file size, a directory status (empty or not) a regex to match files in a directory and also to discard copying files. With the base HDFS Sensor, it was not possible to watch a directory for files with a unknown name. HDFS Sensors is now extended with (contrib): - HdfsSensorRegex : for matching files wih a regex (re) - HdfsSensorFolder : for matching with directory HDFS Sensor has now to built in filters : - filter_for_filesize : to filter list result by the filesize - filter_for_ignored_ext : to discard or not copying files Unittests added with a new FakeSnakebite client and a FakeHdfsHook A more efficient HDFS Sensor: HDFS Sensor is now capable to trigger true based on a file size, a directory status (empty or not) a regex to match files in a directory and also to discard copying files. With the base HDFS Sensor, it was not possible to watch a directory for files with a unknown name. HDFS Sensors is now extended with (contrib): - HdfsSensorRegex : for matching files wih a regex (re) - HdfsSensorFolder : for matching with directory HDFS Sensor has now to built in filters : - filter_for_filesize : to filter list result by the filesize - filter_for_ignored_ext : to discard or not copying files Unittests added with a new FakeSnakebite client and a FakeHdfsHook Closes #1957 from vfoucault/feature/AIRFLOW-715 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1c4cff05 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1c4cff05 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1c4cff05 Branch: refs/heads/master Commit: 1c4cff056488623cfd3a6ec411e680e3e5198b21 Parents: 410736d Author: vfoucault <[email protected]> Authored: Sat Dec 31 14:02:07 2016 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Sat Dec 31 14:02:07 2016 +0100 ---------------------------------------------------------------------- airflow/contrib/sensors/hdfs_sensors.py | 67 +++++++ airflow/operators/sensors.py | 65 ++++++- airflow/settings.py | 6 + tests/contrib/__init__.py | 1 + tests/contrib/sensors/hdfs_sensors.py | 251 +++++++++++++++++++++++++++ tests/core.py | 73 ++++++++ tests/operators/sensors.py | 73 +++++++- 7 files changed, 528 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/airflow/contrib/sensors/hdfs_sensors.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py new file mode 100644 index 0000000..a12e295 --- /dev/null +++ b/airflow/contrib/sensors/hdfs_sensors.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from airflow.operators.sensors import HdfsSensor +import logging + + +class HdfsSensorRegex(HdfsSensor): + def __init__( + self, + regex, + *args, **kwargs): + super(HdfsSensorRegex, self).__init__(*args, **kwargs) + self.regex = regex + + def poke(self, context): + """ + poke matching files in a directory with self.regex + :return: Bool depending on the search criteria + """ + sb = self.hook(self.hdfs_conn_id).get_conn() + logging.getLogger("snakebite").setLevel(logging.WARNING) + logging.info( + 'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals())) + result = [f for f in sb.ls([self.filepath], include_toplevel=False) if + f['file_type'] == 'f' and self.regex.match(f['path'].replace('%s/' % self.filepath, ''))] + result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) + result = self.filter_for_filesize(result, self.file_size) + return bool(result) + + +class HdfsSensorFolder(HdfsSensor): + def __init__( + self, + be_empty=False, + *args, **kwargs): + super(HdfsSensorFolder, self).__init__(*args, **kwargs) + self.be_empty = be_empty + + def poke(self, context): + """ + poke for a non empty directory + :return: Bool depending on the search criteria + """ + sb = self.hook(self.hdfs_conn_id).get_conn() + logging.getLogger("snakebite").setLevel(logging.WARNING) + result = [f for f in sb.ls([self.filepath], include_toplevel=True)] + result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) + result = self.filter_for_filesize(result, self.file_size) + if self.be_empty: + logging.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals())) + return len(result) == 1 and result[0]['path'] == self.filepath + else: + logging.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals())) + return bool(result) and result[0]['file_type'] == 'f' + + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/airflow/operators/sensors.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index 8dd1b71..da01483 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -22,12 +22,15 @@ from datetime import datetime import logging from urllib.parse import urlparse from time import sleep +import re +import sys import airflow from airflow import hooks, settings from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException from airflow.models import BaseOperator, TaskInstance from airflow.hooks.base_hook import BaseHook +from airflow.hooks.hdfs_hook import HDFSHook from airflow.utils.state import State from airflow.utils.decorators import apply_defaults @@ -371,29 +374,77 @@ class HdfsSensor(BaseSensorOperator): Waits for a file or folder to land in HDFS """ template_fields = ('filepath',) - ui_color = '#4d9de0' + ui_color = settings.WEB_COLORS['LIGHTBLUE'] @apply_defaults def __init__( self, filepath, hdfs_conn_id='hdfs_default', + ignored_ext=['_COPYING_'], + ignore_copying=True, + file_size=None, + hook=HDFSHook, *args, **kwargs): super(HdfsSensor, self).__init__(*args, **kwargs) self.filepath = filepath self.hdfs_conn_id = hdfs_conn_id + self.file_size = file_size + self.ignored_ext = ignored_ext + self.ignore_copying = ignore_copying + self.hook = hook + + @staticmethod + def filter_for_filesize(result, size=None): + """ + Will test the filepath result and test if its size is at least self.filesize + :param result: a list of dicts returned by Snakebite ls + :param size: the file size in MB a file should be at least to trigger True + :return: (bool) depending on the matching criteria + """ + if size: + logging.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result)) + size *= settings.MEGABYTE + result = [x for x in result if x['length'] >= size] + logging.debug('HdfsSensor.poke: after size filter result is %s', result) + return result + + @staticmethod + def filter_for_ignored_ext(result, ignored_ext, ignore_copying): + """ + Will filter if instructed to do so the result to remove matching criteria + :param result: (list) of dicts returned by Snakebite ls + :param ignored_ext: (list) of ignored extentions + :param ignore_copying: (bool) shall we ignore ? + :return: + """ + if ignore_copying: + regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext) + ignored_extentions_regex = re.compile(regex_builder) + logging.debug('Filtering result for ignored extentions: %s in files %s', ignored_extentions_regex.pattern, + map(lambda x: x['path'], result)) + result = [x for x in result if not ignored_extentions_regex.match(x['path'])] + logging.debug('HdfsSensor.poke: after ext filter result is %s', result) + return result def poke(self, context): - import airflow.hooks.hdfs_hook - sb = airflow.hooks.hdfs_hook.HDFSHook(self.hdfs_conn_id).get_conn() + sb = self.hook(self.hdfs_conn_id).get_conn() logging.getLogger("snakebite").setLevel(logging.WARNING) - logging.info( - 'Poking for file {self.filepath} '.format(**locals())) + logging.info('Poking for file {self.filepath} '.format(**locals())) try: - files = [f for f in sb.ls([self.filepath])] + # IMOO it's not right here, as there no raise of any kind. + # if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*', + # it's not correct as the directory exists and sb does not raise any error + # here is a quick fix + result = [f for f in sb.ls([self.filepath], include_toplevel=False)] + logging.debug('HdfsSensor.poke: result is %s', result) + result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) + result = self.filter_for_filesize(result, self.file_size) + return bool(result) except: + e = sys.exc_info() + logging.debug("Caught an exception !: %s", str(e)) return False - return True class WebHdfsSensor(BaseSensorOperator): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index e8da674..ce2ca92 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -147,3 +147,9 @@ except: configure_logging() configure_orm() + +# Const stuff + +KILOBYTE = 1024 +MEGABYTE = KILOBYTE * KILOBYTE +WEB_COLORS = {'LIGHTBLUE': '#4d9de0'} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/tests/contrib/__init__.py ---------------------------------------------------------------------- diff --git a/tests/contrib/__init__.py b/tests/contrib/__init__.py index 180273b..ff6f9e2 100644 --- a/tests/contrib/__init__.py +++ b/tests/contrib/__init__.py @@ -14,3 +14,4 @@ from __future__ import absolute_import from .operators import * +from .sensors import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/tests/contrib/sensors/hdfs_sensors.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/hdfs_sensors.py b/tests/contrib/sensors/hdfs_sensors.py new file mode 100644 index 0000000..cabe349 --- /dev/null +++ b/tests/contrib/sensors/hdfs_sensors.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import sys +import unittest +import re +from datetime import timedelta +from airflow.contrib.sensors.hdfs_sensors import HdfsSensorFolder, HdfsSensorRegex +from airflow.exceptions import AirflowSensorTimeout + + +class HdfsSensorFolderTests(unittest.TestCase): + def setUp(self): + if sys.version_info[0] == 3: + raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here') + from tests.core import FakeHDFSHook + self.hook = FakeHDFSHook + self.logger = logging.getLogger() + self.logger.setLevel(logging.DEBUG) + + def test_should_be_empty_directory(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + task = HdfsSensorFolder(task_id='Should_be_empty_directory', + filepath='/datadirectory/empty_directory', + be_empty=True, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + task.execute(None) + + # Then + # Nothing happens, nothing is raised exec is ok + + def test_should_be_empty_directory_fail(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail', + filepath='/datadirectory/not_empty_directory', + be_empty=True, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) + + def test_should_be_a_non_empty_directory(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + task = HdfsSensorFolder(task_id='Should_be_non_empty_directory', + filepath='/datadirectory/not_empty_directory', + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + task.execute(None) + + # Then + # Nothing happens, nothing is raised exec is ok + + def test_should_be_non_empty_directory_fail(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail', + filepath='/datadirectory/empty_directory', + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) + + +class HdfsSensorRegexTests(unittest.TestCase): + def setUp(self): + if sys.version_info[0] == 3: + raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here') + from tests.core import FakeHDFSHook + self.hook = FakeHDFSHook + self.logger = logging.getLogger() + self.logger.setLevel(logging.DEBUG) + + def test_should_match_regex(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + compiled_regex = re.compile("test[1-2]file") + task = HdfsSensorRegex(task_id='Should_match_the_regex', + filepath='/datadirectory/regex_dir', + regex=compiled_regex, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + task.execute(None) + + # Then + # Nothing happens, nothing is raised exec is ok + + def test_should_not_match_regex(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + compiled_regex = re.compile("^IDoNotExist") + task = HdfsSensorRegex(task_id='Should_not_match_the_regex', + filepath='/datadirectory/regex_dir', + regex=compiled_regex, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) + + def test_should_match_regex_and_filesize(self): + """ + test the file size behaviour with regex + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + compiled_regex = re.compile("test[1-2]file") + task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize', + filepath='/datadirectory/regex_dir', + regex=compiled_regex, + ignore_copying=True, + ignored_ext=['_COPYING_', 'sftp'], + file_size=10, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + task.execute(None) + + # Then + # Nothing happens, nothing is raised exec is ok + + def test_should_match_regex_but_filesize(self): + """ + test the file size behaviour with regex + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + compiled_regex = re.compile("test[1-2]file") + task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize', + filepath='/datadirectory/regex_dir', + regex=compiled_regex, + file_size=20, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) + + def test_should_match_regex_but_copyingext(self): + """ + test the file size behaviour with regex + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + compiled_regex = re.compile("copying_file_\d+.txt") + task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize', + filepath='/datadirectory/regex_dir', + regex=compiled_regex, + ignored_ext=['_COPYING_', 'sftp'], + file_size=20, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 85e7fa1..c85be2d 100644 --- a/tests/core.py +++ b/tests/core.py @@ -1773,6 +1773,7 @@ class HttpOpSensorTest(unittest.TestCase): dag=self.dag) sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + class FakeWebHDFSHook(object): def __init__(self, conn_id): self.conn_id = conn_id @@ -1783,6 +1784,78 @@ class FakeWebHDFSHook(object): def check_for_path(self, hdfs_path): return hdfs_path + +class FakeSnakeBiteClientException(Exception): + pass + + +class FakeSnakeBiteClient(object): + + def __init__(self): + self.started = True + + def ls(self, path, include_toplevel=False): + """ + the fake snakebite client + :param path: the array of path to test + :param include_toplevel: to return the toplevel directory info + :return: a list for path for the matching queries + """ + if path[0] == '/datadirectory/empty_directory' and not include_toplevel: + return [] + elif path[0] == '/datadirectory/datafile': + return [{'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796, + 'block_replication': 3, 'modification_time': 1481122343862, 'length': 0, 'blocksize': 134217728, + 'owner': u'hdfs', 'path': '/datadirectory/datafile'}] + elif path[0] == '/datadirectory/empty_directory' and include_toplevel: + return [ + {'group': u'supergroup', 'permission': 493, 'file_type': 'd', 'access_time': 0, 'block_replication': 0, + 'modification_time': 1481132141540, 'length': 0, 'blocksize': 0, 'owner': u'hdfs', + 'path': '/datadirectory/empty_directory'}] + elif path[0] == '/datadirectory/not_empty_directory': + return [{'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796, + 'block_replication': 3, 'modification_time': 1481122343862, 'length': 0, 'blocksize': 134217728, + 'owner': u'hdfs', 'path': '/datadirectory/not_empty_directory/test_file'}] + elif path[0] == '/datadirectory/not_empty_directory' and include_toplevel: + return [ + {'group': u'supergroup', 'permission': 493, 'file_type': 'd', 'access_time': 0, 'block_replication': 0, + 'modification_time': 1481132141540, 'length': 0, 'blocksize': 0, 'owner': u'hdfs', + 'path': '/datadirectory/empty_directory'}, + {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796, + 'block_replication': 3, 'modification_time': 1481122343862, 'length': 0, 'blocksize': 134217728, + 'owner': u'hdfs', 'path': '/datadirectory/not_empty_directory/test_file'}] + elif path[0] == '/datadirectory/not_existing_file_or_directory': + raise FakeSnakeBiteClientException + elif path[0] == '/datadirectory/regex_dir': + return [{'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796, + 'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728, + 'owner': u'hdfs', 'path': '/datadirectory/regex_dir/test1file'}, + {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796, + 'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728, + 'owner': u'hdfs', 'path': '/datadirectory/regex_dir/test2file'}, + {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796, + 'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728, + 'owner': u'hdfs', 'path': '/datadirectory/regex_dir/test3file'}, + {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796, + 'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728, + 'owner': u'hdfs', 'path': '/datadirectory/regex_dir/copying_file_1.txt._COPYING_'}, + {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796, + 'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728, + 'owner': u'hdfs', 'path': '/datadirectory/regex_dir/copying_file_3.txt.sftp'} + ] + else: + raise FakeSnakeBiteClientException + + +class FakeHDFSHook(object): + def __init__(self, conn_id=None): + self.conn_id = conn_id + + def get_conn(self): + client = FakeSnakeBiteClient() + return client + + class ConnectionTest(unittest.TestCase): def setUp(self): configuration.load_test_config() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/tests/operators/sensors.py ---------------------------------------------------------------------- diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py index ebce1dc..e8b272b 100644 --- a/tests/operators/sensors.py +++ b/tests/operators/sensors.py @@ -15,13 +15,14 @@ import logging import os +import sys import time import unittest from datetime import datetime, timedelta from airflow import DAG, configuration -from airflow.operators.sensors import HttpSensor, BaseSensorOperator +from airflow.operators.sensors import HttpSensor, BaseSensorOperator, HdfsSensor from airflow.utils.decorators import apply_defaults from airflow.exceptions import (AirflowException, AirflowSensorTimeout, @@ -110,3 +111,73 @@ class HttpSensorTests(unittest.TestCase): poke_interval=5) with self.assertRaisesRegexp(AirflowException, 'AirflowException raised here!'): task.execute(None) + + +class HdfsSensorTests(unittest.TestCase): + + def setUp(self): + if sys.version_info[0] == 3: + raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here') + from tests.core import FakeHDFSHook + self.hook = FakeHDFSHook + self.logger = logging.getLogger() + self.logger.setLevel(logging.DEBUG) + + def test_legacy_file_exist(self): + """ + Test the legacy behaviour + :return: + """ + # Given + self.logger.info("Test for existing file with the legacy behaviour") + # When + task = HdfsSensor(task_id='Should_be_file_legacy', + filepath='/datadirectory/datafile', + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + task.execute(None) + + # Then + # Nothing happens, nothing is raised exec is ok + + def test_legacy_file_exist_but_filesize(self): + """ + Test the legacy behaviour with the filesize + :return: + """ + # Given + self.logger.info("Test for existing file with the legacy behaviour") + # When + task = HdfsSensor(task_id='Should_be_file_legacy', + filepath='/datadirectory/datafile', + timeout=1, + file_size=20, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) + + def test_legacy_file_does_not_exists(self): + """ + Test the legacy behaviour + :return: + """ + # Given + self.logger.info("Test for non existing file with the legacy behaviour") + task = HdfsSensor(task_id='Should_not_be_file_legacy', + filepath='/datadirectory/not_existing_file_or_directory', + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) \ No newline at end of file
