Lee-W commented on code in PR #64322:
URL: https://github.com/apache/airflow/pull/64322#discussion_r3014691143
##########
airflow-core/newsfragments/64322.bugfix.rst:
##########
@@ -0,0 +1 @@
+Fix premature asset-triggered DagRuns when ``AssetDagRunQueue`` had rows but
``SerializedDagModel`` was not yet available; the scheduler now skips those
DAGs until serialization exists.
Review Comment:
```suggestion
Fix premature asset-triggered DagRuns when ``AssetDagRunQueue`` had rows but
``SerializedDagModel`` was not yet available; the scheduler now skips those
Dags until serialization exists.
```
##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -631,6 +631,10 @@ def dags_needing_dagruns(cls, session: Session) ->
tuple[Any, dict[str, datetime
you should ensure that any scheduling decisions are made in a single
transaction -- as soon as the
transaction is committed it will be unlocked.
+ For asset-triggered scheduling, DAGs that have ``AssetDagRunQueue``
rows but no matching
Review Comment:
```suggestion
For asset-triggered scheduling, Dags that have ``AssetDagRunQueue``
rows but no matching
```
##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -677,6 +681,16 @@ def dag_ready(dag_id: str, cond: SerializedAssetBase,
statuses: dict[UKey, bool]
for dag_id, adrqs in adrq_by_dag.items()
}
ser_dags =
SerializedDagModel.get_latest_serialized_dags(dag_ids=list(dag_statuses),
session=session)
+ ser_dag_ids = {ser_dag.dag_id for ser_dag in ser_dags}
+ missing_from_serialized = set(adrq_by_dag.keys()) - ser_dag_ids
+ if missing_from_serialized:
+ log.debug(
+ "DAGs in ADRQ but missing SerializedDagModel (skipping —
condition cannot be evaluated): %s",
+ sorted(missing_from_serialized),
+ )
+ for dag_id in missing_from_serialized:
+ del adrq_by_dag[dag_id]
+ del dag_statuses[dag_id]
Review Comment:
```suggestion
missing_from_serialized =
if (missing_from_serialized := set(adrq_by_dag.keys()) -
ser_dag_ids):
log.debug(
"Dags have queued asset events (ADRQs), but are not found in
the serialized_dag table."
sorted(missing_from_serialized),
)
for dag_id in missing_from_serialized:
del adrq_by_dag[dag_id]
del dag_statuses[dag_id]
```
##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -2047,6 +2047,128 @@ def test_dags_needing_dagruns_assets(self, dag_maker,
session):
dag_models = query.all()
assert dag_models == [dag_model]
+ def test_dags_needing_dagruns_skips_adrq_when_serialized_dag_missing(
+ self, session, caplog, testing_dag_bundle
+ ):
+ """ADRQ rows for a dag_id without SerializedDagModel must be skipped
(no triggered_date_by_dag).
Review Comment:
```suggestion
"""ADRQ rows for a Dag without SerializedDagModel must be skipped
(no triggered_date_by_dag).
```
##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -631,6 +631,10 @@ def dags_needing_dagruns(cls, session: Session) ->
tuple[Any, dict[str, datetime
you should ensure that any scheduling decisions are made in a single
transaction -- as soon as the
transaction is committed it will be unlocked.
+ For asset-triggered scheduling, DAGs that have ``AssetDagRunQueue``
rows but no matching
+ ``SerializedDagModel`` row are omitted from ``triggered_date_by_dag``
until serialization exists;
+ queue rows are **not** deleted here so the scheduler can re-evaluate
on a later run.
Review Comment:
```suggestion
ADRQs are **not** deleted here so the scheduler can re-evaluate on a
later run.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]