Copilot commented on code in PR #68558:
URL: https://github.com/apache/airflow/pull/68558#discussion_r3411760440


##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -100,6 +146,61 @@ def test_get_dag_returns_none_when_not_found(self):
 
         assert result is None
 
+    def test_get_dag_reflects_in_place_version_update_end_to_end(self):
+        """End-to-end regression: an in-place version update must be re-read, 
not served stale.
+
+        When a DagVersion has no task instances, 
``SerializedDagModel.write_dag`` updates the
+        serialized DAG in place (same ``dag_version_id``, new content). A 
long-lived DagBag (e.g.
+        the scheduler's) must reflect the new content instead of serving the 
cached old code.
+
+        Each step uses its own session, matching the real deployment where the 
dag processor
+        writes and the scheduler reads in separate processes/sessions.
+        """
+        dag_id = "stale_cache_dag"
+        bundle_name = "testing"
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+
+        def make_lazy(task_ids):
+            with DAG(dag_id, schedule=None) as dag:
+                for task_id in task_ids:
+                    EmptyOperator(task_id=task_id)
+            return LazyDeserializedDAG.from_dag(dag)
+
+        # Long-lived bag, like the scheduler's process-lived scheduler_dag_bag.
+        dag_bag = DBDagBag()
+
+        with create_session() as session:
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+            session.add(DagModel(dag_id=dag_id, bundle_name=bundle_name))
+            session.flush()
+            # Version 1: a single task, no task instances yet.
+            SerializedDagModel.write_dag(make_lazy(["a"]), 
bundle_name=bundle_name, session=session)
+            session.commit()
+            version_id = DagVersion.get_latest_version(dag_id, 
session=session).id
+
+        # The scheduler loads and caches the DAG.
+        with create_session() as session:
+            assert set(dag_bag.get_dag(version_id, session=session).task_ids) 
== {"a"}
+
+        # The dag processor adds a task and re-writes. With no task instances 
on the version,
+        # write_dag updates it in place (same dag_version_id, new content + 
hash).
+        with create_session() as session:
+            did_write = SerializedDagModel.write_dag(
+                make_lazy(["a", "b"]), bundle_name=bundle_name, session=session
+            )
+            session.commit()
+            assert did_write is True
+            assert DagVersion.get_latest_version(dag_id, session=session).id 
== version_id
+
+        # The scheduler reads again: it must serve the updated DAG, not the 
stale cached one.
+        with create_session() as session:
+            assert set(dag_bag.get_dag(version_id, session=session).task_ids) 
== {"a", "b"}
+
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+

Review Comment:
   This test clears DAGs and serialized DAGs at the end, but leaves the 
`DagBundleModel` row it created behind. Add `clear_db_dag_bundles()` in the 
teardown cleanup to avoid cross-test contamination and PK conflicts when other 
tests also create a bundle with the same name.



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -100,6 +146,61 @@ def test_get_dag_returns_none_when_not_found(self):
 
         assert result is None
 
+    def test_get_dag_reflects_in_place_version_update_end_to_end(self):
+        """End-to-end regression: an in-place version update must be re-read, 
not served stale.
+
+        When a DagVersion has no task instances, 
``SerializedDagModel.write_dag`` updates the
+        serialized DAG in place (same ``dag_version_id``, new content). A 
long-lived DagBag (e.g.
+        the scheduler's) must reflect the new content instead of serving the 
cached old code.
+
+        Each step uses its own session, matching the real deployment where the 
dag processor
+        writes and the scheduler reads in separate processes/sessions.
+        """
+        dag_id = "stale_cache_dag"
+        bundle_name = "testing"
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+

Review Comment:
   This test creates a `DagBundleModel(name="testing")` but only clears DAGs 
and serialized DAGs. Since `clear_db_dags()` doesn’t remove dag bundles, the 
bundle row can leak into other db_test cases and potentially cause primary-key 
collisions. Clear dag bundles as part of the setup cleanup to keep the test 
isolated.



##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -117,9 +140,9 @@ def _get_dag(self, version_id: UUID | str, session: 
Session) -> SerializedDAG |
         # counting a single lookup as both a miss and a hit.
         if self._use_cache:
             with self._lock:
-                if dag := self._dags.get(version_id):
+                if (cached := self._dags.get(version_id)) is not None:
                     stats.incr("api_server.dag_bag.cache_hit")
-                    return dag
+                    return cached.dag
             stats.incr("api_server.dag_bag.cache_miss")

Review Comment:
   In the double-checked locking path, `_get_dag()` returns an entry another 
thread may have cached while we were querying the DB, but it doesn’t revalidate 
that cached entry’s `dag_hash`. With in-place updates this can reintroduce 
staleness (e.g. thread A reads fresh from DB but thread B cached an older hash 
first; A then returns B’s stale cache entry). Validate the cached entry’s hash 
(and evict if mismatched) before returning it, to preserve the “validate on 
every cache hit” guarantee and avoid serving stale DAGs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to