This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 36e2e43def Remove double collection of dags in `airflow dags 
reserialize`  (#27030)
36e2e43def is described below

commit 36e2e43def6a27d9bf2cab4d27d104414bea3f7f
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Oct 13 19:29:00 2022 +0100

    Remove double collection of dags in `airflow dags reserialize`  (#27030)
    
    We explicitly call dagbag.collect_dags after instantiating DagBag in
    airflow dags reserialize code.
    
    The method collect_dags is called on instantiation
    of the DagBag so calling it again means more processing of the same dags.
    
    Here, we use a variable to achieve the same needed effect on reserialization
    
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
 airflow/models/dagbag.py    | 12 +++++++-----
 airflow/utils/db.py         |  4 ++--
 tests/models/test_dagbag.py | 13 +++++++++++++
 3 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index cabc142f31..849498c1e4 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -98,6 +98,7 @@ class DagBag(LoggingMixin):
         read_dags_from_db: bool = False,
         store_serialized_dags: bool | None = None,
         load_op_links: bool = True,
+        collect_dags: bool = True,
     ):
         # Avoid circular import
         from airflow.models.dag import DAG
@@ -137,11 +138,12 @@ class DagBag(LoggingMixin):
 
         self.dagbag_import_error_tracebacks = conf.getboolean('core', 
'dagbag_import_error_tracebacks')
         self.dagbag_import_error_traceback_depth = conf.getint('core', 
'dagbag_import_error_traceback_depth')
-        self.collect_dags(
-            dag_folder=dag_folder,
-            include_examples=include_examples,
-            safe_mode=safe_mode,
-        )
+        if collect_dags:
+            self.collect_dags(
+                dag_folder=dag_folder,
+                include_examples=include_examples,
+                safe_mode=safe_mode,
+            )
         # Should the extra operator link be loaded via plugins?
         # This flag is set to False in Scheduler so that Extra Operator links 
are not loaded
         self.load_op_links = load_op_links
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index f796156f8f..d581925791 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -831,8 +831,8 @@ def reserialize_dags(*, session: Session = NEW_SESSION) -> 
None:
     from airflow.models.serialized_dag import SerializedDagModel
 
     session.query(SerializedDagModel).delete(synchronize_session=False)
-    dagbag = DagBag()
-    dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+    dagbag = DagBag(collect_dags=False)
+    dagbag.collect_dags(only_if_updated=False)
     dagbag.sync_to_db(session=session)
 
 
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index ed20d41ef0..f55a31400d 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -1060,3 +1060,16 @@ class TestDagBag:
         dagbag = DagBag(dag_folder=dag_file, include_examples=False)
         assert len(dagbag.dag_ids) == 0
         assert "has no tags" in dagbag.import_errors[dag_file]
+
+    def test_dagbag_dag_collection(self):
+
+        dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False, 
collect_dags=False)
+        # since collect_dags is False, dagbag.dags should be empty
+        assert not dagbag.dags
+
+        dagbag.collect_dags()
+        assert dagbag.dags
+
+        # test that dagbag.dags is not empty if collect_dags is True
+        dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
+        assert dagbag.dags

Reply via email to