ephraimbuddy commented on code in PR #68558:
URL: https://github.com/apache/airflow/pull/68558#discussion_r3411678612
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -84,27 +91,43 @@ def __init__(
self._lock: RLock | nullcontext = RLock() if self._use_cache else
nullcontext()
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
- """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
+ """Read and cache a SerializedDAG (with its ``dag_hash`` for staleness
detection)."""
serdag.load_op_links = self.load_op_links
dag = serdag.dag
if not dag:
return None
with self._lock:
- self._dags[serdag.dag_version_id] = dag
+ self._dags[serdag.dag_version_id] = _CacheEntry(dag,
serdag.dag_hash)
cache_size = len(self._dags)
if self._use_cache:
stats.gauge("api_server.dag_bag.cache_size", cache_size, rate=0.1)
return dag
+ @staticmethod
+ def _current_dag_hash(version_id: UUID | str, session: Session) -> str |
None:
+ """Return the current ``dag_hash`` of the serialized DAG for
``version_id``, or None."""
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ return session.scalar(
+
select(SerializedDagModel.dag_hash).where(SerializedDagModel.dag_version_id ==
version_id)
+ )
Review Comment:
This file has other local import of SerializedDagModel in other places to
fix circular import issues. So this is valid here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]