uranusjr commented on code in PR #29094:
URL: https://github.com/apache/airflow/pull/29094#discussion_r1166296913


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -231,28 +255,21 @@ def is_alive(self, grace_multiplier: float | None = None) 
-> bool:
             < scheduler_health_check_threshold
         )
 
-    def __get_concurrency_maps(
-        self, states: list[TaskInstanceState], session: Session
-    ) -> tuple[DefaultDict[str, int], DefaultDict[tuple[str, str], int]]:
+    def __get_concurrency_maps(self, states: list[TaskInstanceState], session: 
Session) -> ConcurrencyMap:
         """
         Get the concurrency maps.
 
         :param states: List of states to query for
-        :return: A map from (dag_id, task_id) to # of task instances and
-         a map from (dag_id, task_id) to # of task instances in the given 
state list
+        :return: Concurrency map
         """
-        ti_concurrency_query: list[tuple[str, str, int]] = (
-            session.query(TI.task_id, TI.dag_id, func.count("*"))
+        ti_concurrency_query: list[tuple[str, str, str, int]] = (
+            session.query(TI.task_id, TI.run_id, TI.dag_id, func.count("*"))
             .filter(TI.state.in_(states))
-            .group_by(TI.task_id, TI.dag_id)
+            .group_by(TI.task_id, TI.run_id, TI.dag_id)
         ).all()

Review Comment:
   I think this `all()` is not needed since we only iterate through the query 
once. Removing it should improve performance a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to