nailo2c commented on code in PR #58543:
URL: https://github.com/apache/airflow/pull/58543#discussion_r3007091158
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2227,6 +2228,50 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
dag_run.run_id,
)
continue
+ # For AssetAndTimeSchedule, defer starting until all required
assets are queued.
+ if isinstance(dag.timetable, AssetAndTimeSchedule):
+ # Count required assets for this DAG's schedule
+ required_count = (
Review Comment:
Good point, `AssetEvaluator` was introduced after this feature was
originally implemented, and it looks applicable here as well. Let me refactor
this part to use it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]