[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15469776#comment-15469776
 ] 

ASF GitHub Bot commented on FLINK-4538:
---------------------------------------

Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77768256
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
    @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
         * RPC's main thread to avoid race condition).
         *
         * @param request The detailed request of the slot
    +    * @return SlotRequestRegistered The confirmation message to be send to 
the caller
         */
    -   public void requestSlot(final SlotRequest request) {
    +   public SlotRequestRegistered requestSlot(final SlotRequest request) {
    +           final AllocationID allocationId = request.getAllocationId();
                if (isRequestDuplicated(request)) {
    -                   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
    -                   return;
    +                   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
    +                   return null;
                }
     
                // try to fulfil the request with current free slots
    -           ResourceSlot slot = chooseSlotToUse(request, freeSlots);
    +           final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
                if (slot != null) {
                        LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
    -                           request.getAllocationId(), request.getJobId());
    +                           allocationId, request.getJobId());
     
                        // record this allocation in bookkeeping
    -                   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
    +                   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
     
                        // remove selected slot from free pool
                        freeSlots.remove(slot.getSlotId());
     
    -                   // TODO: send slot request to TaskManager
    +                   slot.getTaskExecutorGateway()
    +                           .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
    --- End diff --
    
    ResourceManager keeps a relationship between resourceID and 
TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID 
using ResourceManager here?


> Implement slot allocation protocol with JobMaster
> -------------------------------------------------
>
>                 Key: FLINK-4538
>                 URL: https://issues.apache.org/jira/browse/FLINK-4538
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to