This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch py-client-sync
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 74e32aa8d9f7b2febb864399489f858f0f6d6be9
Author: GPK <[email protected]>
AuthorDate: Tue Mar 24 16:23:56 2026 +0000

    Fix zip DAG import errors being cleared during bundle refresh (#63617)
    
    * Fix zip DAG import errors being cleared during bundle refresh
    
    * Fixup tests
    
    * Resolve comments
    
    * Update types
    
    * Resolve comments
---
 airflow-core/src/airflow/dag_processing/manager.py |  35 +++---
 airflow-core/src/airflow/models/dag.py             |   2 +-
 .../tests/unit/dag_processing/test_manager.py      | 124 ++++++++++++++++++++-
 airflow-core/tests/unit/models/test_dag.py         |   2 +-
 4 files changed, 142 insertions(+), 21 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 10b29c0560e..7d4e20adfa8 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -690,10 +690,11 @@ class DagFileProcessorManager(LoggingMixin):
 
             known_files[bundle.name] = found_files
 
-            self.deactivate_deleted_dags(bundle_name=bundle.name, 
present=found_files)
+            observed_filelocs = self._get_observed_filelocs(found_files)
+            self.deactivate_deleted_dags(bundle_name=bundle.name, 
observed_filelocs=observed_filelocs)
             self.clear_orphaned_import_errors(
                 bundle_name=bundle.name,
-                observed_filelocs={str(x.rel_path) for x in found_files},  # 
todo: make relative
+                observed_filelocs=observed_filelocs,
             )
 
         if any_refreshed:
@@ -710,17 +711,17 @@ class DagFileProcessorManager(LoggingMixin):
 
         return rel_paths
 
-    def deactivate_deleted_dags(self, bundle_name: str, present: 
set[DagFileInfo]) -> None:
-        """Deactivate DAGs that come from files that are no longer present in 
bundle."""
-
-        def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
-            """
-            Find dag files in zip file located at abs_path.
+    def _get_observed_filelocs(self, present: set[DagFileInfo]) -> set[str]:
+        """
+        Return observed DAG source paths for bundle entries.
 
-            We return the abs "paths" formed by joining the relative path 
inside the zip
-            with the path to the zip.
+        For regular files this includes the relative file path.
+        For ZIP archives this includes DAG-like inner paths such as
+        ``archive.zip/dag.py``.
+        """
 
-            """
+        def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
+            """Yield absolute paths for DAG-like files inside a ZIP archive."""
             try:
                 with zipfile.ZipFile(abs_path) as z:
                     for info in z.infolist():
@@ -729,22 +730,26 @@ class DagFileProcessorManager(LoggingMixin):
             except zipfile.BadZipFile:
                 self.log.exception("There was an error accessing ZIP file %s", 
abs_path)
 
-        rel_filelocs: list[str] = []
+        observed_filelocs: set[str] = set()
         for info in present:
             abs_path = str(info.absolute_path)
             if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
-                rel_filelocs.append(str(info.rel_path))
+                observed_filelocs.add(str(info.rel_path))
             else:
                 if TYPE_CHECKING:
                     assert info.bundle_path
                 for abs_sub_path in 
find_zipped_dags(abs_path=info.absolute_path):
                     rel_sub_path = 
Path(abs_sub_path).relative_to(info.bundle_path)
-                    rel_filelocs.append(str(rel_sub_path))
+                    observed_filelocs.add(str(rel_sub_path))
 
+        return observed_filelocs
+
+    def deactivate_deleted_dags(self, bundle_name: str, observed_filelocs: 
set[str]) -> None:
+        """Deactivate DAGs that come from files that are no longer present in 
bundle."""
         with create_session() as session:
             any_deactivated = DagModel.deactivate_deleted_dags(
                 bundle_name=bundle_name,
-                rel_filelocs=rel_filelocs,
+                rel_filelocs=observed_filelocs,
                 session=session,
             )
             # Only run cleanup if we actually deactivated any DAGs
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 677fbc26048..d4a74b6cc24 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -588,7 +588,7 @@ class DagModel(Base):
     def deactivate_deleted_dags(
         cls,
         bundle_name: str,
-        rel_filelocs: list[str],
+        rel_filelocs: set[str],
         session: Session = NEW_SESSION,
     ) -> bool:
         """
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 40a8c4f0b66..e728ce85524 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -26,6 +26,7 @@ import shutil
 import signal
 import textwrap
 import time
+import zipfile
 from collections import defaultdict, deque
 from datetime import datetime, timedelta
 from pathlib import Path
@@ -110,6 +111,39 @@ def encode_mtime_in_filename(val):
     return out
 
 
+def _create_zip_bundle_with_valid_and_broken_dags(zip_path: Path) -> None:
+    with zipfile.ZipFile(zip_path, "w") as zf:
+        zf.writestr(
+            "valid_dag.py",
+            textwrap.dedent(
+                """
+                from datetime import datetime
+
+                from airflow.providers.standard.operators.empty import 
EmptyOperator
+                from airflow.sdk import DAG
+
+                with DAG(
+                    dag_id="zip_valid_dag",
+                    start_date=datetime(2024, 1, 1),
+                    schedule=None,
+                    catchup=False,
+                ):
+                    EmptyOperator(task_id="task")
+                """
+            ),
+        )
+        zf.writestr(
+            "broken_dag.py",
+            textwrap.dedent(
+                """
+                from airflow.sdk import DAG
+
+                raise RuntimeError("broken zip dag")
+                """
+            ),
+        )
+
+
 class TestDagFileProcessorManager:
     @pytest.fixture(autouse=True)
     def _disable_examples(self):
@@ -191,6 +225,88 @@ class TestDagFileProcessorManager:
                 assert len(import_errors) == 0
                 session.rollback()
 
+    @pytest.mark.usefixtures("clear_parse_import_errors")
+    def test_clear_orphaned_import_errors_keeps_zip_inner_file_errors(self, 
session, tmp_path):
+        zip_path = tmp_path / "test_zip.zip"
+        _create_zip_bundle_with_valid_and_broken_dags(zip_path)
+
+        session.add(
+            ParseImportError(
+                filename="test_zip.zip/broken_dag.py",
+                bundle_name="testing",
+                timestamp=timezone.utcnow(),
+                stacktrace="zip import error",
+            )
+        )
+        session.flush()
+
+        manager = DagFileProcessorManager(max_runs=1)
+        manager.clear_orphaned_import_errors(
+            bundle_name="testing",
+            observed_filelocs=manager._get_observed_filelocs(
+                {
+                    DagFileInfo(
+                        bundle_name="testing",
+                        rel_path=Path("test_zip.zip"),
+                        bundle_path=tmp_path,
+                    )
+                }
+            ),
+            session=session,
+        )
+        session.flush()
+
+        import_errors = session.scalars(select(ParseImportError)).all()
+        assert len(import_errors) == 1
+        assert import_errors[0].filename == "test_zip.zip/broken_dag.py"
+
+    def test_get_observed_filelocs_expands_zip_inner_paths(self, tmp_path):
+        zip_path = tmp_path / "test_zip.zip"
+        _create_zip_bundle_with_valid_and_broken_dags(zip_path)
+
+        manager = DagFileProcessorManager(max_runs=1)
+        observed_filelocs = manager._get_observed_filelocs(
+            {
+                DagFileInfo(
+                    bundle_name="testing",
+                    rel_path=Path("test_zip.zip"),
+                    bundle_path=tmp_path,
+                )
+            }
+        )
+
+        assert observed_filelocs == {
+            "test_zip.zip/valid_dag.py",
+            "test_zip.zip/broken_dag.py",
+        }
+
+    @pytest.mark.usefixtures("clear_parse_import_errors")
+    def test_refresh_dag_bundles_keeps_zip_inner_file_errors(self, session, 
tmp_path, configure_dag_bundles):
+        bundle_path = tmp_path / "bundleone"
+        bundle_path.mkdir()
+        zip_path = bundle_path / "test_zip.zip"
+        _create_zip_bundle_with_valid_and_broken_dags(zip_path)
+
+        session.add(
+            ParseImportError(
+                filename="test_zip.zip/broken_dag.py",
+                bundle_name="bundleone",
+                timestamp=timezone.utcnow(),
+                stacktrace="zip import error",
+            )
+        )
+        session.flush()
+
+        with configure_dag_bundles({"bundleone": bundle_path}):
+            DagBundlesManager().sync_bundles_to_db()
+            manager = DagFileProcessorManager(max_runs=1)
+            manager._dag_bundles = 
list(DagBundlesManager().get_all_dag_bundles())
+            manager._refresh_dag_bundles({})
+
+        import_errors = session.scalars(select(ParseImportError)).all()
+        assert len(import_errors) == 1
+        assert import_errors[0].filename == "test_zip.zip/broken_dag.py"
+
     @conf_vars({("core", "load_examples"): "False"})
     def test_max_runs_when_no_files(self, tmp_path):
         with conf_vars({("core", "dags_folder"): str(tmp_path)}):
@@ -881,7 +997,7 @@ class TestDagFileProcessorManager:
         ]
 
         manager = DagFileProcessorManager(max_runs=1)
-        manager.deactivate_deleted_dags("dag_maker", active_files)
+        manager.deactivate_deleted_dags("dag_maker", 
manager._get_observed_filelocs(set(active_files)))
 
         # The DAG from test_dag1.py is still active
         assert session.get(DagModel, "test_dag1").is_stale is False
@@ -892,14 +1008,14 @@ class TestDagFileProcessorManager:
         ("rel_filelocs", "expected_return", "expected_dag1_stale", 
"expected_dag2_stale"),
         [
             pytest.param(
-                ["test_dag1.py"],  # Only dag1 present, dag2 deleted
+                {"test_dag1.py"},  # Only dag1 present, dag2 deleted
                 True,  # Should return True
                 False,  # dag1 should not be stale
                 True,  # dag2 should be stale
                 id="dags_deactivated",
             ),
             pytest.param(
-                ["test_dag1.py", "test_dag2.py"],  # Both files present
+                {"test_dag1.py", "test_dag2.py"},  # Both files present
                 False,  # Should return False
                 False,  # dag1 should not be stale
                 False,  # dag2 should not be stale
@@ -972,7 +1088,7 @@ class TestDagFileProcessorManager:
         dag_maker.sync_dagbag_to_db()
 
         manager = DagFileProcessorManager(max_runs=1)
-        manager.deactivate_deleted_dags("dag_maker", active_files)
+        manager.deactivate_deleted_dags("dag_maker", 
manager._get_observed_filelocs(set(active_files)))
 
         if should_call_cleanup:
             mock_remove_references.assert_called_once()
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 046c85ea799..11a1c4818fc 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -842,7 +842,7 @@ class TestDag:
 
         DagModel.deactivate_deleted_dags(
             bundle_name=orm_dag.bundle_name,
-            rel_filelocs=list_py_file_paths(settings.DAGS_FOLDER),
+            rel_filelocs=set(list_py_file_paths(settings.DAGS_FOLDER)),
         )
 
         orm_dag = session.scalar(select(DagModel).where(DagModel.dag_id == 
dag_id))

Reply via email to