[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15469801#comment-15469801 ]
ASF GitHub Bot commented on FLINK-4538: --------------------------------------- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77769044 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot + * @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- There exists 3 following possibilities of the response from taskExecutor: 1. Ack request which means the taskExecutor gives the slot to the specified jobMaster as expected. 2. Decline request if the slot is already occupied by other AllocationID. 3. Timeout which could caused by lost of request message or response message or slow network transfer. On the first occasion, SlotManager need to do nothing. However, under the second and third occasion, slotManager will verify and clear all the previous allocate information for this slot request firstly, then try to find a proper slot for the slot request again. I thought we should add logic to handle these 3 following possibilities of the response from taskExecutor. > Implement slot allocation protocol with JobMaster > ------------------------------------------------- > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Maximilian Michels > Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)