ephraimbuddy commented on code in PR #68558:
URL: https://github.com/apache/airflow/pull/68558#discussion_r3411726357
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -84,27 +91,43 @@ def __init__(
self._lock: RLock | nullcontext = RLock() if self._use_cache else
nullcontext()
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
- """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
+ """Read and cache a SerializedDAG (with its ``dag_hash`` for staleness
detection)."""
serdag.load_op_links = self.load_op_links
dag = serdag.dag
if not dag:
return None
with self._lock:
- self._dags[serdag.dag_version_id] = dag
+ self._dags[serdag.dag_version_id] = _CacheEntry(dag,
serdag.dag_hash)
cache_size = len(self._dags)
if self._use_cache:
stats.gauge("api_server.dag_bag.cache_size", cache_size, rate=0.1)
return dag
+ @staticmethod
+ def _current_dag_hash(version_id: UUID | str, session: Session) -> str |
None:
+ """Return the current ``dag_hash`` of the serialized DAG for
``version_id``, or None."""
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ return session.scalar(
+
select(SerializedDagModel.dag_hash).where(SerializedDagModel.dag_version_id ==
version_id)
+ )
+
def _get_dag(self, version_id: UUID | str, session: Session) ->
SerializedDAG | None:
- # Check cache first
with self._lock:
- dag = self._dags.get(version_id)
-
- if dag:
- if self._use_cache:
- stats.incr("api_server.dag_bag.cache_hit")
- return dag
+ cached = self._dags.get(version_id)
+
+ if cached is not None:
+ # A version may have been updated in place (same dag_version_id,
new content + new
+ # dag_hash) by SerializedDagModel.write_dag, so validate the
cached copy against the
+ # current dag_hash before serving it. That validation is a
single-row lookup on the
+ # uniquely-indexed serialized_dag.dag_version_id column.
+ if self._current_dag_hash(version_id, session) == cached.dag_hash:
+ if self._use_cache:
+ stats.incr("api_server.dag_bag.cache_hit")
+ return cached.dag
Review Comment:
The per-hit cost is intentional. A TTL / last_checked interval would
reintroduce a staleness window — which is exactly the bug being fixed for the
scheduler's long-lived, no-TTL bag — so the check has to run on every hit.
It's also cheap: a single-row lookup on the unique index
serialized_dag.dag_version_id, and in the common unversioned-bundle case
get_dag_for_run already issues a get_latest_version query per call, so this is
a second cheap query, not a first. A TI-based "immutable" shortcut isn't safe
either, since TI-existence isn't monotonic (deleting a DAG's runs lets
write_dag update the still-latest version in place again).
---
Drafted-by: Claude Code (Opus 4.8); reviewed by @ephraimbuddy before posting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]