This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6235cf8975 Revert "Handle Example dags case when checking for missing 
files (#41856)" (#42193)
6235cf8975 is described below

commit 6235cf897598b03ace2e52bb677b478a5612ccad
Author: Utkarsh Sharma <utkarshar...@gmail.com>
AuthorDate: Thu Sep 12 19:03:24 2024 +0530

    Revert "Handle Example dags case when checking for missing files (#41856)" 
(#42193)
    
    This reverts commit 435e9687b0c56499bc29c21d3cada8ae9e0a8c53.
---
 airflow/dag_processing/manager.py       | 11 ++--
 tests/dag_processing/test_job_runner.py | 89 +++++++++++++++++----------------
 2 files changed, 48 insertions(+), 52 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 7e404307dc..fee515dc07 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -41,7 +41,6 @@ from sqlalchemy import delete, select, update
 from tabulate import tabulate
 
 import airflow.models
-from airflow import example_dags
 from airflow.api_internal.internal_api_call import internal_api_call
 from airflow.callbacks.callback_requests import CallbackRequest, 
SlaCallbackRequest
 from airflow.configuration import conf
@@ -70,8 +69,6 @@ from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import prohibit_commit, with_row_locks
 
-example_dag_folder = next(iter(example_dags.__path__))
-
 if TYPE_CHECKING:
     from multiprocessing.connection import Connection as 
MultiprocessingConnection
 
@@ -530,11 +527,9 @@ class DagFileProcessorManager(LoggingMixin):
 
         for dag in dags_parsed:
             # When the DAG processor runs as part of the scheduler, and the 
user changes the DAGs folder,
-            # DAGs from the previous DAGs folder will be marked as stale. We 
also need to handle example dags
-            # differently. Note that this change has no impact on standalone 
DAG processors.
-            dag_not_in_current_dag_folder = (
-                not os.path.commonpath([dag.fileloc, example_dag_folder]) == 
example_dag_folder
-            ) and (os.path.commonpath([dag.fileloc, dag_directory]) != 
dag_directory)
+            # DAGs from the previous DAGs folder will be marked as stale. Note 
that this change has no impact
+            # on standalone DAG processors.
+            dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, 
dag_directory]) != dag_directory
             # The largest valid difference between a DagFileStat's 
last_finished_time and a DAG's
             # last_parsed_time is the processor_timeout. Longer than that 
indicates that the DAG is
             # no longer present in the file. We have a stale_dag_threshold 
configured to prevent a
diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index 0e15a2d1f6..9b8437d77d 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -773,57 +773,58 @@ class TestDagProcessorJobRunner:
         def get_dag_string(filename) -> str:
             return open(TEST_DAG_FOLDER / filename).read()
 
-        def add_dag_to_db(file_path, dag_id, processor_subdir):
-            dagbag = DagBag(file_path, read_dags_from_db=False)
-            dag = dagbag.get_dag(dag_id)
-            dag.fileloc = file_path
-            dag.last_parsed_time = timezone.utcnow()
-            dag.sync_to_db(processor_subdir=processor_subdir)
+        with tempfile.TemporaryDirectory() as tmpdir:
+            old_dag_home = tempfile.mkdtemp(dir=tmpdir)
+            old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, 
suffix=".py")
+            
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
+            old_dag_file.flush()
+            new_dag_home = tempfile.mkdtemp(dir=tmpdir)
+            new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, 
suffix=".py")
+            
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
+            new_dag_file.flush()
+
+            manager = DagProcessorJobRunner(
+                job=Job(),
+                processor=DagFileProcessorManager(
+                    dag_directory=new_dag_home,
+                    max_runs=1,
+                    processor_timeout=timedelta(minutes=10),
+                    signal_conn=MagicMock(),
+                    dag_ids=[],
+                    pickle_dags=False,
+                    async_mode=True,
+                ),
+            )
 
-        def create_dag_folder(dag_id):
-            dag_home = tempfile.mkdtemp(dir=tmpdir)
-            dag_file = tempfile.NamedTemporaryFile(dir=dag_home, suffix=".py")
-            dag_file.write(get_dag_string(dag_id).encode())
-            dag_file.flush()
-            return dag_home, dag_file
+            dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
+            other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)
 
-        with tempfile.TemporaryDirectory() as tmpdir:
-            old_dag_home, old_dag_file = 
create_dag_folder("test_example_bash_operator.py")
-            new_dag_home, new_dag_file = 
create_dag_folder("test_scheduler_dags.py")
-            example_dag_home, example_dag_file = 
create_dag_folder("test_dag_warnings.py")
-
-            with 
mock.patch("airflow.dag_processing.manager.example_dag_folder", 
example_dag_home):
-                manager = DagProcessorJobRunner(
-                    job=Job(),
-                    processor=DagFileProcessorManager(
-                        dag_directory=new_dag_home,
-                        max_runs=1,
-                        processor_timeout=timedelta(minutes=10),
-                        signal_conn=MagicMock(),
-                        dag_ids=[],
-                        pickle_dags=False,
-                        async_mode=True,
-                    ),
-                )
+            with create_session() as session:
+                # Add DAG from old dah home to the DB
+                dag = dagbag.get_dag("test_example_bash_operator")
+                dag.fileloc = old_dag_file.name
+                dag.last_parsed_time = timezone.utcnow()
+                dag.sync_to_db(processor_subdir=old_dag_home)
 
-                with create_session() as session:
-                    add_dag_to_db(old_dag_file.name, 
"test_example_bash_operator", old_dag_home)
-                    add_dag_to_db(new_dag_file.name, 
"test_start_date_scheduling", new_dag_home)
-                    add_dag_to_db(example_dag_file.name, "test_dag_warnings", 
example_dag_home)
+                # Add DAG from new DAG home to the DB
+                other_dag = other_dagbag.get_dag("test_start_date_scheduling")
+                other_dag.fileloc = new_dag_file.name
+                other_dag.last_parsed_time = timezone.utcnow()
+                other_dag.sync_to_db(processor_subdir=new_dag_home)
 
-                    manager.processor._file_paths = [new_dag_file, 
example_dag_file]
+                manager.processor._file_paths = [new_dag_file]
 
-                    active_dag_count = (
-                        
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
-                    )
-                    assert active_dag_count == 3
+                active_dag_count = (
+                    
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
+                )
+                assert active_dag_count == 2
 
-                    manager.processor._scan_stale_dags()
+                manager.processor._scan_stale_dags()
 
-                    active_dag_count = (
-                        
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
-                    )
-                    assert active_dag_count == 2
+                active_dag_count = (
+                    
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
+                )
+                assert active_dag_count == 1
 
     @mock.patch(
         
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", 
new_callable=PropertyMock

Reply via email to