Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2571#discussion_r82132624
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
    @@ -136,53 +127,34 @@ public SlotRequestRegistered requestSlot(final 
SlotRequest request) {
                        // record this allocation in bookkeeping
                        allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
                        // remove selected slot from free pool
    -                   final ResourceSlot removedSlot = 
freeSlots.remove(slot.getSlotId());
    -
    -                   final Future<SlotRequestReply> slotRequestReplyFuture =
    -                           
slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
    -
    -                   slotRequestReplyFuture.handleAsync(new 
BiFunction<SlotRequestReply, Throwable, Object>() {
    -                           @Override
    -                           public Object apply(SlotRequestReply 
slotRequestReply, Throwable throwable) {
    -                                   if (throwable != null) {
    -                                           // we failed, put the slot and 
the request back again
    -                                           if 
(allocationMap.isAllocated(slot.getSlotId())) {
    -                                                   // only re-add if the 
slot hasn't been removed in the meantime
    -                                                   
freeSlots.put(slot.getSlotId(), removedSlot);
    -                                           }
    -                                           
pendingSlotRequests.put(allocationId, request);
    -                                   }
    -                                   return null;
    -                           }
    -                   }, resourceManagerServices.getExecutor());
    +                   freeSlots.remove(slot.getSlotId());
    +
    +                   sendSlotRequest(slot, request);
                } else {
                        LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
                                "AllocationID:{}, JobID:{}", allocationId, 
request.getJobId());
    -                   Preconditions.checkState(resourceManagerServices != 
null,
    +                   Preconditions.checkState(rmServices != null,
                                "Attempted to allocate resources but no 
ResourceManagerServices set.");
    -                   
resourceManagerServices.allocateResource(request.getResourceProfile());
    +                   
rmServices.allocateResource(request.getResourceProfile());
                        pendingSlotRequests.put(allocationId, request);
                }
     
    -           return new SlotRequestRegistered(allocationId);
    +           return new RMSlotRequestRegistered(allocationId);
        }
     
        /**
    -    * Sync slot status with TaskManager's SlotReport.
    +    * Notifies the SlotManager that a slot is available again after being 
allocated.
    +    * @param slotID slot id of available slot
         */
    -   public void updateSlotStatus(final SlotReport slotReport) {
    -           for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
    -                   updateSlotStatus(slotStatus);
    +   public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
    +           if (!allocationMap.isAllocated(slotID)) {
    +                   throw new IllegalStateException("Slot was not 
previously allocated but " +
    +                           "TaskManager reports it as available again");
                }
    -   }
    -
    -   /**
    -    * Registers a TaskExecutor
    -    * @param resourceID TaskExecutor's ResourceID
    -    * @param gateway TaskExcutor's gateway
    -    */
    -   public void registerTaskExecutor(ResourceID resourceID, 
TaskExecutorGateway gateway) {
    -           this.taskManagerGateways.put(resourceID, gateway);
    +           allocationMap.removeAllocation(slotID);
    +           final Map<SlotID, ResourceSlot> slots = 
registeredSlots.get(resourceID);
    +           ResourceSlot freeSlot = slots.get(slotID);
    --- End diff --
    
    Better to check whether the slot exists in case of TM report an unknown 
free slot


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to