This is an automated email from the ASF dual-hosted git repository.
ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 34ef2503e1d Add purge-warnings and bundle-refresh override seams to
DagFileProcessorManager (#66107)
34ef2503e1d is described below
commit 34ef2503e1d698decb85971af79ad43260d04f6e
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu May 7 14:19:22 2026 +0100
Add purge-warnings and bundle-refresh override seams to
DagFileProcessorManager (#66107)
* Add purge-warnings and bundle-refresh override seams to
DagFileProcessorManager
The projected AIP-92 DB-to-API swap needs DagFileProcessorManager subclasses
that route operations through the API server instead of the metadata DB.
Two spots in the current implementation are awkward to override cleanly:
- _run_parsing_loop calls DagWarning.purge_inactive_dag_warnings() directly,
forcing subclasses to reimplement the parsing loop just to redirect or
skip the cleanup. Promote it to an overridable instance method,
purge_inactive_dag_warnings(), so subclasses can forward it to an API or
no-op it without touching _run_parsing_loop.
- Bundle refreshes triggered by external events (API callbacks, coordinator
messages) had to mutate the private _force_refresh_bundles set directly.
Expose request_bundle_refresh() as the public seam so event handlers can
mark a bundle for refresh without reaching into private state.
* Make request_bundle_refresh accept multiple bundle names
* Harden DAG bundle refresh seam
---
airflow-core/src/airflow/dag_processing/manager.py | 26 ++++++++++++--
.../tests/unit/dag_processing/test_manager.py | 41 ++++++++++++++++++++++
2 files changed, 65 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 8d497ca7508..6e663da93a9 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -478,7 +478,7 @@ class DagFileProcessorManager(LoggingMixin):
self._add_callback_to_queue(callback)
self._scan_stale_dags()
self._cleanup_stale_bundle_versions()
- DagWarning.purge_inactive_dag_warnings()
+ self.purge_inactive_dag_warnings()
# Update number of loop iteration.
self._num_run += 1
@@ -525,7 +525,7 @@ class DagFileProcessorManager(LoggingMixin):
"""Queue any files requested for parsing as requested by users via
UI/API."""
files = self.claim_priority_files()
self._add_files_to_queue(files, mode="frontprio")
- self._force_refresh_bundles |= {file.bundle_name for file in files}
+ self.request_bundle_refresh(file.bundle_name for file in files)
if self._force_refresh_bundles:
self.log.info("Bundles being force refreshed: %s", ",
".join(self._force_refresh_bundles))
@@ -537,6 +537,19 @@ class DagFileProcessorManager(LoggingMixin):
"""
return self._claim_priority_files()
+ def request_bundle_refresh(self, bundle_names: str | Iterable[str]) ->
None:
+ """
+ Request that the given bundles be refreshed on the next refresh tick.
+
+ Use this from event handlers reacting to external signals to mark
+ bundles as needing refresh; the next call to
:meth:`_refresh_dag_bundles`
+ will not skip them via :meth:`should_skip_refresh`.
+ """
+ if isinstance(bundle_names, str):
+ self._force_refresh_bundles.add(bundle_names)
+ return
+ self._force_refresh_bundles.update(bundle_names)
+
def should_skip_refresh(
self,
*,
@@ -691,6 +704,15 @@ class DagFileProcessorManager(LoggingMixin):
values["version"] = version
session.execute(update(DagBundleModel).where(DagBundleModel.name ==
bundle_name).values(**values))
+ def purge_inactive_dag_warnings(self) -> None:
+ """
+ Purge warnings for inactive/stale DAGs.
+
+ Default implementation deletes records from the metadata DB; override
to
+ source warnings from an API or skip the cleanup entirely.
+ """
+ DagWarning.purge_inactive_dag_warnings()
+
def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
"""Refresh DAG bundles, if required."""
now = timezone.utcnow()
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index c10e73473bf..2296d1b428b 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -733,6 +733,24 @@ class TestDagFileProcessorManager:
assert manager._file_queue == deque([file1, file2])
assert manager._force_refresh_bundles == {"dags-folder"}
+ def test_request_bundle_refresh_marks_bundles_for_refresh(self):
+ """`request_bundle_refresh` adds the bundles to the force-refresh
set."""
+ manager = DagFileProcessorManager(max_runs=1)
+ assert manager._force_refresh_bundles == set()
+
+ manager.request_bundle_refresh(["bundleone", "bundletwo"])
+ manager.request_bundle_refresh(["bundleone"]) # idempotent
+
+ assert manager._force_refresh_bundles == {"bundleone", "bundletwo"}
+
+ def test_request_bundle_refresh_accepts_single_bundle_name(self):
+ """`request_bundle_refresh` treats a string as one bundle name, not an
iterable."""
+ manager = DagFileProcessorManager(max_runs=1)
+
+ manager.request_bundle_refresh("bundleone")
+
+ assert manager._force_refresh_bundles == {"bundleone"}
+
@pytest.mark.usefixtures("testing_dag_bundle")
def test_scan_stale_dags(self, session):
"""
@@ -2189,6 +2207,29 @@ class TestDagFileProcessorManager:
sync_mock.assert_not_called()
assert [b.name for b in manager._dag_bundles] == ["testing"]
+ def test_purge_inactive_dag_warnings_delegates_to_dagwarning(self):
+ """Default `purge_inactive_dag_warnings` calls
`DagWarning.purge_inactive_dag_warnings`."""
+ manager = DagFileProcessorManager(max_runs=1)
+ with mock.patch(
+
"airflow.dag_processing.manager.DagWarning.purge_inactive_dag_warnings"
+ ) as purge_mock:
+ manager.purge_inactive_dag_warnings()
+ purge_mock.assert_called_once_with()
+
+ def test_run_parsing_loop_uses_overridable_purge(self, tmp_path,
configure_testing_dag_bundle):
+ """`_run_parsing_loop` calls the overridable
`purge_inactive_dag_warnings` seam."""
+ with configure_testing_dag_bundle(tmp_path):
+ manager = DagFileProcessorManager(max_runs=1)
+ with (
+ mock.patch.object(manager, "purge_inactive_dag_warnings") as
purge_mock,
+ mock.patch(
+
"airflow.dag_processing.manager.DagWarning.purge_inactive_dag_warnings"
+ ) as direct_mock,
+ ):
+ manager.run()
+ purge_mock.assert_called()
+ direct_mock.assert_not_called()
+
@mock.patch("airflow.dag_processing.manager.stats.gauge")
def test_stats_total_parse_time(self, statsd_gauge_mock, tmp_path,
configure_testing_dag_bundle):
key = "dag_processing.total_parse_time"