This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 874b837da48 [FLINK-33390][runtime] Support slot balancing at TM level 
for Adaptive Scheduler
874b837da48 is described below

commit 874b837da48a17e4968b6ec557eb0037dcecc91f
Author: Roc Marshal <[email protected]>
AuthorDate: Mon Oct 14 13:47:26 2024 +0800

    [FLINK-33390][runtime] Support slot balancing at TM level for Adaptive 
Scheduler
---
 .../adaptive/AdaptiveSchedulerFactory.java         |  11 +-
 .../adaptive/allocator/AllocatorUtil.java          |  13 ++
 .../adaptive/allocator/DefaultSlotAssigner.java    |  39 +++---
 ...signer.java => SimpleSlotMatchingResolver.java} |  29 ++--
 .../adaptive/allocator/SlotAllocator.java          |   3 +-
 .../scheduler/adaptive/allocator/SlotAssigner.java |   4 +-
 ...SlotAssigner.java => SlotMatchingResolver.java} |  29 ++--
 .../allocator/SlotSharingSlotAllocator.java        |  33 ++++-
 ...otAssigner.java => SlotTaskExecutorWeight.java} |  30 ++--
 .../SlotsBalancedSlotMatchingResolver.java         | 147 ++++++++++++++++++++
 .../adaptive/allocator/SlotsUtilization.java       |  69 ++++++++++
 .../allocator/StateLocalitySlotAssigner.java       |   3 +-
 .../adaptive/AdaptiveSchedulerBuilder.java         |   5 +-
 .../scheduler/adaptive/LocalRecoveryTest.java      |   4 +-
 .../TaskBalancedSlotSharingResolverTest.java       |   4 +-
 .../AbstractSlotMatchingResolverTest.java          | 153 +++++++++++++++++++++
 .../allocator/DefaultSlotAssignerTest.java         |   6 +-
 .../allocator/SlotSharingSlotAllocatorTest.java    |  47 ++++---
 .../scheduler/adaptive/allocator/TestingSlot.java  |   4 +-
 .../adaptive/allocator/TestingSlotAllocator.java   |  10 +-
 20 files changed, 553 insertions(+), 90 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 9f182bf3587..a6e06aa5ac2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
@@ -123,7 +124,9 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
                         
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY),
                         jobMasterConfiguration.get(DeploymentOptions.TARGET),
                         jobMasterConfiguration.get(
-                                
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED));
+                                
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED),
+                        jobMasterConfiguration.get(
+                                
TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE));
 
         final ExecutionGraphFactory executionGraphFactory =
                 new DefaultExecutionGraphFactory(
@@ -169,13 +172,15 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
             DeclarativeSlotPool declarativeSlotPool,
             boolean localRecoveryEnabled,
             @Nullable String executionTarget,
-            boolean minimalTaskManagerPreferred) {
+            boolean minimalTaskManagerPreferred,
+            TaskManagerOptions.TaskManagerLoadBalanceMode 
taskManagerLoadBalanceMode) {
         return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
                 declarativeSlotPool::reserveFreeSlot,
                 declarativeSlotPool::freeReservedSlot,
                 declarativeSlotPool::containsFreeSlot,
                 localRecoveryEnabled,
                 executionTarget,
-                minimalTaskManagerPreferred);
+                minimalTaskManagerPreferred,
+                taskManagerLoadBalanceMode);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
index 24c81ab0fb8..8fbe44c3a1d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -54,4 +58,13 @@ class AllocatorUtil {
                 freeSlots.size(),
                 minimumRequiredSlots);
     }
+
+    static Map<ResourceID, Set<PhysicalSlot>> getSlotsPerTaskExecutor(
+            Collection<PhysicalSlot> physicalSlots) {
+        return physicalSlots.stream()
+                .collect(
+                        Collectors.groupingBy(
+                                slot -> 
slot.getTaskManagerLocation().getResourceID(),
+                                Collectors.toSet()));
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 4adcd4a090b..9a5a13f0785 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -52,20 +53,23 @@ public class DefaultSlotAssigner implements SlotAssigner {
     private final @Nullable String executionTarget;
     private final boolean minimalTaskManagerPreferred;
     private final SlotSharingResolver slotSharingResolver;
+    private final SlotMatchingResolver slotMatchingResolver;
 
     DefaultSlotAssigner(
             @Nullable String executionTarget,
             boolean minimalTaskManagerPreferred,
-            SlotSharingResolver slotSharingResolver) {
+            SlotSharingResolver slotSharingResolver,
+            SlotMatchingResolver slotMatchingResolver) {
         this.executionTarget = executionTarget;
         this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
         this.slotSharingResolver = slotSharingResolver;
+        this.slotMatchingResolver = slotMatchingResolver;
     }
 
     @Override
     public Collection<SlotAssignment> assignSlots(
             JobInformation jobInformation,
-            Collection<? extends SlotInfo> freeSlots,
+            Collection<PhysicalSlot> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations) {
         checkMinimumRequiredSlots(jobInformation, freeSlots);
@@ -74,26 +78,20 @@ public class DefaultSlotAssigner implements SlotAssigner {
                 slotSharingResolver.getExecutionSlotSharingGroups(
                         jobInformation, vertexParallelism);
 
-        final Collection<? extends SlotInfo> pickedSlots =
-                pickSlotsIfNeeded(allGroups.size(), freeSlots);
+        final Collection<PhysicalSlot> pickedSlots = 
pickSlotsIfNeeded(allGroups.size(), freeSlots);
 
-        Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
-        Collection<SlotAssignment> assignments = new ArrayList<>();
-        for (ExecutionSlotSharingGroup group : allGroups) {
-            assignments.add(new SlotAssignment(iterator.next(), group));
-        }
-        return assignments;
+        return slotMatchingResolver.matchSlotSharingGroupWithSlots(allGroups, 
pickedSlots);
     }
 
     @VisibleForTesting
-    Collection<? extends SlotInfo> pickSlotsIfNeeded(
-            int requestExecutionSlotSharingGroups, Collection<? extends 
SlotInfo> freeSlots) {
-        Collection<? extends SlotInfo> pickedSlots = freeSlots;
+    Collection<PhysicalSlot> pickSlotsIfNeeded(
+            int requestExecutionSlotSharingGroups, Collection<PhysicalSlot> 
freeSlots) {
+        Collection<PhysicalSlot> pickedSlots = freeSlots;
         if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
                 && minimalTaskManagerPreferred
                 // To avoid the sort-work loading.
                 && freeSlots.size() > requestExecutionSlotSharingGroups) {
-            final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor =
+            final Map<TaskManagerLocation, Set<PhysicalSlot>> 
slotsPerTaskExecutor =
                     getSlotsPerTaskExecutor(freeSlots);
             pickedSlots =
                     pickSlotsInMinimalTaskExecutors(
@@ -127,21 +125,20 @@ public class DefaultSlotAssigner implements SlotAssigner {
      * @param requestedGroups the number of the request execution slot sharing 
groups.
      * @return the target slots that are distributed on the minimal task 
executors.
      */
-    private Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
-            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
-            int requestedGroups) {
-        final List<SlotInfo> pickedSlots = new ArrayList<>();
+    private Collection<PhysicalSlot> pickSlotsInMinimalTaskExecutors(
+            Map<TaskManagerLocation, Set<PhysicalSlot>> slotsByTaskExecutor, 
int requestedGroups) {
+        final List<PhysicalSlot> pickedSlots = new ArrayList<>();
         final Iterator<TaskManagerLocation> sortedTaskExecutors =
                 getSortedTaskExecutors(slotsByTaskExecutor);
         while (pickedSlots.size() < requestedGroups) {
-            Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+            Set<PhysicalSlot> slotInfos = 
slotsByTaskExecutor.get(sortedTaskExecutors.next());
             pickedSlots.addAll(slotInfos);
         }
         return pickedSlots;
     }
 
-    private Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
getSlotsPerTaskExecutor(
-            Collection<? extends SlotInfo> slots) {
+    private Map<TaskManagerLocation, Set<PhysicalSlot>> 
getSlotsPerTaskExecutor(
+            Collection<PhysicalSlot> slots) {
         return slots.stream()
                 .collect(
                         Collectors.groupingBy(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SimpleSlotMatchingResolver.java
similarity index 50%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SimpleSlotMatchingResolver.java
index cb264e28d88..127e1bb38d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SimpleSlotMatchingResolver.java
@@ -17,19 +17,28 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
-import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 
-/** Interface for assigning slots to slot sharing groups. */
-@Internal
-public interface SlotAssigner {
+/** The simple slot matching resolver implementation. */
+public enum SimpleSlotMatchingResolver implements SlotMatchingResolver {
+    INSTANCE;
 
-    Collection<SlotAssignment> assignSlots(
-            JobInformation jobInformation,
-            Collection<? extends SlotInfo> freeSlots,
-            VertexParallelism vertexParallelism,
-            JobAllocationsInformation previousAllocations);
+    @Override
+    public Collection<JobSchedulingPlan.SlotAssignment> 
matchSlotSharingGroupWithSlots(
+            Collection<ExecutionSlotSharingGroup> requestGroups,
+            Collection<PhysicalSlot> freeSlots) {
+        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+        Collection<JobSchedulingPlan.SlotAssignment> assignments = new 
ArrayList<>();
+        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroup group : 
requestGroups) {
+            assignments.add(new 
JobSchedulingPlan.SlotAssignment(iterator.next(), group));
+        }
+        return assignments;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
index 629a57f5122..e5dd04eb00e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
 import org.apache.flink.runtime.util.ResourceCounter;
 
@@ -61,7 +62,7 @@ public interface SlotAllocator {
      */
     Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(
             JobInformation jobInformation,
-            Collection<? extends SlotInfo> slots,
+            Collection<PhysicalSlot> slots,
             JobAllocationsInformation jobAllocationsInformation);
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
index cb264e28d88..7a549569606 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
@@ -18,7 +18,7 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 
 import java.util.Collection;
@@ -29,7 +29,7 @@ public interface SlotAssigner {
 
     Collection<SlotAssignment> assignSlots(
             JobInformation jobInformation,
-            Collection<? extends SlotInfo> freeSlots,
+            Collection<PhysicalSlot> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotMatchingResolver.java
similarity index 50%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotMatchingResolver.java
index cb264e28d88..4e9acf9165f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotMatchingResolver.java
@@ -17,19 +17,28 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import java.util.Collection;
+import java.util.function.Supplier;
 
-/** Interface for assigning slots to slot sharing groups. */
-@Internal
-public interface SlotAssigner {
+/** The interface to define the methods for request slot matching resolver. */
+public interface SlotMatchingResolver {
 
-    Collection<SlotAssignment> assignSlots(
-            JobInformation jobInformation,
-            Collection<? extends SlotInfo> freeSlots,
-            VertexParallelism vertexParallelism,
-            JobAllocationsInformation previousAllocations);
+    Supplier<FlinkRuntimeException> NO_SLOTS_EXCEPTION_GETTER =
+            () -> new FlinkRuntimeException("No suitable slots enough.");
+
+    /**
+     * Match slots from the free slots with the given collection of requests 
execution groups.
+     *
+     * @param requestGroups the requested execution slot sharing groups.
+     * @param freeSlots the free slots.
+     * @return The assignment result.
+     */
+    Collection<SlotAssignment> matchSlotSharingGroupWithSlots(
+            Collection<ExecutionSlotSharingGroup> requestGroups,
+            Collection<PhysicalSlot> freeSlots);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 58712498224..992fe071038 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -62,6 +63,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     private final @Nullable String executionTarget;
     private final boolean minimalTaskManagerPreferred;
     private final SlotSharingResolver slotSharingResolver = 
DefaultSlotSharingResolver.INSTANCE;
+    private final SlotMatchingResolver slotMatchingResolver;
 
     private SlotSharingSlotAllocator(
             ReserveSlotFunction reserveSlot,
@@ -69,13 +71,15 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
             boolean localRecoveryEnabled,
             @Nullable String executionTarget,
-            boolean minimalTaskManagerPreferred) {
+            boolean minimalTaskManagerPreferred,
+            TaskManagerOptions.TaskManagerLoadBalanceMode 
taskManagerLoadBalanceMode) {
         this.reserveSlotFunction = reserveSlot;
         this.freeSlotFunction = freeSlotFunction;
         this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
         this.localRecoveryEnabled = localRecoveryEnabled;
         this.executionTarget = executionTarget;
         this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+        this.slotMatchingResolver = 
getSlotMatchingResolver(taskManagerLoadBalanceMode);
     }
 
     public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
@@ -84,14 +88,16 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
             boolean localRecoveryEnabled,
             @Nullable String executionTarget,
-            boolean minimalTaskManagerPreferred) {
+            boolean minimalTaskManagerPreferred,
+            TaskManagerOptions.TaskManagerLoadBalanceMode 
taskManagerLoadBalanceMode) {
         return new SlotSharingSlotAllocator(
                 reserveSlot,
                 freeSlotFunction,
                 isSlotAvailableAndFreeFunction,
                 localRecoveryEnabled,
                 executionTarget,
-                minimalTaskManagerPreferred);
+                minimalTaskManagerPreferred,
+                taskManagerLoadBalanceMode);
     }
 
     @Override
@@ -146,7 +152,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     @Override
     public Optional<JobSchedulingPlan> 
determineParallelismAndCalculateAssignment(
             JobInformation jobInformation,
-            Collection<? extends SlotInfo> slots,
+            Collection<PhysicalSlot> slots,
             JobAllocationsInformation jobAllocationsInformation) {
         return determineParallelism(jobInformation, slots)
                 .map(
@@ -157,7 +163,8 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                                             : new DefaultSlotAssigner(
                                                     executionTarget,
                                                     
minimalTaskManagerPreferred,
-                                                    slotSharingResolver);
+                                                    slotSharingResolver,
+                                                    slotMatchingResolver);
                             return new JobSchedulingPlan(
                                     parallelism,
                                     slotAssigner.assignSlots(
@@ -168,6 +175,22 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                         });
     }
 
+    private SlotMatchingResolver getSlotMatchingResolver(
+            TaskManagerOptions.TaskManagerLoadBalanceMode 
taskManagerLoadBalanceMode) {
+        switch (taskManagerLoadBalanceMode) {
+            case NONE:
+            case MIN_RESOURCES:
+                return SimpleSlotMatchingResolver.INSTANCE;
+            case SLOTS:
+                return SlotsBalancedSlotMatchingResolver.INSTANCE;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unsupported task manager load mode: %s",
+                                taskManagerLoadBalanceMode));
+        }
+    }
+
     /**
      * Distributes free slots across the slot-sharing groups of the job. Slots 
are distributed as
      * evenly as possible. If a group requires less than an even share of 
slots the remainder is
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotTaskExecutorWeight.java
similarity index 53%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotTaskExecutorWeight.java
index cb264e28d88..9b0bd549b52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotTaskExecutorWeight.java
@@ -17,19 +17,25 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 
-import java.util.Collection;
+import javax.annotation.Nonnull;
 
-/** Interface for assigning slots to slot sharing groups. */
-@Internal
-public interface SlotAssigner {
+/**
+ * Helper class to represent the slot and the loading or slots utilization 
weight info of the task
+ * executor where the slot is located at.
+ */
+class SlotTaskExecutorWeight<T> {
+    final @Nonnull T taskExecutorWeight;
+    final @Nonnull PhysicalSlot physicalSlot;
+
+    SlotTaskExecutorWeight(@Nonnull T taskExecutorWeight, @Nonnull 
PhysicalSlot physicalSlot) {
+        this.taskExecutorWeight = taskExecutorWeight;
+        this.physicalSlot = physicalSlot;
+    }
 
-    Collection<SlotAssignment> assignSlots(
-            JobInformation jobInformation,
-            Collection<? extends SlotInfo> freeSlots,
-            VertexParallelism vertexParallelism,
-            JobAllocationsInformation previousAllocations);
+    ResourceID getResourceID() {
+        return physicalSlot.getTaskManagerLocation().getResourceID();
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsBalancedSlotMatchingResolver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsBalancedSlotMatchingResolver.java
new file mode 100644
index 00000000000..1aea6baded6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsBalancedSlotMatchingResolver.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.util.CollectionUtil;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** The slots balanced request slot matching resolver implementation. */
+public enum SlotsBalancedSlotMatchingResolver implements SlotMatchingResolver {
+    INSTANCE;
+
+    @Override
+    public Collection<JobSchedulingPlan.SlotAssignment> 
matchSlotSharingGroupWithSlots(
+            Collection<ExecutionSlotSharingGroup> requestGroups,
+            Collection<PhysicalSlot> freeSlots) {
+
+        final List<SlotAssignment> slotAssignments = new 
ArrayList<>(requestGroups.size());
+        final Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor =
+                AllocatorUtil.getSlotsPerTaskExecutor(freeSlots);
+        final Map<ResourceID, SlotsUtilization> taskExecutorSlotsUtilizations =
+                getSlotsUtilizationView(slotsPerTaskExecutor);
+        final TreeMap<Double, Set<PhysicalSlot>> utilizationSlotsMap =
+                getUtilizationSlotsMap(freeSlots, 
taskExecutorSlotsUtilizations);
+
+        SlotTaskExecutorWeight<SlotsUtilization> best;
+        for (ExecutionSlotSharingGroup requestGroup : requestGroups) {
+            best = getTheBestSlotUtilization(utilizationSlotsMap, 
taskExecutorSlotsUtilizations);
+            ResourceID resourceID = best.getResourceID();
+            slotAssignments.add(new SlotAssignment(best.physicalSlot, 
requestGroup));
+            SlotsUtilization oldSlotsUtilization = 
taskExecutorSlotsUtilizations.get(resourceID);
+            // Update the references
+            final SlotsUtilization newSlotsUtilization = 
oldSlotsUtilization.incReserved(1);
+            taskExecutorSlotsUtilizations.put(resourceID, newSlotsUtilization);
+            updateSlotsPerTaskExecutor(slotsPerTaskExecutor, best);
+            Set<PhysicalSlot> slotInfos = 
slotsPerTaskExecutor.get(best.getResourceID());
+            updateUtilizationSlotsMap(utilizationSlotsMap, best, slotInfos, 
newSlotsUtilization);
+        }
+        return slotAssignments;
+    }
+
+    private Map<ResourceID, SlotsUtilization> getSlotsUtilizationView(
+            Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor) {
+        return slotsPerTaskExecutor.entrySet().stream()
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey,
+                                entry -> new 
SlotsUtilization(entry.getValue().size(), 0)));
+    }
+
+    private static void updateUtilizationSlotsMap(
+            Map<Double, Set<PhysicalSlot>> utilizationSlotsMap,
+            SlotTaskExecutorWeight<SlotsUtilization> best,
+            Set<PhysicalSlot> slotsToAdjust,
+            SlotsUtilization newSlotsUtilization) {
+        Double oldUtilization = best.taskExecutorWeight.getUtilization();
+        Double newUtilization = newSlotsUtilization.getUtilization();
+
+        Set<PhysicalSlot> physicalSlots = 
utilizationSlotsMap.get(oldUtilization);
+
+        if (Objects.nonNull(physicalSlots)) {
+            physicalSlots.remove(best.physicalSlot);
+            if (Objects.nonNull(slotsToAdjust)) {
+                physicalSlots.removeAll(slotsToAdjust);
+            }
+        }
+        if (CollectionUtil.isNullOrEmpty(physicalSlots)) {
+            utilizationSlotsMap.remove(oldUtilization);
+        }
+        if (Objects.nonNull(slotsToAdjust)) {
+            utilizationSlotsMap
+                    .computeIfAbsent(newUtilization, slotsUtilization -> new 
HashSet<>())
+                    .addAll(slotsToAdjust);
+        }
+    }
+
+    private static void updateSlotsPerTaskExecutor(
+            Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor,
+            SlotTaskExecutorWeight<SlotsUtilization> best) {
+        Set<PhysicalSlot> slotInfos = 
slotsPerTaskExecutor.get(best.getResourceID());
+        if (Objects.nonNull(slotInfos)) {
+            slotInfos.remove(best.physicalSlot);
+        }
+        if (Objects.isNull(slotInfos) || slotInfos.isEmpty()) {
+            slotsPerTaskExecutor.remove(best.getResourceID());
+        }
+    }
+
+    private static TreeMap<Double, Set<PhysicalSlot>> getUtilizationSlotsMap(
+            Collection<PhysicalSlot> slots, Map<ResourceID, SlotsUtilization> 
slotsUtilizations) {
+        return slots.stream()
+                .collect(
+                        Collectors.groupingBy(
+                                physicalSlot ->
+                                        slotsUtilizations
+                                                .get(
+                                                        physicalSlot
+                                                                
.getTaskManagerLocation()
+                                                                
.getResourceID())
+                                                .getUtilization(),
+                                TreeMap::new,
+                                Collectors.toSet()));
+    }
+
+    private static SlotTaskExecutorWeight<SlotsUtilization> 
getTheBestSlotUtilization(
+            TreeMap<Double, Set<PhysicalSlot>> slotsByUtilization,
+            Map<ResourceID, SlotsUtilization> taskExecutorSlotsUtilizations) {
+        Map.Entry<Double, Set<PhysicalSlot>> firstEntry = 
slotsByUtilization.firstEntry();
+        if (firstEntry == null
+                || firstEntry.getKey() == null
+                || CollectionUtil.isNullOrEmpty(firstEntry.getValue())) {
+            throw NO_SLOTS_EXCEPTION_GETTER.get();
+        }
+        PhysicalSlot slot = firstEntry.getValue().iterator().next();
+        ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
+        SlotsUtilization slotsUtilization = 
taskExecutorSlotsUtilizations.get(resourceID);
+        return new SlotTaskExecutorWeight<>(slotsUtilization, slot);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsUtilization.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsUtilization.java
new file mode 100644
index 00000000000..cbb540a74f5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotsUtilization.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/** Help class to represent the slots utilization info. */
+class SlotsUtilization {
+
+    private final int total;
+    private final int reserved;
+
+    SlotsUtilization(int total, int reserved) {
+        Preconditions.checkArgument(
+                total >= reserved, "The total value must be >= reserved 
value.");
+        Preconditions.checkArgument(reserved >= 0, "The reserved number must 
not be negative.");
+        this.total = total;
+        this.reserved = reserved;
+    }
+
+    SlotsUtilization incReserved(int inc) {
+        Preconditions.checkArgument(inc > 0, "The increment number must be 
greater than zero.");
+        Preconditions.checkArgument(
+                reserved + inc <= total,
+                "The increment result must be equal to or less than the total 
value.");
+        return new SlotsUtilization(total, reserved + inc);
+    }
+
+    double getUtilization() {
+        if (total == 0 && reserved == 0) {
+            return 1.0;
+        }
+        return ((double) reserved) / total;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SlotsUtilization that = (SlotsUtilization) o;
+        return total == that.total && reserved == that.reserved;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(total, reserved);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
index 14b3ef7e555..28ca994c55a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
@@ -96,7 +97,7 @@ public class StateLocalitySlotAssigner implements 
SlotAssigner {
     @Override
     public Collection<SlotAssignment> assignSlots(
             JobInformation jobInformation,
-            Collection<? extends SlotInfo> freeSlots,
+            Collection<PhysicalSlot> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations) {
         checkMinimumRequiredSlots(jobInformation, freeSlots);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 405ef99baac..e8718be0ead 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.TraceOptions;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -282,7 +283,9 @@ public class AdaptiveSchedulerBuilder {
                                 
jobMasterConfiguration.get(DeploymentOptions.TARGET),
                                 jobMasterConfiguration.get(
                                         JobManagerOptions
-                                                
.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED))
+                                                
.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED),
+                                jobMasterConfiguration.get(
+                                        
TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE))
                         : slotAllocator,
                 executorService,
                 userCodeLoader,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
index 35e2bb54db4..3f08094fe50 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -77,7 +78,8 @@ public class LocalRecoveryTest extends 
AdaptiveSchedulerTestBase {
                                 slotPool,
                                 localRecoveryEnabled,
                                 executionTarget,
-                                minimalTaskManagerPreferred),
+                                minimalTaskManagerPreferred,
+                                
TaskManagerOptions.TaskManagerLoadBalanceMode.NONE),
                         capturedAllocations);
 
         scheduler =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
index 3cd18490176..2122dd88198 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TaskBalancedSlotSharingResolverTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
@@ -65,7 +66,8 @@ class TaskBalancedSlotSharingResolverTest {
                     is_slot_free_function,
                     disable_local_recovery,
                     NULL_EXECUTION_TARGET,
-                    false);
+                    false,
+                    TaskManagerOptions.TaskManagerLoadBalanceMode.NONE);
 
     private SlotSharingGroup slotSharingGroup1;
     private SlotSharingGroup slotSharingGroup2;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
new file mode 100644
index 00000000000..5eadda12ea7
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.IntSummaryStatistics;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base testing class for the implementations of {@link 
SlotMatchingResolver}. */
+abstract class AbstractSlotMatchingResolverTest {
+
+    public static final SlotSharingGroup SLOT_SHARING_GROUP = new 
SlotSharingGroup();
+
+    protected final TaskManagerLocation tml1 = new LocalTaskManagerLocation();
+    protected final TestingSlot slot1OfTml1 = createAnySlotOf(tml1);
+    protected final TestingSlot slot2OfTml1 = createAnySlotOf(tml1);
+    protected final TestingSlot slot3OfTml1 = createAnySlotOf(tml1);
+
+    protected final TaskManagerLocation tml2 = new LocalTaskManagerLocation();
+    protected final TestingSlot slot1OfTml2 = createAnySlotOf(tml2);
+    protected final TestingSlot slot2OfTml2 = createAnySlotOf(tml2);
+    protected final TestingSlot slot3OfTml2 = createAnySlotOf(tml2);
+
+    protected final TaskManagerLocation tml3 = new LocalTaskManagerLocation();
+    protected final TestingSlot slot1OfTml3 = createAnySlotOf(tml3);
+    protected final TestingSlot slot2OfTml3 = createAnySlotOf(tml3);
+    protected final TestingSlot slot3OfTml3 = createAnySlotOf(tml3);
+
+    protected final ExecutionSlotSharingGroup requestGroup1 = createGroup(1);
+    protected final ExecutionSlotSharingGroup requestGroup2 = createGroup(2);
+    protected final ExecutionSlotSharingGroup requestGroup3 = createGroup(3);
+    protected final ExecutionSlotSharingGroup requestGroup4 = createGroup(4);
+    protected final ExecutionSlotSharingGroup requestGroup5 = createGroup(5);
+    protected final ExecutionSlotSharingGroup requestGroup6 = createGroup(6);
+    protected final ExecutionSlotSharingGroup requestGroup7 = createGroup(7);
+
+    protected final List<PhysicalSlot> freeSlots =
+            Arrays.asList(
+                    slot1OfTml1,
+                    slot2OfTml1,
+                    slot3OfTml1,
+                    slot1OfTml2,
+                    slot2OfTml2,
+                    slot3OfTml2,
+                    slot1OfTml3,
+                    slot2OfTml3,
+                    slot3OfTml3);
+
+    protected final List<ExecutionSlotSharingGroup> requestedGroups =
+            Arrays.asList(
+                    requestGroup1,
+                    requestGroup2,
+                    requestGroup3,
+                    requestGroup4,
+                    requestGroup5,
+                    requestGroup6,
+                    requestGroup7);
+
+    protected SlotMatchingResolver slotMatchingResolver;
+
+    @BeforeEach
+    protected void setUp() {
+        this.slotMatchingResolver = createSlotMatchingResolver();
+    }
+
+    protected abstract SlotMatchingResolver createSlotMatchingResolver();
+
+    protected abstract void assertAssignments(Collection<SlotAssignment> 
assignments);
+
+    @Test
+    void testMatchSlotSharingGroupWithSlots() {
+        Collection<SlotAssignment> slotAssignments =
+                
slotMatchingResolver.matchSlotSharingGroupWithSlots(requestedGroups, freeSlots);
+        assertAssignments(slotAssignments);
+    }
+
+    protected static @Nonnull Map<TaskManagerLocation, Set<SlotAssignment>>
+            getAssignmentsPerTaskManager(Collection<SlotAssignment> 
assignments) {
+        return assignments.stream()
+                .collect(
+                        Collectors.groupingBy(
+                                assignment -> 
assignment.getSlotInfo().getTaskManagerLocation(),
+                                Collectors.toSet()));
+    }
+
+    private static TestingSlot createAnySlotOf(TaskManagerLocation tml) {
+        return new TestingSlot(new AllocationID(), ResourceProfile.ANY, tml);
+    }
+
+    private static ExecutionSlotSharingGroup createGroup(int 
executionVertices) {
+        return new ExecutionSlotSharingGroup(
+                SLOT_SHARING_GROUP,
+                IntStream.range(0, executionVertices)
+                        .mapToObj(ignored -> new ExecutionVertexID(new 
JobVertexID(), 0))
+                        .collect(Collectors.toSet()));
+    }
+}
+
+/** Test for {@link SlotsBalancedSlotMatchingResolver}. */
+class SlotsBalancedSlotMatchingResolverTest extends 
AbstractSlotMatchingResolverTest {
+
+    @Override
+    protected SlotMatchingResolver createSlotMatchingResolver() {
+        return SlotsBalancedSlotMatchingResolver.INSTANCE;
+    }
+
+    @Override
+    protected void assertAssignments(Collection<SlotAssignment> assignments) {
+        Map<TaskManagerLocation, Set<SlotAssignment>> assignmentsPerTm =
+                getAssignmentsPerTaskManager(assignments);
+        IntSummaryStatistics stats =
+                
assignmentsPerTm.values().stream().collect(Collectors.summarizingInt(Set::size));
+        assertThat(stats.getMax() - stats.getMin()).isBetween(0, 1);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
index 86010f1a70f..d1b61b28237 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
@@ -69,7 +70,7 @@ class DefaultSlotAssignerTest {
     @Parameter int parallelism;
 
     @Parameter(value = 1)
-    Collection<? extends SlotInfo> freeSlots;
+    Collection<PhysicalSlot> freeSlots;
 
     @Parameter(value = 2)
     List<TaskManagerLocation> expectedTaskManagerLocations;
@@ -80,7 +81,8 @@ class DefaultSlotAssignerTest {
                 new DefaultSlotAssigner(
                         APPLICATION_MODE_EXECUTION_TARGET,
                         true,
-                        DefaultSlotSharingResolver.INSTANCE);
+                        DefaultSlotSharingResolver.INSTANCE,
+                        SimpleSlotMatchingResolver.INSTANCE);
         final Set<TaskManagerLocation> keptTaskExecutors =
                 slotAssigner.pickSlotsIfNeeded(parallelism, freeSlots).stream()
                         .map(SlotInfo::getTaskManagerLocation)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index e7ac761804e..0b74ecf2523 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation;
@@ -44,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.IntStream;
 
 import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot.getSlots;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -64,6 +65,8 @@ class SlotSharingSlotAllocatorTest {
     private static final boolean DISABLE_LOCAL_RECOVERY = false;
     private static final String NULL_EXECUTION_TARGET = null;
     private static final boolean MINIMAL_TASK_MANAGER_PREFERRED_DISABLED = 
true;
+    private static final TaskManagerOptions.TaskManagerLoadBalanceMode
+            TASK_MANAGER_LOAD_BALANCE_MODE = 
TaskManagerOptions.TaskManagerLoadBalanceMode.NONE;
 
     private static final SlotSharingGroup slotSharingGroup1 = new 
SlotSharingGroup();
     private static final SlotSharingGroup slotSharingGroup2 = new 
SlotSharingGroup();
@@ -83,7 +86,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
 
         final ResourceCounter resourceCounter =
                 slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, 
vertex2, vertex3));
@@ -104,7 +108,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -126,7 +131,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -151,7 +157,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
         final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
         final JobInformation.VertexInformation vertex11 =
                 new TestVertexInformation(new JobVertexID(), 4, 
slotSharingGroup1);
@@ -187,7 +194,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -207,7 +215,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 1, 8, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -238,7 +247,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 4, 4, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -262,7 +272,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
         SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 2, 2, 
slotSharingGroup);
@@ -286,7 +297,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 4, 10, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -319,7 +331,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
         final JobInformation.VertexInformation vertex1 =
                 new TestVertexInformation(new JobVertexID(), 4, 4, new 
SlotSharingGroup());
         final JobInformation.VertexInformation vertex2 =
@@ -352,7 +365,8 @@ class SlotSharingSlotAllocatorTest {
                         TEST_IS_SLOT_FREE_FUNCTION,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -398,7 +412,8 @@ class SlotSharingSlotAllocatorTest {
                         ignored -> false,
                         DISABLE_LOCAL_RECOVERY,
                         NULL_EXECUTION_TARGET,
-                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED);
+                        MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                        TASK_MANAGER_LOAD_BALANCE_MODE);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -453,8 +468,7 @@ class SlotSharingSlotAllocatorTest {
                                 KeyGroupRange.of(1, 100),
                                 1)));
 
-        List<SlotInfo> freeSlots = new ArrayList<>();
-        IntStream.range(0, 10).forEach(i -> freeSlots.add(new TestingSlot(new 
AllocationID())));
+        List<PhysicalSlot> freeSlots = new ArrayList<>(getSlots(10));
         freeSlots.add(new TestingSlot(allocation1));
         freeSlots.add(new TestingSlot(allocation2));
 
@@ -466,7 +480,8 @@ class SlotSharingSlotAllocatorTest {
                                 id -> false,
                                 true,
                                 NULL_EXECUTION_TARGET,
-                                MINIMAL_TASK_MANAGER_PREFERRED_DISABLED)
+                                MINIMAL_TASK_MANAGER_PREFERRED_DISABLED,
+                                TASK_MANAGER_LOAD_BALANCE_MODE)
                         .determineParallelismAndCalculateAssignment(
                                 new TestJobInformation(Arrays.asList(vertex1, 
vertex2, vertex3)),
                                 freeSlots,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
index c44a1e0c2ac..3953eef7891 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlot.java
@@ -36,7 +36,7 @@ public class TestingSlot implements PhysicalSlot {
     private final TaskManagerLocation taskManagerLocation;
 
     public TestingSlot() {
-        this(new AllocationID(), ResourceProfile.ANY);
+        this(new AllocationID());
     }
 
     public TestingSlot(AllocationID allocationId) {
@@ -55,7 +55,7 @@ public class TestingSlot implements PhysicalSlot {
         this(allocationId, resourceProfile, new LocalTaskManagerLocation());
     }
 
-    private TestingSlot(
+    public TestingSlot(
             AllocationID allocationId,
             ResourceProfile resourceProfile,
             TaskManagerLocation taskManagerLocation) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
index b0897d54f53..a4cc3e2581c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.function.TriFunction;
@@ -28,6 +29,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /** Testing implementation of {@link SlotAllocator}. */
 public class TestingSlotAllocator implements SlotAllocator {
@@ -85,7 +87,7 @@ public class TestingSlotAllocator implements SlotAllocator {
     @Override
     public Optional<JobSchedulingPlan> 
determineParallelismAndCalculateAssignment(
             JobInformation jobInformation,
-            Collection<? extends SlotInfo> slots,
+            Collection<PhysicalSlot> slots,
             JobAllocationsInformation jobAllocationsInformation) {
         return determineParallelismAndCalculateAssignmentFunction.apply(
                 jobInformation, slots, jobAllocationsInformation);
@@ -174,7 +176,11 @@ public class TestingSlotAllocator implements SlotAllocator 
{
                         (jobInformation, slotInfos, jobAllocationsInformation) 
-> {
                             capturedAllocations.add(jobAllocationsInformation);
                             return 
slotAllocator.determineParallelismAndCalculateAssignment(
-                                    jobInformation, slotInfos, 
jobAllocationsInformation);
+                                    jobInformation,
+                                    slotInfos.stream()
+                                            .map(si -> (PhysicalSlot) si)
+                                            .collect(Collectors.toList()),
+                                    jobAllocationsInformation);
                         })
                 .build();
     }

Reply via email to