This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3cdf228191a073c7008f88faa0b088c6a2428012 Author: Weihua Hu <huweihua....@gmail.com> AuthorDate: Mon Jun 26 17:08:38 2023 +0800 [FLINK-31843][runtime] Improve performance of SlotSelectionStrategy by bulk select. --- .../jobmaster/slotpool/PhysicalSlotProvider.java | 14 +-- .../slotpool/PhysicalSlotProviderImpl.java | 117 +++++++++++++-------- .../scheduler/SimpleExecutionSlotAllocator.java | 107 ++++++++++++------- .../SlotSharingExecutionSlotAllocator.java | 107 +++++++++++++------ .../slotpool/PhysicalSlotProviderResource.java | 6 +- .../scheduler/TestingPhysicalSlotProvider.java | 34 +++--- 6 files changed, 251 insertions(+), 134 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java index bab5693dcbc..9a4ef808ab9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java @@ -20,22 +20,24 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.jobmaster.SlotRequestId; +import java.util.Collection; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** The provider serves physical slot requests. */ public interface PhysicalSlotProvider { /** - * Submit a request to allocate a physical slot. + * Submit requests to allocate physical slots. * * <p>The physical slot can be either allocated from the slots, which are already available for - * the job, or a new one can be requeted from the resource manager. + * the job, or a new one can be requested from the resource manager. * - * @param physicalSlotRequest slot requirements - * @return a future of the allocated slot + * @param physicalSlotRequests physicalSlotRequest slot requirements + * @return futures of the allocated slots */ - CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot( - PhysicalSlotRequest physicalSlotRequest); + Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocatePhysicalSlots( + Collection<PhysicalSlotRequest> physicalSlotRequests); /** * Cancels the slot request with the given {@link SlotRequestId}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java index 508f72bfad2..b6c80e2138c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java @@ -27,8 +27,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -52,53 +56,80 @@ public class PhysicalSlotProviderImpl implements PhysicalSlotProvider { } @Override - public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot( - PhysicalSlotRequest physicalSlotRequest) { - SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId(); - SlotProfile slotProfile = physicalSlotRequest.getSlotProfile(); - ResourceProfile resourceProfile = slotProfile.getPhysicalSlotResourceProfile(); - - LOG.debug( - "Received slot request [{}] with resource requirements: {}", - slotRequestId, - resourceProfile); - - Optional<PhysicalSlot> availablePhysicalSlot = - tryAllocateFromAvailable(slotRequestId, slotProfile); - - CompletableFuture<PhysicalSlot> slotFuture; - slotFuture = - availablePhysicalSlot - .map(CompletableFuture::completedFuture) - .orElseGet( - () -> - requestNewSlot( - slotRequestId, - resourceProfile, - slotProfile.getPreferredAllocations(), - physicalSlotRequest - .willSlotBeOccupiedIndefinitely())); - - return slotFuture.thenApply( - physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot)); + public Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocatePhysicalSlots( + Collection<PhysicalSlotRequest> physicalSlotRequests) { + + for (PhysicalSlotRequest physicalSlotRequest : physicalSlotRequests) { + LOG.debug( + "Received slot request [{}] with resource requirements: {}", + physicalSlotRequest.getSlotRequestId(), + physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile()); + } + + Map<SlotRequestId, PhysicalSlotRequest> physicalSlotRequestsById = + physicalSlotRequests.stream() + .collect( + Collectors.toMap( + PhysicalSlotRequest::getSlotRequestId, + Function.identity())); + Map<SlotRequestId, Optional<PhysicalSlot>> availablePhysicalSlots = + tryAllocateFromAvailable(physicalSlotRequestsById.values()); + + return availablePhysicalSlots.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> { + Optional<PhysicalSlot> availablePhysicalSlot = entry.getValue(); + SlotRequestId slotRequestId = entry.getKey(); + PhysicalSlotRequest physicalSlotRequest = + physicalSlotRequestsById.get(slotRequestId); + SlotProfile slotProfile = physicalSlotRequest.getSlotProfile(); + ResourceProfile resourceProfile = + slotProfile.getPhysicalSlotResourceProfile(); + + CompletableFuture<PhysicalSlot> slotFuture = + availablePhysicalSlot + .map(CompletableFuture::completedFuture) + .orElseGet( + () -> + requestNewSlot( + slotRequestId, + resourceProfile, + slotProfile + .getPreferredAllocations(), + physicalSlotRequest + .willSlotBeOccupiedIndefinitely())); + + return slotFuture.thenApply( + physicalSlot -> + new PhysicalSlotRequest.Result( + slotRequestId, physicalSlot)); + })); } - private Optional<PhysicalSlot> tryAllocateFromAvailable( - SlotRequestId slotRequestId, SlotProfile slotProfile) { + private Map<SlotRequestId, Optional<PhysicalSlot>> tryAllocateFromAvailable( + Collection<PhysicalSlotRequest> slotRequests) { FreeSlotInfoTracker freeSlotInfoTracker = slotPool.getFreeSlotInfoTracker(); - Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot = - slotSelectionStrategy.selectBestSlotForProfile(freeSlotInfoTracker, slotProfile); - - return selectedAvailableSlot.flatMap( - slotInfoAndLocality -> { - freeSlotInfoTracker.reserveSlot( - slotInfoAndLocality.getSlotInfo().getAllocationId()); - return slotPool.allocateAvailableSlot( - slotRequestId, - slotInfoAndLocality.getSlotInfo().getAllocationId(), - slotProfile.getPhysicalSlotResourceProfile()); - }); + Map<SlotRequestId, Optional<PhysicalSlot>> allocateResult = new HashMap<>(); + for (PhysicalSlotRequest request : slotRequests) { + Optional<SlotSelectionStrategy.SlotInfoAndLocality> slot = + slotSelectionStrategy.selectBestSlotForProfile( + freeSlotInfoTracker, request.getSlotProfile()); + allocateResult.put( + request.getSlotRequestId(), + slot.flatMap( + slotInfoAndLocality -> { + freeSlotInfoTracker.reserveSlot( + slotInfoAndLocality.getSlotInfo().getAllocationId()); + return slotPool.allocateAvailableSlot( + request.getSlotRequestId(), + slotInfoAndLocality.getSlotInfo().getAllocationId(), + request.getSlotProfile().getPhysicalSlotResourceProfile()); + })); + } + return allocateResult; } private CompletableFuture<PhysicalSlot> requestNewSlot( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java index 07251ef42cf..ca2a763296c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java @@ -32,13 +32,15 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.DualKeyLinkedMap; import org.apache.flink.util.FlinkException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -74,46 +76,75 @@ public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator { @Override public List<ExecutionSlotAssignment> allocateSlotsFor( List<ExecutionAttemptID> executionAttemptIds) { - return executionAttemptIds.stream() - .map(id -> new ExecutionSlotAssignment(id, allocateSlotFor(id))) - .collect(Collectors.toList()); + List<ExecutionSlotAssignment> result = new ArrayList<>(executionAttemptIds.size()); + + Map<SlotRequestId, ExecutionAttemptID> remainingExecutionsToSlotRequest = + new HashMap<>(executionAttemptIds.size()); + List<PhysicalSlotRequest> physicalSlotRequests = + new ArrayList<>(executionAttemptIds.size()); + + for (ExecutionAttemptID executionAttemptId : executionAttemptIds) { + if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) { + result.add( + new ExecutionSlotAssignment( + executionAttemptId, + requestedPhysicalSlots.getValueByKeyA(executionAttemptId))); + } else { + final SlotRequestId slotRequestId = new SlotRequestId(); + final ResourceProfile resourceProfile = + resourceProfileRetriever.apply(executionAttemptId); + Collection<TaskManagerLocation> preferredLocations = + preferredLocationsRetriever.getPreferredLocations( + executionAttemptId.getExecutionVertexId(), Collections.emptySet()); + final SlotProfile slotProfile = + SlotProfile.priorAllocation( + resourceProfile, + resourceProfile, + preferredLocations, + Collections.emptyList(), + Collections.emptySet()); + final PhysicalSlotRequest request = + new PhysicalSlotRequest( + slotRequestId, slotProfile, slotWillBeOccupiedIndefinitely); + physicalSlotRequests.add(request); + remainingExecutionsToSlotRequest.put(slotRequestId, executionAttemptId); + } + } + + result.addAll( + allocatePhysicalSlotsFor(remainingExecutionsToSlotRequest, physicalSlotRequests)); + + return result; } - private CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID executionAttemptId) { - if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) { - return requestedPhysicalSlots.getValueByKeyA(executionAttemptId); - } - final SlotRequestId slotRequestId = new SlotRequestId(); - final ResourceProfile resourceProfile = resourceProfileRetriever.apply(executionAttemptId); - Collection<TaskManagerLocation> preferredLocations = - preferredLocationsRetriever.getPreferredLocations( - executionAttemptId.getExecutionVertexId(), Collections.emptySet()); - final SlotProfile slotProfile = - SlotProfile.priorAllocation( - resourceProfile, - resourceProfile, - preferredLocations, - Collections.emptyList(), - Collections.emptySet()); - final PhysicalSlotRequest request = - new PhysicalSlotRequest(slotRequestId, slotProfile, slotWillBeOccupiedIndefinitely); - final CompletableFuture<LogicalSlot> slotFuture = - slotProvider - .allocatePhysicalSlot(request) - .thenApply( - physicalSlotRequest -> - allocateLogicalSlotFromPhysicalSlot( - slotRequestId, - physicalSlotRequest.getPhysicalSlot(), - slotWillBeOccupiedIndefinitely)); - slotFuture.exceptionally( - throwable -> { - this.requestedPhysicalSlots.removeKeyA(executionAttemptId); - this.slotProvider.cancelSlotRequest(slotRequestId, throwable); - return null; + private List<ExecutionSlotAssignment> allocatePhysicalSlotsFor( + Map<SlotRequestId, ExecutionAttemptID> executionAttemptIds, + List<PhysicalSlotRequest> slotRequests) { + List<ExecutionSlotAssignment> allocatedSlots = new ArrayList<>(); + Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> slotFutures = + slotProvider.allocatePhysicalSlots(slotRequests); + + slotFutures.forEach( + (slotRequestId, slotRequestResultFuture) -> { + ExecutionAttemptID executionAttemptId = executionAttemptIds.get(slotRequestId); + + final CompletableFuture<LogicalSlot> slotFuture = + slotRequestResultFuture.thenApply( + physicalSlotRequest -> + allocateLogicalSlotFromPhysicalSlot( + slotRequestId, + physicalSlotRequest.getPhysicalSlot(), + slotWillBeOccupiedIndefinitely)); + slotFuture.exceptionally( + throwable -> { + this.requestedPhysicalSlots.removeKeyA(executionAttemptId); + this.slotProvider.cancelSlotRequest(slotRequestId, throwable); + return null; + }); + requestedPhysicalSlots.put(executionAttemptId, slotRequestId, slotFuture); + allocatedSlots.add(new ExecutionSlotAssignment(executionAttemptId, slotFuture)); }); - this.requestedPhysicalSlots.put(executionAttemptId, slotRequestId, slotFuture); - return slotFuture; + return allocatedSlots; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java index f1995a7062a..9be00e497df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java @@ -37,11 +37,13 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -160,13 +162,23 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator { .collect( Collectors.groupingBy( slotSharingStrategy::getExecutionSlotSharingGroup)); - Map<ExecutionSlotSharingGroup, SharedSlot> slots = - executionsByGroup.keySet().stream() - .map(group -> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) - .collect( - Collectors.toMap( - SharedSlot::getExecutionSlotSharingGroup, - Function.identity())); + + Map<ExecutionSlotSharingGroup, SharedSlot> slots = new HashMap<>(executionsByGroup.size()); + Set<ExecutionSlotSharingGroup> groupsToAssign = new HashSet<>(executionsByGroup.keySet()); + + Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots = + tryAssignExistingSharedSlots(groupsToAssign); + slots.putAll(assignedSlots); + groupsToAssign.removeAll(assignedSlots.keySet()); + + if (!groupsToAssign.isEmpty()) { + Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots = + allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever); + slots.putAll(allocatedSlots); + groupsToAssign.removeAll(allocatedSlots.keySet()); + Preconditions.checkState(groupsToAssign.isEmpty()); + } + Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments = allocateLogicalSlotsFromSharedSlots(slots, executionsByGroup); @@ -225,35 +237,64 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator { return assignments; } - private SharedSlot getOrAllocateSharedSlot( - ExecutionSlotSharingGroup executionSlotSharingGroup, + private Map<ExecutionSlotSharingGroup, SharedSlot> tryAssignExistingSharedSlots( + Set<ExecutionSlotSharingGroup> executionSlotSharingGroups) { + Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots = + new HashMap<>(executionSlotSharingGroups.size()); + for (ExecutionSlotSharingGroup group : executionSlotSharingGroups) { + SharedSlot sharedSlot = sharedSlots.get(group); + if (sharedSlot != null) { + assignedSlots.put(group, sharedSlot); + } + } + return assignedSlots; + } + + private Map<ExecutionSlotSharingGroup, SharedSlot> allocateSharedSlots( + Set<ExecutionSlotSharingGroup> executionSlotSharingGroups, SharedSlotProfileRetriever sharedSlotProfileRetriever) { - return sharedSlots.computeIfAbsent( - executionSlotSharingGroup, - group -> { - SlotRequestId physicalSlotRequestId = new SlotRequestId(); - ResourceProfile physicalSlotResourceProfile = - getPhysicalSlotResourceProfile(group); - SlotProfile slotProfile = - sharedSlotProfileRetriever.getSlotProfile( - group, physicalSlotResourceProfile); - PhysicalSlotRequest physicalSlotRequest = - new PhysicalSlotRequest( - physicalSlotRequestId, - slotProfile, - slotWillBeOccupiedIndefinitely); + + List<PhysicalSlotRequest> slotRequests = new ArrayList<>(); + Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots = new HashMap<>(); + + Map<SlotRequestId, ExecutionSlotSharingGroup> requestToGroup = new HashMap<>(); + Map<SlotRequestId, ResourceProfile> requestToPhysicalResources = new HashMap<>(); + + for (ExecutionSlotSharingGroup group : executionSlotSharingGroups) { + SlotRequestId physicalSlotRequestId = new SlotRequestId(); + ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group); + SlotProfile slotProfile = + sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile); + PhysicalSlotRequest request = + new PhysicalSlotRequest( + physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely); + slotRequests.add(request); + requestToGroup.put(physicalSlotRequestId, group); + requestToPhysicalResources.put(physicalSlotRequestId, physicalSlotResourceProfile); + } + + Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocateResult = + slotProvider.allocatePhysicalSlots(slotRequests); + + allocateResult.forEach( + (slotRequestId, resultCompletableFuture) -> { + ExecutionSlotSharingGroup group = requestToGroup.get(slotRequestId); CompletableFuture<PhysicalSlot> physicalSlotFuture = - slotProvider - .allocatePhysicalSlot(physicalSlotRequest) - .thenApply(PhysicalSlotRequest.Result::getPhysicalSlot); - return new SharedSlot( - physicalSlotRequestId, - physicalSlotResourceProfile, - group, - physicalSlotFuture, - slotWillBeOccupiedIndefinitely, - this::releaseSharedSlot); + resultCompletableFuture.thenApply( + PhysicalSlotRequest.Result::getPhysicalSlot); + SharedSlot slot = + new SharedSlot( + slotRequestId, + requestToPhysicalResources.get(slotRequestId), + group, + physicalSlotFuture, + slotWillBeOccupiedIndefinitely, + this::releaseSharedSlot); + allocatedSlots.put(group, slot); + Preconditions.checkState(!sharedSlots.containsKey(group)); + sharedSlots.put(group, slot); }); + return allocatedSlots; } private void releaseSharedSlot(ExecutionSlotSharingGroup executionSlotSharingGroup) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java index c21c29e3a11..2275a6492e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderResource.java @@ -29,6 +29,7 @@ import org.junit.rules.ExternalResource; import javax.annotation.Nonnull; import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -72,7 +73,10 @@ public class PhysicalSlotProviderResource extends ExternalResource { public CompletableFuture<PhysicalSlotRequest.Result> allocateSlot(PhysicalSlotRequest request) { return CompletableFuture.supplyAsync( - () -> physicalSlotProvider.allocatePhysicalSlot(request), + () -> + physicalSlotProvider + .allocatePhysicalSlots(Collections.singletonList(request)) + .get(request.getSlotRequestId()), mainThreadExecutor) .thenCompose(Function.identity()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java index 2f870ae82ea..856bb9414e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java @@ -108,19 +108,27 @@ public class TestingPhysicalSlotProvider implements PhysicalSlotProvider { } @Override - public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot( - PhysicalSlotRequest physicalSlotRequest) { - SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId(); - requests.put(slotRequestId, physicalSlotRequest); - - final CompletableFuture<TestingPhysicalSlot> resultFuture = - physicalSlotCreator.apply( - physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile()); - - responses.put(slotRequestId, resultFuture); - - return resultFuture.thenApply( - physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot)); + public Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocatePhysicalSlots( + Collection<PhysicalSlotRequest> physicalSlotRequests) { + Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> result = + new HashMap<>(physicalSlotRequests.size()); + for (PhysicalSlotRequest physicalSlotRequest : physicalSlotRequests) { + SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId(); + requests.put(slotRequestId, physicalSlotRequest); + + final CompletableFuture<TestingPhysicalSlot> resultFuture = + physicalSlotCreator.apply( + physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile()); + + responses.put(slotRequestId, resultFuture); + + CompletableFuture<PhysicalSlotRequest.Result> physicalSlotFuture = + resultFuture.thenApply( + physicalSlot -> + new PhysicalSlotRequest.Result(slotRequestId, physicalSlot)); + result.put(slotRequestId, physicalSlotFuture); + } + return result; } @Override