Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r155751694
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
    @@ -266,104 +279,367 @@ public void disconnectResourceManager() {
        // 
------------------------------------------------------------------------
     
        @Override
    -   public CompletableFuture<SimpleSlot> allocateSlot(
    -                   SlotRequestID requestId,
    -                   ScheduledUnit task,
    -                   ResourceProfile resources,
    -                   Iterable<TaskManagerLocation> locationPreferences,
    +   public CompletableFuture<LogicalSlot> allocateSlot(
    +                   SlotRequestId slotRequestId,
    +                   ScheduledUnit scheduledUnit,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling,
                        Time timeout) {
     
    -           return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
    +           return internalAllocateSlot(
    +                   slotRequestId,
    +                   scheduledUnit,
    +                   resourceProfile,
    +                   locationPreferences,
    +                   allowQueuedScheduling);
        }
     
    -   @Override
    -   public void returnAllocatedSlot(Slot slot) {
    -           internalReturnAllocatedSlot(slot);
    +   private CompletableFuture<LogicalSlot> internalAllocateSlot(
    +                   SlotRequestId slotRequestId,
    +                   ScheduledUnit task,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling) {
    +
    +           final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
    +
    +           if (slotSharingGroupId != null) {
    +                   // allocate slot with slot sharing
    +                   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
    +                           slotSharingGroupId,
    +                           id -> new SlotSharingManager(
    +                                   id,
    +                                   this,
    +                                   providerAndOwner));
    +
    +                   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
    +
    +                   try {
    +                           if (task.getCoLocationConstraint() != null) {
    +                                   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
    +                                           task.getCoLocationConstraint(),
    +                                           multiTaskSlotManager,
    +                                           resourceProfile,
    +                                           locationPreferences,
    +                                           allowQueuedScheduling);
    +                           } else {
    +                                   multiTaskSlotFuture = 
allocateMultiTaskSlot(
    +                                           task.getJobVertexId(), 
multiTaskSlotManager,
    +                                           resourceProfile,
    +                                           locationPreferences,
    +                                           allowQueuedScheduling);
    +                           }
    +                   } catch (NoResourceAvailableException 
noResourceException) {
    +                           return 
FutureUtils.completedExceptionally(noResourceException);
    +                   }
    +
    +                   // sanity check
    +                   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
    +
    +                   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
    +                           slotRequestId,
    +                           task.getJobVertexId(),
    +                           multiTaskSlotFuture.getLocality());
    +
    +                   return leave.getLogicalSlotFuture();
    +           } else {
    +                   // request an allocated slot to assign a single logical 
slot to
    +                   CompletableFuture<SlotAndLocality> 
slotAndLocalityFuture = requestAllocatedSlot(
    +                           slotRequestId,
    +                           resourceProfile,
    +                           locationPreferences,
    +                           allowQueuedScheduling);
    +
    +                   return slotAndLocalityFuture.thenApply(
    +                           (SlotAndLocality slotAndLocality) -> {
    +                                   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
    +
    +                                   final SingleLogicalSlot singleTaskSlot 
= new SingleLogicalSlot(
    +                                           slotRequestId,
    +                                           allocatedSlot,
    +                                           null,
    +                                           slotAndLocality.getLocality(),
    +                                           providerAndOwner);
    +
    +                                   if 
(allocatedSlot.tryAssignPayload(singleTaskSlot)) {
    +                                           return singleTaskSlot;
    +                                   } else {
    +                                           final FlinkException 
flinkException = new FlinkException("Could not assign payload to allocated slot 
" + allocatedSlot.getAllocationId() + '.');
    +                                           releaseSlot(slotRequestId, 
null, flinkException);
    +                                           throw new 
CompletionException(flinkException);
    +                                   }
    +                           });
    +           }
        }
     
    -   @Override
    -   public CompletableFuture<Acknowledge> 
cancelSlotAllocation(SlotRequestID requestId) {
    -           final PendingRequest pendingRequest = 
removePendingRequest(requestId);
    +   /**
    +    * Allocates a co-located {@link SlotSharingManager.MultiTaskSlot} for 
the given {@link CoLocationConstraint}.
    +    *
    +    * <p>If allowQueuedScheduling is true, then the returned {@link 
SlotSharingManager.MultiTaskSlot} can be
    +    * uncompleted.
    +    *
    +    * @param coLocationConstraint for which to allocate a {@link 
SlotSharingManager.MultiTaskSlot}
    +    * @param multiTaskSlotManager responsible for the slot sharing group 
for which to allocate the slot
    +    * @param resourceProfile specifying the requirements for the requested 
slot
    +    * @param locationPreferences containing preferred TaskExecutors on 
which to allocate the slot
    +    * @param allowQueuedScheduling true if queued scheduling (the returned 
task slot must not be completed yet) is allowed, otherwise false
    +    * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which 
contains the allocated{@link SlotSharingManager.MultiTaskSlot}
    +    *              and its locality wrt the given location preferences
    +    * @throws NoResourceAvailableException if no task slot could be 
allocated
    +    */
    +   private SlotSharingManager.MultiTaskSlotLocality 
allocateCoLocatedMultiTaskSlot(
    +                   CoLocationConstraint coLocationConstraint,
    +                   SlotSharingManager multiTaskSlotManager,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling) throws 
NoResourceAvailableException {
    +           final SlotRequestId coLocationSlotRequestId = 
coLocationConstraint.getSlotRequestId();
    +
    +           if (coLocationSlotRequestId != null) {
    +                   // we have a slot assigned --> try to retrieve it
    +                   final SlotSharingManager.TaskSlot taskSlot = 
multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
    +
    +                   if (taskSlot != null) {
    +                           Preconditions.checkState(taskSlot instanceof 
SlotSharingManager.MultiTaskSlot);
    +                           return 
SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot) 
taskSlot), Locality.LOCAL);
    +                   } else {
    +                           // the slot may have been cancelled in the mean 
time
    +                           coLocationConstraint.setSlotRequestId(null);
    +                   }
    +           }
     
    -           if (pendingRequest != null) {
    -                   failPendingRequest(pendingRequest, new 
CancellationException("Allocation with request id" + requestId + " 
cancelled."));
    +           final Collection<TaskManagerLocation> actualLocationPreferences;
    +
    +           if (coLocationConstraint.isAssigned()) {
    +                   actualLocationPreferences = 
Collections.singleton(coLocationConstraint.getLocation());
                } else {
    -                   final Slot slot = allocatedSlots.get(requestId);
    +                   actualLocationPreferences = locationPreferences;
    +           }
    +
    +           // get a new multi task slot
    +           final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality = allocateMultiTaskSlot(
    +                   coLocationConstraint.getGroupId(), multiTaskSlotManager,
    +                   resourceProfile,
    +                   actualLocationPreferences,
    +                   allowQueuedScheduling);
    +
    +           // check whether we fulfill the co-location constraint
    +           if (coLocationConstraint.isAssigned() && 
multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
    +                   multiTaskSlotLocality.getMultiTaskSlot().release(
    +                           new FlinkException("Multi task slot is not 
local and, thus, does not fulfill the co-location constraint."));
     
    -                   if (slot != null) {
    -                           LOG.info("Returning allocated slot {} because 
the corresponding allocation request {} was cancelled.", slot, requestId);
    -                           if (slot.markCancelled()) {
    -                                   internalReturnAllocatedSlot(slot);
    +                   throw new NoResourceAvailableException("Could not 
allocate a local multi task slot for the " +
    +                           "co location constraint " + 
coLocationConstraint + '.');
    +           }
    +
    +           final SlotRequestId slotRequestId = new SlotRequestId();
    +           final SlotSharingManager.MultiTaskSlot coLocationSlot = 
multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
    +                   slotRequestId,
    +                   coLocationConstraint.getGroupId());
    +
    +           // mark the requested slot as co-located slot for other 
co-located tasks
    +           coLocationConstraint.setSlotRequestId(slotRequestId);
    +
    +           // lock the co-location constraint once we have obtained the 
allocated slot
    +           coLocationSlot.getSlotContextFuture().whenComplete(
    +                   (SlotContext slotContext, Throwable throwable) -> {
    +                           if (throwable == null) {
    --- End diff --
    
    Is it acceptable to swallow the `Throwable` here? 


---

Reply via email to