This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 08050f8c26381fef9a9a746643e593443df977a6
Author: Roc Marshal <[email protected]>
AuthorDate: Mon Dec 2 23:47:53 2024 +0800

    [FLINK-33389][refactor] Extract the assignment logic of task to slot as 
SlotSharingResolver then refactor the implementation of the default resover.
---
 .../adaptive/allocator/AllocatorUtil.java          | 27 ------------
 .../adaptive/allocator/DefaultSlotAssigner.java    | 16 ++++---
 ...orUtil.java => DefaultSlotSharingResolver.java} | 51 +++++++++-------------
 .../adaptive/allocator/SlotSharingResolver.java    | 39 +++++++++++++++++
 .../allocator/SlotSharingSlotAllocator.java        | 31 ++++++++++---
 .../allocator/StateLocalitySlotAssigner.java       | 21 +++++----
 .../allocator/DefaultSlotAssignerTest.java         |  5 ++-
 .../allocator/SlotSharingSlotAllocatorTest.java    |  4 +-
 .../allocator/StateLocalitySlotAssignerTest.java   |  2 +-
 9 files changed, 111 insertions(+), 85 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
index d516aae2596..24c81ab0fb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
@@ -19,17 +19,10 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -61,24 +54,4 @@ class AllocatorUtil {
                 freeSlots.size(),
                 minimumRequiredSlots);
     }
-
-    static List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
-            createExecutionSlotSharingGroups(
-                    VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
-        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
-        slotSharingGroup
-                .getJobVertexIds()
-                .forEach(
-                        jobVertexId -> {
-                            int parallelism = 
vertexParallelism.getParallelism(jobVertexId);
-                            for (int subtaskIdx = 0; subtaskIdx < parallelism; 
subtaskIdx++) {
-                                sharedSlotToVertexAssignment
-                                        .computeIfAbsent(subtaskIdx, ignored 
-> new HashSet<>())
-                                        .add(new 
ExecutionVertexID(jobVertexId, subtaskIdx));
-                            }
-                        });
-        return sharedSlotToVertexAssignment.values().stream()
-                .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
-                .collect(Collectors.toList());
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 2dc0f6dac56..4adcd4a090b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
@@ -38,7 +37,6 @@ import java.util.stream.Collectors;
 
 import static java.util.function.Function.identity;
 import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
-import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
 
 /**
  * Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. Specifically,
@@ -53,10 +51,15 @@ public class DefaultSlotAssigner implements SlotAssigner {
 
     private final @Nullable String executionTarget;
     private final boolean minimalTaskManagerPreferred;
+    private final SlotSharingResolver slotSharingResolver;
 
-    DefaultSlotAssigner(@Nullable String executionTarget, boolean 
minimalTaskManagerPreferred) {
+    DefaultSlotAssigner(
+            @Nullable String executionTarget,
+            boolean minimalTaskManagerPreferred,
+            SlotSharingResolver slotSharingResolver) {
         this.executionTarget = executionTarget;
         this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+        this.slotSharingResolver = slotSharingResolver;
     }
 
     @Override
@@ -67,10 +70,9 @@ public class DefaultSlotAssigner implements SlotAssigner {
             JobAllocationsInformation previousAllocations) {
         checkMinimumRequiredSlots(jobInformation, freeSlots);
 
-        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
-        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
-            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
-        }
+        final Collection<ExecutionSlotSharingGroup> allGroups =
+                slotSharingResolver.getExecutionSlotSharingGroups(
+                        jobInformation, vertexParallelism);
 
         final Collection<? extends SlotInfo> pickedSlots =
                 pickSlotsIfNeeded(allGroups.size(), freeSlots);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotSharingResolver.java
similarity index 60%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotSharingResolver.java
index d516aae2596..d927b973719 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotSharingResolver.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,35 +32,20 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.util.Preconditions.checkState;
+/** The default implementation of the {@link SlotSharingResolver}. */
+@Internal
+public enum DefaultSlotSharingResolver implements SlotSharingResolver {
+    INSTANCE;
 
-/** The allocator util class. */
-class AllocatorUtil {
-
-    private AllocatorUtil() {}
-
-    static Map<SlotSharingGroupId, 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
-            getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
-        return 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
-    }
-
-    static int getMinimumRequiredSlots(
-            Map<SlotSharingGroupId, 
SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
-                    slotSharingGroupMetaInfos) {
-        return slotSharingGroupMetaInfos.values().stream()
-                
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
-                .reduce(0, Integer::sum);
-    }
-
-    static void checkMinimumRequiredSlots(
-            JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
-        final int minimumRequiredSlots =
-                
getMinimumRequiredSlots(getSlotSharingGroupMetaInfos(jobInformation));
-        checkState(
-                freeSlots.size() >= minimumRequiredSlots,
-                "Not enough slots to allocate all the execution slot sharing 
groups (have: %s, need: %s)",
-                freeSlots.size(),
-                minimumRequiredSlots);
+    @Override
+    public Collection<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups(
+            JobInformation jobInformation, VertexParallelism 
vertexParallelism) {
+        final List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> 
allGroups =
+                new ArrayList<>();
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+        }
+        return allGroups;
     }
 
     static List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
@@ -78,7 +64,10 @@ class AllocatorUtil {
                             }
                         });
         return sharedSlotToVertexAssignment.values().stream()
-                .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
+                .map(
+                        executionVertexIDs ->
+                                new 
SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
+                                        slotSharingGroup, executionVertexIDs))
                 .collect(Collectors.toList());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingResolver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingResolver.java
new file mode 100644
index 00000000000..dc0eb36e350
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingResolver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+
+import java.util.Collection;
+
+/**
+ * The resolver to define how to construct execution slot sharing groups from 
the execution vertices
+ * of a job with the vertices parallelisms.
+ */
+public interface SlotSharingResolver {
+    /**
+     * Get the execution slot sharing groups for the job information based on 
the vertices
+     * parallelisms.
+     *
+     * @param jobInformation the job information.
+     * @param vertexParallelism the parallelisms for the vertices of the job 
information.
+     * @return the all execution slot sharing groups for the deployment of the 
job.
+     */
+    Collection<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups(
+            JobInformation jobInformation, VertexParallelism 
vertexParallelism);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 425159cfe0f..58712498224 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -30,6 +31,7 @@ import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -59,6 +61,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     private final boolean localRecoveryEnabled;
     private final @Nullable String executionTarget;
     private final boolean minimalTaskManagerPreferred;
+    private final SlotSharingResolver slotSharingResolver = 
DefaultSlotSharingResolver.INSTANCE;
 
     private SlotSharingSlotAllocator(
             ReserveSlotFunction reserveSlot,
@@ -150,9 +153,11 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                         parallelism -> {
                             SlotAssigner slotAssigner =
                                     localRecoveryEnabled && 
!jobAllocationsInformation.isEmpty()
-                                            ? new StateLocalitySlotAssigner()
+                                            ? new 
StateLocalitySlotAssigner(slotSharingResolver)
                                             : new DefaultSlotAssigner(
-                                                    executionTarget, 
minimalTaskManagerPreferred);
+                                                    executionTarget,
+                                                    
minimalTaskManagerPreferred,
+                                                    slotSharingResolver);
                             return new JobSchedulingPlan(
                                     parallelism,
                                     slotAssigner.assignSlots(
@@ -295,18 +300,30 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                                 slotInfo.getAllocationId(), null, 
System.currentTimeMillis()));
     }
 
-    static class ExecutionSlotSharingGroup {
+    /** The execution slot sharing group for adaptive scheduler. */
+    public static class ExecutionSlotSharingGroup {
         private final String id;
+        private final SlotSharingGroup slotSharingGroup;
         private final Set<ExecutionVertexID> containedExecutionVertices;
 
-        public ExecutionSlotSharingGroup(Set<ExecutionVertexID> 
containedExecutionVertices) {
-            this(containedExecutionVertices, UUID.randomUUID().toString());
+        public ExecutionSlotSharingGroup(
+                SlotSharingGroup slotSharingGroup,
+                Set<ExecutionVertexID> containedExecutionVertices) {
+            this(UUID.randomUUID().toString(), slotSharingGroup, 
containedExecutionVertices);
         }
 
         public ExecutionSlotSharingGroup(
-                Set<ExecutionVertexID> containedExecutionVertices, String id) {
-            this.containedExecutionVertices = containedExecutionVertices;
+                String id,
+                SlotSharingGroup slotSharingGroup,
+                Set<ExecutionVertexID> containedExecutionVertices) {
             this.id = id;
+            this.slotSharingGroup = 
Preconditions.checkNotNull(slotSharingGroup);
+            this.containedExecutionVertices = containedExecutionVertices;
+        }
+
+        @VisibleForTesting
+        public SlotSharingGroup getSlotSharingGroup() {
+            return slotSharingGroup;
         }
 
         public String getId() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
index b9524627508..14b3ef7e555 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation;
@@ -36,7 +35,6 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.stream.Collectors;
@@ -44,7 +42,6 @@ import java.util.stream.Collectors;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toMap;
 import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
-import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
@@ -90,6 +87,12 @@ public class StateLocalitySlotAssigner implements 
SlotAssigner {
         }
     }
 
+    private final SlotSharingResolver slotSharingResolver;
+
+    StateLocalitySlotAssigner(SlotSharingResolver slotSharingResolver) {
+        this.slotSharingResolver = slotSharingResolver;
+    }
+
     @Override
     public Collection<SlotAssignment> assignSlots(
             JobInformation jobInformation,
@@ -98,10 +101,10 @@ public class StateLocalitySlotAssigner implements 
SlotAssigner {
             JobAllocationsInformation previousAllocations) {
         checkMinimumRequiredSlots(jobInformation, freeSlots);
 
-        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
-        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
-            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
-        }
+        final Collection<ExecutionSlotSharingGroup> allGroups =
+                slotSharingResolver.getExecutionSlotSharingGroups(
+                        jobInformation, vertexParallelism);
+
         final Map<JobVertexID, Integer> parallelism = 
getParallelism(allGroups);
         final PriorityQueue<AllocationScore> scores =
                 calculateScores(jobInformation, previousAllocations, 
allGroups, parallelism);
@@ -140,7 +143,7 @@ public class StateLocalitySlotAssigner implements 
SlotAssigner {
     private PriorityQueue<AllocationScore> calculateScores(
             JobInformation jobInformation,
             JobAllocationsInformation previousAllocations,
-            List<ExecutionSlotSharingGroup> allGroups,
+            Collection<ExecutionSlotSharingGroup> allGroups,
             Map<JobVertexID, Integer> parallelism) {
         // PQ orders the pairs (allocationID, groupID) by score, decreasing
         // the score is computed as the potential amount of state that would 
reside locally
@@ -153,7 +156,7 @@ public class StateLocalitySlotAssigner implements 
SlotAssigner {
     }
 
     private static Map<JobVertexID, Integer> getParallelism(
-            List<ExecutionSlotSharingGroup> groups) {
+            Collection<ExecutionSlotSharingGroup> groups) {
         final Map<JobVertexID, Integer> parallelism = new HashMap<>();
         for (ExecutionSlotSharingGroup group : groups) {
             for (ExecutionVertexID evi : 
group.getContainedExecutionVertices()) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
index 60a1067efb0..86010f1a70f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java
@@ -77,7 +77,10 @@ class DefaultSlotAssignerTest {
     @TestTemplate
     void testPickSlotsIfNeeded() {
         final DefaultSlotAssigner slotAssigner =
-                new DefaultSlotAssigner(APPLICATION_MODE_EXECUTION_TARGET, 
true);
+                new DefaultSlotAssigner(
+                        APPLICATION_MODE_EXECUTION_TARGET,
+                        true,
+                        DefaultSlotSharingResolver.INSTANCE);
         final Set<TaskManagerLocation> keptTaskExecutors =
                 slotAssigner.pickSlotsIfNeeded(parallelism, freeSlots).stream()
                         .map(SlotInfo::getTaskManagerLocation)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index dc5800b1719..e7ac761804e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -484,8 +484,8 @@ class SlotSharingSlotAllocatorTest {
                 set.add(id.getJobVertexId());
             }
         }
-        
assertThat(allocated.get(allocation1)).contains(vertex1.getJobVertexID());
-        
assertThat(allocated.get(allocation1)).contains(vertex2.getJobVertexID());
+        assertThat(allocated.get(allocation1))
+                .contains(vertex1.getJobVertexID(), vertex2.getJobVertexID());
         
assertThat(allocated.get(allocation2)).contains(vertex3.getJobVertexID());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
index 9be8dc41ec7..bca7c549bdb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
@@ -225,7 +225,7 @@ class StateLocalitySlotAssignerTest {
             VertexInformation vertexInformation,
             List<AllocationID> allocationIDs,
             List<VertexAllocationInformation> allocations) {
-        return new StateLocalitySlotAssigner()
+        return new 
StateLocalitySlotAssigner(DefaultSlotSharingResolver.INSTANCE)
                 .assignSlots(
                         new 
TestJobInformation(singletonList(vertexInformation)),
                         
allocationIDs.stream().map(TestingSlot::new).collect(Collectors.toList()),

Reply via email to