Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155604499 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // ------------------------------------------------------------------------ @Override - public CompletableFuture<SimpleSlot> allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable<TaskManagerLocation> locationPreferences, + public CompletableFuture<LogicalSlot> allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection<TaskManagerLocation> locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture<LogicalSlot> internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection<TaskManagerLocation> locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; --- End diff -- The variable name is confusing. `multiTaskSlotFuture` is not of type `Future`.
---