uranusjr commented on code in PR #68558:
URL: https://github.com/apache/airflow/pull/68558#discussion_r3411765499


##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -84,27 +91,43 @@ def __init__(
         self._lock: RLock | nullcontext = RLock() if self._use_cache else 
nullcontext()
 
     def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
-        """Read and optionally cache a SerializedDAG from a 
SerializedDagModel."""
+        """Read and cache a SerializedDAG (with its ``dag_hash`` for staleness 
detection)."""
         serdag.load_op_links = self.load_op_links
         dag = serdag.dag
         if not dag:
             return None
         with self._lock:
-            self._dags[serdag.dag_version_id] = dag
+            self._dags[serdag.dag_version_id] = _CacheEntry(dag, 
serdag.dag_hash)
             cache_size = len(self._dags)
         if self._use_cache:
             stats.gauge("api_server.dag_bag.cache_size", cache_size, rate=0.1)
         return dag
 
+    @staticmethod
+    def _current_dag_hash(version_id: UUID | str, session: Session) -> str | 
None:
+        """Return the current ``dag_hash`` of the serialized DAG for 
``version_id``, or None."""
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        return session.scalar(
+            
select(SerializedDagModel.dag_hash).where(SerializedDagModel.dag_version_id == 
version_id)
+        )
+
     def _get_dag(self, version_id: UUID | str, session: Session) -> 
SerializedDAG | None:
-        # Check cache first
         with self._lock:
-            dag = self._dags.get(version_id)
-
-        if dag:
-            if self._use_cache:
-                stats.incr("api_server.dag_bag.cache_hit")
-            return dag
+            cached = self._dags.get(version_id)
+
+        if cached is not None:
+            # A version may have been updated in place (same dag_version_id, 
new content + new
+            # dag_hash) by SerializedDagModel.write_dag, so validate the 
cached copy against the
+            # current dag_hash before serving it. That validation is a 
single-row lookup on the
+            # uniquely-indexed serialized_dag.dag_version_id column.
+            if self._current_dag_hash(version_id, session) == cached.dag_hash:
+                if self._use_cache:
+                    stats.incr("api_server.dag_bag.cache_hit")
+                return cached.dag

Review Comment:
   I think a TTL is not a bad idea actually since dag updates are inherently 
unfrequent—dags are only reparsed on an interval. I would consider using 
`min_serialized_dag_update_interval` here since a dag can’t be updated more 
frequently than this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to