Nataneljpwd commented on code in PR #61769:
URL: https://github.com/apache/airflow/pull/61769#discussion_r2799094569


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
             self.task_concurrency_map[(dag_id, task_id)] += c
             self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
 
+        # Additionally count DEFERRED TIs for task-level concurrency limits.
+        # Deferred TIs are still in-flight and must count against
+        # max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+        # count toward dag_run_active_tasks (max_active_tasks) because deferred
+        # tasks don't consume worker slots.

Review Comment:
   I think this comment can be shortened to 1 sentance



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -193,6 +193,20 @@ def load(self, session: Session) -> None:
             self.task_concurrency_map[(dag_id, task_id)] += c
             self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c
 
+        # Additionally count DEFERRED TIs for task-level concurrency limits.
+        # Deferred TIs are still in-flight and must count against
+        # max_active_tis_per_dag / max_active_tis_per_dagrun, but they do NOT
+        # count toward dag_run_active_tasks (max_active_tasks) because deferred
+        # tasks don't consume worker slots.
+        deferred_query = session.execute(
+            select(TI.dag_id, TI.task_id, TI.run_id, func.count("*"))
+            .where(TI.state == TaskInstanceState.DEFERRED)
+            .group_by(TI.task_id, TI.run_id, TI.dag_id)
+        )
+        for dag_id, task_id, run_id, c in deferred_query:
+            self.task_concurrency_map[(dag_id, task_id)] += c
+            self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += c

Review Comment:
   I think this can be added to the query above instead of issuing another query



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to