mis98zb commented on code in PR #32122:
URL: https://github.com/apache/airflow/pull/32122#discussion_r1276436829


##########
tests/jobs/test_scheduler_job.py:
##########
@@ -1137,6 +1138,142 @@ def 
test_find_executable_task_instances_max_active_tis_per_dag(self, dag_maker):
         assert 1 == len(res)
         session.rollback()
 
+    @pytest.mark.parametrize(
+        
"concurrency_limit,mapped_tis_state,schedule_count,schedule_tids,queued_count,queued_tids",
+        [
+            [
+                1,
+                dict(),
+                5,
+                {"tg.dummy1"},
+                1,
+                {"tg.dummy1"},
+            ],
+            [
+                1,
+                {0: (State.RUNNING, 30, None)},
+                4,
+                {"tg.dummy1"},
+                0,
+                set(),
+            ],
+            [
+                1,
+                {0: (State.SUCCESS, 30, 15)},
+                5,
+                {"tg.dummy1", "tg.dummy2"},
+                1,
+                {"tg.dummy2"},
+            ],
+            [
+                2,
+                dict(),
+                5,
+                {"tg.dummy1"},
+                2,
+                {"tg.dummy1"},
+            ],
+            [
+                2,
+                {0: (State.RUNNING, 30, None), 1: (State.RUNNING, 30, None)},
+                3,
+                {"tg.dummy1"},
+                0,
+                set(),
+            ],
+            [
+                2,
+                {0: (State.SUCCESS, 30, 15), 1: (State.RUNNING, 30, None)},
+                4,
+                {"tg.dummy1", "tg.dummy2"},
+                1,
+                {"tg.dummy2"},
+            ],
+            [
+                2,
+                {0: (State.RUNNING, 30, None), 1: (State.SUCCESS, 30, 15)},
+                4,
+                {"tg.dummy1", "tg.dummy2"},
+                1,
+                {"tg.dummy2"},
+            ],
+            [
+                2,
+                {0: (State.SUCCESS, 30, 15), 1: (State.SUCCESS, 30, 15)},
+                5,
+                {"tg.dummy1", "tg.dummy2"},
+                2,
+                {"tg.dummy2"},
+            ],
+        ],
+    )
+    def test_find_executable_task_instances_with_task_group_concurrency_limit(
+        self,
+        dag_maker,
+        concurrency_limit: int,
+        mapped_tis_state: dict,
+        schedule_count: int,
+        schedule_tids: set,
+        queued_count: int,
+        queued_tids: set,
+    ):
+        """
+        Test if _executable_task_instances_to_queued puts the right task 
instances into the
+        mock_list.
+        """
+        with 
dag_maker(dag_id="test_find_executable_task_instances_with_task_group_concurrency_limit"):
+
+            t1 = BashOperator(task_id="dummy", bash_command="sleep 300")
+
+            @task_group(max_active_groups_per_dagrun=concurrency_limit)

Review Comment:
   Got it, I'll add the test case.
   
   Since the Operator and task group expansion in an expanded task group is not 
supported yet (I know this from 
[here](https://github.com/apache/airflow/discussions/29618#discussioncomment-5027948)),
 I'll add test cases for 2 scenarios:
   1. the expansion on outer group
   2. the expansion on inner group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to