jscheffl commented on code in PR #58543:
URL: https://github.com/apache/airflow/pull/58543#discussion_r3365426479


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2582,6 +2584,55 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
                         dag_run.run_id,
                     )
                     continue
+            # For AssetAndTimeSchedule, defer starting until all required 
assets are queued.
+            # Only gate scheduled runs; manual and backfill runs should start 
immediately.
+            if isinstance(dag.timetable, AssetAndTimeSchedule) and 
dag_run.run_type == DagRunType.SCHEDULED:

Review Comment:
   I think I understand the demand, but not being an expert in timetables and 
scheduler... looking at the code I am not fully convinced that the proposed 
implementation strategy is "good". It assumes the Dag is triggered to queued 
state at point of schedule and then is sitting "queued" waiting for events. I 
think this is the wrong state model. In my view the trigger should only happen 
at point of schedule if events are available. In all other cases if should not 
generate a Dag run in my view and this also should not mark the Dag as failed 
just because of missing events.
   
   Marking a queued Dag run to failed is the wrong signal, feels like something 
is broken. Also besides this the code you are using is not updating the tasks 
to be skipped but they stay in the state like before ("none"), usually if a Dag 
run is set to failed, tasks are set to skipped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to