kaxil commented on code in PR #63546:
URL: https://github.com/apache/airflow/pull/63546#discussion_r2984707207


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1480,6 +1487,26 @@ def _create_dag_runs_dataset_triggered(
                     .where(*dataset_event_filters)
                 ).all()
 
+                ddrq_records = session.scalars(
+                    select(DatasetDagRunQueue).where(
+                        DatasetDagRunQueue.target_dag_id == dag.dag_id
+                    )
+                ).all()
+                ddrq_uris = {r.dataset.uri for r in ddrq_records}
+                consumed_uris = {e.dataset.uri for e in dataset_events}
+                missing_uris = ddrq_uris - consumed_uris

Review Comment:
   `e.dataset.uri` fires a lazy SELECT per event since `DatasetEvent.dataset` 
uses default lazy loading. Same issue on lines 1541 and 1554. With many events 
per DAG, this multiplies into hundreds of queries per scheduler loop.
   
   If these log lines stay, add `joinedload(DatasetEvent.dataset)` to the 
`dataset_events` query above.



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1326,6 +1326,13 @@ def _create_dagruns_for_dags(self, guard: 
CommitProhibitorGuard, session: Sessio
         non_dataset_dags = 
all_dags_needing_dag_runs.difference(dataset_triggered_dags)
         self._create_dag_runs(non_dataset_dags, session)
         if dataset_triggered_dags:
+            self.log.info(

Review Comment:
   All the log messages use a `[DEBUG DATASETS]` prefix but are emitted at INFO 
or WARNING level. If this is diagnostic logging for debugging a specific issue, 
it should be at DEBUG level. If it is meant for production, the prefix is 
misleading and the volume (one INFO per DAG per scheduler loop) is high.
   
   Airflow uses logger names for filtering, not string prefixes.



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1480,6 +1487,26 @@ def _create_dag_runs_dataset_triggered(
                     .where(*dataset_event_filters)
                 ).all()
 
+                ddrq_records = session.scalars(

Review Comment:
   This re-queries `DatasetDagRunQueue` per DAG inside the loop, but 
`dags_needing_dagruns` already loaded all DDRQ records into `by_dag`. That data 
is available — it is the source of `dataset_triggered_dag_info`. Re-fetching it 
here adds one query per dataset-triggered DAG per scheduler loop.
   
   On top of that, `r.dataset.uri` triggers a lazy load per record since 
`DatasetDagRunQueue.dataset` is not eagerly loaded — so this is an N+1 query 
pattern.
   
   If this diagnostic is worth keeping, pass the URI sets from 
`dags_needing_dagruns` through `dataset_triggered_dag_info` instead of 
re-querying.



##########
airflow/models/dag.py:
##########
@@ -4094,13 +4094,35 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: 
dict) -> bool | None:
         ser_dags = session.scalars(
             
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
         ).all()
+        ser_dag_ids = {s.dag_id for s in ser_dags}

Review Comment:
   The guard against missing `SerializedDagModel` is a good fix — during DAG 
processor parse cycles there is a window where DDRQ records exist but the 
serialized DAG has not been written yet. Skipping these prevents a `KeyError` 
and preserves the DDRQ records for the next heartbeat.
   
   One thing the tests do not verify: that the DDRQ records are actually 
preserved after `dags_needing_dagruns` returns. The PR description says DDRQ 
entries are preserved so the DAG is re-evaluated on the next heartbeat but no 
test asserts the DDRQ rows still exist in the DB. Worth adding that assertion.



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1480,6 +1487,26 @@ def _create_dag_runs_dataset_triggered(
                     .where(*dataset_event_filters)
                 ).all()
 
+                ddrq_records = session.scalars(
+                    select(DatasetDagRunQueue).where(
+                        DatasetDagRunQueue.target_dag_id == dag.dag_id
+                    )
+                ).all()
+                ddrq_uris = {r.dataset.uri for r in ddrq_records}
+                consumed_uris = {e.dataset.uri for e in dataset_events}
+                missing_uris = ddrq_uris - consumed_uris
+                if missing_uris:
+                    self.log.warning(

Review Comment:
   This warning fires whenever a DDRQ URI has no matching `DatasetEvent` in the 
`(prev_exec, exec_date]` window. That is a normal situation when a DAG has 
multiple dataset dependencies that update at different times — the DDRQ records 
are deleted a few lines later anyway.
   
   This will produce false-positive warnings in production, training operators 
to ignore them.



##########
airflow/models/dag.py:
##########
@@ -4094,13 +4094,35 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: 
dict) -> bool | None:
         ser_dags = session.scalars(
             
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
         ).all()
+        ser_dag_ids = {s.dag_id for s in ser_dags}
+        missing_from_serialized = set(by_dag.keys()) - ser_dag_ids
+        if missing_from_serialized:
+            log.warning(
+                "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel "
+                "(skipping — condition cannot be evaluated): %s",
+                sorted(missing_from_serialized),
+            )
+            for dag_id in missing_from_serialized:
+                del by_dag[dag_id]
+                del dag_statuses[dag_id]

Review Comment:
   `del missing_from_serialized` is unnecessary — this is a local variable that 
goes out of scope when the function returns.



##########
airflow/models/dag.py:
##########
@@ -4094,13 +4094,35 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: 
dict) -> bool | None:
         ser_dags = session.scalars(
             
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
         ).all()
+        ser_dag_ids = {s.dag_id for s in ser_dags}
+        missing_from_serialized = set(by_dag.keys()) - ser_dag_ids
+        if missing_from_serialized:
+            log.warning(
+                "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel "
+                "(skipping — condition cannot be evaluated): %s",
+                sorted(missing_from_serialized),
+            )
+            for dag_id in missing_from_serialized:
+                del by_dag[dag_id]
+                del dag_statuses[dag_id]
+        del missing_from_serialized
         for ser_dag in ser_dags:
             dag_id = ser_dag.dag_id
             statuses = dag_statuses[dag_id]
+            dataset_condition = ser_dag.dag.timetable.dataset_condition
 
-            if not dag_ready(dag_id, 
cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses):
+            if not dag_ready(dag_id, cond=dataset_condition, 
statuses=statuses):
                 del by_dag[dag_id]
                 del dag_statuses[dag_id]
+            else:

Review Comment:
   Same logging concern as in `scheduler_job_runner.py` — this emits at INFO on 
every satisfied dataset condition, every scheduler heartbeat. For deployments 
with many dataset-triggered DAGs, this adds noise to production logs. Consider 
moving to DEBUG.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to