nathadfield commented on code in PR #61448:
URL: https://github.com/apache/airflow/pull/61448#discussion_r2871591146


##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1113,13 +1225,27 @@ def _create_orm_dagrun(
     partition_key: str | None = None,
     session: Session = NEW_SESSION,
 ) -> DagRun:
-    bundle_version = None
-    if not dag.disable_bundle_versioning:
-        bundle_version = session.scalar(
-            select(DagModel.bundle_version).where(DagModel.dag_id == 
dag.dag_id),
-        )
-    dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
+    bundle_version = _resolve_bundle_version(
+        dag=dag,
+        session=session,
+    )
+    dag_version = DagVersion.get_latest_version(dag.dag_id, 
bundle_version=bundle_version, session=session)
+
     if not dag_version:
+        if bundle_version:
+            # Bundle version exists but not yet serialized - this is a 
temporary race condition
+            log.warning(
+                "Bundle version %s for DAG %s is not yet available. "

Review Comment:
   I haven't reproduced it in a live system but the race condition was 
identified through code analysis. It's a pretty narrow window in practice and 
it's more likely to surface if the DAG files are large or slow to parse, or the 
scheduler is under heavy load.
   
   I validated the scenario with a unit test 
(`test_create_dagrun_race_condition_bundle_version_not_serialized_yet`) that 
sets up exactly this state, DagBundleModel at v2.0.0 but only a DagVersion for 
v1.0.0, and confirms BundleVersionUnavailable is raised with a clear retry 
message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to