Lee-W commented on code in PR #64322:
URL: https://github.com/apache/airflow/pull/64322#discussion_r3014691143


##########
airflow-core/newsfragments/64322.bugfix.rst:
##########
@@ -0,0 +1 @@
+Fix premature asset-triggered DagRuns when ``AssetDagRunQueue`` had rows but 
``SerializedDagModel`` was not yet available; the scheduler now skips those 
DAGs until serialization exists.

Review Comment:
   ```suggestion
   Fix premature asset-triggered DagRuns when ``AssetDagRunQueue`` had rows but 
``SerializedDagModel`` was not yet available; the scheduler now skips those 
Dags until serialization exists.
   ```
   



##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -631,6 +631,10 @@ def dags_needing_dagruns(cls, session: Session) -> 
tuple[Any, dict[str, datetime
         you should ensure that any scheduling decisions are made in a single 
transaction -- as soon as the
         transaction is committed it will be unlocked.
 
+        For asset-triggered scheduling, DAGs that have ``AssetDagRunQueue`` 
rows but no matching

Review Comment:
   ```suggestion
           For asset-triggered scheduling, Dags that have ``AssetDagRunQueue`` 
rows but no matching
   ```



##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -677,6 +681,16 @@ def dag_ready(dag_id: str, cond: SerializedAssetBase, 
statuses: dict[UKey, bool]
             for dag_id, adrqs in adrq_by_dag.items()
         }
         ser_dags = 
SerializedDagModel.get_latest_serialized_dags(dag_ids=list(dag_statuses), 
session=session)
+        ser_dag_ids = {ser_dag.dag_id for ser_dag in ser_dags}
+        missing_from_serialized = set(adrq_by_dag.keys()) - ser_dag_ids
+        if missing_from_serialized:
+            log.debug(
+                "DAGs in ADRQ but missing SerializedDagModel (skipping — 
condition cannot be evaluated): %s",
+                sorted(missing_from_serialized),
+            )
+            for dag_id in missing_from_serialized:
+                del adrq_by_dag[dag_id]
+                del dag_statuses[dag_id]

Review Comment:
   ```suggestion
           missing_from_serialized = 
           if (missing_from_serialized := set(adrq_by_dag.keys()) - 
ser_dag_ids):
               log.debug(
                   "Dags have queued asset events (ADRQs), but are not found in 
the serialized_dag table."
                   sorted(missing_from_serialized),
               )
               for dag_id in missing_from_serialized:
                   del adrq_by_dag[dag_id]
                   del dag_statuses[dag_id]
   ```



##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -2047,6 +2047,128 @@ def test_dags_needing_dagruns_assets(self, dag_maker, 
session):
         dag_models = query.all()
         assert dag_models == [dag_model]
 
+    def test_dags_needing_dagruns_skips_adrq_when_serialized_dag_missing(
+        self, session, caplog, testing_dag_bundle
+    ):
+        """ADRQ rows for a dag_id without SerializedDagModel must be skipped 
(no triggered_date_by_dag).

Review Comment:
   ```suggestion
           """ADRQ rows for a Dag without SerializedDagModel must be skipped 
(no triggered_date_by_dag).
   ```



##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -631,6 +631,10 @@ def dags_needing_dagruns(cls, session: Session) -> 
tuple[Any, dict[str, datetime
         you should ensure that any scheduling decisions are made in a single 
transaction -- as soon as the
         transaction is committed it will be unlocked.
 
+        For asset-triggered scheduling, DAGs that have ``AssetDagRunQueue`` 
rows but no matching
+        ``SerializedDagModel`` row are omitted from ``triggered_date_by_dag`` 
until serialization exists;
+        queue rows are **not** deleted here so the scheduler can re-evaluate 
on a later run.

Review Comment:
   ```suggestion
          ADRQs are **not** deleted here so the scheduler can re-evaluate on a 
later run.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to