kaxil commented on code in PR #63939:
URL: https://github.com/apache/airflow/pull/63939#discussion_r2963332184


##########
airflow/datasets/manager.py:
##########
@@ -184,7 +185,7 @@ def _slow_path_queue_dagruns(
         cls, dataset_id: int, dags_to_queue: set[DagModel], session: Session
     ) -> None:
         def _queue_dagrun_if_needed(dag: DagModel) -> str | None:

Review Comment:
   The Postgres path has a WHERE guard (`created_at < excluded.created_at`) 
that prevents a slower transaction from overwriting a newer timestamp, but 
`session.merge()` here still overwrites unconditionally. Two concurrent tasks 
can race: if the slower one commits last, it writes an older `created_at`. 
Using `utcnow()` makes backward drift unlikely, but it's still possible under 
clock adjustments or NTP drift.



##########
airflow/datasets/manager.py:
##########
@@ -202,8 +203,13 @@ def _queue_dagrun_if_needed(dag: DagModel) -> str | None:
     def _postgres_queue_dagruns(cls, dataset_id: int, dags_to_queue: 
set[DagModel], session: Session) -> None:
         from sqlalchemy.dialects.postgresql import insert
 
-        values = [{"target_dag_id": dag.dag_id} for dag in dags_to_queue]
-        stmt = 
insert(DatasetDagRunQueue).values(dataset_id=dataset_id).on_conflict_do_nothing()
+        values = [{"target_dag_id": dag.dag_id, "created_at": 
timezone.utcnow()} for dag in dags_to_queue]

Review Comment:
   Each iteration calls `timezone.utcnow()` separately, so each DAG in 
`dags_to_queue` gets a slightly different timestamp. For the common single-DAG 
case this doesn't matter, but with multiple DAGs the timestamps diverge. 
Computing the timestamp once before the comprehension would be more consistent:
   
   ```python
   now = timezone.utcnow()
   values = [{"target_dag_id": dag.dag_id, "created_at": now} for dag in 
dags_to_queue]
   ```



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1489,21 +1489,25 @@ def _create_dag_runs_dataset_triggered(
                     events=dataset_events,
                 )
 
-                dag_run = dag.create_dagrun(
-                    run_id=run_id,
-                    run_type=DagRunType.DATASET_TRIGGERED,
-                    execution_date=exec_date,
-                    data_interval=data_interval,
-                    state=DagRunState.QUEUED,
-                    external_trigger=False,
-                    session=session,
-                    dag_hash=dag_hash,
-                    creating_job_id=self.job.id,
-                )
-                Stats.incr("dataset.triggered_dagruns")
-                dag_run.consumed_dataset_events.extend(dataset_events)
+                if dataset_events:
+                    dag_run = dag.create_dagrun(
+                        run_id=run_id,
+                        run_type=DagRunType.DATASET_TRIGGERED,
+                        execution_date=exec_date,
+                        data_interval=data_interval,
+                        state=DagRunState.QUEUED,
+                        external_trigger=False,
+                        session=session,
+                        dag_hash=dag_hash,
+                        creating_job_id=self.job.id,
+                    )
+                    Stats.incr("dataset.triggered_dagruns")
+                    dag_run.consumed_dataset_events.extend(dataset_events)
                 session.execute(
-                    
delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == 
dag_run.dag_id)
+                    delete(DatasetDagRunQueue).where(
+                        DatasetDagRunQueue.target_dag_id == dag.dag_id,
+                        DatasetDagRunQueue.created_at <= exec_date,

Review Comment:
   The `created_at <= exec_date` scoping correctly preserves newer DDRQ rows 
that arrive during processing. But when `dataset_events` is empty and DDRQ rows 
are deleted without creating a DagRun, there's no log message. Operators 
debugging "my dataset-triggered DAG didn't fire" would have zero visibility 
into this silent cleanup. A `self.log.warning(...)` here would help.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to