This is an automated email from the ASF dual-hosted git repository.
gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4b510de4e8c Fix zip DAG import errors being cleared during bundle
refresh (#63617)
4b510de4e8c is described below
commit 4b510de4e8c3293e81e70ea36370e8c4c0ea5e03
Author: GPK <[email protected]>
AuthorDate: Tue Mar 24 16:23:56 2026 +0000
Fix zip DAG import errors being cleared during bundle refresh (#63617)
* Fix zip DAG import errors being cleared during bundle refresh
* Fixup tests
* Resolve comments
* Update types
* Resolve comments
---
airflow-core/src/airflow/dag_processing/manager.py | 35 +++---
airflow-core/src/airflow/models/dag.py | 2 +-
.../tests/unit/dag_processing/test_manager.py | 124 ++++++++++++++++++++-
airflow-core/tests/unit/models/test_dag.py | 2 +-
4 files changed, 142 insertions(+), 21 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 10b29c0560e..7d4e20adfa8 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -690,10 +690,11 @@ class DagFileProcessorManager(LoggingMixin):
known_files[bundle.name] = found_files
- self.deactivate_deleted_dags(bundle_name=bundle.name,
present=found_files)
+ observed_filelocs = self._get_observed_filelocs(found_files)
+ self.deactivate_deleted_dags(bundle_name=bundle.name,
observed_filelocs=observed_filelocs)
self.clear_orphaned_import_errors(
bundle_name=bundle.name,
- observed_filelocs={str(x.rel_path) for x in found_files}, #
todo: make relative
+ observed_filelocs=observed_filelocs,
)
if any_refreshed:
@@ -710,17 +711,17 @@ class DagFileProcessorManager(LoggingMixin):
return rel_paths
- def deactivate_deleted_dags(self, bundle_name: str, present:
set[DagFileInfo]) -> None:
- """Deactivate DAGs that come from files that are no longer present in
bundle."""
-
- def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
- """
- Find dag files in zip file located at abs_path.
+ def _get_observed_filelocs(self, present: set[DagFileInfo]) -> set[str]:
+ """
+ Return observed DAG source paths for bundle entries.
- We return the abs "paths" formed by joining the relative path
inside the zip
- with the path to the zip.
+ For regular files this includes the relative file path.
+ For ZIP archives this includes DAG-like inner paths such as
+ ``archive.zip/dag.py``.
+ """
- """
+ def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
+ """Yield absolute paths for DAG-like files inside a ZIP archive."""
try:
with zipfile.ZipFile(abs_path) as z:
for info in z.infolist():
@@ -729,22 +730,26 @@ class DagFileProcessorManager(LoggingMixin):
except zipfile.BadZipFile:
self.log.exception("There was an error accessing ZIP file %s",
abs_path)
- rel_filelocs: list[str] = []
+ observed_filelocs: set[str] = set()
for info in present:
abs_path = str(info.absolute_path)
if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
- rel_filelocs.append(str(info.rel_path))
+ observed_filelocs.add(str(info.rel_path))
else:
if TYPE_CHECKING:
assert info.bundle_path
for abs_sub_path in
find_zipped_dags(abs_path=info.absolute_path):
rel_sub_path =
Path(abs_sub_path).relative_to(info.bundle_path)
- rel_filelocs.append(str(rel_sub_path))
+ observed_filelocs.add(str(rel_sub_path))
+ return observed_filelocs
+
+ def deactivate_deleted_dags(self, bundle_name: str, observed_filelocs:
set[str]) -> None:
+ """Deactivate DAGs that come from files that are no longer present in
bundle."""
with create_session() as session:
any_deactivated = DagModel.deactivate_deleted_dags(
bundle_name=bundle_name,
- rel_filelocs=rel_filelocs,
+ rel_filelocs=observed_filelocs,
session=session,
)
# Only run cleanup if we actually deactivated any DAGs
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 677fbc26048..d4a74b6cc24 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -588,7 +588,7 @@ class DagModel(Base):
def deactivate_deleted_dags(
cls,
bundle_name: str,
- rel_filelocs: list[str],
+ rel_filelocs: set[str],
session: Session = NEW_SESSION,
) -> bool:
"""
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 40a8c4f0b66..e728ce85524 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -26,6 +26,7 @@ import shutil
import signal
import textwrap
import time
+import zipfile
from collections import defaultdict, deque
from datetime import datetime, timedelta
from pathlib import Path
@@ -110,6 +111,39 @@ def encode_mtime_in_filename(val):
return out
+def _create_zip_bundle_with_valid_and_broken_dags(zip_path: Path) -> None:
+ with zipfile.ZipFile(zip_path, "w") as zf:
+ zf.writestr(
+ "valid_dag.py",
+ textwrap.dedent(
+ """
+ from datetime import datetime
+
+ from airflow.providers.standard.operators.empty import
EmptyOperator
+ from airflow.sdk import DAG
+
+ with DAG(
+ dag_id="zip_valid_dag",
+ start_date=datetime(2024, 1, 1),
+ schedule=None,
+ catchup=False,
+ ):
+ EmptyOperator(task_id="task")
+ """
+ ),
+ )
+ zf.writestr(
+ "broken_dag.py",
+ textwrap.dedent(
+ """
+ from airflow.sdk import DAG
+
+ raise RuntimeError("broken zip dag")
+ """
+ ),
+ )
+
+
class TestDagFileProcessorManager:
@pytest.fixture(autouse=True)
def _disable_examples(self):
@@ -191,6 +225,88 @@ class TestDagFileProcessorManager:
assert len(import_errors) == 0
session.rollback()
+ @pytest.mark.usefixtures("clear_parse_import_errors")
+ def test_clear_orphaned_import_errors_keeps_zip_inner_file_errors(self,
session, tmp_path):
+ zip_path = tmp_path / "test_zip.zip"
+ _create_zip_bundle_with_valid_and_broken_dags(zip_path)
+
+ session.add(
+ ParseImportError(
+ filename="test_zip.zip/broken_dag.py",
+ bundle_name="testing",
+ timestamp=timezone.utcnow(),
+ stacktrace="zip import error",
+ )
+ )
+ session.flush()
+
+ manager = DagFileProcessorManager(max_runs=1)
+ manager.clear_orphaned_import_errors(
+ bundle_name="testing",
+ observed_filelocs=manager._get_observed_filelocs(
+ {
+ DagFileInfo(
+ bundle_name="testing",
+ rel_path=Path("test_zip.zip"),
+ bundle_path=tmp_path,
+ )
+ }
+ ),
+ session=session,
+ )
+ session.flush()
+
+ import_errors = session.scalars(select(ParseImportError)).all()
+ assert len(import_errors) == 1
+ assert import_errors[0].filename == "test_zip.zip/broken_dag.py"
+
+ def test_get_observed_filelocs_expands_zip_inner_paths(self, tmp_path):
+ zip_path = tmp_path / "test_zip.zip"
+ _create_zip_bundle_with_valid_and_broken_dags(zip_path)
+
+ manager = DagFileProcessorManager(max_runs=1)
+ observed_filelocs = manager._get_observed_filelocs(
+ {
+ DagFileInfo(
+ bundle_name="testing",
+ rel_path=Path("test_zip.zip"),
+ bundle_path=tmp_path,
+ )
+ }
+ )
+
+ assert observed_filelocs == {
+ "test_zip.zip/valid_dag.py",
+ "test_zip.zip/broken_dag.py",
+ }
+
+ @pytest.mark.usefixtures("clear_parse_import_errors")
+ def test_refresh_dag_bundles_keeps_zip_inner_file_errors(self, session,
tmp_path, configure_dag_bundles):
+ bundle_path = tmp_path / "bundleone"
+ bundle_path.mkdir()
+ zip_path = bundle_path / "test_zip.zip"
+ _create_zip_bundle_with_valid_and_broken_dags(zip_path)
+
+ session.add(
+ ParseImportError(
+ filename="test_zip.zip/broken_dag.py",
+ bundle_name="bundleone",
+ timestamp=timezone.utcnow(),
+ stacktrace="zip import error",
+ )
+ )
+ session.flush()
+
+ with configure_dag_bundles({"bundleone": bundle_path}):
+ DagBundlesManager().sync_bundles_to_db()
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
+ manager._refresh_dag_bundles({})
+
+ import_errors = session.scalars(select(ParseImportError)).all()
+ assert len(import_errors) == 1
+ assert import_errors[0].filename == "test_zip.zip/broken_dag.py"
+
@conf_vars({("core", "load_examples"): "False"})
def test_max_runs_when_no_files(self, tmp_path):
with conf_vars({("core", "dags_folder"): str(tmp_path)}):
@@ -881,7 +997,7 @@ class TestDagFileProcessorManager:
]
manager = DagFileProcessorManager(max_runs=1)
- manager.deactivate_deleted_dags("dag_maker", active_files)
+ manager.deactivate_deleted_dags("dag_maker",
manager._get_observed_filelocs(set(active_files)))
# The DAG from test_dag1.py is still active
assert session.get(DagModel, "test_dag1").is_stale is False
@@ -892,14 +1008,14 @@ class TestDagFileProcessorManager:
("rel_filelocs", "expected_return", "expected_dag1_stale",
"expected_dag2_stale"),
[
pytest.param(
- ["test_dag1.py"], # Only dag1 present, dag2 deleted
+ {"test_dag1.py"}, # Only dag1 present, dag2 deleted
True, # Should return True
False, # dag1 should not be stale
True, # dag2 should be stale
id="dags_deactivated",
),
pytest.param(
- ["test_dag1.py", "test_dag2.py"], # Both files present
+ {"test_dag1.py", "test_dag2.py"}, # Both files present
False, # Should return False
False, # dag1 should not be stale
False, # dag2 should not be stale
@@ -972,7 +1088,7 @@ class TestDagFileProcessorManager:
dag_maker.sync_dagbag_to_db()
manager = DagFileProcessorManager(max_runs=1)
- manager.deactivate_deleted_dags("dag_maker", active_files)
+ manager.deactivate_deleted_dags("dag_maker",
manager._get_observed_filelocs(set(active_files)))
if should_call_cleanup:
mock_remove_references.assert_called_once()
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 046c85ea799..11a1c4818fc 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -842,7 +842,7 @@ class TestDag:
DagModel.deactivate_deleted_dags(
bundle_name=orm_dag.bundle_name,
- rel_filelocs=list_py_file_paths(settings.DAGS_FOLDER),
+ rel_filelocs=set(list_py_file_paths(settings.DAGS_FOLDER)),
)
orm_dag = session.scalar(select(DagModel).where(DagModel.dag_id ==
dag_id))