This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3cdf228191a073c7008f88faa0b088c6a2428012
Author: Weihua Hu <huweihua....@gmail.com>
AuthorDate: Mon Jun 26 17:08:38 2023 +0800

    [FLINK-31843][runtime] Improve performance of SlotSelectionStrategy by bulk 
select.
---
 .../jobmaster/slotpool/PhysicalSlotProvider.java   |  14 +--
 .../slotpool/PhysicalSlotProviderImpl.java         | 117 +++++++++++++--------
 .../scheduler/SimpleExecutionSlotAllocator.java    | 107 ++++++++++++-------
 .../SlotSharingExecutionSlotAllocator.java         | 107 +++++++++++++------
 .../slotpool/PhysicalSlotProviderResource.java     |   6 +-
 .../scheduler/TestingPhysicalSlotProvider.java     |  34 +++---
 6 files changed, 251 insertions(+), 134 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
index bab5693dcbc..9a4ef808ab9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
@@ -20,22 +20,24 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /** The provider serves physical slot requests. */
 public interface PhysicalSlotProvider {
 
     /**
-     * Submit a request to allocate a physical slot.
+     * Submit requests to allocate physical slots.
      *
      * <p>The physical slot can be either allocated from the slots, which are 
already available for
-     * the job, or a new one can be requeted from the resource manager.
+     * the job, or a new one can be requested from the resource manager.
      *
-     * @param physicalSlotRequest slot requirements
-     * @return a future of the allocated slot
+     * @param physicalSlotRequests physicalSlotRequest slot requirements
+     * @return futures of the allocated slots
      */
-    CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
-            PhysicalSlotRequest physicalSlotRequest);
+    Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> 
allocatePhysicalSlots(
+            Collection<PhysicalSlotRequest> physicalSlotRequests);
 
     /**
      * Cancels the slot request with the given {@link SlotRequestId}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
index 508f72bfad2..b6c80e2138c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
@@ -27,8 +27,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -52,53 +56,80 @@ public class PhysicalSlotProviderImpl implements 
PhysicalSlotProvider {
     }
 
     @Override
-    public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
-            PhysicalSlotRequest physicalSlotRequest) {
-        SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
-        SlotProfile slotProfile = physicalSlotRequest.getSlotProfile();
-        ResourceProfile resourceProfile = 
slotProfile.getPhysicalSlotResourceProfile();
-
-        LOG.debug(
-                "Received slot request [{}] with resource requirements: {}",
-                slotRequestId,
-                resourceProfile);
-
-        Optional<PhysicalSlot> availablePhysicalSlot =
-                tryAllocateFromAvailable(slotRequestId, slotProfile);
-
-        CompletableFuture<PhysicalSlot> slotFuture;
-        slotFuture =
-                availablePhysicalSlot
-                        .map(CompletableFuture::completedFuture)
-                        .orElseGet(
-                                () ->
-                                        requestNewSlot(
-                                                slotRequestId,
-                                                resourceProfile,
-                                                
slotProfile.getPreferredAllocations(),
-                                                physicalSlotRequest
-                                                        
.willSlotBeOccupiedIndefinitely()));
-
-        return slotFuture.thenApply(
-                physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, 
physicalSlot));
+    public Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> 
allocatePhysicalSlots(
+            Collection<PhysicalSlotRequest> physicalSlotRequests) {
+
+        for (PhysicalSlotRequest physicalSlotRequest : physicalSlotRequests) {
+            LOG.debug(
+                    "Received slot request [{}] with resource requirements: 
{}",
+                    physicalSlotRequest.getSlotRequestId(),
+                    
physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile());
+        }
+
+        Map<SlotRequestId, PhysicalSlotRequest> physicalSlotRequestsById =
+                physicalSlotRequests.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        PhysicalSlotRequest::getSlotRequestId,
+                                        Function.identity()));
+        Map<SlotRequestId, Optional<PhysicalSlot>> availablePhysicalSlots =
+                tryAllocateFromAvailable(physicalSlotRequestsById.values());
+
+        return availablePhysicalSlots.entrySet().stream()
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey,
+                                entry -> {
+                                    Optional<PhysicalSlot> 
availablePhysicalSlot = entry.getValue();
+                                    SlotRequestId slotRequestId = 
entry.getKey();
+                                    PhysicalSlotRequest physicalSlotRequest =
+                                            
physicalSlotRequestsById.get(slotRequestId);
+                                    SlotProfile slotProfile = 
physicalSlotRequest.getSlotProfile();
+                                    ResourceProfile resourceProfile =
+                                            
slotProfile.getPhysicalSlotResourceProfile();
+
+                                    CompletableFuture<PhysicalSlot> slotFuture 
=
+                                            availablePhysicalSlot
+                                                    
.map(CompletableFuture::completedFuture)
+                                                    .orElseGet(
+                                                            () ->
+                                                                    
requestNewSlot(
+                                                                            
slotRequestId,
+                                                                            
resourceProfile,
+                                                                            
slotProfile
+                                                                               
     .getPreferredAllocations(),
+                                                                            
physicalSlotRequest
+                                                                               
     .willSlotBeOccupiedIndefinitely()));
+
+                                    return slotFuture.thenApply(
+                                            physicalSlot ->
+                                                    new 
PhysicalSlotRequest.Result(
+                                                            slotRequestId, 
physicalSlot));
+                                }));
     }
 
-    private Optional<PhysicalSlot> tryAllocateFromAvailable(
-            SlotRequestId slotRequestId, SlotProfile slotProfile) {
+    private Map<SlotRequestId, Optional<PhysicalSlot>> 
tryAllocateFromAvailable(
+            Collection<PhysicalSlotRequest> slotRequests) {
         FreeSlotInfoTracker freeSlotInfoTracker = 
slotPool.getFreeSlotInfoTracker();
 
-        Optional<SlotSelectionStrategy.SlotInfoAndLocality> 
selectedAvailableSlot =
-                
slotSelectionStrategy.selectBestSlotForProfile(freeSlotInfoTracker, 
slotProfile);
-
-        return selectedAvailableSlot.flatMap(
-                slotInfoAndLocality -> {
-                    freeSlotInfoTracker.reserveSlot(
-                            
slotInfoAndLocality.getSlotInfo().getAllocationId());
-                    return slotPool.allocateAvailableSlot(
-                            slotRequestId,
-                            
slotInfoAndLocality.getSlotInfo().getAllocationId(),
-                            slotProfile.getPhysicalSlotResourceProfile());
-                });
+        Map<SlotRequestId, Optional<PhysicalSlot>> allocateResult = new 
HashMap<>();
+        for (PhysicalSlotRequest request : slotRequests) {
+            Optional<SlotSelectionStrategy.SlotInfoAndLocality> slot =
+                    slotSelectionStrategy.selectBestSlotForProfile(
+                            freeSlotInfoTracker, request.getSlotProfile());
+            allocateResult.put(
+                    request.getSlotRequestId(),
+                    slot.flatMap(
+                            slotInfoAndLocality -> {
+                                freeSlotInfoTracker.reserveSlot(
+                                        
slotInfoAndLocality.getSlotInfo().getAllocationId());
+                                return slotPool.allocateAvailableSlot(
+                                        request.getSlotRequestId(),
+                                        
slotInfoAndLocality.getSlotInfo().getAllocationId(),
+                                        
request.getSlotProfile().getPhysicalSlotResourceProfile());
+                            }));
+        }
+        return allocateResult;
     }
 
     private CompletableFuture<PhysicalSlot> requestNewSlot(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
index 07251ef42cf..ca2a763296c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
@@ -32,13 +32,15 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.DualKeyLinkedMap;
 import org.apache.flink.util.FlinkException;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -74,46 +76,75 @@ public class SimpleExecutionSlotAllocator implements 
ExecutionSlotAllocator {
     @Override
     public List<ExecutionSlotAssignment> allocateSlotsFor(
             List<ExecutionAttemptID> executionAttemptIds) {
-        return executionAttemptIds.stream()
-                .map(id -> new ExecutionSlotAssignment(id, 
allocateSlotFor(id)))
-                .collect(Collectors.toList());
+        List<ExecutionSlotAssignment> result = new 
ArrayList<>(executionAttemptIds.size());
+
+        Map<SlotRequestId, ExecutionAttemptID> 
remainingExecutionsToSlotRequest =
+                new HashMap<>(executionAttemptIds.size());
+        List<PhysicalSlotRequest> physicalSlotRequests =
+                new ArrayList<>(executionAttemptIds.size());
+
+        for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
+            if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) {
+                result.add(
+                        new ExecutionSlotAssignment(
+                                executionAttemptId,
+                                
requestedPhysicalSlots.getValueByKeyA(executionAttemptId)));
+            } else {
+                final SlotRequestId slotRequestId = new SlotRequestId();
+                final ResourceProfile resourceProfile =
+                        resourceProfileRetriever.apply(executionAttemptId);
+                Collection<TaskManagerLocation> preferredLocations =
+                        preferredLocationsRetriever.getPreferredLocations(
+                                executionAttemptId.getExecutionVertexId(), 
Collections.emptySet());
+                final SlotProfile slotProfile =
+                        SlotProfile.priorAllocation(
+                                resourceProfile,
+                                resourceProfile,
+                                preferredLocations,
+                                Collections.emptyList(),
+                                Collections.emptySet());
+                final PhysicalSlotRequest request =
+                        new PhysicalSlotRequest(
+                                slotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
+                physicalSlotRequests.add(request);
+                remainingExecutionsToSlotRequest.put(slotRequestId, 
executionAttemptId);
+            }
+        }
+
+        result.addAll(
+                allocatePhysicalSlotsFor(remainingExecutionsToSlotRequest, 
physicalSlotRequests));
+
+        return result;
     }
 
-    private CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID 
executionAttemptId) {
-        if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) {
-            return requestedPhysicalSlots.getValueByKeyA(executionAttemptId);
-        }
-        final SlotRequestId slotRequestId = new SlotRequestId();
-        final ResourceProfile resourceProfile = 
resourceProfileRetriever.apply(executionAttemptId);
-        Collection<TaskManagerLocation> preferredLocations =
-                preferredLocationsRetriever.getPreferredLocations(
-                        executionAttemptId.getExecutionVertexId(), 
Collections.emptySet());
-        final SlotProfile slotProfile =
-                SlotProfile.priorAllocation(
-                        resourceProfile,
-                        resourceProfile,
-                        preferredLocations,
-                        Collections.emptyList(),
-                        Collections.emptySet());
-        final PhysicalSlotRequest request =
-                new PhysicalSlotRequest(slotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
-        final CompletableFuture<LogicalSlot> slotFuture =
-                slotProvider
-                        .allocatePhysicalSlot(request)
-                        .thenApply(
-                                physicalSlotRequest ->
-                                        allocateLogicalSlotFromPhysicalSlot(
-                                                slotRequestId,
-                                                
physicalSlotRequest.getPhysicalSlot(),
-                                                
slotWillBeOccupiedIndefinitely));
-        slotFuture.exceptionally(
-                throwable -> {
-                    this.requestedPhysicalSlots.removeKeyA(executionAttemptId);
-                    this.slotProvider.cancelSlotRequest(slotRequestId, 
throwable);
-                    return null;
+    private List<ExecutionSlotAssignment> allocatePhysicalSlotsFor(
+            Map<SlotRequestId, ExecutionAttemptID> executionAttemptIds,
+            List<PhysicalSlotRequest> slotRequests) {
+        List<ExecutionSlotAssignment> allocatedSlots = new ArrayList<>();
+        Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> 
slotFutures =
+                slotProvider.allocatePhysicalSlots(slotRequests);
+
+        slotFutures.forEach(
+                (slotRequestId, slotRequestResultFuture) -> {
+                    ExecutionAttemptID executionAttemptId = 
executionAttemptIds.get(slotRequestId);
+
+                    final CompletableFuture<LogicalSlot> slotFuture =
+                            slotRequestResultFuture.thenApply(
+                                    physicalSlotRequest ->
+                                            
allocateLogicalSlotFromPhysicalSlot(
+                                                    slotRequestId,
+                                                    
physicalSlotRequest.getPhysicalSlot(),
+                                                    
slotWillBeOccupiedIndefinitely));
+                    slotFuture.exceptionally(
+                            throwable -> {
+                                
this.requestedPhysicalSlots.removeKeyA(executionAttemptId);
+                                
this.slotProvider.cancelSlotRequest(slotRequestId, throwable);
+                                return null;
+                            });
+                    requestedPhysicalSlots.put(executionAttemptId, 
slotRequestId, slotFuture);
+                    allocatedSlots.add(new 
ExecutionSlotAssignment(executionAttemptId, slotFuture));
                 });
-        this.requestedPhysicalSlots.put(executionAttemptId, slotRequestId, 
slotFuture);
-        return slotFuture;
+        return allocatedSlots;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
index f1995a7062a..9be00e497df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
@@ -37,11 +37,13 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -160,13 +162,23 @@ class SlotSharingExecutionSlotAllocator implements 
ExecutionSlotAllocator {
                         .collect(
                                 Collectors.groupingBy(
                                         
slotSharingStrategy::getExecutionSlotSharingGroup));
-        Map<ExecutionSlotSharingGroup, SharedSlot> slots =
-                executionsByGroup.keySet().stream()
-                        .map(group -> getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever))
-                        .collect(
-                                Collectors.toMap(
-                                        
SharedSlot::getExecutionSlotSharingGroup,
-                                        Function.identity()));
+
+        Map<ExecutionSlotSharingGroup, SharedSlot> slots = new 
HashMap<>(executionsByGroup.size());
+        Set<ExecutionSlotSharingGroup> groupsToAssign = new 
HashSet<>(executionsByGroup.keySet());
+
+        Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots =
+                tryAssignExistingSharedSlots(groupsToAssign);
+        slots.putAll(assignedSlots);
+        groupsToAssign.removeAll(assignedSlots.keySet());
+
+        if (!groupsToAssign.isEmpty()) {
+            Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots =
+                    allocateSharedSlots(groupsToAssign, 
sharedSlotProfileRetriever);
+            slots.putAll(allocatedSlots);
+            groupsToAssign.removeAll(allocatedSlots.keySet());
+            Preconditions.checkState(groupsToAssign.isEmpty());
+        }
+
         Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments =
                 allocateLogicalSlotsFromSharedSlots(slots, executionsByGroup);
 
@@ -225,35 +237,64 @@ class SlotSharingExecutionSlotAllocator implements 
ExecutionSlotAllocator {
         return assignments;
     }
 
-    private SharedSlot getOrAllocateSharedSlot(
-            ExecutionSlotSharingGroup executionSlotSharingGroup,
+    private Map<ExecutionSlotSharingGroup, SharedSlot> 
tryAssignExistingSharedSlots(
+            Set<ExecutionSlotSharingGroup> executionSlotSharingGroups) {
+        Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots =
+                new HashMap<>(executionSlotSharingGroups.size());
+        for (ExecutionSlotSharingGroup group : executionSlotSharingGroups) {
+            SharedSlot sharedSlot = sharedSlots.get(group);
+            if (sharedSlot != null) {
+                assignedSlots.put(group, sharedSlot);
+            }
+        }
+        return assignedSlots;
+    }
+
+    private Map<ExecutionSlotSharingGroup, SharedSlot> allocateSharedSlots(
+            Set<ExecutionSlotSharingGroup> executionSlotSharingGroups,
             SharedSlotProfileRetriever sharedSlotProfileRetriever) {
-        return sharedSlots.computeIfAbsent(
-                executionSlotSharingGroup,
-                group -> {
-                    SlotRequestId physicalSlotRequestId = new SlotRequestId();
-                    ResourceProfile physicalSlotResourceProfile =
-                            getPhysicalSlotResourceProfile(group);
-                    SlotProfile slotProfile =
-                            sharedSlotProfileRetriever.getSlotProfile(
-                                    group, physicalSlotResourceProfile);
-                    PhysicalSlotRequest physicalSlotRequest =
-                            new PhysicalSlotRequest(
-                                    physicalSlotRequestId,
-                                    slotProfile,
-                                    slotWillBeOccupiedIndefinitely);
+
+        List<PhysicalSlotRequest> slotRequests = new ArrayList<>();
+        Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots = new 
HashMap<>();
+
+        Map<SlotRequestId, ExecutionSlotSharingGroup> requestToGroup = new 
HashMap<>();
+        Map<SlotRequestId, ResourceProfile> requestToPhysicalResources = new 
HashMap<>();
+
+        for (ExecutionSlotSharingGroup group : executionSlotSharingGroups) {
+            SlotRequestId physicalSlotRequestId = new SlotRequestId();
+            ResourceProfile physicalSlotResourceProfile = 
getPhysicalSlotResourceProfile(group);
+            SlotProfile slotProfile =
+                    sharedSlotProfileRetriever.getSlotProfile(group, 
physicalSlotResourceProfile);
+            PhysicalSlotRequest request =
+                    new PhysicalSlotRequest(
+                            physicalSlotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
+            slotRequests.add(request);
+            requestToGroup.put(physicalSlotRequestId, group);
+            requestToPhysicalResources.put(physicalSlotRequestId, 
physicalSlotResourceProfile);
+        }
+
+        Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> 
allocateResult =
+                slotProvider.allocatePhysicalSlots(slotRequests);
+
+        allocateResult.forEach(
+                (slotRequestId, resultCompletableFuture) -> {
+                    ExecutionSlotSharingGroup group = 
requestToGroup.get(slotRequestId);
                     CompletableFuture<PhysicalSlot> physicalSlotFuture =
-                            slotProvider
-                                    .allocatePhysicalSlot(physicalSlotRequest)
-                                    
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-                    return new SharedSlot(
-                            physicalSlotRequestId,
-                            physicalSlotResourceProfile,
-                            group,
-                            physicalSlotFuture,
-                            slotWillBeOccupiedIndefinitely,
-                            this::releaseSharedSlot);
+                            resultCompletableFuture.thenApply(
+                                    
PhysicalSlotRequest.Result::getPhysicalSlot);
+                    SharedSlot slot =
+                            new SharedSlot(
+                                    slotRequestId,
+                                    
requestToPhysicalResources.get(slotRequestId),
+                                    group,
+                                    physicalSlotFuture,
+                                    slotWillBeOccupiedIndefinitely,
+                                    this::releaseSharedSlot);
+                    allocatedSlots.put(group, slot);
+                    Preconditions.checkState(!sharedSlots.containsKey(group));
+                    sharedSlots.put(group, slot);
                 });
+        return allocatedSlots;
     }
 
     private void releaseSharedSlot(ExecutionSlotSharingGroup 
executionSlotSharingGroup) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java
index c21c29e3a11..2275a6492e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java
@@ -29,6 +29,7 @@ import org.junit.rules.ExternalResource;
 import javax.annotation.Nonnull;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -72,7 +73,10 @@ public class PhysicalSlotProviderResource extends 
ExternalResource {
 
     public CompletableFuture<PhysicalSlotRequest.Result> 
allocateSlot(PhysicalSlotRequest request) {
         return CompletableFuture.supplyAsync(
-                        () -> 
physicalSlotProvider.allocatePhysicalSlot(request),
+                        () ->
+                                physicalSlotProvider
+                                        
.allocatePhysicalSlots(Collections.singletonList(request))
+                                        .get(request.getSlotRequestId()),
                         mainThreadExecutor)
                 .thenCompose(Function.identity());
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
index 2f870ae82ea..856bb9414e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
@@ -108,19 +108,27 @@ public class TestingPhysicalSlotProvider implements 
PhysicalSlotProvider {
     }
 
     @Override
-    public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
-            PhysicalSlotRequest physicalSlotRequest) {
-        SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
-        requests.put(slotRequestId, physicalSlotRequest);
-
-        final CompletableFuture<TestingPhysicalSlot> resultFuture =
-                physicalSlotCreator.apply(
-                        
physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile());
-
-        responses.put(slotRequestId, resultFuture);
-
-        return resultFuture.thenApply(
-                physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, 
physicalSlot));
+    public Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> 
allocatePhysicalSlots(
+            Collection<PhysicalSlotRequest> physicalSlotRequests) {
+        Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> 
result =
+                new HashMap<>(physicalSlotRequests.size());
+        for (PhysicalSlotRequest physicalSlotRequest : physicalSlotRequests) {
+            SlotRequestId slotRequestId = 
physicalSlotRequest.getSlotRequestId();
+            requests.put(slotRequestId, physicalSlotRequest);
+
+            final CompletableFuture<TestingPhysicalSlot> resultFuture =
+                    physicalSlotCreator.apply(
+                            
physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile());
+
+            responses.put(slotRequestId, resultFuture);
+
+            CompletableFuture<PhysicalSlotRequest.Result> physicalSlotFuture =
+                    resultFuture.thenApply(
+                            physicalSlot ->
+                                    new 
PhysicalSlotRequest.Result(slotRequestId, physicalSlot));
+            result.put(slotRequestId, physicalSlotFuture);
+        }
+        return result;
     }
 
     @Override

Reply via email to