This is an automated email from the ASF dual-hosted git repository.
weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f843c121065 [FLINK-33391][runtime] Support tasks balancing at TM level
for Adaptive Scheduler.
f843c121065 is described below
commit f843c121065579e0042d55c57938c163e17ad8e6
Author: Roc Marshal <[email protected]>
AuthorDate: Thu Nov 14 08:25:00 2024 +0800
[FLINK-33391][runtime] Support tasks balancing at TM level for Adaptive
Scheduler.
---
.../adaptive/allocator/DefaultSlotAssigner.java | 2 +-
.../allocator/SlotSharingSlotAllocator.java | 35 +++++-
.../TasksBalancedSlotMatchingResolver.java | 132 +++++++++++++++++++++
.../AbstractSlotMatchingResolverTest.java | 34 ++++++
4 files changed, 199 insertions(+), 4 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 9a5a13f0785..81293ec98a4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -109,7 +109,7 @@ public class DefaultSlotAssigner implements SlotAssigner {
* @return The ordered task manager that orders by the number of free
slots descending.
*/
private Iterator<TaskManagerLocation> getSortedTaskExecutors(
- Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsPerTaskExecutor) {
+ Map<TaskManagerLocation, Set<PhysicalSlot>> slotsPerTaskExecutor) {
final Comparator<TaskManagerLocation> taskExecutorComparator =
(leftTml, rightTml) ->
Integer.compare(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 992fe071038..9ac1ecfbedd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -30,6 +30,9 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.Preconditions;
@@ -62,7 +65,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
private final boolean localRecoveryEnabled;
private final @Nullable String executionTarget;
private final boolean minimalTaskManagerPreferred;
- private final SlotSharingResolver slotSharingResolver =
DefaultSlotSharingResolver.INSTANCE;
+ private final SlotSharingResolver slotSharingResolver;
private final SlotMatchingResolver slotMatchingResolver;
private SlotSharingSlotAllocator(
@@ -79,6 +82,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
this.localRecoveryEnabled = localRecoveryEnabled;
this.executionTarget = executionTarget;
this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+ this.slotSharingResolver =
getSlotSharingResolver(taskManagerLoadBalanceMode);
this.slotMatchingResolver =
getSlotMatchingResolver(taskManagerLoadBalanceMode);
}
@@ -175,6 +179,23 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
});
}
+ private SlotSharingResolver getSlotSharingResolver(
+ TaskManagerOptions.TaskManagerLoadBalanceMode
taskManagerLoadBalanceMode) {
+ switch (taskManagerLoadBalanceMode) {
+ case NONE:
+ case MIN_RESOURCES:
+ case SLOTS:
+ return DefaultSlotSharingResolver.INSTANCE;
+ case TASKS:
+ return TaskBalancedSlotSharingResolver.INSTANCE;
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported task manager load balance mode:
%s when initializing slot sharing resolver.",
+ taskManagerLoadBalanceMode));
+ }
+ }
+
private SlotMatchingResolver getSlotMatchingResolver(
TaskManagerOptions.TaskManagerLoadBalanceMode
taskManagerLoadBalanceMode) {
switch (taskManagerLoadBalanceMode) {
@@ -183,10 +204,12 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
return SimpleSlotMatchingResolver.INSTANCE;
case SLOTS:
return SlotsBalancedSlotMatchingResolver.INSTANCE;
+ case TASKS:
+ return TasksBalancedSlotMatchingResolver.INSTANCE;
default:
throw new UnsupportedOperationException(
String.format(
- "Unsupported task manager load mode: %s",
+ "Unsupported task manager load balance mode:
%s when initializing slot matching resolver",
taskManagerLoadBalanceMode));
}
}
@@ -324,7 +347,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
}
/** The execution slot sharing group for adaptive scheduler. */
- public static class ExecutionSlotSharingGroup {
+ public static class ExecutionSlotSharingGroup implements WeightLoadable {
private final String id;
private final SlotSharingGroup slotSharingGroup;
private final Set<ExecutionVertexID> containedExecutionVertices;
@@ -356,6 +379,12 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
public Collection<ExecutionVertexID> getContainedExecutionVertices() {
return containedExecutionVertices;
}
+
+ @Nonnull
+ @Override
+ public LoadingWeight getLoading() {
+ return new DefaultLoadingWeight(containedExecutionVertices.size());
+ }
}
static class SlotSharingGroupMetaInfo {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
new file mode 100644
index 00000000000..d4ee374227e
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.util.CollectionUtil;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import static
org.apache.flink.runtime.scheduler.loading.WeightLoadable.sortByLoadingDescend;
+
+/** The tasks balanced request slot matching resolver implementation. */
+public enum TasksBalancedSlotMatchingResolver implements SlotMatchingResolver {
+ INSTANCE;
+
+ @Override
+ public Collection<JobSchedulingPlan.SlotAssignment>
matchSlotSharingGroupWithSlots(
+ Collection<ExecutionSlotSharingGroup> requestGroups,
+ Collection<PhysicalSlot> freeSlots) {
+ final List<JobSchedulingPlan.SlotAssignment> slotAssignments =
+ new ArrayList<>(requestGroups.size());
+ final Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor =
+ AllocatorUtil.getSlotsPerTaskExecutor(freeSlots);
+ final TreeMap<LoadingWeight, Set<PhysicalSlot>> loadingSlotsMap =
+ getLoadingSlotsMap(freeSlots);
+
+ SlotTaskExecutorWeight<LoadingWeight> best;
+ for (ExecutionSlotSharingGroup requestGroup :
sortByLoadingDescend(requestGroups)) {
+ best = getTheBestSlotTaskExecutorLoading(loadingSlotsMap);
+ slotAssignments.add(new SlotAssignment(best.physicalSlot,
requestGroup));
+
+ // Update the references
+ final LoadingWeight newLoading =
+ best.taskExecutorWeight.merge(requestGroup.getLoading());
+ updateSlotsPerTaskExecutor(slotsPerTaskExecutor, best);
+ Set<PhysicalSlot> physicalSlots =
slotsPerTaskExecutor.get(best.getResourceID());
+ updateLoadingSlotsMap(loadingSlotsMap, best, physicalSlots,
newLoading);
+ }
+ return slotAssignments;
+ }
+
+ private static void updateLoadingSlotsMap(
+ Map<LoadingWeight, Set<PhysicalSlot>> loadingSlotsMap,
+ SlotTaskExecutorWeight<LoadingWeight> best,
+ Set<PhysicalSlot> slotsToAdjust,
+ LoadingWeight newLoading) {
+ Set<PhysicalSlot> physicalSlots =
loadingSlotsMap.get(best.taskExecutorWeight);
+ if (!CollectionUtil.isNullOrEmpty(physicalSlots)) {
+ physicalSlots.remove(best.physicalSlot);
+ }
+ if (!CollectionUtil.isNullOrEmpty(slotsToAdjust)
+ && !CollectionUtil.isNullOrEmpty(physicalSlots)) {
+ physicalSlots.removeAll(slotsToAdjust);
+ }
+ if (CollectionUtil.isNullOrEmpty(physicalSlots)) {
+ loadingSlotsMap.remove(best.taskExecutorWeight);
+ }
+ if (!CollectionUtil.isNullOrEmpty(slotsToAdjust)) {
+ Set<PhysicalSlot> slotsOfNewKey =
+ loadingSlotsMap.computeIfAbsent(
+ newLoading,
+ ignored ->
+ CollectionUtil.newHashSetWithExpectedSize(
+ slotsToAdjust.size()));
+ slotsOfNewKey.addAll(slotsToAdjust);
+ }
+ }
+
+ private static void updateSlotsPerTaskExecutor(
+ Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor,
+ SlotTaskExecutorWeight<LoadingWeight> best) {
+ Set<PhysicalSlot> slots =
slotsPerTaskExecutor.get(best.getResourceID());
+ if (Objects.nonNull(slots)) {
+ slots.remove(best.physicalSlot);
+ }
+ if (CollectionUtil.isNullOrEmpty(slots)) {
+ slotsPerTaskExecutor.remove(best.getResourceID());
+ }
+ }
+
+ private static TreeMap<LoadingWeight, Set<PhysicalSlot>>
getLoadingSlotsMap(
+ Collection<PhysicalSlot> slots) {
+ return new TreeMap<>() {
+ {
+ HashSet<PhysicalSlot> slotsValue =
+
CollectionUtil.newHashSetWithExpectedSize(slots.size());
+ slotsValue.addAll(slots);
+ put(DefaultLoadingWeight.EMPTY, slotsValue);
+ }
+ };
+ }
+
+ private static SlotTaskExecutorWeight<LoadingWeight>
getTheBestSlotTaskExecutorLoading(
+ TreeMap<LoadingWeight, Set<PhysicalSlot>> slotsByLoading) {
+ final Map.Entry<LoadingWeight, Set<PhysicalSlot>> firstEntry =
slotsByLoading.firstEntry();
+ if (firstEntry == null
+ || firstEntry.getKey() == null
+ || CollectionUtil.isNullOrEmpty(firstEntry.getValue())) {
+ throw NO_SLOTS_EXCEPTION_GETTER.get();
+ }
+ return new SlotTaskExecutorWeight<>(
+ firstEntry.getKey(), firstEntry.getValue().iterator().next());
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
index 5eadda12ea7..e2be038d345 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
@@ -22,6 +22,8 @@ import
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -134,6 +136,38 @@ abstract class AbstractSlotMatchingResolverTest {
}
}
+/** Test for {@link TasksBalancedSlotMatchingResolver}. */
+class TasksBalancedSlotMatchingResolverTest extends
AbstractSlotMatchingResolverTest {
+
+ @Override
+ protected SlotMatchingResolver createSlotMatchingResolver() {
+ return TasksBalancedSlotMatchingResolver.INSTANCE;
+ }
+
+ @Override
+ protected void assertAssignments(Collection<SlotAssignment> assignments) {
+ Map<TaskManagerLocation, Set<SlotAssignment>> assignmentsPerTm =
+ getAssignmentsPerTaskManager(assignments);
+ assertThat(assignmentsPerTm)
+ .allSatisfy(
+ (taskManagerLocation, slotAssignments) -> {
+ assertThat(
+ slotAssignments.stream()
+ .map(
+ s ->
+
s.getTargetAs(
+
ExecutionSlotSharingGroup
+
.class)
+
.getLoading())
+ .reduce(
+
DefaultLoadingWeight.EMPTY,
+
LoadingWeight::merge)
+ .getLoading())
+ .isGreaterThanOrEqualTo(9f);
+ });
+ }
+}
+
/** Test for {@link SlotsBalancedSlotMatchingResolver}. */
class SlotsBalancedSlotMatchingResolverTest extends
AbstractSlotMatchingResolverTest {