mis98zb commented on code in PR #32122:
URL: https://github.com/apache/airflow/pull/32122#discussion_r1262265208


##########
airflow/models/taskinstance.py:
##########
@@ -2580,6 +2580,31 @@ def get_num_running_task_instances(self, session: 
Session, same_dagrun=False) ->
             )
         return num_running_task_instances_query.scalar()
 
+    @provide_session
+    def get_valid_map_index(self, session: Session, limit: int) -> (set[int]):
+        """Return running map index of task group from the DB."""

Review Comment:
   You mean the code to generate concurrency_map in scheduler_job_runer.py, 
right?
   It should be good if we can unify same logic. But the two places have some 
differences.
   
   Code here is for task_concurrency_dep.py, which checks for single TI before 
it be queued.
   And the code for concurrency_map is for all TIs after being queued.
   So their logic is not very similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to