kaxil commented on code in PR #63546:
URL: https://github.com/apache/airflow/pull/63546#discussion_r2984707207
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1480,6 +1487,26 @@ def _create_dag_runs_dataset_triggered(
.where(*dataset_event_filters)
).all()
+ ddrq_records = session.scalars(
+ select(DatasetDagRunQueue).where(
+ DatasetDagRunQueue.target_dag_id == dag.dag_id
+ )
+ ).all()
+ ddrq_uris = {r.dataset.uri for r in ddrq_records}
+ consumed_uris = {e.dataset.uri for e in dataset_events}
+ missing_uris = ddrq_uris - consumed_uris
Review Comment:
`e.dataset.uri` fires a lazy SELECT per event since `DatasetEvent.dataset`
uses default lazy loading. Same issue on lines 1541 and 1554. With many events
per DAG, this multiplies into hundreds of queries per scheduler loop.
If these log lines stay, add `joinedload(DatasetEvent.dataset)` to the
`dataset_events` query above.
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1326,6 +1326,13 @@ def _create_dagruns_for_dags(self, guard:
CommitProhibitorGuard, session: Sessio
non_dataset_dags =
all_dags_needing_dag_runs.difference(dataset_triggered_dags)
self._create_dag_runs(non_dataset_dags, session)
if dataset_triggered_dags:
+ self.log.info(
Review Comment:
All the log messages use a `[DEBUG DATASETS]` prefix but are emitted at INFO
or WARNING level. If this is diagnostic logging for debugging a specific issue,
it should be at DEBUG level. If it is meant for production, the prefix is
misleading and the volume (one INFO per DAG per scheduler loop) is high.
Airflow uses logger names for filtering, not string prefixes.
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1480,6 +1487,26 @@ def _create_dag_runs_dataset_triggered(
.where(*dataset_event_filters)
).all()
+ ddrq_records = session.scalars(
Review Comment:
This re-queries `DatasetDagRunQueue` per DAG inside the loop, but
`dags_needing_dagruns` already loaded all DDRQ records into `by_dag`. That data
is available — it is the source of `dataset_triggered_dag_info`. Re-fetching it
here adds one query per dataset-triggered DAG per scheduler loop.
On top of that, `r.dataset.uri` triggers a lazy load per record since
`DatasetDagRunQueue.dataset` is not eagerly loaded — so this is an N+1 query
pattern.
If this diagnostic is worth keeping, pass the URI sets from
`dags_needing_dagruns` through `dataset_triggered_dag_info` instead of
re-querying.
##########
airflow/models/dag.py:
##########
@@ -4094,13 +4094,35 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses:
dict) -> bool | None:
ser_dags = session.scalars(
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
).all()
+ ser_dag_ids = {s.dag_id for s in ser_dags}
Review Comment:
The guard against missing `SerializedDagModel` is a good fix — during DAG
processor parse cycles there is a window where DDRQ records exist but the
serialized DAG has not been written yet. Skipping these prevents a `KeyError`
and preserves the DDRQ records for the next heartbeat.
One thing the tests do not verify: that the DDRQ records are actually
preserved after `dags_needing_dagruns` returns. The PR description says DDRQ
entries are preserved so the DAG is re-evaluated on the next heartbeat but no
test asserts the DDRQ rows still exist in the DB. Worth adding that assertion.
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1480,6 +1487,26 @@ def _create_dag_runs_dataset_triggered(
.where(*dataset_event_filters)
).all()
+ ddrq_records = session.scalars(
+ select(DatasetDagRunQueue).where(
+ DatasetDagRunQueue.target_dag_id == dag.dag_id
+ )
+ ).all()
+ ddrq_uris = {r.dataset.uri for r in ddrq_records}
+ consumed_uris = {e.dataset.uri for e in dataset_events}
+ missing_uris = ddrq_uris - consumed_uris
+ if missing_uris:
+ self.log.warning(
Review Comment:
This warning fires whenever a DDRQ URI has no matching `DatasetEvent` in the
`(prev_exec, exec_date]` window. That is a normal situation when a DAG has
multiple dataset dependencies that update at different times — the DDRQ records
are deleted a few lines later anyway.
This will produce false-positive warnings in production, training operators
to ignore them.
##########
airflow/models/dag.py:
##########
@@ -4094,13 +4094,35 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses:
dict) -> bool | None:
ser_dags = session.scalars(
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
).all()
+ ser_dag_ids = {s.dag_id for s in ser_dags}
+ missing_from_serialized = set(by_dag.keys()) - ser_dag_ids
+ if missing_from_serialized:
+ log.warning(
+ "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel "
+ "(skipping — condition cannot be evaluated): %s",
+ sorted(missing_from_serialized),
+ )
+ for dag_id in missing_from_serialized:
+ del by_dag[dag_id]
+ del dag_statuses[dag_id]
Review Comment:
`del missing_from_serialized` is unnecessary — this is a local variable that
goes out of scope when the function returns.
##########
airflow/models/dag.py:
##########
@@ -4094,13 +4094,35 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses:
dict) -> bool | None:
ser_dags = session.scalars(
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
).all()
+ ser_dag_ids = {s.dag_id for s in ser_dags}
+ missing_from_serialized = set(by_dag.keys()) - ser_dag_ids
+ if missing_from_serialized:
+ log.warning(
+ "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel "
+ "(skipping — condition cannot be evaluated): %s",
+ sorted(missing_from_serialized),
+ )
+ for dag_id in missing_from_serialized:
+ del by_dag[dag_id]
+ del dag_statuses[dag_id]
+ del missing_from_serialized
for ser_dag in ser_dags:
dag_id = ser_dag.dag_id
statuses = dag_statuses[dag_id]
+ dataset_condition = ser_dag.dag.timetable.dataset_condition
- if not dag_ready(dag_id,
cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses):
+ if not dag_ready(dag_id, cond=dataset_condition,
statuses=statuses):
del by_dag[dag_id]
del dag_statuses[dag_id]
+ else:
Review Comment:
Same logging concern as in `scheduler_job_runner.py` — this emits at INFO on
every satisfied dataset condition, every scheduler heartbeat. For deployments
with many dataset-triggered DAGs, this adds noise to production logs. Consider
moving to DEBUG.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]