[AIRFLOW-813] Fix unterminated unit tests in SchedulerJobTest Closes #2030 from fenglu-g/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9d9e56dc Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9d9e56dc Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9d9e56dc Branch: refs/heads/v1-8-test Commit: 9d9e56dc3cbec256422249d382aedd29d25c46a3 Parents: cf6d50c Author: Feng Lu <fen...@fengcloud.hot.corp.google.com> Authored: Sat Jan 28 13:39:59 2017 +0100 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Sat Jan 28 13:40:03 2017 +0100 ---------------------------------------------------------------------- airflow/utils/dag_processing.py | 9 +++++++-- tests/jobs.py | 6 ++---- 2 files changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d9e56dc/airflow/utils/dag_processing.py ---------------------------------------------------------------------- diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index ef1c1ed..6ed5db7 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -354,6 +354,8 @@ class DagFileProcessorManager(LoggingMixin): self._last_finish_time = {} # Map from file path to the number of runs self._run_count = defaultdict(int) + # Scheduler heartbeat key. + self._heart_beat_key = 'heart-beat' @property def file_paths(self): @@ -628,17 +630,20 @@ class DagFileProcessorManager(LoggingMixin): self.symlink_latest_log_directory() + # Update scheduler heartbeat count. + self._run_count[self._heart_beat_key] += 1 + return simple_dags def max_runs_reached(self): """ :return: whether all file paths have been processed max_runs times """ - if not self._file_paths: # No dag file is present. - return False for file_path in self._file_paths: if self._run_count[file_path] != self._max_runs: return False + if self._run_count[self._heart_beat_key] < self._max_runs: + return False return True def terminate(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d9e56dc/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 1872266..b674bcd 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -267,8 +267,7 @@ class SchedulerJobTest(unittest.TestCase): Utility function that runs a single scheduler loop without actually changing/scheduling any dags. This is useful to simulate the other side effects of running a scheduler loop, e.g. to see what parse errors there are in the - dags_folder. The run_duration is limited to 20 seconds as the scheduler - will run forever as num_runs is ignored when there is no dag file. + dags_folder. :param dags_folder: the directory to traverse :type directory: str @@ -276,8 +275,7 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob( dag_id='this_dag_doesnt_exist', # We don't want to actually run anything num_runs=1, - subdir=os.path.join(dags_folder), - run_duration=20) + subdir=os.path.join(dags_folder)) scheduler.heartrate = 0 scheduler.run()