uranusjr commented on code in PR #68558:
URL: https://github.com/apache/airflow/pull/68558#discussion_r3411765499
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -84,27 +91,43 @@ def __init__(
self._lock: RLock | nullcontext = RLock() if self._use_cache else
nullcontext()
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
- """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
+ """Read and cache a SerializedDAG (with its ``dag_hash`` for staleness
detection)."""
serdag.load_op_links = self.load_op_links
dag = serdag.dag
if not dag:
return None
with self._lock:
- self._dags[serdag.dag_version_id] = dag
+ self._dags[serdag.dag_version_id] = _CacheEntry(dag,
serdag.dag_hash)
cache_size = len(self._dags)
if self._use_cache:
stats.gauge("api_server.dag_bag.cache_size", cache_size, rate=0.1)
return dag
+ @staticmethod
+ def _current_dag_hash(version_id: UUID | str, session: Session) -> str |
None:
+ """Return the current ``dag_hash`` of the serialized DAG for
``version_id``, or None."""
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ return session.scalar(
+
select(SerializedDagModel.dag_hash).where(SerializedDagModel.dag_version_id ==
version_id)
+ )
+
def _get_dag(self, version_id: UUID | str, session: Session) ->
SerializedDAG | None:
- # Check cache first
with self._lock:
- dag = self._dags.get(version_id)
-
- if dag:
- if self._use_cache:
- stats.incr("api_server.dag_bag.cache_hit")
- return dag
+ cached = self._dags.get(version_id)
+
+ if cached is not None:
+ # A version may have been updated in place (same dag_version_id,
new content + new
+ # dag_hash) by SerializedDagModel.write_dag, so validate the
cached copy against the
+ # current dag_hash before serving it. That validation is a
single-row lookup on the
+ # uniquely-indexed serialized_dag.dag_version_id column.
+ if self._current_dag_hash(version_id, session) == cached.dag_hash:
+ if self._use_cache:
+ stats.incr("api_server.dag_bag.cache_hit")
+ return cached.dag
Review Comment:
I think a TTL is not a bad idea actually since dag updates are inherently
unfrequent—dags are only reparsed on an interval. I would consider using
`min_serialized_dag_update_interval` here since a dag can’t be updated more
frequently than this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]