This is an automated email from the ASF dual-hosted git repository.
weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 874b837da48 [FLINK-33390][runtime] Support slot balancing at TM level
for Adaptive Scheduler
874b837da48 is described below
commit 874b837da48a17e4968b6ec557eb0037dcecc91f
Author: Roc Marshal <[email protected]>
AuthorDate: Mon Oct 14 13:47:26 2024 +0800
[FLINK-33390][runtime] Support slot balancing at TM level for Adaptive
Scheduler
---
.../adaptive/AdaptiveSchedulerFactory.java | 11 +-
.../adaptive/allocator/AllocatorUtil.java | 13 ++
.../adaptive/allocator/DefaultSlotAssigner.java | 39 +++---
...signer.java => SimpleSlotMatchingResolver.java} | 29 ++--
.../adaptive/allocator/SlotAllocator.java | 3 +-
.../scheduler/adaptive/allocator/SlotAssigner.java | 4 +-
...SlotAssigner.java => SlotMatchingResolver.java} | 29 ++--
.../allocator/SlotSharingSlotAllocator.java | 33 ++++-
...otAssigner.java => SlotTaskExecutorWeight.java} | 30 ++--
.../SlotsBalancedSlotMatchingResolver.java | 147 ++++++++++++++++++++
.../adaptive/allocator/SlotsUtilization.java | 69 ++++++++++
.../allocator/StateLocalitySlotAssigner.java | 3 +-
.../adaptive/AdaptiveSchedulerBuilder.java | 5 +-
.../scheduler/adaptive/LocalRecoveryTest.java | 4 +-
.../TaskBalancedSlotSharingResolverTest.java | 4 +-
.../AbstractSlotMatchingResolverTest.java | 153 +++++++++++++++++++++
.../allocator/DefaultSlotAssignerTest.java | 6 +-
.../allocator/SlotSharingSlotAllocatorTest.java | 47 ++++---
.../scheduler/adaptive/allocator/TestingSlot.java | 4 +-
.../adaptive/allocator/TestingSlotAllocator.java | 10 +-
20 files changed, 553 insertions(+), 90 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 9f182bf3587..a6e06aa5ac2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
@@ -123,7 +124,9 @@ public class AdaptiveSchedulerFactory implements
SchedulerNGFactory {
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
jobMasterConfiguration.get(DeploymentOptions.TARGET),
jobMasterConfiguration.get(
-
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED));
+
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED),
+ jobMasterConfiguration.get(
+
TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE));
final ExecutionGraphFactory executionGraphFactory =
new DefaultExecutionGraphFactory(
@@ -169,13 +172,15 @@ public class AdaptiveSchedulerFactory implements
SchedulerNGFactory {
DeclarativeSlotPool declarativeSlotPool,
boolean localRecoveryEnabled,
@Nullable String executionTarget,
- boolean minimalTaskManagerPreferred) {
+ boolean minimalTaskManagerPreferred,
+ TaskManagerOptions.TaskManagerLoadBalanceMode
taskManagerLoadBalanceMode) {
return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
declarativeSlotPool::reserveFreeSlot,
declarativeSlotPool::freeReservedSlot,
declarativeSlotPool::containsFreeSlot,
localRecoveryEnabled,
executionTarget,
- minimalTaskManagerPreferred);
+ minimalTaskManagerPreferred,
+ taskManagerLoadBalanceMode);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
index 24c81ab0fb8..8fbe44c3a1d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
@@ -18,11 +18,15 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkState;
@@ -54,4 +58,13 @@ class AllocatorUtil {
freeSlots.size(),
minimumRequiredSlots);
}
+
+ static Map<ResourceID, Set<PhysicalSlot>> getSlotsPerTaskExecutor(
+ Collection<PhysicalSlot> physicalSlots) {
+ return physicalSlots.stream()
+ .collect(
+ Collectors.groupingBy(
+ slot ->
slot.getTaskManagerLocation().getResourceID(),
+ Collectors.toSet()));
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 4adcd4a090b..9a5a13f0785 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -52,20 +53,23 @@ public class DefaultSlotAssigner implements SlotAssigner {
private final @Nullable String executionTarget;
private final boolean minimalTaskManagerPreferred;
private final SlotSharingResolver slotSharingResolver;
+ private final SlotMatchingResolver slotMatchingResolver;
DefaultSlotAssigner(
@Nullable String executionTarget,
boolean minimalTaskManagerPreferred,
- SlotSharingResolver slotSharingResolver) {
+ SlotSharingResolver slotSharingResolver,
+ SlotMatchingResolver slotMatchingResolver) {
this.executionTarget = executionTarget;
this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
this.slotSharingResolver = slotSharingResolver;
+ this.slotMatchingResolver = slotMatchingResolver;
}
@Override
public Collection<SlotAssignment> assignSlots(
JobInformation jobInformation,
- Collection<? extends SlotInfo> freeSlots,
+ Collection<PhysicalSlot> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
checkMinimumRequiredSlots(jobInformation, freeSlots);
@@ -74,26 +78,20 @@ public class DefaultSlotAssigner implements SlotAssigner {
slotSharingResolver.getExecutionSlotSharingGroups(
jobInformation, vertexParallelism);
- final Collection<? extends SlotInfo> pickedSlots =
- pickSlotsIfNeeded(allGroups.size(), freeSlots);
+ final Collection<PhysicalSlot> pickedSlots =
pickSlotsIfNeeded(allGroups.size(), freeSlots);
- Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
- Collection<SlotAssignment> assignments = new ArrayList<>();
- for (ExecutionSlotSharingGroup group : allGroups) {
- assignments.add(new SlotAssignment(iterator.next(), group));
- }
- return assignments;
+ return slotMatchingResolver.matchSlotSharingGroupWithSlots(allGroups,
pickedSlots);
}
@VisibleForTesting
- Collection<? extends SlotInfo> pickSlotsIfNeeded(
- int requestExecutionSlotSharingGroups, Collection<? extends
SlotInfo> freeSlots) {
- Collection<? extends SlotInfo> pickedSlots = freeSlots;
+ Collection<PhysicalSlot> pickSlotsIfNeeded(
+ int requestExecutionSlotSharingGroups, Collection<PhysicalSlot>
freeSlots) {
+ Collection<PhysicalSlot> pickedSlots = freeSlots;
if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
&& minimalTaskManagerPreferred
// To avoid the sort-work loading.
&& freeSlots.size() > requestExecutionSlotSharingGroups) {
- final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsPerTaskExecutor =
+ final Map<TaskManagerLocation, Set<PhysicalSlot>>
slotsPerTaskExecutor =
getSlotsPerTaskExecutor(freeSlots);
pickedSlots =
pickSlotsInMinimalTaskExecutors(
@@ -127,21 +125,20 @@ public class DefaultSlotAssigner implements SlotAssigner {
* @param requestedGroups the number of the request execution slot sharing
groups.
* @return the target slots that are distributed on the minimal task
executors.
*/
- private Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
- Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsByTaskExecutor,
- int requestedGroups) {
- final List<SlotInfo> pickedSlots = new ArrayList<>();
+ private Collection<PhysicalSlot> pickSlotsInMinimalTaskExecutors(
+ Map<TaskManagerLocation, Set<PhysicalSlot>> slotsByTaskExecutor,
int requestedGroups) {
+ final List<PhysicalSlot> pickedSlots = new ArrayList<>();
final Iterator<TaskManagerLocation> sortedTaskExecutors =
getSortedTaskExecutors(slotsByTaskExecutor);
while (pickedSlots.size() < requestedGroups) {
- Set<? extends SlotInfo> slotInfos =
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+ Set<PhysicalSlot> slotInfos =
slotsByTaskExecutor.get(sortedTaskExecutors.next());
pickedSlots.addAll(slotInfos);
}
return pickedSlots;
}
- private Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
getSlotsPerTaskExecutor(
- Collection<? extends SlotInfo> slots) {
+ private Map<TaskManagerLocation, Set<PhysicalSlot>>
getSlotsPerTaskExecutor(
+ Collection<PhysicalSlot> slots) {
return slots.stream()
.collect(
Collectors.groupingBy(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SimpleSlotMatchingResolver.java
similarity index 50%
copy from
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
copy to
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SimpleSlotMatchingResolver.java
index cb264e28d88..127e1bb38d7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SimpleSlotMatchingResolver.java
@@ -17,19 +17,28 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
-import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.jobmaster.SlotInfo;
-import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
-/** Interface for assigning slots to slot sharing groups. */
-@Internal
-public interface SlotAssigner {
+/** The simple slot matching resolver implementation. */
+public enum SimpleSlotMatchingResolver implements SlotMatchingResolver {
+ INSTANCE;
- Collection<SlotAssignment> assignSlots(
- JobInformation jobInformation,
- Collection<? extends SlotInfo> freeSlots,
- VertexParallelism vertexParallelism,
- JobAllocationsInformation previousAllocations);
+ @Override
+ public Collection<JobSchedulingPlan.SlotAssignment>
matchSlotSharingGroupWithSlots(
+ Collection<ExecutionSlotSharingGroup> requestGroups,
+ Collection<PhysicalSlot> freeSlots) {
+ Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+ Collection<JobSchedulingPlan.SlotAssignment> assignments = new
ArrayList<>();
+ for (SlotSharingSlotAllocator.ExecutionSlotSharingGroup group :
requestGroups) {
+ assignments.add(new
JobSchedulingPlan.SlotAssignment(iterator.next(), group));
+ }
+ return assignments;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
index 629a57f5122..e5dd04eb00e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.util.ResourceCounter;
@@ -61,7 +62,7 @@ public interface SlotAllocator {
*/
Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(
JobInformation jobInformation,
- Collection<? extends SlotInfo> slots,
+ Collection<PhysicalSlot> slots,
JobAllocationsInformation jobAllocationsInformation);
/**
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
index cb264e28d88..7a549569606 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
import java.util.Collection;
@@ -29,7 +29,7 @@ public interface SlotAssigner {
Collection<SlotAssignment> assignSlots(
JobInformation jobInformation,
- Collection<? extends SlotInfo> freeSlots,
+ Collection<PhysicalSlot> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotMatchingResolver.java
similarity index 50%
copy from
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
copy to
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotMatchingResolver.java
index cb264e28d88..4e9acf9165f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotMatchingResolver.java
@@ -17,19 +17,28 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.util.FlinkRuntimeException;
import java.util.Collection;
+import java.util.function.Supplier;
-/** Interface for assigning slots to slot sharing groups. */
-@Internal
-public interface SlotAssigner {
+/** The interface to define the methods for request slot matching resolver. */
+public interface SlotMatchingResolver {
- Collection<SlotAssignment> assignSlots(
- JobInformation jobInformation,
- Collection<? extends SlotInfo> freeSlots,
- VertexParallelism vertexParallelism,
- JobAllocationsInformation previousAllocations);
+ Supplier<FlinkRuntimeException> NO_SLOTS_EXCEPTION_GETTER =
+ () -> new FlinkRuntimeException("No suitable slots enough.");
+
+ /**
+ * Match slots from the free slots with the given collection of requests
execution groups.
+ *
+ * @param requestGroups the requested execution slot sharing groups.
+ * @param freeSlots the free slots.
+ * @return The assignment result.
+ */
+ Collection<SlotAssignment> matchSlotSharingGroupWithSlots(
+ Collection<ExecutionSlotSharingGroup> requestGroups,
+ Collection<PhysicalSlot> freeSlots);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 58712498224..992fe071038 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -62,6 +63,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
private final @Nullable String executionTarget;
private final boolean minimalTaskManagerPreferred;
private final SlotSharingResolver slotSharingResolver =
DefaultSlotSharingResolver.INSTANCE;
+ private final SlotMatchingResolver slotMatchingResolver;
private SlotSharingSlotAllocator(
ReserveSlotFunction reserveSlot,
@@ -69,13 +71,15 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
boolean localRecoveryEnabled,
@Nullable String executionTarget,
- boolean minimalTaskManagerPreferred) {
+ boolean minimalTaskManagerPreferred,
+ TaskManagerOptions.TaskManagerLoadBalanceMode
taskManagerLoadBalanceMode) {
this.reserveSlotFunction = reserveSlot;
this.freeSlotFunction = freeSlotFunction;
this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
this.localRecoveryEnabled = localRecoveryEnabled;
this.executionTarget = executionTarget;
this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+ this.slotMatchingResolver =
getSlotMatchingResolver(taskManagerLoadBalanceMode);
}
public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
@@ -84,14 +88,16 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
boolean localRecoveryEnabled,
@Nullable String executionTarget,
- boolean minimalTaskManagerPreferred) {
+ boolean minimalTaskManagerPreferred,
+ TaskManagerOptions.TaskManagerLoadBalanceMode
taskManagerLoadBalanceMode) {
return new SlotSharingSlotAllocator(
reserveSlot,
freeSlotFunction,
isSlotAvailableAndFreeFunction,
localRecoveryEnabled,
executionTarget,
- minimalTaskManagerPreferred);
+ minimalTaskManagerPreferred,
+ taskManagerLoadBalanceMode);
}
@Override
@@ -146,7 +152,7 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
@Override
public Optional<JobSchedulingPlan>
determineParallelismAndCalculateAssignment(
JobInformation jobInformation,
- Collection<? extends SlotInfo> slots,
+ Collection<PhysicalSlot> slots,
JobAllocationsInformation jobAllocationsInformation) {
return determineParallelism(jobInformation, slots)
.map(
@@ -157,7 +163,8 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
: new DefaultSlotAssigner(
executionTarget,
minimalTaskManagerPreferred,
- slotSharingResolver);
+ slotSharingResolver,
+ slotMatchingResolver);
return new JobSchedulingPlan(
parallelism,
slotAssigner.assignSlots(
@@ -168,6 +175,22 @@ public class SlotSharingSlotAllocator implements
SlotAllocator {
});
}
+ private SlotMatchingResolver getSlotMatchingResolver(
+ TaskManagerOptions.TaskManagerLoadBalanceMode
taskManagerLoadBalanceMode) {
+ switch (taskManagerLoadBalanceMode) {
+ case NONE:
+ case MIN_RESOURCES:
+ return SimpleSlotMatchingResolver.INSTANCE;
+ case SLOTS:
+ return SlotsBalancedSlotMatchingResolver.INSTANCE;
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported task manager load mode: %s",
+ taskManagerLoadBalanceMode));
+ }
+ }
+
/**
* Distributes free slots across the slot-sharing groups of the job. Slots
are distributed as
* evenly as possible. If a group requires less than an even share of
slots the remainder is
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotTaskExecutorWeight.java
similarity index 53%
copy from
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
copy to
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotTaskExecutorWeight.java
index cb264e28d88..9b0bd549b52 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotTaskExecutorWeight.java
@@ -17,19 +17,25 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
-import java.util.Collection;
+import javax.annotation.Nonnull;
-/** Interface for assigning slots to slot sharing groups. */
-@Internal
-public interface SlotAssigner {
+/**
+ * Helper class to represent the slot and the loading or slots utilization
weight info of the task
+ * executor where the slot is located at.
+ */
+class SlotTaskExecutorWeight<T> {
+ final @Nonnull T taskExecutorWeight;
+ final @Nonnull PhysicalSlot physicalSlot;
+
+ SlotTaskExecutorWeight(@Nonnull T taskExecutorWeight, @Nonnull
PhysicalSlot physicalSlot) {
+ this.taskExecutorWeight = taskExecutorWeight;
+ this.physicalSlot = physicalSlot;
+ }
- Collection<SlotAssignment> assignSlots(
- JobInformation jobInformation,
- Collection<? extends SlotInfo> freeSlots,
- VertexParallelism vertexParallelism,
- JobAllocationsInformation previousAllocations);
+ ResourceID getResourceID() {
+ return physicalSlot.getTaskManagerLocation().getResourceID();
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsBalancedSlotMatchingResolver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsBalancedSlotMatchingResolver.java
new file mode 100644
index 00000000000..1aea6baded6
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsBalancedSlotMatchingResolver.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.util.CollectionUtil;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** The slots balanced request slot matching resolver implementation. */
+public enum SlotsBalancedSlotMatchingResolver implements SlotMatchingResolver {
+ INSTANCE;
+
+ @Override
+ public Collection<JobSchedulingPlan.SlotAssignment>
matchSlotSharingGroupWithSlots(
+ Collection<ExecutionSlotSharingGroup> requestGroups,
+ Collection<PhysicalSlot> freeSlots) {
+
+ final List<SlotAssignment> slotAssignments = new
ArrayList<>(requestGroups.size());
+ final Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor =
+ AllocatorUtil.getSlotsPerTaskExecutor(freeSlots);
+ final Map<ResourceID, SlotsUtilization> taskExecutorSlotsUtilizations =
+ getSlotsUtilizationView(slotsPerTaskExecutor);
+ final TreeMap<Double, Set<PhysicalSlot>> utilizationSlotsMap =
+ getUtilizationSlotsMap(freeSlots,
taskExecutorSlotsUtilizations);
+
+ SlotTaskExecutorWeight<SlotsUtilization> best;
+ for (ExecutionSlotSharingGroup requestGroup : requestGroups) {
+ best = getTheBestSlotUtilization(utilizationSlotsMap,
taskExecutorSlotsUtilizations);
+ ResourceID resourceID = best.getResourceID();
+ slotAssignments.add(new SlotAssignment(best.physicalSlot,
requestGroup));
+ SlotsUtilization oldSlotsUtilization =
taskExecutorSlotsUtilizations.get(resourceID);
+ // Update the references
+ final SlotsUtilization newSlotsUtilization =
oldSlotsUtilization.incReserved(1);
+ taskExecutorSlotsUtilizations.put(resourceID, newSlotsUtilization);
+ updateSlotsPerTaskExecutor(slotsPerTaskExecutor, best);
+ Set<PhysicalSlot> slotInfos =
slotsPerTaskExecutor.get(best.getResourceID());
+ updateUtilizationSlotsMap(utilizationSlotsMap, best, slotInfos,
newSlotsUtilization);
+ }
+ return slotAssignments;
+ }
+
+ private Map<ResourceID, SlotsUtilization> getSlotsUtilizationView(
+ Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor) {
+ return slotsPerTaskExecutor.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> new
SlotsUtilization(entry.getValue().size(), 0)));
+ }
+
+ private static void updateUtilizationSlotsMap(
+ Map<Double, Set<PhysicalSlot>> utilizationSlotsMap,
+ SlotTaskExecutorWeight<SlotsUtilization> best,
+ Set<PhysicalSlot> slotsToAdjust,
+ SlotsUtilization newSlotsUtilization) {
+ Double oldUtilization = best.taskExecutorWeight.getUtilization();
+ Double newUtilization = newSlotsUtilization.getUtilization();
+
+ Set<PhysicalSlot> physicalSlots =
utilizationSlotsMap.get(oldUtilization);
+
+ if (Objects.nonNull(physicalSlots)) {
+ physicalSlots.remove(best.physicalSlot);
+ if (Objects.nonNull(slotsToAdjust)) {
+ physicalSlots.removeAll(slotsToAdjust);
+ }
+ }
+ if (CollectionUtil.isNullOrEmpty(physicalSlots)) {
+ utilizationSlotsMap.remove(oldUtilization);
+ }
+ if (Objects.nonNull(slotsToAdjust)) {
+ utilizationSlotsMap
+ .computeIfAbsent(newUtilization, slotsUtilization -> new
HashSet<>())
+ .addAll(slotsToAdjust);
+ }
+ }
+
+ private static void updateSlotsPerTaskExecutor(
+ Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor,
+ SlotTaskExecutorWeight<SlotsUtilization> best) {
+ Set<PhysicalSlot> slotInfos =
slotsPerTaskExecutor.get(best.getResourceID());
+ if (Objects.nonNull(slotInfos)) {
+ slotInfos.remove(best.physicalSlot);
+ }
+ if (Objects.isNull(slotInfos) || slotInfos.isEmpty()) {
+ slotsPerTaskExecutor.remove(best.getResourceID());
+ }
+ }
+
+ private static TreeMap<Double, Set<PhysicalSlot>> getUtilizationSlotsMap(
+ Collection<PhysicalSlot> slots, Map<ResourceID, SlotsUtilization>
slotsUtilizations) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ physicalSlot ->
+ slotsUtilizations
+ .get(
+ physicalSlot
+
.getTaskManagerLocation()
+
.getResourceID())
+ .getUtilization(),
+ TreeMap::new,
+ Collectors.toSet()));
+ }
+
+ private static SlotTaskExecutorWeight<SlotsUtilization>
getTheBestSlotUtilization(
+ TreeMap<Double, Set<PhysicalSlot>> slotsByUtilization,
+ Map<ResourceID, SlotsUtilization> taskExecutorSlotsUtilizations) {
+ Map.Entry<Double, Set<PhysicalSlot>> firstEntry =
slotsByUtilization.firstEntry();
+ if (firstEntry == null
+ || firstEntry.getKey() == null
+ || CollectionUtil.isNullOrEmpty(firstEntry.getValue())) {
+ throw NO_SLOTS_EXCEPTION_GETTER.get();
+ }
+ PhysicalSlot slot = firstEntry.getValue().iterator().next();
+ ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
+ SlotsUtilization slotsUtilization =
taskExecutorSlotsUtilizations.get(resourceID);
+ return new SlotTaskExecutorWeight<>(slotsUtilization, slot);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsUtilization.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsUtilization.java
new file mode 100644
index 00000000000..cbb540a74f5
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsUtilization.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/** Help class to represent the slots utilization info. */
+class SlotsUtilization {
+
+ private final int total;
+ private final int reserved;
+
+ SlotsUtilization(int total, int reserved) {
+ Preconditions.checkArgument(
+ total >= reserved, "The total value must be >= reserved
value.");
+ Preconditions.checkArgument(reserved >= 0, "The reserved number must
not be negative.");
+ this.total = total;
+ this.reserved = reserved;
+ }
+
+ SlotsUtilization incReserved(int inc) {
+ Preconditions.checkArgument(inc > 0, "The increment number must be
greater than zero.");
+ Preconditions.checkArgument(
+ reserved + inc <= total,
+ "The increment result must be equal to or less than the total
value.");
+ return new SlotsUtilization(total, reserved + inc);
+ }
+
+ double getUtilization() {
+ if (total == 0 && reserved == 0) {
+ return 1.0;
+ }
+ return ((double) reserved) / total;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SlotsUtilization that = (SlotsUtilization) o;
+ return total == that.total && reserved == that.reserved;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(total, reserved);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
index 14b3ef7e555..28ca994c55a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
import
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation;
import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
@@ -96,7 +97,7 @@ public class StateLocalitySlotAssigner implements
SlotAssigner {
@Override
public Collection<SlotAssignment> assignSlots(
JobInformation jobInformation,
- Collection<? extends SlotInfo> freeSlots,
+ Collection<PhysicalSlot> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
checkMinimumRequiredSlots(jobInformation, freeSlots);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 405ef99baac..e8718be0ead 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
@@ -282,7 +283,9 @@ public class AdaptiveSchedulerBuilder {
jobMasterConfiguration.get(DeploymentOptions.TARGET),
jobMasterConfiguration.get(
JobManagerOptions
-
.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED))
+
.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED),
+ jobMasterConfiguration.get(
+
TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE))
: slotAllocator,
executorService,
userCodeLoader,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
index 35e2bb54db4..3f08094fe50 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler.adaptive;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -77,7 +78,8 @@ public class LocalRecoveryTest extends
AdaptiveSchedulerTestBase {
slotPool,
localRecoveryEnabled,
executionTarget,
- minimalTaskManagerPreferred),
+ minimalTaskManagerPreferred,
+
TaskManagerOptions.TaskManagerLoadBalanceMode.NONE),
capturedAllocations);
scheduler =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
index 3cd18490176..2122dd88198 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.scheduler.adaptive;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
@@ -65,7 +66,8 @@ class TaskBalancedSlotSharingResolverTest {
is_slot_free_function,
disable_local_recovery,
NULL_EXECUTION_TARGET,
- false);
+ false,
+ TaskManagerOptions.TaskManagerLoadBalanceMode.NONE);
private SlotSharingGroup slotSharingGroup1;
private SlotSharingGroup slotSharingGroup2;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
new file mode 100644
index 00000000000..5eadda12ea7
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.IntSummaryStatistics;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base testing class for the implementations of {@link
SlotMatchingResolver}. */
+abstract class AbstractSlotMatchingResolverTest {
+
+ public static final SlotSharingGroup SLOT_SHARING_GROUP = new
SlotSharingGroup();
+
+ protected final TaskManagerLocation tml1 = new LocalTaskManagerLocation();
+ protected final TestingSlot slot1OfTml1 = createAnySlotOf(tml1);
+ protected final TestingSlot slot2OfTml1 = createAnySlotOf(tml1);
+ protected final TestingSlot slot3OfTml1 = createAnySlotOf(tml1);
+
+ protected final TaskManagerLocation tml2 = new LocalTaskManagerLocation();
+ protected final TestingSlot slot1OfTml2 = createAnySlotOf(tml2);
+ protected final TestingSlot slot2OfTml2 = createAnySlotOf(tml2);
+ protected final TestingSlot slot3OfTml2 = createAnySlotOf(tml2);
+
+ protected final TaskManagerLocation tml3 = new LocalTaskManagerLocation();
+ protected final TestingSlot slot1OfTml3 = createAnySlotOf(tml3);
+ protected final TestingSlot slot2OfTml3 = createAnySlotOf(tml3);
+ protected final TestingSlot slot3OfTml3 = createAnySlotOf(tml3);
+
+ protected final ExecutionSlotSharingGroup requestGroup1 = createGroup(1);
+ protected final ExecutionSlotSharingGroup requestGroup2 = createGroup(2);
+ protected final ExecutionSlotSharingGroup requestGroup3 = createGroup(3);
+ protected final ExecutionSlotSharingGroup requestGroup4 = createGroup(4);
+ protected final ExecutionSlotSharingGroup requestGroup5 = createGroup(5);
+ protected final ExecutionSlotSharingGroup requestGroup6 = createGroup(6);
+ protected final ExecutionSlotSharingGroup requestGroup7 = createGroup(7);
+
+ protected final List<PhysicalSlot> freeSlots =
+ Arrays.asList(
+ slot1OfTml1,
+ slot2OfTml1,
+ slot3OfTml1,
+ slot1OfTml2,
+ slot2OfTml2,
+ slot3OfTml2,
+ slot1OfTml3,
+ slot2OfTml3,
+ slot3OfTml3);
+
+ protected final List<ExecutionSlotSharingGroup> requestedGroups =
+ Arrays.asList(
+ requestGroup1,
+ requestGroup2,
+ requestGroup3,
+ requestGroup4,
+ requestGroup5,
+ requestGroup6,
+ requestGroup7);
+
+ protected SlotMatchingResolver slotMatchingResolver;
+
+ @BeforeEach
+ protected void setUp() {
+ this.slotMatchingResolver = createSlotMatchingResolver();
+ }
+
+ protected abstract SlotMatchingResolver createSlotMatchingResolver();
+
+ protected abstract void assertAssignments(Collection<SlotAssignment>
assignments);
+
+ @Test
+ void testMatchSlotSharingGroupWithSlots() {
+ Collection<SlotAssignment> slotAssignments =
+
slotMatchingResolver.matchSlotSharingGroupWithSlots(requestedGroups, freeSlots);
+ assertAssignments(slotAssignments);
+ }
+
+ protected static @Nonnull Map<TaskManagerLocation, Set<SlotAssignment>>
+ getAssignmentsPerTaskManager(Collection<SlotAssignment>
assignments) {
+ return assignments.stream()
+ .collect(
+ Collectors.groupingBy(
+ assignment ->
assignment.getSlotInfo().getTaskManagerLocation(),
+ Collectors.toSet()));
+ }
+
+ private static TestingSlot createAnySlotOf(TaskManagerLocation tml) {
+ return new TestingSlot(new AllocationID(), ResourceProfile.ANY, tml);
+ }
+
+ private static ExecutionSlotSharingGroup createGroup(int
executionVertices) {
+ return new ExecutionSlotSharingGroup(
+ SLOT_SHARING_GROUP,
+ IntStream.range(0, executionVertices)
+ .mapToObj(ignored -> new ExecutionVertexID(new
JobVertexID(), 0))
+ .collect(Collectors.toSet()));
+ }
+}
+
+/** Test for {@link SlotsBalancedSlotMatchingResolver}. */
+class SlotsBalancedSlotMatchingResolverTest extends
AbstractSlotMatchingResolverTest {
+
+ @Override
+ protected SlotMatchingResolver createSlotMatchingResolver() {
+ return SlotsBalancedSlotMatchingResolver.INSTANCE;
+ }
+
+ @Override
+ protected void assertAssignments(Collection<SlotAssignment> assignments) {
+ Map<TaskManagerLocation, Set<SlotAssignment>> assignmentsPerTm =
+ getAssignmentsPerTaskManager(assignments);
+ IntSummaryStatistics stats =
+
assignmentsPerTm.values().stream().collect(Collectors.summarizingInt(Set::size));
+ assertThat(stats.getMax() - stats.getMin()).isBetween(0, 1);
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
index 86010f1a70f..d1b61b28237 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
@@ -69,7 +70,7 @@ class DefaultSlotAssignerTest {
@Parameter int parallelism;
@Parameter(value = 1)
- Collection<? extends SlotInfo> freeSlots;
+ Collection<PhysicalSlot> freeSlots;
@Parameter(value = 2)
List<TaskManagerLocation> expectedTaskManagerLocations;
@@ -80,7 +81,8 @@ class DefaultSlotAssignerTest {
new DefaultSlotAssigner(
APPLICATION_MODE_EXECUTION_TARGET,
true,
- DefaultSlotSharingResolver.INSTANCE);
+ DefaultSlotSharingResolver.INSTANCE,
+ SimpleSlotMatchingResolver.INSTANCE);
final Set<TaskManagerLocation> keptTaskExecutors =
slotAssigner.pickSlotsIfNeeded(parallelism, freeSlots).stream()
.map(SlotInfo::getTaskManagerLocation)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index e7ac761804e..0b74ecf2523 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -17,12 +17,14 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation;
@@ -44,7 +46,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.stream.IntStream;
import static
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot.getSlots;
import static org.assertj.core.api.Assertions.assertThat;
@@ -64,6 +65,8 @@ class SlotSharingSlotAllocatorTest {
private static final boolean DISABLE_LOCAL_RECOVERY = false;
private static final String NULL_EXECUTION_TARGET = null;
private static final boolean MINIMAL_TASK_MANAGER_PREFERRED_DISABLED =
true;
+ private static final TaskManagerOptions.TaskManagerLoadBalanceMode
+ TASK_MANAGER_LOAD_BALANCE_MODE =
TaskManagerOptions.TaskManagerLoadBalanceMode.NONE;
private static final SlotSharingGroup slotSharingGroup1 = new
SlotSharingGroup();
private static final SlotSharingGroup slotSharingGroup2 = new
SlotSharingGroup();
@@ -83,7 +86,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final ResourceCounter resourceCounter =
slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1,
vertex2, vertex3));
@@ -104,7 +108,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -126,7 +131,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -151,7 +157,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
final JobInformation.VertexInformation vertex11 =
new TestVertexInformation(new JobVertexID(), 4,
slotSharingGroup1);
@@ -187,7 +194,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -207,7 +215,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 1, 8, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -238,7 +247,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 4, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -262,7 +272,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 2, 2,
slotSharingGroup);
@@ -286,7 +297,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 10, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -319,7 +331,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 4, new
SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
@@ -352,7 +365,8 @@ class SlotSharingSlotAllocatorTest {
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -398,7 +412,8 @@ class SlotSharingSlotAllocatorTest {
ignored -> false,
DISABLE_LOCAL_RECOVERY,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE);
final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2,
vertex3));
@@ -453,8 +468,7 @@ class SlotSharingSlotAllocatorTest {
KeyGroupRange.of(1, 100),
1)));
- List<SlotInfo> freeSlots = new ArrayList<>();
- IntStream.range(0, 10).forEach(i -> freeSlots.add(new TestingSlot(new
AllocationID())));
+ List<PhysicalSlot> freeSlots = new ArrayList<>(getSlots(10));
freeSlots.add(new TestingSlot(allocation1));
freeSlots.add(new TestingSlot(allocation2));
@@ -466,7 +480,8 @@ class SlotSharingSlotAllocatorTest {
id -> false,
true,
NULL_EXECUTION_TARGET,
- MINIMAL_TASK_MANAGER_PREFERRED_DISABLED)
+ MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+ TASK_MANAGER_LOAD_BALANCE_MODE)
.determineParallelismAndCalculateAssignment(
new TestJobInformation(Arrays.asList(vertex1,
vertex2, vertex3)),
freeSlots,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
index c44a1e0c2ac..3953eef7891 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
@@ -36,7 +36,7 @@ public class TestingSlot implements PhysicalSlot {
private final TaskManagerLocation taskManagerLocation;
public TestingSlot() {
- this(new AllocationID(), ResourceProfile.ANY);
+ this(new AllocationID());
}
public TestingSlot(AllocationID allocationId) {
@@ -55,7 +55,7 @@ public class TestingSlot implements PhysicalSlot {
this(allocationId, resourceProfile, new LocalTaskManagerLocation());
}
- private TestingSlot(
+ public TestingSlot(
AllocationID allocationId,
ResourceProfile resourceProfile,
TaskManagerLocation taskManagerLocation) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
index b0897d54f53..a4cc3e2581c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.function.TriFunction;
@@ -28,6 +29,7 @@ import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.stream.Collectors;
/** Testing implementation of {@link SlotAllocator}. */
public class TestingSlotAllocator implements SlotAllocator {
@@ -85,7 +87,7 @@ public class TestingSlotAllocator implements SlotAllocator {
@Override
public Optional<JobSchedulingPlan>
determineParallelismAndCalculateAssignment(
JobInformation jobInformation,
- Collection<? extends SlotInfo> slots,
+ Collection<PhysicalSlot> slots,
JobAllocationsInformation jobAllocationsInformation) {
return determineParallelismAndCalculateAssignmentFunction.apply(
jobInformation, slots, jobAllocationsInformation);
@@ -174,7 +176,11 @@ public class TestingSlotAllocator implements SlotAllocator
{
(jobInformation, slotInfos, jobAllocationsInformation)
-> {
capturedAllocations.add(jobAllocationsInformation);
return
slotAllocator.determineParallelismAndCalculateAssignment(
- jobInformation, slotInfos,
jobAllocationsInformation);
+ jobInformation,
+ slotInfos.stream()
+ .map(si -> (PhysicalSlot) si)
+ .collect(Collectors.toList()),
+ jobAllocationsInformation);
})
.build();
}