jscheffl commented on code in PR #58543:
URL: https://github.com/apache/airflow/pull/58543#discussion_r3365426479
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2582,6 +2584,55 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
dag_run.run_id,
)
continue
+ # For AssetAndTimeSchedule, defer starting until all required
assets are queued.
+ # Only gate scheduled runs; manual and backfill runs should start
immediately.
+ if isinstance(dag.timetable, AssetAndTimeSchedule) and
dag_run.run_type == DagRunType.SCHEDULED:
Review Comment:
I think I understand the demand, but not being an expert in timetables and
scheduler... looking at the code I am not fully convinced that the proposed
implementation strategy is "good". It assumes the Dag is triggered to queued
state at point of schedule and then is sitting "queued" waiting for events. I
think this is the wrong state model. In my view the trigger should only happen
at point of schedule if events are available. In all other cases if should not
generate a Dag run in my view and this also should not mark the Dag as failed
just because of missing events.
Marking a queued Dag run to failed is the wrong signal, feels like something
is broken. Also besides this the code you are using is not updating the tasks
to be skipped but they stay in the state like before ("none"), usually if a Dag
run is set to failed, tasks are set to skipped.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]