This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca20e1d8a189e8589eb3d66ce6c5b91f1725f123
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Dec 3 00:04:06 2024 +0800

    [FLINK-33389][runtime] Support tasks balancing at slot level for Adaptive 
Scheduler
---
 .../scheduler/ExecutionSlotSharingGroup.java       |   4 +-
 .../allocator/TaskBalancedSlotSharingResolver.java |  76 ++++++
 .../TaskBalancedSlotSharingResolverTest.java       | 254 +++++++++++++++++++++
 3 files changed, 331 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
index c52ceb8aa45..7f3dc35d5ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
@@ -49,9 +48,8 @@ public class ExecutionSlotSharingGroup implements 
WeightLoadable {
         executionVertexIds.add(executionVertexId);
     }
 
-    @VisibleForTesting
     @Nonnull
-    SlotSharingGroup getSlotSharingGroup() {
+    public SlotSharingGroup getSlotSharingGroup() {
         return slotSharingGroup;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TaskBalancedSlotSharingResolver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TaskBalancedSlotSharingResolver.java
new file mode 100644
index 00000000000..7b489f05511
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TaskBalancedSlotSharingResolver.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.TaskBalancedExecutionSlotSharingGroupBuilder;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The implementation of the {@link SlotSharingResolver} based on tasks 
balanced resolver. */
+@Internal
+public enum TaskBalancedSlotSharingResolver implements SlotSharingResolver {
+    INSTANCE;
+
+    @Override
+    public List<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups(
+            JobInformation jobInformation, VertexParallelism 
vertexParallelism) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        getAllVertices(jobInformation, vertexParallelism),
+                        jobInformation.getSlotSharingGroups(),
+                        jobInformation.getCoLocationGroups())
+                .build().values().stream()
+                        .distinct()
+                        .map(
+                                fromGroup ->
+                                        new ExecutionSlotSharingGroup(
+                                                
fromGroup.getSlotSharingGroup(),
+                                                
fromGroup.getExecutionVertexIds()))
+                        .collect(Collectors.toList());
+    }
+
+    static Map<JobVertexID, List<ExecutionVertexID>> getAllVertices(
+            JobInformation jobInformation, VertexParallelism 
vertexParallelism) {
+        final Map<JobVertexID, List<ExecutionVertexID>> 
jobVertexToExecutionVertices =
+                new HashMap<>();
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            slotSharingGroup
+                    .getJobVertexIds()
+                    .forEach(
+                            jobVertexId -> {
+                                int parallelism = 
vertexParallelism.getParallelism(jobVertexId);
+                                for (int subtaskIdx = 0; subtaskIdx < 
parallelism; subtaskIdx++) {
+                                    jobVertexToExecutionVertices
+                                            .computeIfAbsent(
+                                                    jobVertexId, ignored -> 
new ArrayList<>())
+                                            .add(new 
ExecutionVertexID(jobVertexId, subtaskIdx));
+                                }
+                            });
+        }
+        return jobVertexToExecutionVertices;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
new file mode 100644
index 00000000000..3cd18490176
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.FreeSlotFunction;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.IsSlotAvailableAndFreeFunction;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.ReserveSlotFunction;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingResolver;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TaskBalancedSlotSharingResolver;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestJobInformation;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestVertexInformation;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot.getSlots;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TaskBalancedSlotSharingResolver}. */
+class TaskBalancedSlotSharingResolverTest {
+
+    private static final IsSlotAvailableAndFreeFunction is_slot_free_function 
= ignored -> true;
+    private static final FreeSlotFunction free_slot_function = (a, c, t) -> {};
+    private static final ReserveSlotFunction reserve_slot_function =
+            (allocationId, resourceProfile) ->
+                    TestingPhysicalSlot.builder()
+                            .withAllocationID(allocationId)
+                            .withResourceProfile(resourceProfile)
+                            .build();
+    private static final boolean disable_local_recovery = false;
+    private static final String NULL_EXECUTION_TARGET = null;
+    private static final SlotSharingResolver slotSharingResolver =
+            TaskBalancedSlotSharingResolver.INSTANCE;
+    private static final SlotSharingSlotAllocator slotAllocator =
+            SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
+                    reserve_slot_function,
+                    free_slot_function,
+                    is_slot_free_function,
+                    disable_local_recovery,
+                    NULL_EXECUTION_TARGET,
+                    false);
+
+    private SlotSharingGroup slotSharingGroup1;
+    private SlotSharingGroup slotSharingGroup2;
+    private TestVertexInformation.TestingCoLocationGroup coLocationGroup1;
+    private TestVertexInformation.TestingCoLocationGroup coLocationGroup2;
+
+    @BeforeEach
+    void setup() {
+        slotSharingGroup1 = new SlotSharingGroup();
+        slotSharingGroup2 = new SlotSharingGroup();
+        coLocationGroup1 = new TestVertexInformation.TestingCoLocationGroup();
+        coLocationGroup2 = new TestVertexInformation.TestingCoLocationGroup();
+    }
+
+    @Test
+    void testGetExecutionSlotSharingGroupsInOneSlotSharingGroup() {
+        final JobInformation.VertexInformation vertex1 =
+                new TestVertexInformation(1, slotSharingGroup1);
+        final JobInformation.VertexInformation vertex2 =
+                new TestVertexInformation(2, slotSharingGroup1);
+        final JobInformation.VertexInformation vertex3 =
+                new TestVertexInformation(3, slotSharingGroup1);
+        final TestJobInformation testJobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final VertexParallelism vertexParallelism =
+                getVertexParallelism(testJobInformation, getSlotsFor(vertex1, 
vertex2, vertex3));
+        final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+                executionSlotSharingGroups =
+                        slotSharingResolver.getExecutionSlotSharingGroups(
+                                testJobInformation, vertexParallelism);
+
+        assertThat(executionSlotSharingGroups).hasSize(3);
+        assertExecutionSlotSharingGroupsInSlotSharingGroupsExactlyAnyOrderOf(
+                executionSlotSharingGroups, slotSharingGroup1);
+        assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
+                slotSharingGroup1, executionSlotSharingGroups, 2, 2, 2);
+    }
+
+    @Test
+    void 
testGetExecutionSlotSharingGroupsInOneSlotSharingGroupWithCoLocationGroup() {
+        final JobInformation.VertexInformation vertex1 =
+                new TestVertexInformation(1, slotSharingGroup1, 
coLocationGroup1);
+        final JobInformation.VertexInformation vertex2 =
+                new TestVertexInformation(2, slotSharingGroup1, 
coLocationGroup1);
+        final JobInformation.VertexInformation vertex3 =
+                new TestVertexInformation(3, slotSharingGroup1);
+        final TestJobInformation testJobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final VertexParallelism vertexParallelism =
+                getVertexParallelism(testJobInformation, getSlotsFor(vertex1, 
vertex2, vertex3));
+        final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+                executionSlotSharingGroups =
+                        slotSharingResolver.getExecutionSlotSharingGroups(
+                                testJobInformation, vertexParallelism);
+
+        assertThat(executionSlotSharingGroups).hasSize(3);
+        assertExecutionSlotSharingGroupsInSlotSharingGroupsExactlyAnyOrderOf(
+                executionSlotSharingGroups, slotSharingGroup1);
+        assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
+                slotSharingGroup1, executionSlotSharingGroups, 1, 2, 3);
+    }
+
+    @Test
+    void testGetExecutionSlotSharingGroupsInMultiSlotSharingGroups() {
+        final JobInformation.VertexInformation vertex1 =
+                new TestVertexInformation(1, slotSharingGroup1);
+        final JobInformation.VertexInformation vertex2 =
+                new TestVertexInformation(2, slotSharingGroup1);
+        final JobInformation.VertexInformation vertex3 =
+                new TestVertexInformation(3, slotSharingGroup1);
+
+        final JobInformation.VertexInformation vertex4 =
+                new TestVertexInformation(1, slotSharingGroup2);
+        final JobInformation.VertexInformation vertex5 =
+                new TestVertexInformation(2, slotSharingGroup2);
+
+        final TestJobInformation testJobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3, vertex4, vertex5));
+
+        final VertexParallelism vertexParallelism =
+                getVertexParallelism(
+                        testJobInformation,
+                        getSlotsFor(vertex1, vertex2, vertex3, vertex4, 
vertex5));
+        final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+                executionSlotSharingGroups =
+                        slotSharingResolver.getExecutionSlotSharingGroups(
+                                testJobInformation, vertexParallelism);
+
+        assertThat(executionSlotSharingGroups).hasSize(5);
+        assertExecutionSlotSharingGroupsInSlotSharingGroupsExactlyAnyOrderOf(
+                executionSlotSharingGroups, slotSharingGroup1, 
slotSharingGroup2);
+        assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
+                slotSharingGroup1, executionSlotSharingGroups, 2, 2, 2);
+        assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
+                slotSharingGroup2, executionSlotSharingGroups, 1, 2);
+    }
+
+    @Test
+    void 
testGetExecutionSlotSharingGroupsInMultiSlotSharingGroupsWithCoLocationGroups() 
{
+        final JobInformation.VertexInformation vertex1 =
+                new TestVertexInformation(1, slotSharingGroup1, 
coLocationGroup1);
+        final JobInformation.VertexInformation vertex2 =
+                new TestVertexInformation(2, slotSharingGroup1, 
coLocationGroup1);
+        final JobInformation.VertexInformation vertex3 =
+                new TestVertexInformation(3, slotSharingGroup1);
+
+        final JobInformation.VertexInformation vertex4 =
+                new TestVertexInformation(1, slotSharingGroup2, 
coLocationGroup2);
+        final JobInformation.VertexInformation vertex5 =
+                new TestVertexInformation(3, slotSharingGroup2);
+        final JobInformation.VertexInformation vertex6 =
+                new TestVertexInformation(2, slotSharingGroup2, 
coLocationGroup2);
+        final JobInformation.VertexInformation vertex7 =
+                new TestVertexInformation(1, slotSharingGroup2);
+
+        final TestJobInformation testJobInformation =
+                new TestJobInformation(
+                        Arrays.asList(
+                                vertex1, vertex2, vertex3, vertex4, vertex5, 
vertex6, vertex7));
+
+        final VertexParallelism vertexParallelism =
+                getVertexParallelism(
+                        testJobInformation,
+                        getSlotsFor(vertex1, vertex2, vertex3, vertex4, 
vertex5, vertex6, vertex7));
+        final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+                executionSlotSharingGroups =
+                        slotSharingResolver.getExecutionSlotSharingGroups(
+                                testJobInformation, vertexParallelism);
+
+        assertThat(executionSlotSharingGroups).hasSize(6);
+        assertExecutionSlotSharingGroupsInSlotSharingGroupsExactlyAnyOrderOf(
+                executionSlotSharingGroups, slotSharingGroup1, 
slotSharingGroup2);
+        assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
+                slotSharingGroup1, executionSlotSharingGroups, 1, 2, 3);
+        assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
+                slotSharingGroup2, executionSlotSharingGroups, 2, 2, 3);
+    }
+
+    private Collection<PhysicalSlot> getSlotsFor(
+            JobInformation.VertexInformation... verticesInformation) {
+        if (verticesInformation == null || verticesInformation.length < 1) {
+            return Collections.emptyList();
+        }
+        int slots =
+                Arrays.stream(verticesInformation)
+                        .map(JobInformation.VertexInformation::getParallelism)
+                        .reduce(0, Integer::sum);
+        return getSlots(slots);
+    }
+
+    private VertexParallelism getVertexParallelism(
+            JobInformation jobInformation, Collection<PhysicalSlot> slots) {
+        return slotAllocator
+                .determineParallelism(jobInformation, slots)
+                .orElse(VertexParallelism.empty());
+    }
+
+    private static void 
assertExecutionSlotSharingGroupsInSlotSharingGroupsExactlyAnyOrderOf(
+            Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+                    executionSlotSharingGroups,
+            SlotSharingGroup... slotSharingGroups) {
+        assertThat(
+                        executionSlotSharingGroups.stream()
+                                .map(
+                                        
SlotSharingSlotAllocator.ExecutionSlotSharingGroup
+                                                ::getSlotSharingGroup)
+                                .collect(Collectors.toSet()))
+                .containsExactlyInAnyOrder(slotSharingGroups);
+    }
+
+    private static void 
assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
+            SlotSharingGroup slotSharingGroup,
+            Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
+                    executionSlotSharingGroups,
+            Integer... numbers) {
+        assertThat(
+                        executionSlotSharingGroups.stream()
+                                .filter(essg -> 
essg.getSlotSharingGroup().equals(slotSharingGroup))
+                                .map(essg -> 
essg.getContainedExecutionVertices().size())
+                                .sorted()
+                                .collect(Collectors.toList()))
+                .containsExactly(numbers);
+    }
+}

Reply via email to