Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r156892139
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
    @@ -266,104 +279,367 @@ public void disconnectResourceManager() {
        // 
------------------------------------------------------------------------
     
        @Override
    -   public CompletableFuture<SimpleSlot> allocateSlot(
    -                   SlotRequestID requestId,
    -                   ScheduledUnit task,
    -                   ResourceProfile resources,
    -                   Iterable<TaskManagerLocation> locationPreferences,
    +   public CompletableFuture<LogicalSlot> allocateSlot(
    +                   SlotRequestId slotRequestId,
    +                   ScheduledUnit scheduledUnit,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling,
                        Time timeout) {
     
    -           return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
    +           return internalAllocateSlot(
    +                   slotRequestId,
    +                   scheduledUnit,
    +                   resourceProfile,
    +                   locationPreferences,
    +                   allowQueuedScheduling);
        }
     
    -   @Override
    -   public void returnAllocatedSlot(Slot slot) {
    -           internalReturnAllocatedSlot(slot);
    +   private CompletableFuture<LogicalSlot> internalAllocateSlot(
    +                   SlotRequestId slotRequestId,
    +                   ScheduledUnit task,
    +                   ResourceProfile resourceProfile,
    +                   Collection<TaskManagerLocation> locationPreferences,
    +                   boolean allowQueuedScheduling) {
    +
    +           final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
    +
    +           if (slotSharingGroupId != null) {
    +                   // allocate slot with slot sharing
    +                   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
    +                           slotSharingGroupId,
    +                           id -> new SlotSharingManager(
    +                                   id,
    +                                   this,
    +                                   providerAndOwner));
    +
    +                   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
    --- End diff --
    
    True, will rename it.


---

Reply via email to