nailo2c commented on code in PR #58543:
URL: https://github.com/apache/airflow/pull/58543#discussion_r3419753934
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2582,6 +2584,55 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
dag_run.run_id,
)
continue
+ # For AssetAndTimeSchedule, defer starting until all required
assets are queued.
+ # Only gate scheduled runs; manual and backfill runs should start
immediately.
+ if isinstance(dag.timetable, AssetAndTimeSchedule) and
dag_run.run_type == DagRunType.SCHEDULED:
Review Comment:
+ Old approach: when the schedule time was reached, the scheduler created a
placeholder `QUEUED` DagRun first, then waited for asset events in a later
scheduler loop.
+ New approach: the scheduler creates a scheduled DagRun only when both
conditions are satisfied: the schedule time is due and the required asset
condition is ready. If assets are missing, no DagRun is created and
`next_dagrun_create_after` stays on the pending slot, so the same slot can be
retried later without placeholder DagRuns.
I feel the new approach is much cleaner and without hack to handle the
timeout issue, and the gating point is moved from after DagRun creation to
before DagRun creation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]