Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r156901374
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
    @@ -266,104 +279,367 @@ public void disconnectResourceManager() {
        // 
------------------------------------------------------------------------
     
        @Override
    -   public CompletableFuture<SimpleSlot> allocateSlot(
    -                   SlotRequestID requestId,
    -                   ScheduledUnit task,
    -                   ResourceProfile resources,
    -                   Iterable<TaskManagerLocation> locationPreferences,
    +   public CompletableFuture<LogicalSlot> allocateSlot(
    +                   SlotRequestId slotRequestId,
    +                   ScheduledUnit scheduledUnit,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling,
                        Time timeout) {
     
    -           return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
    +           return internalAllocateSlot(
    +                   slotRequestId,
    +                   scheduledUnit,
    +                   resourceProfile,
    +                   locationPreferences,
    +                   allowQueuedScheduling);
        }
     
    -   @Override
    -   public void returnAllocatedSlot(Slot slot) {
    -           internalReturnAllocatedSlot(slot);
    +   private CompletableFuture<LogicalSlot> internalAllocateSlot(
    +                   SlotRequestId slotRequestId,
    +                   ScheduledUnit task,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling) {
    +
    +           final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
    +
    +           if (slotSharingGroupId != null) {
    +                   // allocate slot with slot sharing
    +                   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
    +                           slotSharingGroupId,
    +                           id -> new SlotSharingManager(
    +                                   id,
    +                                   this,
    +                                   providerAndOwner));
    +
    +                   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
    +
    +                   try {
    +                           if (task.getCoLocationConstraint() != null) {
    +                                   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
    +                                           task.getCoLocationConstraint(),
    +                                           multiTaskSlotManager,
    +                                           resourceProfile,
    +                                           locationPreferences,
    +                                           allowQueuedScheduling);
    +                           } else {
    +                                   multiTaskSlotFuture = 
allocateMultiTaskSlot(
    +                                           task.getJobVertexId(), 
multiTaskSlotManager,
    +                                           resourceProfile,
    +                                           locationPreferences,
    +                                           allowQueuedScheduling);
    +                           }
    +                   } catch (NoResourceAvailableException 
noResourceException) {
    +                           return 
FutureUtils.completedExceptionally(noResourceException);
    +                   }
    +
    +                   // sanity check
    +                   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
    +
    +                   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
    +                           slotRequestId,
    +                           task.getJobVertexId(),
    +                           multiTaskSlotFuture.getLocality());
    +
    +                   return leave.getLogicalSlotFuture();
    +           } else {
    +                   // request an allocated slot to assign a single logical 
slot to
    +                   CompletableFuture<SlotAndLocality> 
slotAndLocalityFuture = requestAllocatedSlot(
    +                           slotRequestId,
    +                           resourceProfile,
    +                           locationPreferences,
    +                           allowQueuedScheduling);
    +
    +                   return slotAndLocalityFuture.thenApply(
    +                           (SlotAndLocality slotAndLocality) -> {
    +                                   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
    +
    +                                   final SingleLogicalSlot singleTaskSlot 
= new SingleLogicalSlot(
    +                                           slotRequestId,
    +                                           allocatedSlot,
    +                                           null,
    +                                           slotAndLocality.getLocality(),
    +                                           providerAndOwner);
    +
    +                                   if 
(allocatedSlot.tryAssignPayload(singleTaskSlot)) {
    +                                           return singleTaskSlot;
    +                                   } else {
    +                                           final FlinkException 
flinkException = new FlinkException("Could not assign payload to allocated slot 
" + allocatedSlot.getAllocationId() + '.');
    +                                           releaseSlot(slotRequestId, 
null, flinkException);
    +                                           throw new 
CompletionException(flinkException);
    +                                   }
    +                           });
    +           }
        }
     
    -   @Override
    -   public CompletableFuture<Acknowledge> 
cancelSlotAllocation(SlotRequestID requestId) {
    -           final PendingRequest pendingRequest = 
removePendingRequest(requestId);
    +   /**
    +    * Allocates a co-located {@link SlotSharingManager.MultiTaskSlot} for 
the given {@link CoLocationConstraint}.
    +    *
    +    * <p>If allowQueuedScheduling is true, then the returned {@link 
SlotSharingManager.MultiTaskSlot} can be
    +    * uncompleted.
    +    *
    +    * @param coLocationConstraint for which to allocate a {@link 
SlotSharingManager.MultiTaskSlot}
    +    * @param multiTaskSlotManager responsible for the slot sharing group 
for which to allocate the slot
    +    * @param resourceProfile specifying the requirements for the requested 
slot
    +    * @param locationPreferences containing preferred TaskExecutors on 
which to allocate the slot
    +    * @param allowQueuedScheduling true if queued scheduling (the returned 
task slot must not be completed yet) is allowed, otherwise false
    +    * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which 
contains the allocated{@link SlotSharingManager.MultiTaskSlot}
    +    *              and its locality wrt the given location preferences
    +    * @throws NoResourceAvailableException if no task slot could be 
allocated
    +    */
    +   private SlotSharingManager.MultiTaskSlotLocality 
allocateCoLocatedMultiTaskSlot(
    +                   CoLocationConstraint coLocationConstraint,
    +                   SlotSharingManager multiTaskSlotManager,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling) throws 
NoResourceAvailableException {
    +           final SlotRequestId coLocationSlotRequestId = 
coLocationConstraint.getSlotRequestId();
    +
    +           if (coLocationSlotRequestId != null) {
    +                   // we have a slot assigned --> try to retrieve it
    +                   final SlotSharingManager.TaskSlot taskSlot = 
multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
    +
    +                   if (taskSlot != null) {
    +                           Preconditions.checkState(taskSlot instanceof 
SlotSharingManager.MultiTaskSlot);
    +                           return 
SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot) 
taskSlot), Locality.LOCAL);
    +                   } else {
    +                           // the slot may have been cancelled in the mean 
time
    +                           coLocationConstraint.setSlotRequestId(null);
    +                   }
    +           }
     
    -           if (pendingRequest != null) {
    -                   failPendingRequest(pendingRequest, new 
CancellationException("Allocation with request id" + requestId + " 
cancelled."));
    +           final Collection<TaskManagerLocation> actualLocationPreferences;
    +
    +           if (coLocationConstraint.isAssigned()) {
    +                   actualLocationPreferences = 
Collections.singleton(coLocationConstraint.getLocation());
                } else {
    -                   final Slot slot = allocatedSlots.get(requestId);
    +                   actualLocationPreferences = locationPreferences;
    +           }
    +
    +           // get a new multi task slot
    +           final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality = allocateMultiTaskSlot(
    +                   coLocationConstraint.getGroupId(), multiTaskSlotManager,
    +                   resourceProfile,
    +                   actualLocationPreferences,
    +                   allowQueuedScheduling);
    +
    +           // check whether we fulfill the co-location constraint
    +           if (coLocationConstraint.isAssigned() && 
multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
    +                   multiTaskSlotLocality.getMultiTaskSlot().release(
    +                           new FlinkException("Multi task slot is not 
local and, thus, does not fulfill the co-location constraint."));
     
    -                   if (slot != null) {
    -                           LOG.info("Returning allocated slot {} because 
the corresponding allocation request {} was cancelled.", slot, requestId);
    -                           if (slot.markCancelled()) {
    -                                   internalReturnAllocatedSlot(slot);
    +                   throw new NoResourceAvailableException("Could not 
allocate a local multi task slot for the " +
    +                           "co location constraint " + 
coLocationConstraint + '.');
    +           }
    +
    +           final SlotRequestId slotRequestId = new SlotRequestId();
    +           final SlotSharingManager.MultiTaskSlot coLocationSlot = 
multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
    +                   slotRequestId,
    +                   coLocationConstraint.getGroupId());
    +
    +           // mark the requested slot as co-located slot for other 
co-located tasks
    +           coLocationConstraint.setSlotRequestId(slotRequestId);
    +
    +           // lock the co-location constraint once we have obtained the 
allocated slot
    +           coLocationSlot.getSlotContextFuture().whenComplete(
    +                   (SlotContext slotContext, Throwable throwable) -> {
    +                           if (throwable == null) {
    +                                   // check whether we are still assigned 
to the co-location constraint
    +                                   if 
(Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId)) {
    +                                           
coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
    +                                   }
                                }
    +                   });
    +
    +           return 
SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, 
multiTaskSlotLocality.getLocality());
    +   }
    +
    +   /**
    +    * Allocates a {@link SlotSharingManager.MultiTaskSlot} for the given 
groupId which is in the
    +    * slot sharing group for which the given {@link SlotSharingManager} is 
responsible.
    +    *
    +    * <p>If allowQueuedScheduling is true, then the method can return an 
uncompleted {@link SlotSharingManager.MultiTaskSlot}.
    +    *
    +    * @param groupId for which to allocate a new {@link 
SlotSharingManager.MultiTaskSlot}
    +    * @param slotSharingManager responsible for the slot sharing group for 
which to allocate the slot
    +    * @param resourceProfile specifying the requirements for the requested 
slot
    +    * @param locationPreferences containing preferred TaskExecutors on 
which to allocate the slot
    +    * @param allowQueuedScheduling true if queued scheduling (the returned 
task slot must not be completed yet) is allowed, otherwise false
    +    * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which 
contains the allocated {@link SlotSharingManager.MultiTaskSlot}
    +    *              and its locality wrt the given location preferences
    +    * @throws NoResourceAvailableException if no task slot could be 
allocated
    +    */
    +   private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
    +                   AbstractID groupId,
    +                   SlotSharingManager slotSharingManager,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling) throws 
NoResourceAvailableException {
    +
    +           // check first whether we have a resolved root slot which we 
can use
    +           SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality 
= slotSharingManager.getResolvedRootSlot(
    +                   groupId,
    +                   locationPreferences);
    +
    +           if (multiTaskSlotLocality != null && 
multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
    +                   return multiTaskSlotLocality;
    +           }
    +
    +           final SlotRequestId allocatedSlotRequestId = new 
SlotRequestId();
    +           final SlotRequestId multiTaskSlotRequestId = new 
SlotRequestId();
    +
    +           // check whether we have an allocated slot available which we 
can use to create a new multi task slot in
    +           final SlotAndLocality slotAndLocality = 
pollAndAllocateSlot(allocatedSlotRequestId, resourceProfile, 
locationPreferences);
    +
    +           if (slotAndLocality != null && (slotAndLocality.getLocality() 
== Locality.LOCAL || multiTaskSlotLocality == null)) {
    +
    +                   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
    +                   final SlotSharingManager.MultiTaskSlot multiTaskSlot = 
slotSharingManager.createRootSlot(
    +                           multiTaskSlotRequestId,
    +                           
CompletableFuture.completedFuture(slotAndLocality.getSlot()),
    +                           allocatedSlotRequestId);
    +
    +                   if (allocatedSlot.tryAssignPayload(multiTaskSlot)) {
    +                           return 
SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, 
slotAndLocality.getLocality());
                        } else {
    -                           LOG.debug("There was no slot allocation with {} 
to be cancelled.", requestId);
    +                           multiTaskSlot.release(new FlinkException("Could 
not assign payload to allocated slot " +
    +                                   allocatedSlot.getAllocationId() + '.'));
                        }
                }
     
    -           return CompletableFuture.completedFuture(Acknowledge.get());
    -   }
    +           if (multiTaskSlotLocality != null) {
    +                   // prefer slot sharing group slots over unused slots
    +                   if (slotAndLocality != null) {
    +                           releaseSlot(
    +                                   allocatedSlotRequestId,
    +                                   null,
    +                                   new FlinkException("Locality constraint 
is not better fulfilled by allocated slot."));
    +                   }
    +                   return multiTaskSlotLocality;
    +           }
     
    -   CompletableFuture<SimpleSlot> internalAllocateSlot(
    -                   SlotRequestID requestId,
    -                   ScheduledUnit task,
    -                   ResourceProfile resources,
    -                   Iterable<TaskManagerLocation> locationPreferences) {
    +           if (allowQueuedScheduling) {
    +                   // there is no slot immediately available --> check 
first for uncompleted slots at the slot sharing group
    +                   SlotSharingManager.MultiTaskSlot multiTaskSlotFuture = 
slotSharingManager.getUnresolvedRootSlot(groupId);
    +
    +                   if (multiTaskSlotFuture == null) {
    +                           // it seems as if we have to request a new slot 
from the resource manager, this is always the last resort!!!
    +                           final CompletableFuture<AllocatedSlot> 
futureSlot = requestNewAllocatedSlot(allocatedSlotRequestId, resourceProfile);
    +
    +                           multiTaskSlotFuture = 
slotSharingManager.createRootSlot(
    +                                   multiTaskSlotRequestId,
    +                                   
futureSlot.thenApply(Function.identity()),
    --- End diff --
    
    Good point. Will change it.


---

Reply via email to