This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f843c121065 [FLINK-33391][runtime] Support tasks balancing at TM level 
for Adaptive Scheduler.
f843c121065 is described below

commit f843c121065579e0042d55c57938c163e17ad8e6
Author: Roc Marshal <[email protected]>
AuthorDate: Thu Nov 14 08:25:00 2024 +0800

    [FLINK-33391][runtime] Support tasks balancing at TM level for Adaptive 
Scheduler.
---
 .../adaptive/allocator/DefaultSlotAssigner.java    |   2 +-
 .../allocator/SlotSharingSlotAllocator.java        |  35 +++++-
 .../TasksBalancedSlotMatchingResolver.java         | 132 +++++++++++++++++++++
 .../AbstractSlotMatchingResolverTest.java          |  34 ++++++
 4 files changed, 199 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 9a5a13f0785..81293ec98a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -109,7 +109,7 @@ public class DefaultSlotAssigner implements SlotAssigner {
      * @return The ordered task manager that orders by the number of free 
slots descending.
      */
     private Iterator<TaskManagerLocation> getSortedTaskExecutors(
-            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor) {
+            Map<TaskManagerLocation, Set<PhysicalSlot>> slotsPerTaskExecutor) {
         final Comparator<TaskManagerLocation> taskExecutorComparator =
                 (leftTml, rightTml) ->
                         Integer.compare(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 992fe071038..9ac1ecfbedd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -30,6 +30,9 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.Preconditions;
@@ -62,7 +65,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     private final boolean localRecoveryEnabled;
     private final @Nullable String executionTarget;
     private final boolean minimalTaskManagerPreferred;
-    private final SlotSharingResolver slotSharingResolver = 
DefaultSlotSharingResolver.INSTANCE;
+    private final SlotSharingResolver slotSharingResolver;
     private final SlotMatchingResolver slotMatchingResolver;
 
     private SlotSharingSlotAllocator(
@@ -79,6 +82,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         this.localRecoveryEnabled = localRecoveryEnabled;
         this.executionTarget = executionTarget;
         this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+        this.slotSharingResolver = 
getSlotSharingResolver(taskManagerLoadBalanceMode);
         this.slotMatchingResolver = 
getSlotMatchingResolver(taskManagerLoadBalanceMode);
     }
 
@@ -175,6 +179,23 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                         });
     }
 
+    private SlotSharingResolver getSlotSharingResolver(
+            TaskManagerOptions.TaskManagerLoadBalanceMode 
taskManagerLoadBalanceMode) {
+        switch (taskManagerLoadBalanceMode) {
+            case NONE:
+            case MIN_RESOURCES:
+            case SLOTS:
+                return DefaultSlotSharingResolver.INSTANCE;
+            case TASKS:
+                return TaskBalancedSlotSharingResolver.INSTANCE;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unsupported task manager load balance mode: 
%s when initializing slot sharing resolver.",
+                                taskManagerLoadBalanceMode));
+        }
+    }
+
     private SlotMatchingResolver getSlotMatchingResolver(
             TaskManagerOptions.TaskManagerLoadBalanceMode 
taskManagerLoadBalanceMode) {
         switch (taskManagerLoadBalanceMode) {
@@ -183,10 +204,12 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                 return SimpleSlotMatchingResolver.INSTANCE;
             case SLOTS:
                 return SlotsBalancedSlotMatchingResolver.INSTANCE;
+            case TASKS:
+                return TasksBalancedSlotMatchingResolver.INSTANCE;
             default:
                 throw new UnsupportedOperationException(
                         String.format(
-                                "Unsupported task manager load mode: %s",
+                                "Unsupported task manager load balance mode: 
%s when initializing slot matching resolver",
                                 taskManagerLoadBalanceMode));
         }
     }
@@ -324,7 +347,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     }
 
     /** The execution slot sharing group for adaptive scheduler. */
-    public static class ExecutionSlotSharingGroup {
+    public static class ExecutionSlotSharingGroup implements WeightLoadable {
         private final String id;
         private final SlotSharingGroup slotSharingGroup;
         private final Set<ExecutionVertexID> containedExecutionVertices;
@@ -356,6 +379,12 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         public Collection<ExecutionVertexID> getContainedExecutionVertices() {
             return containedExecutionVertices;
         }
+
+        @Nonnull
+        @Override
+        public LoadingWeight getLoading() {
+            return new DefaultLoadingWeight(containedExecutionVertices.size());
+        }
     }
 
     static class SlotSharingGroupMetaInfo {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
new file mode 100644
index 00000000000..d4ee374227e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.util.CollectionUtil;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import static 
org.apache.flink.runtime.scheduler.loading.WeightLoadable.sortByLoadingDescend;
+
+/** The tasks balanced request slot matching resolver implementation. */
+public enum TasksBalancedSlotMatchingResolver implements SlotMatchingResolver {
+    INSTANCE;
+
+    @Override
+    public Collection<JobSchedulingPlan.SlotAssignment> 
matchSlotSharingGroupWithSlots(
+            Collection<ExecutionSlotSharingGroup> requestGroups,
+            Collection<PhysicalSlot> freeSlots) {
+        final List<JobSchedulingPlan.SlotAssignment> slotAssignments =
+                new ArrayList<>(requestGroups.size());
+        final Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor =
+                AllocatorUtil.getSlotsPerTaskExecutor(freeSlots);
+        final TreeMap<LoadingWeight, Set<PhysicalSlot>> loadingSlotsMap =
+                getLoadingSlotsMap(freeSlots);
+
+        SlotTaskExecutorWeight<LoadingWeight> best;
+        for (ExecutionSlotSharingGroup requestGroup : 
sortByLoadingDescend(requestGroups)) {
+            best = getTheBestSlotTaskExecutorLoading(loadingSlotsMap);
+            slotAssignments.add(new SlotAssignment(best.physicalSlot, 
requestGroup));
+
+            // Update the references
+            final LoadingWeight newLoading =
+                    best.taskExecutorWeight.merge(requestGroup.getLoading());
+            updateSlotsPerTaskExecutor(slotsPerTaskExecutor, best);
+            Set<PhysicalSlot> physicalSlots = 
slotsPerTaskExecutor.get(best.getResourceID());
+            updateLoadingSlotsMap(loadingSlotsMap, best, physicalSlots, 
newLoading);
+        }
+        return slotAssignments;
+    }
+
+    private static void updateLoadingSlotsMap(
+            Map<LoadingWeight, Set<PhysicalSlot>> loadingSlotsMap,
+            SlotTaskExecutorWeight<LoadingWeight> best,
+            Set<PhysicalSlot> slotsToAdjust,
+            LoadingWeight newLoading) {
+        Set<PhysicalSlot> physicalSlots = 
loadingSlotsMap.get(best.taskExecutorWeight);
+        if (!CollectionUtil.isNullOrEmpty(physicalSlots)) {
+            physicalSlots.remove(best.physicalSlot);
+        }
+        if (!CollectionUtil.isNullOrEmpty(slotsToAdjust)
+                && !CollectionUtil.isNullOrEmpty(physicalSlots)) {
+            physicalSlots.removeAll(slotsToAdjust);
+        }
+        if (CollectionUtil.isNullOrEmpty(physicalSlots)) {
+            loadingSlotsMap.remove(best.taskExecutorWeight);
+        }
+        if (!CollectionUtil.isNullOrEmpty(slotsToAdjust)) {
+            Set<PhysicalSlot> slotsOfNewKey =
+                    loadingSlotsMap.computeIfAbsent(
+                            newLoading,
+                            ignored ->
+                                    CollectionUtil.newHashSetWithExpectedSize(
+                                            slotsToAdjust.size()));
+            slotsOfNewKey.addAll(slotsToAdjust);
+        }
+    }
+
+    private static void updateSlotsPerTaskExecutor(
+            Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor,
+            SlotTaskExecutorWeight<LoadingWeight> best) {
+        Set<PhysicalSlot> slots = 
slotsPerTaskExecutor.get(best.getResourceID());
+        if (Objects.nonNull(slots)) {
+            slots.remove(best.physicalSlot);
+        }
+        if (CollectionUtil.isNullOrEmpty(slots)) {
+            slotsPerTaskExecutor.remove(best.getResourceID());
+        }
+    }
+
+    private static TreeMap<LoadingWeight, Set<PhysicalSlot>> 
getLoadingSlotsMap(
+            Collection<PhysicalSlot> slots) {
+        return new TreeMap<>() {
+            {
+                HashSet<PhysicalSlot> slotsValue =
+                        
CollectionUtil.newHashSetWithExpectedSize(slots.size());
+                slotsValue.addAll(slots);
+                put(DefaultLoadingWeight.EMPTY, slotsValue);
+            }
+        };
+    }
+
+    private static SlotTaskExecutorWeight<LoadingWeight> 
getTheBestSlotTaskExecutorLoading(
+            TreeMap<LoadingWeight, Set<PhysicalSlot>> slotsByLoading) {
+        final Map.Entry<LoadingWeight, Set<PhysicalSlot>> firstEntry = 
slotsByLoading.firstEntry();
+        if (firstEntry == null
+                || firstEntry.getKey() == null
+                || CollectionUtil.isNullOrEmpty(firstEntry.getValue())) {
+            throw NO_SLOTS_EXCEPTION_GETTER.get();
+        }
+        return new SlotTaskExecutorWeight<>(
+                firstEntry.getKey(), firstEntry.getValue().iterator().next());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
index 5eadda12ea7..e2be038d345 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -134,6 +136,38 @@ abstract class AbstractSlotMatchingResolverTest {
     }
 }
 
+/** Test for {@link TasksBalancedSlotMatchingResolver}. */
+class TasksBalancedSlotMatchingResolverTest extends 
AbstractSlotMatchingResolverTest {
+
+    @Override
+    protected SlotMatchingResolver createSlotMatchingResolver() {
+        return TasksBalancedSlotMatchingResolver.INSTANCE;
+    }
+
+    @Override
+    protected void assertAssignments(Collection<SlotAssignment> assignments) {
+        Map<TaskManagerLocation, Set<SlotAssignment>> assignmentsPerTm =
+                getAssignmentsPerTaskManager(assignments);
+        assertThat(assignmentsPerTm)
+                .allSatisfy(
+                        (taskManagerLocation, slotAssignments) -> {
+                            assertThat(
+                                            slotAssignments.stream()
+                                                    .map(
+                                                            s ->
+                                                                    
s.getTargetAs(
+                                                                               
     ExecutionSlotSharingGroup
+                                                                               
             .class)
+                                                                            
.getLoading())
+                                                    .reduce(
+                                                            
DefaultLoadingWeight.EMPTY,
+                                                            
LoadingWeight::merge)
+                                                    .getLoading())
+                                    .isGreaterThanOrEqualTo(9f);
+                        });
+    }
+}
+
 /** Test for {@link SlotsBalancedSlotMatchingResolver}. */
 class SlotsBalancedSlotMatchingResolverTest extends 
AbstractSlotMatchingResolverTest {
 

Reply via email to