Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r156896571 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // ------------------------------------------------------------------------ @Override - public CompletableFuture<SimpleSlot> allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable<TaskManagerLocation> locationPreferences, + public CompletableFuture<LogicalSlot> allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection<TaskManagerLocation> locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture<LogicalSlot> internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection<TaskManagerLocation> locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( + slotRequestId, + task.getJobVertexId(), + multiTaskSlotFuture.getLocality()); + + return leave.getLogicalSlotFuture(); + } else { + // request an allocated slot to assign a single logical slot to + CompletableFuture<SlotAndLocality> slotAndLocalityFuture = requestAllocatedSlot( + slotRequestId, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + + return slotAndLocalityFuture.thenApply( + (SlotAndLocality slotAndLocality) -> { + final AllocatedSlot allocatedSlot = slotAndLocality.getSlot(); + + final SingleLogicalSlot singleTaskSlot = new SingleLogicalSlot( + slotRequestId, + allocatedSlot, + null, + slotAndLocality.getLocality(), + providerAndOwner); + + if (allocatedSlot.tryAssignPayload(singleTaskSlot)) { + return singleTaskSlot; + } else { + final FlinkException flinkException = new FlinkException("Could not assign payload to allocated slot " + allocatedSlot.getAllocationId() + '.'); + releaseSlot(slotRequestId, null, flinkException); + throw new CompletionException(flinkException); + } + }); + } } - @Override - public CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID requestId) { - final PendingRequest pendingRequest = removePendingRequest(requestId); + /** + * Allocates a co-located {@link SlotSharingManager.MultiTaskSlot} for the given {@link CoLocationConstraint}. + * + * <p>If allowQueuedScheduling is true, then the returned {@link SlotSharingManager.MultiTaskSlot} can be + * uncompleted. + * + * @param coLocationConstraint for which to allocate a {@link SlotSharingManager.MultiTaskSlot} + * @param multiTaskSlotManager responsible for the slot sharing group for which to allocate the slot + * @param resourceProfile specifying the requirements for the requested slot + * @param locationPreferences containing preferred TaskExecutors on which to allocate the slot + * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not be completed yet) is allowed, otherwise false + * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated{@link SlotSharingManager.MultiTaskSlot} + * and its locality wrt the given location preferences + * @throws NoResourceAvailableException if no task slot could be allocated + */ + private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot( + CoLocationConstraint coLocationConstraint, + SlotSharingManager multiTaskSlotManager, + ResourceProfile resourceProfile, + Collection<TaskManagerLocation> locationPreferences, + boolean allowQueuedScheduling) throws NoResourceAvailableException { + final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId(); + + if (coLocationSlotRequestId != null) { + // we have a slot assigned --> try to retrieve it + final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId); + + if (taskSlot != null) { + Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot); + return SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot) taskSlot), Locality.LOCAL); + } else { + // the slot may have been cancelled in the mean time + coLocationConstraint.setSlotRequestId(null); + } + } - if (pendingRequest != null) { - failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + requestId + " cancelled.")); + final Collection<TaskManagerLocation> actualLocationPreferences; + + if (coLocationConstraint.isAssigned()) { + actualLocationPreferences = Collections.singleton(coLocationConstraint.getLocation()); } else { - final Slot slot = allocatedSlots.get(requestId); + actualLocationPreferences = locationPreferences; + } + + // get a new multi task slot + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot( + coLocationConstraint.getGroupId(), multiTaskSlotManager, + resourceProfile, + actualLocationPreferences, + allowQueuedScheduling); + + // check whether we fulfill the co-location constraint + if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) { + multiTaskSlotLocality.getMultiTaskSlot().release( + new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint.")); - if (slot != null) { - LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", slot, requestId); - if (slot.markCancelled()) { - internalReturnAllocatedSlot(slot); + throw new NoResourceAvailableException("Could not allocate a local multi task slot for the " + + "co location constraint " + coLocationConstraint + '.'); + } + + final SlotRequestId slotRequestId = new SlotRequestId(); + final SlotSharingManager.MultiTaskSlot coLocationSlot = multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot( + slotRequestId, + coLocationConstraint.getGroupId()); + + // mark the requested slot as co-located slot for other co-located tasks + coLocationConstraint.setSlotRequestId(slotRequestId); + + // lock the co-location constraint once we have obtained the allocated slot + coLocationSlot.getSlotContextFuture().whenComplete( + (SlotContext slotContext, Throwable throwable) -> { + if (throwable == null) { --- End diff -- I will log the throwable here.
---