zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r431983175



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -562,4 +575,110 @@ private void releaseSharedSlot(
        public boolean requiresPreviousExecutionGraphAllocations() {
                return slotSelectionStrategy instanceof 
PreviousAllocationSlotSelectionStrategy;
        }
+
+       @Override
+       public CompletableFuture<Collection<PhysicalSlotRequest.Result>> 
allocatePhysicalSlots(
+                       final Collection<PhysicalSlotRequest> 
physicalSlotRequests,
+                       final Time timeout) {
+
+               final PhysicalSlotRequestBulk slotRequestBulk = new 
PhysicalSlotRequestBulk(physicalSlotRequests);
+
+               final List<CompletableFuture<PhysicalSlotRequest.Result>> 
resultFutures = new ArrayList<>(physicalSlotRequests.size());
+               for (PhysicalSlotRequest request : physicalSlotRequests) {
+                       final CompletableFuture<PhysicalSlotRequest.Result> 
resultFuture =
+                               allocatePhysicalSlot(request, 
timeout).thenApply(result -> {
+                                       slotRequestBulk.markRequestFulfilled(
+                                               result.getSlotRequestId(),
+                                               
result.getPhysicalSlot().getAllocationId());
+
+                                       return result;
+                               });
+                       resultFutures.add(resultFuture);
+               }
+
+               slotRequestBulkTracker.track(slotRequestBulk);
+               schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, 
timeout);
+
+               return FutureUtils.combineAll(resultFutures)
+                       .whenComplete((ignore, throwable) -> 
slotRequestBulkTracker.untrack(slotRequestBulk));
+       }
+
+       private CompletableFuture<PhysicalSlotRequest.Result> 
allocatePhysicalSlot(
+                       final PhysicalSlotRequest physicalSlotRequest,
+                       final Time timeout) {
+
+               final SlotRequestId slotRequestId = 
physicalSlotRequest.getSlotRequestId();
+               final SlotProfile slotProfile = 
physicalSlotRequest.getSlotProfile();
+
+               final Optional<SlotAndLocality> availablePhysicalSlot = 
tryAllocateFromAvailable(slotRequestId, slotProfile);
+
+               final CompletableFuture<PhysicalSlot> slotFuture;
+               if (availablePhysicalSlot.isPresent()) {
+                       slotFuture = 
CompletableFuture.completedFuture(availablePhysicalSlot.get().getSlot());
+               } else if 
(physicalSlotRequest.willSlotBeOccupiedIndefinitely()) {
+                       slotFuture = 
slotPool.requestNewAllocatedSlot(slotRequestId, 
slotProfile.getPhysicalSlotResourceProfile(), timeout);
+               } else {
+                       slotFuture = 
slotPool.requestNewAllocatedBatchSlot(slotRequestId, 
slotProfile.getPhysicalSlotResourceProfile());

Review comment:
       done via a758c3a88a0a6cc2e0b183b20a17fe618f8cbbb9




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to