jscheffl commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2821164333
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
self.task_concurrency_map[(dag_id, task_id)] += c
self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
+ # Additionally count DEFERRED TIs for task-level concurrency limits.
+ # Deferred TIs are still in-flight and must count against
+ # max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+ # count toward dag_run_active_tasks (max_active_tasks) because deferred
+ # tasks don't consume worker slots.
+ deferred_query = session.execute(
+ select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
+ .where(TI.state == TaskInstanceState.DEFERRED)
+ .group_by(TI.task_id, TI.run_id, TI.dag_id)
+ )
Review Comment:
Instead of firing another second query, can you please add deferred just in
line 188 to the condition? And additional query would be expensive and
separation does not make it better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]