[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551229#comment-15551229
 ] 

ASF GitHub Bot commented on FLINK-4348:
---------------------------------------

Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2571#discussion_r82132624
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
    @@ -136,53 +127,34 @@ public SlotRequestRegistered requestSlot(final 
SlotRequest request) {
                        // record this allocation in bookkeeping
                        allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
                        // remove selected slot from free pool
    -                   final ResourceSlot removedSlot = 
freeSlots.remove(slot.getSlotId());
    -
    -                   final Future<SlotRequestReply> slotRequestReplyFuture =
    -                           
slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
    -
    -                   slotRequestReplyFuture.handleAsync(new 
BiFunction<SlotRequestReply, Throwable, Object>() {
    -                           @Override
    -                           public Object apply(SlotRequestReply 
slotRequestReply, Throwable throwable) {
    -                                   if (throwable != null) {
    -                                           // we failed, put the slot and 
the request back again
    -                                           if 
(allocationMap.isAllocated(slot.getSlotId())) {
    -                                                   // only re-add if the 
slot hasn't been removed in the meantime
    -                                                   
freeSlots.put(slot.getSlotId(), removedSlot);
    -                                           }
    -                                           
pendingSlotRequests.put(allocationId, request);
    -                                   }
    -                                   return null;
    -                           }
    -                   }, resourceManagerServices.getExecutor());
    +                   freeSlots.remove(slot.getSlotId());
    +
    +                   sendSlotRequest(slot, request);
                } else {
                        LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
                                "AllocationID:{}, JobID:{}", allocationId, 
request.getJobId());
    -                   Preconditions.checkState(resourceManagerServices != 
null,
    +                   Preconditions.checkState(rmServices != null,
                                "Attempted to allocate resources but no 
ResourceManagerServices set.");
    -                   
resourceManagerServices.allocateResource(request.getResourceProfile());
    +                   
rmServices.allocateResource(request.getResourceProfile());
                        pendingSlotRequests.put(allocationId, request);
                }
     
    -           return new SlotRequestRegistered(allocationId);
    +           return new RMSlotRequestRegistered(allocationId);
        }
     
        /**
    -    * Sync slot status with TaskManager's SlotReport.
    +    * Notifies the SlotManager that a slot is available again after being 
allocated.
    +    * @param slotID slot id of available slot
         */
    -   public void updateSlotStatus(final SlotReport slotReport) {
    -           for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
    -                   updateSlotStatus(slotStatus);
    +   public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
    +           if (!allocationMap.isAllocated(slotID)) {
    +                   throw new IllegalStateException("Slot was not 
previously allocated but " +
    +                           "TaskManager reports it as available again");
                }
    -   }
    -
    -   /**
    -    * Registers a TaskExecutor
    -    * @param resourceID TaskExecutor's ResourceID
    -    * @param gateway TaskExcutor's gateway
    -    */
    -   public void registerTaskExecutor(ResourceID resourceID, 
TaskExecutorGateway gateway) {
    -           this.taskManagerGateways.put(resourceID, gateway);
    +           allocationMap.removeAllocation(slotID);
    +           final Map<SlotID, ResourceSlot> slots = 
registeredSlots.get(resourceID);
    +           ResourceSlot freeSlot = slots.get(slotID);
    --- End diff --
    
    Better to check whether the slot exists in case of TM report an unknown 
free slot


> Implement slot allocation protocol with TaskExecutor
> ----------------------------------------------------
>
>                 Key: FLINK-4348
>                 URL: https://issues.apache.org/jira/browse/FLINK-4348
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: Kurt Young
>            Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to