This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-2-test by this push:
     new 55fa5d8  Reduce DB load incurred by Stale DAG deactivation (#21399)
55fa5d8 is described below

commit 55fa5d8d047e343619015b24b3acc81828963694
Author: Sam Wheating <sam.wheat...@shopify.com>
AuthorDate: Sun Mar 20 00:17:42 2022 -0700

    Reduce DB load incurred by Stale DAG deactivation (#21399)
    
    Deactivating stale DAGs periodically in bulk
    
    By moving this logic into the DagFileProcessorManager and running it across 
all processed file periodically, we can prevent the use of un-indexed queries.
    
    The basic logic is that we can look at the last processed time of a file 
(for a given processor) and compare that to the last_parsed_time of an entry in 
the dag table. If the file has been processed significantly more recently than 
the DAG has been updated, then its safe to assume that the DAG is missing and 
can be marked inactive.
    
    (cherry picked from commit f309ea78f7d8b62383bc41eac217681a0916382b)
---
 airflow/config_templates/config.yml          |  8 +++++
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/dag_processing/manager.py            | 43 ++++++++++++++++++++++++-
 airflow/dag_processing/processor.py          | 11 -------
 tests/dag_processing/test_manager.py         | 48 +++++++++++++++++++++++++++-
 tests/dag_processing/test_processor.py       | 25 ---------------
 6 files changed, 101 insertions(+), 38 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 1e77041..cc5817a 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1767,6 +1767,14 @@
       type: string
       example: ~
       default: "30"
+    - name: deactivate_stale_dags_interval
+      description: |
+        How often (in seconds) to check for stale DAGs (DAGs which are no 
longer present in
+        the expected files) which should be deactivated.
+      version_added: 2.3.0
+      type: integer
+      example: ~
+      default: "60"
     - name: dag_dir_list_interval
       description: |
         How often (in seconds) to scan the DAGs directory for new files. 
Default to 5 minutes.
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 826eaf4..8bd20ed 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -886,6 +886,10 @@ scheduler_idle_sleep_time = 1
 # this interval. Keeping this number low will increase CPU usage.
 min_file_process_interval = 30
 
+# How often (in seconds) to check for stale DAGs (DAGs which are no longer 
present in
+# the expected files) which should be deactivated.
+deactivate_stale_dags_interval = 60
+
 # How often (in seconds) to scan the DAGs directory for new files. Default to 
5 minutes.
 dag_dir_list_interval = 300
 
diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 6c78aa6..315a3a5 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -468,9 +468,12 @@ class DagFileProcessorManager(LoggingMixin):
         self.last_stat_print_time = 0
         # TODO: Remove magic number
         self._zombie_query_interval = 10
+        # Last time we cleaned up DAGs which are no longer in files
+        self.last_deactivate_stale_dags_time = 
timezone.make_aware(datetime.fromtimestamp(0))
+        # How often to check for DAGs which are no longer in files
+        self.deactivate_stale_dags_interval = conf.getint('scheduler', 
'deactivate_stale_dags_interval')
         # How long to wait before timing out a process to parse a DAG file
         self._processor_timeout = processor_timeout
-
         # How often to scan the DAGs directory for new files. Default to 5 
minutes.
         self.dag_dir_list_interval = conf.getint('scheduler', 
'dag_dir_list_interval')
 
@@ -519,6 +522,43 @@ class DagFileProcessorManager(LoggingMixin):
 
         return self._run_parsing_loop()
 
+    @provide_session
+    def _deactivate_stale_dags(self, session=None):
+        """Detects DAGs which are no longer present in files and deactivate 
them."""
+        now = timezone.utcnow()
+        elapsed_time_since_refresh = (now - 
self.last_deactivate_stale_dags_time).total_seconds()
+        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+            last_parsed = {
+                fp: self.get_last_finish_time(fp) for fp in self.file_paths if 
self.get_last_finish_time(fp)
+            }
+            to_deactivate = set()
+            dags_parsed = (
+                session.query(DagModel.dag_id, DagModel.fileloc, 
DagModel.last_parsed_time)
+                .filter(DagModel.is_active)
+                .all()
+            )
+            for dag in dags_parsed:
+                # The largest valid difference between a DagFileStat's 
last_finished_time and a DAG's
+                # last_parsed_time is _processor_timeout. Longer than that 
indicates that the DAG is
+                # no longer present in the file.
+                if (
+                    dag.fileloc in last_parsed
+                    and (dag.last_parsed_time + self._processor_timeout) < 
last_parsed[dag.fileloc]
+                ):
+                    self.log.info(f"DAG {dag.dag_id} is missing and will be 
deactivated.")
+                    to_deactivate.add(dag.dag_id)
+
+            if to_deactivate:
+                deactivated = (
+                    session.query(DagModel)
+                    .filter(DagModel.dag_id.in_(to_deactivate))
+                    .update({DagModel.is_active: False}, 
synchronize_session="fetch")
+                )
+                if deactivated:
+                    self.log.info("Deactivated %i DAGs which are no longer 
present in file.", deactivated)
+
+            self.last_deactivate_stale_dags_time = timezone.utcnow()
+
     def _run_parsing_loop(self):
 
         # In sync mode we want timeout=None -- wait forever until a message is 
received
@@ -581,6 +621,7 @@ class DagFileProcessorManager(LoggingMixin):
                 self.waitables.pop(sentinel)
                 self._processors.pop(processor.file_path)
 
+            self._deactivate_stale_dags()
             self._refresh_dag_dir()
             self._find_zombies()
 
diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index e59c818..a6cf372 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -648,8 +648,6 @@ class DagFileProcessor(LoggingMixin):
             Stats.incr('dag_file_refresh_error', 1, 1)
             return 0, 0
 
-        self._deactivate_missing_dags(session, dagbag, file_path)
-
         if len(dagbag.dags) > 0:
             self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), 
file_path)
         else:
@@ -679,12 +677,3 @@ class DagFileProcessor(LoggingMixin):
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
-
-    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, 
file_path: str) -> None:
-        deactivated = (
-            session.query(DagModel)
-            .filter(DagModel.fileloc == file_path, DagModel.is_active, 
~DagModel.dag_id.in_(dagbag.dag_ids))
-            .update({DagModel.is_active: False}, synchronize_session="fetch")
-        )
-        if deactivated:
-            self.log.info("Deactivated %i DAGs which are no longer present in 
%s", deactivated, file_path)
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index b549b2b..abba415 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -465,7 +465,6 @@ class TestDagFileProcessorManager:
             pickle_dags=False,
             async_mode=True,
         )
-
         dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
         with create_session() as session:
             session.query(LJ).delete()
@@ -596,6 +595,53 @@ class TestDagFileProcessorManager:
             child_pipe.close()
             parent_pipe.close()
 
+    def test_deactivate_stale_dags(self):
+        """
+        Ensure that DAGs are marked inactive when the file is parsed but the
+        DagModel.last_parsed_time is not updated.
+        """
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=1,
+            processor_timeout=timedelta(minutes=10),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+        test_dag_path = str(TEST_DAG_FOLDER / 'test_example_bash_operator.py')
+        dagbag = DagBag(test_dag_path, read_dags_from_db=False)
+
+        with create_session() as session:
+            # Add stale DAG to the DB
+            dag = dagbag.get_dag('test_example_bash_operator')
+            dag.last_parsed_time = timezone.utcnow()
+            dag.sync_to_db()
+
+            # Add DAG to the file_parsing_stats
+            stat = DagFileStat(
+                num_dags=1,
+                import_errors=0,
+                last_finish_time=timezone.utcnow() + timedelta(hours=1),
+                last_duration=1,
+                run_count=1,
+            )
+            manager._file_paths = [test_dag_path]
+            manager._file_stats[test_dag_path] = stat
+
+            active_dags = (
+                session.query(DagModel).filter(DagModel.is_active, 
DagModel.fileloc == test_dag_path).all()
+            )
+            assert len(active_dags) == 1
+
+            manager._file_stats[test_dag_path] = stat
+            manager._deactivate_stale_dags()
+            active_dags = (
+                session.query(DagModel).filter(DagModel.is_active, 
DagModel.fileloc == test_dag_path).all()
+            )
+
+            assert len(active_dags) == 0
+
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", 
new_callable=PropertyMock)
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
     def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index c9ecfb0..c0d2267 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -700,31 +700,6 @@ class TestDagFileProcessor:
             assert import_error.stacktrace == 
expected_stacktrace.format(invalid_dag_filename)
             session.rollback()
 
-    def test_process_file_should_deactivate_missing_dags(self):
-
-        dag_file = os.path.join(
-            os.path.dirname(os.path.realpath(__file__)), 
'../dags/test_only_dummy_tasks.py'
-        )
-
-        # write a DAG into the DB which is not present in its specified file
-        with create_session() as session:
-            orm_dag = DagModel(dag_id='missing_dag', is_active=True, 
fileloc=dag_file)
-            session.merge(orm_dag)
-            session.commit()
-
-        session = settings.Session()
-
-        dags = session.query(DagModel).all()
-        assert [dag.dag_id for dag in dags if dag.is_active] == ['missing_dag']
-
-        # re-parse the file and see that the DAG is no longer there
-        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
-        dag_file_processor.process_file(dag_file, [])
-
-        dags = session.query(DagModel).all()
-        assert [dag.dag_id for dag in dags if dag.is_active] == 
['test_only_dummy_tasks']
-        assert [dag.dag_id for dag in dags if not dag.is_active] == 
['missing_dag']
-
 
 class TestProcessorAgent:
     @pytest.fixture(autouse=True)

Reply via email to