[AIRFLOW-780] Fix dag import errors no longer working The import errors were no longer working after the multiprocessor update (since they are cleared after each DAG directory is parsed). This change fixes them, and adds tests to prevent future regressions.
Also fix a couple of linter errors. Note that there are a few inefficiencies (e.g. sometimes we delete then add import errors in the same place instead of just doing an update), but this is equivalent to the old behavior. Testing Done: - Added missing unit tests for dag imports. Note that some of them strangely fail for python 3 and it became too time consuming to debug since I don't have a copy of the travis environment, I even ran with the same version of python locally and couldn't reproduce. I have skipped those 3 tests in python 3 for now. Closes #2018 from aoen/fix_parse_errors_not_displa Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67cbb966 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67cbb966 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67cbb966 Branch: refs/heads/v1-8-test Commit: 67cbb966410226c1489bb730af3af45330fc51b9 Parents: dc97bcd Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Fri Jan 27 01:29:00 2017 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Jan 27 01:29:04 2017 -0800 ---------------------------------------------------------------------- airflow/configuration.py | 1 + airflow/jobs.py | 48 +++++---- airflow/models.py | 3 + airflow/settings.py | 3 +- airflow/utils/dag_processing.py | 4 +- tests/jobs.py | 193 +++++++++++++++++++++++++++++++++-- 6 files changed, 219 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index 979b071..011f764 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -503,6 +503,7 @@ authenticate = true max_threads = 2 catchup_by_default = True scheduler_zombie_task_threshold = 300 +dag_dir_list_interval = 0 """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 201d87f..1a581e9 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -657,34 +657,44 @@ class SchedulerJob(BaseJob): session.close() @staticmethod - def record_import_errors(session, dagbag): + @provide_session + def clear_nonexistent_import_errors(session, known_file_paths): """ - For the DAGs in the given DagBag, record any associated import errors. - These are usually displayed through the Airflow UI so that users know - that there are issues parsing DAGs. + Clears import errors for files that no longer exist. :param session: session for ORM operations :type session: sqlalchemy.orm.session.Session - :param dagbag: DagBag containing DAGs with import errors - :type dagbag: models.Dagbag + :param known_file_paths: The list of existing files that are parsed for DAGs + :type known_file_paths: list[unicode] """ - for filename, stacktrace in list(dagbag.import_errors.items()): - session.query(models.ImportError).filter( - models.ImportError.filename == filename - ).delete() - session.add(models.ImportError( - filename=filename, stacktrace=stacktrace)) + session.query(models.ImportError).filter( + ~models.ImportError.filename.in_(known_file_paths) + ).delete(synchronize_session='fetch') session.commit() @staticmethod - def clear_import_errors(session): + def update_import_errors(session, dagbag): """ - Remove all the known import errors from the DB. + For the DAGs in the given DagBag, record any associated import errors and clears + errors for files that no longer have them. These are usually displayed through the + Airflow UI so that users know that there are issues parsing DAGs. :param session: session for ORM operations :type session: sqlalchemy.orm.session.Session + :param dagbag: DagBag containing DAGs with import errors + :type dagbag: models.Dagbag """ - session.query(models.ImportError).delete() + # Clear the errors of the processed files + for dagbag_file in dagbag.file_last_changed: + session.query(models.ImportError).filter( + models.ImportError.filename == dagbag_file + ).delete() + + # Add the errors of the processed files + for filename, stacktrace in dagbag.import_errors.iteritems(): + session.add(models.ImportError( + filename=filename, + stacktrace=stacktrace)) session.commit() @provide_session @@ -1352,8 +1362,6 @@ class SchedulerJob(BaseJob): dr.execution_date)) self._reset_state_for_orphaned_tasks(dr, session=session) - self.logger.info("Removing old import errors") - self.clear_import_errors(session) session.close() execute_start_time = datetime.now() @@ -1388,6 +1396,9 @@ class SchedulerJob(BaseJob): .format(len(known_file_paths), self.subdir)) processor_manager.set_file_paths(known_file_paths) + self.logger.debug("Removing old import errors") + self.clear_nonexistent_import_errors(known_file_paths=known_file_paths) + # Kick of new processes and collect results from finished ones self.logger.info("Heartbeating the process manager") simple_dags = processor_manager.heartbeat() @@ -1523,6 +1534,7 @@ class SchedulerJob(BaseJob): file_path)) else: self.logger.warn("No viable dags retrieved from {}".format(file_path)) + self.update_import_errors(session, dagbag) return [] # Save individual DAGs in the ORM and update DagModel.last_scheduled_time @@ -1598,7 +1610,7 @@ class SchedulerJob(BaseJob): # Record import errors into the ORM try: - self.record_import_errors(session, dagbag) + self.update_import_errors(session, dagbag) except Exception: self.logger.exception("Error logging import errors!") try: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 95c40a9..b9015eb 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -247,6 +247,7 @@ class DagBag(BaseDagBag, LoggingMixin): with open(filepath, 'rb') as f: content = f.read() if not all([s in content for s in (b'DAG', b'airflow')]): + self.file_last_changed[filepath] = file_last_changed_on_disk return found_dags self.logger.debug("Importing {}".format(filepath)) @@ -283,6 +284,8 @@ class DagBag(BaseDagBag, LoggingMixin): format(mod.filename, filepath)) content = zf.read() if not all([s in content for s in (b'DAG', b'airflow')]): + self.file_last_changed[filepath] = ( + file_last_changed_on_disk) # todo: create ignore list return found_dags http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 4882875..45f7fba 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -143,7 +143,6 @@ def configure_orm(disable_connection_pool=False): engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE') engine_args['pool_recycle'] = conf.getint('core', 'SQL_ALCHEMY_POOL_RECYCLE') - # engine_args['echo'] = True engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) Session = scoped_session( @@ -163,4 +162,4 @@ configure_orm() KILOBYTE = 1024 MEGABYTE = KILOBYTE * KILOBYTE -WEB_COLORS = {'LIGHTBLUE': '#4d9de0'} \ No newline at end of file +WEB_COLORS = {'LIGHTBLUE': '#4d9de0'} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/utils/dag_processing.py ---------------------------------------------------------------------- diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index aa502fb..6209946 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -198,8 +198,8 @@ def list_py_file_paths(directory, safe_mode=True): if safe_mode: with open(file_path, 'rb') as f: content = f.read() - might_contain_dag = all([s in content - for s in (b'DAG', b'airflow')]) + might_contain_dag = all( + [s in content for s in (b'DAG', b'airflow')]) if not might_contain_dag: continue http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 53626ee..b674bcd 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -20,13 +20,15 @@ from __future__ import unicode_literals import datetime import logging import os +import shutil import unittest import six +import sys +from tempfile import mkdtemp from airflow import AirflowException, settings from airflow import models from airflow.bin import cli -from airflow.executors import DEFAULT_EXECUTOR from airflow.jobs import BackfillJob, SchedulerJob from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI from airflow.operators.dummy_operator import DummyOperator @@ -49,9 +51,20 @@ except ImportError: except ImportError: mock = None +IS_PYTHON_3_TRAVIS = sys.version_info >= (3, 0) and "TRAVIS" in os.environ + DEV_NULL = '/dev/null' DEFAULT_DATE = datetime.datetime(2016, 1, 1) +# Include the words "airflow" and "dag" in the file contents, tricking airflow into thinking these +# files contain a DAG (otherwise Airflow will skip them) +PARSEABLE_DAG_FILE_CONTENTS = '"airflow DAG"' +UNPARSEABLE_DAG_FILE_CONTENTS = 'airflow DAG' + +# Filename to be used for dags that are created in an ad-hoc manner and can be removed/ +# created at runtime +TEMP_DAG_FILENAME = "temp_dag.py" + class BackfillJobTest(unittest.TestCase): @@ -97,7 +110,7 @@ class BackfillJobTest(unittest.TestCase): job = BackfillJob( dag=dag, start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE+datetime.timedelta(days=1), + end_date=DEFAULT_DATE + datetime.timedelta(days=1), ignore_first_depends_on_past=True ) job.run() @@ -109,7 +122,8 @@ class BackfillJobTest(unittest.TestCase): self.assertTrue(drs[0].execution_date == DEFAULT_DATE) self.assertTrue(drs[0].state == State.SUCCESS) - self.assertTrue(drs[1].execution_date == DEFAULT_DATE+datetime.timedelta(days=1)) + self.assertTrue(drs[1].execution_date == + DEFAULT_DATE + datetime.timedelta(days=1)) self.assertTrue(drs[1].state == State.SUCCESS) dag.clear() @@ -134,9 +148,8 @@ class BackfillJobTest(unittest.TestCase): logger = logging.getLogger('BackfillJobTest.test_backfill_examples') dags = [ dag for dag in self.dagbag.dags.values() - if 'example_dags' in dag.full_filepath - and dag.dag_id not in skip_dags - ] + if 'example_dags' in dag.full_filepath and dag.dag_id not in skip_dags + ] for dag in dags: dag.clear( @@ -244,6 +257,27 @@ class SchedulerJobTest(unittest.TestCase): def setUp(self): self.dagbag = DagBag() + session = settings.Session() + session.query(models.ImportError).delete() + session.commit() + + @staticmethod + def run_single_scheduler_loop_with_no_dags(dags_folder): + """ + Utility function that runs a single scheduler loop without actually + changing/scheduling any dags. This is useful to simulate the other side effects of + running a scheduler loop, e.g. to see what parse errors there are in the + dags_folder. + + :param dags_folder: the directory to traverse + :type directory: str + """ + scheduler = SchedulerJob( + dag_id='this_dag_doesnt_exist', # We don't want to actually run anything + num_runs=1, + subdir=os.path.join(dags_folder)) + scheduler.heartrate = 0 + scheduler.run() @provide_session def evaluate_dagrun( @@ -1177,11 +1211,11 @@ class SchedulerJobTest(unittest.TestCase): self.assertLess(dr.execution_date, datetime.datetime.now()) dag3 = DAG(DAG_NAME3, - schedule_interval='@hourly', - max_active_runs=1, - catchup=False, - default_args=default_args - ) + schedule_interval='@hourly', + max_active_runs=1, + catchup=False, + default_args=default_args + ) run_this_1 = DummyOperator(task_id='run_this_1', dag=dag3) run_this_2 = DummyOperator(task_id='run_this_2', dag=dag3) @@ -1209,3 +1243,140 @@ class SchedulerJobTest(unittest.TestCase): # The DR should be scheduled BEFORE now self.assertLess(dr.execution_date, datetime.datetime.now()) + + @unittest.skipIf(IS_PYTHON_3_TRAVIS, + "Fails in Python 3 on Travis but not reproducible locally") + def test_add_unparseable_file_before_sched_start_creates_import_error(self): + try: + dags_folder = mkdtemp() + unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME) + with open(unparseable_filename, 'w') as unparseable_file: + unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) + self.run_single_scheduler_loop_with_no_dags(dags_folder) + finally: + shutil.rmtree(dags_folder) + + session = settings.Session() + import_errors = session.query(models.ImportError).all() + + self.assertEqual(len(import_errors), 1) + import_error = import_errors[0] + self.assertEqual(import_error.filename, + unparseable_filename) + self.assertEqual(import_error.stacktrace, + "invalid syntax ({}, line 1)".format(TEMP_DAG_FILENAME)) + + @unittest.skipIf(IS_PYTHON_3_TRAVIS, + "Fails in Python 3 on Travis but not reproducible locally") + def test_add_unparseable_file_after_sched_start_creates_import_error(self): + try: + dags_folder = mkdtemp() + unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME) + self.run_single_scheduler_loop_with_no_dags(dags_folder) + + with open(unparseable_filename, 'w') as unparseable_file: + unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) + self.run_single_scheduler_loop_with_no_dags(dags_folder) + finally: + shutil.rmtree(dags_folder) + + session = settings.Session() + import_errors = session.query(models.ImportError).all() + + self.assertEqual(len(import_errors), 1) + import_error = import_errors[0] + self.assertEqual(import_error.filename, + unparseable_filename) + self.assertEqual(import_error.stacktrace, + "invalid syntax ({}, line 1)".format(TEMP_DAG_FILENAME)) + + def test_no_import_errors_with_parseable_dag(self): + try: + dags_folder = mkdtemp() + parseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME) + + with open(parseable_filename, 'w') as parseable_file: + parseable_file.writelines(PARSEABLE_DAG_FILE_CONTENTS) + self.run_single_scheduler_loop_with_no_dags(dags_folder) + finally: + shutil.rmtree(dags_folder) + + session = settings.Session() + import_errors = session.query(models.ImportError).all() + + self.assertEqual(len(import_errors), 0) + + @unittest.skipIf(IS_PYTHON_3_TRAVIS, + "Fails in Python 3 on Travis but not reproducible locally") + def test_new_import_error_replaces_old(self): + try: + dags_folder = mkdtemp() + unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME) + + # Generate original import error + with open(unparseable_filename, 'w') as unparseable_file: + unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) + self.run_single_scheduler_loop_with_no_dags(dags_folder) + + # Generate replacement import error (the error will be on the second line now) + with open(unparseable_filename, 'w') as unparseable_file: + unparseable_file.writelines( + PARSEABLE_DAG_FILE_CONTENTS + + os.linesep + + UNPARSEABLE_DAG_FILE_CONTENTS) + self.run_single_scheduler_loop_with_no_dags(dags_folder) + finally: + shutil.rmtree(dags_folder) + + session = settings.Session() + import_errors = session.query(models.ImportError).all() + + self.assertEqual(len(import_errors), 1) + import_error = import_errors[0] + self.assertEqual(import_error.filename, + unparseable_filename) + self.assertEqual(import_error.stacktrace, + "invalid syntax ({}, line 2)".format(TEMP_DAG_FILENAME)) + + def test_remove_error_clears_import_error(self): + try: + dags_folder = mkdtemp() + filename_to_parse = os.path.join(dags_folder, TEMP_DAG_FILENAME) + + # Generate original import error + with open(filename_to_parse, 'w') as file_to_parse: + file_to_parse.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) + self.run_single_scheduler_loop_with_no_dags(dags_folder) + + # Remove the import error from the file + with open(filename_to_parse, 'w') as file_to_parse: + file_to_parse.writelines( + PARSEABLE_DAG_FILE_CONTENTS) + self.run_single_scheduler_loop_with_no_dags(dags_folder) + finally: + shutil.rmtree(dags_folder) + + session = settings.Session() + import_errors = session.query(models.ImportError).all() + + self.assertEqual(len(import_errors), 0) + + def test_remove_file_clears_import_error(self): + try: + dags_folder = mkdtemp() + filename_to_parse = os.path.join(dags_folder, TEMP_DAG_FILENAME) + + # Generate original import error + with open(filename_to_parse, 'w') as file_to_parse: + file_to_parse.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) + self.run_single_scheduler_loop_with_no_dags(dags_folder) + finally: + shutil.rmtree(dags_folder) + + # Rerun the scheduler once the dag file has been removed + self.run_single_scheduler_loop_with_no_dags(dags_folder) + + session = settings.Session() + import_errors = session.query(models.ImportError).all() + + self.assertEqual(len(import_errors), 0)