[AIRFLOW-780] Fix dag import errors no longer working

The import errors were no longer working after the
multiprocessor update
(since they are cleared after each DAG directory
is parsed). This change
fixes them, and adds tests to prevent future
regressions.

Also fix a couple of linter errors.

Note that there are a few inefficiencies (e.g.
sometimes we delete then add import errors in the
same place instead of just doing an update), but
this is equivalent to the old behavior.

Testing Done:
- Added missing unit tests for dag imports. Note
that some of them strangely fail for python 3 and
it became too time consuming to debug since I
don't have a copy of the travis environment, I
even ran with the same version of python locally
and couldn't reproduce. I have skipped those 3
tests in python 3 for now.

Closes #2018 from aoen/fix_parse_errors_not_displa


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67cbb966
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67cbb966
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67cbb966

Branch: refs/heads/v1-8-test
Commit: 67cbb966410226c1489bb730af3af45330fc51b9
Parents: dc97bcd
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Fri Jan 27 01:29:00 2017 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Jan 27 01:29:04 2017 -0800

----------------------------------------------------------------------
 airflow/configuration.py        |   1 +
 airflow/jobs.py                 |  48 +++++----
 airflow/models.py               |   3 +
 airflow/settings.py             |   3 +-
 airflow/utils/dag_processing.py |   4 +-
 tests/jobs.py                   | 193 +++++++++++++++++++++++++++++++++--
 6 files changed, 219 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 979b071..011f764 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -503,6 +503,7 @@ authenticate = true
 max_threads = 2
 catchup_by_default = True
 scheduler_zombie_task_threshold = 300
+dag_dir_list_interval = 0
 """
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 201d87f..1a581e9 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -657,34 +657,44 @@ class SchedulerJob(BaseJob):
             session.close()
 
     @staticmethod
-    def record_import_errors(session, dagbag):
+    @provide_session
+    def clear_nonexistent_import_errors(session, known_file_paths):
         """
-        For the DAGs in the given DagBag, record any associated import errors.
-        These are usually displayed through the Airflow UI so that users know
-        that there are issues parsing DAGs.
+        Clears import errors for files that no longer exist.
 
         :param session: session for ORM operations
         :type session: sqlalchemy.orm.session.Session
-        :param dagbag: DagBag containing DAGs with import errors
-        :type dagbag: models.Dagbag
+        :param known_file_paths: The list of existing files that are parsed 
for DAGs
+        :type known_file_paths: list[unicode]
         """
-        for filename, stacktrace in list(dagbag.import_errors.items()):
-            session.query(models.ImportError).filter(
-                models.ImportError.filename == filename
-            ).delete()
-            session.add(models.ImportError(
-                filename=filename, stacktrace=stacktrace))
+        session.query(models.ImportError).filter(
+            ~models.ImportError.filename.in_(known_file_paths)
+        ).delete(synchronize_session='fetch')
         session.commit()
 
     @staticmethod
-    def clear_import_errors(session):
+    def update_import_errors(session, dagbag):
         """
-        Remove all the known import errors from the DB.
+        For the DAGs in the given DagBag, record any associated import errors 
and clears
+        errors for files that no longer have them. These are usually displayed 
through the
+        Airflow UI so that users know that there are issues parsing DAGs.
 
         :param session: session for ORM operations
         :type session: sqlalchemy.orm.session.Session
+        :param dagbag: DagBag containing DAGs with import errors
+        :type dagbag: models.Dagbag
         """
-        session.query(models.ImportError).delete()
+        # Clear the errors of the processed files
+        for dagbag_file in dagbag.file_last_changed:
+            session.query(models.ImportError).filter(
+                models.ImportError.filename == dagbag_file
+            ).delete()
+
+        # Add the errors of the processed files
+        for filename, stacktrace in dagbag.import_errors.iteritems():
+            session.add(models.ImportError(
+                filename=filename,
+                stacktrace=stacktrace))
         session.commit()
 
     @provide_session
@@ -1352,8 +1362,6 @@ class SchedulerJob(BaseJob):
                                                       dr.execution_date))
             self._reset_state_for_orphaned_tasks(dr, session=session)
 
-        self.logger.info("Removing old import errors")
-        self.clear_import_errors(session)
         session.close()
 
         execute_start_time = datetime.now()
@@ -1388,6 +1396,9 @@ class SchedulerJob(BaseJob):
                                  .format(len(known_file_paths), self.subdir))
                 processor_manager.set_file_paths(known_file_paths)
 
+                self.logger.debug("Removing old import errors")
+                
self.clear_nonexistent_import_errors(known_file_paths=known_file_paths)
+
             # Kick of new processes and collect results from finished ones
             self.logger.info("Heartbeating the process manager")
             simple_dags = processor_manager.heartbeat()
@@ -1523,6 +1534,7 @@ class SchedulerJob(BaseJob):
                                      file_path))
         else:
             self.logger.warn("No viable dags retrieved from 
{}".format(file_path))
+            self.update_import_errors(session, dagbag)
             return []
 
         # Save individual DAGs in the ORM and update 
DagModel.last_scheduled_time
@@ -1598,7 +1610,7 @@ class SchedulerJob(BaseJob):
 
         # Record import errors into the ORM
         try:
-            self.record_import_errors(session, dagbag)
+            self.update_import_errors(session, dagbag)
         except Exception:
             self.logger.exception("Error logging import errors!")
         try:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 95c40a9..b9015eb 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -247,6 +247,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                 with open(filepath, 'rb') as f:
                     content = f.read()
                     if not all([s in content for s in (b'DAG', b'airflow')]):
+                        self.file_last_changed[filepath] = 
file_last_changed_on_disk
                         return found_dags
 
             self.logger.debug("Importing {}".format(filepath))
@@ -283,6 +284,8 @@ class DagBag(BaseDagBag, LoggingMixin):
                                               format(mod.filename, filepath))
                             content = zf.read()
                             if not all([s in content for s in (b'DAG', 
b'airflow')]):
+                                self.file_last_changed[filepath] = (
+                                    file_last_changed_on_disk)
                                 # todo: create ignore list
                                 return found_dags
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 4882875..45f7fba 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -143,7 +143,6 @@ def configure_orm(disable_connection_pool=False):
         engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE')
         engine_args['pool_recycle'] = conf.getint('core',
                                                   'SQL_ALCHEMY_POOL_RECYCLE')
-        # engine_args['echo'] = True
 
     engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
     Session = scoped_session(
@@ -163,4 +162,4 @@ configure_orm()
 
 KILOBYTE = 1024
 MEGABYTE = KILOBYTE * KILOBYTE
-WEB_COLORS = {'LIGHTBLUE': '#4d9de0'}
\ No newline at end of file
+WEB_COLORS = {'LIGHTBLUE': '#4d9de0'}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index aa502fb..6209946 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -198,8 +198,8 @@ def list_py_file_paths(directory, safe_mode=True):
                     if safe_mode:
                         with open(file_path, 'rb') as f:
                             content = f.read()
-                            might_contain_dag = all([s in content
-                                                     for s in (b'DAG', 
b'airflow')])
+                            might_contain_dag = all(
+                                [s in content for s in (b'DAG', b'airflow')])
 
                     if not might_contain_dag:
                         continue

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 53626ee..b674bcd 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -20,13 +20,15 @@ from __future__ import unicode_literals
 import datetime
 import logging
 import os
+import shutil
 import unittest
 import six
+import sys
+from tempfile import mkdtemp
 
 from airflow import AirflowException, settings
 from airflow import models
 from airflow.bin import cli
-from airflow.executors import DEFAULT_EXECUTOR
 from airflow.jobs import BackfillJob, SchedulerJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance 
as TI
 from airflow.operators.dummy_operator import DummyOperator
@@ -49,9 +51,20 @@ except ImportError:
     except ImportError:
         mock = None
 
+IS_PYTHON_3_TRAVIS = sys.version_info >= (3, 0) and "TRAVIS" in os.environ
+
 DEV_NULL = '/dev/null'
 DEFAULT_DATE = datetime.datetime(2016, 1, 1)
 
+# Include the words "airflow" and "dag" in the file contents, tricking airflow 
into thinking these
+# files contain a DAG (otherwise Airflow will skip them)
+PARSEABLE_DAG_FILE_CONTENTS = '"airflow DAG"'
+UNPARSEABLE_DAG_FILE_CONTENTS = 'airflow DAG'
+
+# Filename to be used for dags that are created in an ad-hoc manner and can be 
removed/
+# created at runtime
+TEMP_DAG_FILENAME = "temp_dag.py"
+
 
 class BackfillJobTest(unittest.TestCase):
 
@@ -97,7 +110,7 @@ class BackfillJobTest(unittest.TestCase):
         job = BackfillJob(
             dag=dag,
             start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE+datetime.timedelta(days=1),
+            end_date=DEFAULT_DATE + datetime.timedelta(days=1),
             ignore_first_depends_on_past=True
         )
         job.run()
@@ -109,7 +122,8 @@ class BackfillJobTest(unittest.TestCase):
 
         self.assertTrue(drs[0].execution_date == DEFAULT_DATE)
         self.assertTrue(drs[0].state == State.SUCCESS)
-        self.assertTrue(drs[1].execution_date == 
DEFAULT_DATE+datetime.timedelta(days=1))
+        self.assertTrue(drs[1].execution_date ==
+                        DEFAULT_DATE + datetime.timedelta(days=1))
         self.assertTrue(drs[1].state == State.SUCCESS)
 
         dag.clear()
@@ -134,9 +148,8 @@ class BackfillJobTest(unittest.TestCase):
         logger = logging.getLogger('BackfillJobTest.test_backfill_examples')
         dags = [
             dag for dag in self.dagbag.dags.values()
-            if 'example_dags' in dag.full_filepath
-            and dag.dag_id not in skip_dags
-            ]
+            if 'example_dags' in dag.full_filepath and dag.dag_id not in 
skip_dags
+        ]
 
         for dag in dags:
             dag.clear(
@@ -244,6 +257,27 @@ class SchedulerJobTest(unittest.TestCase):
 
     def setUp(self):
         self.dagbag = DagBag()
+        session = settings.Session()
+        session.query(models.ImportError).delete()
+        session.commit()
+
+    @staticmethod
+    def run_single_scheduler_loop_with_no_dags(dags_folder):
+        """
+        Utility function that runs a single scheduler loop without actually
+        changing/scheduling any dags. This is useful to simulate the other 
side effects of
+        running a scheduler loop, e.g. to see what parse errors there are in 
the
+        dags_folder.
+
+        :param dags_folder: the directory to traverse
+        :type directory: str
+        """
+        scheduler = SchedulerJob(
+            dag_id='this_dag_doesnt_exist',  # We don't want to actually run 
anything
+            num_runs=1,
+            subdir=os.path.join(dags_folder))
+        scheduler.heartrate = 0
+        scheduler.run()
 
     @provide_session
     def evaluate_dagrun(
@@ -1177,11 +1211,11 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertLess(dr.execution_date, datetime.datetime.now())
 
         dag3 = DAG(DAG_NAME3,
-                  schedule_interval='@hourly',
-                  max_active_runs=1,
-                  catchup=False,
-                  default_args=default_args
-              )
+                   schedule_interval='@hourly',
+                   max_active_runs=1,
+                   catchup=False,
+                   default_args=default_args
+                   )
 
         run_this_1 = DummyOperator(task_id='run_this_1', dag=dag3)
         run_this_2 = DummyOperator(task_id='run_this_2', dag=dag3)
@@ -1209,3 +1243,140 @@ class SchedulerJobTest(unittest.TestCase):
 
         # The DR should be scheduled BEFORE now
         self.assertLess(dr.execution_date, datetime.datetime.now())
+
+    @unittest.skipIf(IS_PYTHON_3_TRAVIS,
+                     "Fails in Python 3 on Travis but not reproducible 
locally")
+    def 
test_add_unparseable_file_before_sched_start_creates_import_error(self):
+        try:
+            dags_folder = mkdtemp()
+            unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME)
+            with open(unparseable_filename, 'w') as unparseable_file:
+                unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+            self.run_single_scheduler_loop_with_no_dags(dags_folder)
+        finally:
+            shutil.rmtree(dags_folder)
+
+        session = settings.Session()
+        import_errors = session.query(models.ImportError).all()
+
+        self.assertEqual(len(import_errors), 1)
+        import_error = import_errors[0]
+        self.assertEqual(import_error.filename,
+                         unparseable_filename)
+        self.assertEqual(import_error.stacktrace,
+                         "invalid syntax ({}, line 
1)".format(TEMP_DAG_FILENAME))
+
+    @unittest.skipIf(IS_PYTHON_3_TRAVIS,
+                     "Fails in Python 3 on Travis but not reproducible 
locally")
+    def test_add_unparseable_file_after_sched_start_creates_import_error(self):
+        try:
+            dags_folder = mkdtemp()
+            unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME)
+            self.run_single_scheduler_loop_with_no_dags(dags_folder)
+
+            with open(unparseable_filename, 'w') as unparseable_file:
+                unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+            self.run_single_scheduler_loop_with_no_dags(dags_folder)
+        finally:
+            shutil.rmtree(dags_folder)
+
+        session = settings.Session()
+        import_errors = session.query(models.ImportError).all()
+
+        self.assertEqual(len(import_errors), 1)
+        import_error = import_errors[0]
+        self.assertEqual(import_error.filename,
+                         unparseable_filename)
+        self.assertEqual(import_error.stacktrace,
+                         "invalid syntax ({}, line 
1)".format(TEMP_DAG_FILENAME))
+
+    def test_no_import_errors_with_parseable_dag(self):
+        try:
+            dags_folder = mkdtemp()
+            parseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME)
+
+            with open(parseable_filename, 'w') as parseable_file:
+                parseable_file.writelines(PARSEABLE_DAG_FILE_CONTENTS)
+            self.run_single_scheduler_loop_with_no_dags(dags_folder)
+        finally:
+            shutil.rmtree(dags_folder)
+
+        session = settings.Session()
+        import_errors = session.query(models.ImportError).all()
+
+        self.assertEqual(len(import_errors), 0)
+
+    @unittest.skipIf(IS_PYTHON_3_TRAVIS,
+                     "Fails in Python 3 on Travis but not reproducible 
locally")
+    def test_new_import_error_replaces_old(self):
+        try:
+            dags_folder = mkdtemp()
+            unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME)
+
+            # Generate original import error
+            with open(unparseable_filename, 'w') as unparseable_file:
+                unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+            self.run_single_scheduler_loop_with_no_dags(dags_folder)
+
+            # Generate replacement import error (the error will be on the 
second line now)
+            with open(unparseable_filename, 'w') as unparseable_file:
+                unparseable_file.writelines(
+                    PARSEABLE_DAG_FILE_CONTENTS +
+                    os.linesep +
+                    UNPARSEABLE_DAG_FILE_CONTENTS)
+            self.run_single_scheduler_loop_with_no_dags(dags_folder)
+        finally:
+            shutil.rmtree(dags_folder)
+
+        session = settings.Session()
+        import_errors = session.query(models.ImportError).all()
+
+        self.assertEqual(len(import_errors), 1)
+        import_error = import_errors[0]
+        self.assertEqual(import_error.filename,
+                         unparseable_filename)
+        self.assertEqual(import_error.stacktrace,
+                         "invalid syntax ({}, line 
2)".format(TEMP_DAG_FILENAME))
+
+    def test_remove_error_clears_import_error(self):
+        try:
+            dags_folder = mkdtemp()
+            filename_to_parse = os.path.join(dags_folder, TEMP_DAG_FILENAME)
+
+            # Generate original import error
+            with open(filename_to_parse, 'w') as file_to_parse:
+                file_to_parse.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+            self.run_single_scheduler_loop_with_no_dags(dags_folder)
+
+            # Remove the import error from the file
+            with open(filename_to_parse, 'w') as file_to_parse:
+                file_to_parse.writelines(
+                    PARSEABLE_DAG_FILE_CONTENTS)
+            self.run_single_scheduler_loop_with_no_dags(dags_folder)
+        finally:
+            shutil.rmtree(dags_folder)
+
+        session = settings.Session()
+        import_errors = session.query(models.ImportError).all()
+
+        self.assertEqual(len(import_errors), 0)
+
+    def test_remove_file_clears_import_error(self):
+        try:
+            dags_folder = mkdtemp()
+            filename_to_parse = os.path.join(dags_folder, TEMP_DAG_FILENAME)
+
+            # Generate original import error
+            with open(filename_to_parse, 'w') as file_to_parse:
+                file_to_parse.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+            self.run_single_scheduler_loop_with_no_dags(dags_folder)
+        finally:
+            shutil.rmtree(dags_folder)
+
+        # Rerun the scheduler once the dag file has been removed
+        self.run_single_scheduler_loop_with_no_dags(dags_folder)
+
+        session = settings.Session()
+        import_errors = session.query(models.ImportError).all()
+
+        self.assertEqual(len(import_errors), 0)

Reply via email to