[ https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551229#comment-15551229 ]
ASF GitHub Bot commented on FLINK-4348: --------------------------------------- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2571#discussion_r82132624 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -136,53 +127,34 @@ public SlotRequestRegistered requestSlot(final SlotRequest request) { // record this allocation in bookkeeping allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool - final ResourceSlot removedSlot = freeSlots.remove(slot.getSlotId()); - - final Future<SlotRequestReply> slotRequestReplyFuture = - slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout); - - slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() { - @Override - public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) { - if (throwable != null) { - // we failed, put the slot and the request back again - if (allocationMap.isAllocated(slot.getSlotId())) { - // only re-add if the slot hasn't been removed in the meantime - freeSlots.put(slot.getSlotId(), removedSlot); - } - pendingSlotRequests.put(allocationId, request); - } - return null; - } - }, resourceManagerServices.getExecutor()); + freeSlots.remove(slot.getSlotId()); + + sendSlotRequest(slot, request); } else { LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " + "AllocationID:{}, JobID:{}", allocationId, request.getJobId()); - Preconditions.checkState(resourceManagerServices != null, + Preconditions.checkState(rmServices != null, "Attempted to allocate resources but no ResourceManagerServices set."); - resourceManagerServices.allocateResource(request.getResourceProfile()); + rmServices.allocateResource(request.getResourceProfile()); pendingSlotRequests.put(allocationId, request); } - return new SlotRequestRegistered(allocationId); + return new RMSlotRequestRegistered(allocationId); } /** - * Sync slot status with TaskManager's SlotReport. + * Notifies the SlotManager that a slot is available again after being allocated. + * @param slotID slot id of available slot */ - public void updateSlotStatus(final SlotReport slotReport) { - for (SlotStatus slotStatus : slotReport.getSlotsStatus()) { - updateSlotStatus(slotStatus); + public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) { + if (!allocationMap.isAllocated(slotID)) { + throw new IllegalStateException("Slot was not previously allocated but " + + "TaskManager reports it as available again"); } - } - - /** - * Registers a TaskExecutor - * @param resourceID TaskExecutor's ResourceID - * @param gateway TaskExcutor's gateway - */ - public void registerTaskExecutor(ResourceID resourceID, TaskExecutorGateway gateway) { - this.taskManagerGateways.put(resourceID, gateway); + allocationMap.removeAllocation(slotID); + final Map<SlotID, ResourceSlot> slots = registeredSlots.get(resourceID); + ResourceSlot freeSlot = slots.get(slotID); --- End diff -- Better to check whether the slot exists in case of TM report an unknown free slot > Implement slot allocation protocol with TaskExecutor > ---------------------------------------------------- > > Key: FLINK-4348 > URL: https://issues.apache.org/jira/browse/FLINK-4348 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Kurt Young > Assignee: Maximilian Michels > > When slotManager finds a proper slot in the free pool for a slot request, > slotManager marks the slot as occupied, then tells the taskExecutor to give > the slot to the specified JobMaster. > when a slot request is sent to taskExecutor, it should contain following > parameters: AllocationID, JobID, slotID, resourceManagerLeaderSessionID. > There exists 3 following possibilities of the response from taskExecutor, we > will discuss when each possibility happens and how to handle. > 1. Ack request which means the taskExecutor gives the slot to the specified > jobMaster as expected. > 2. Decline request if the slot is already occupied by other AllocationID. > 3. Timeout which could caused by lost of request message or response message > or slow network transfer. > On the first occasion, ResourceManager need to do nothing. However, under the > second and third occasion, ResourceManager need to notify slotManager, > slotManager will verify and clear all the previous allocate information for > this slot request firstly, then try to find a proper slot for the slot > request again. This may cause some duplicate allocation, e.g. the slot > request to TaskManager is successful but the response is lost somehow, so we > may request a slot in another TaskManager, this causes two slots assigned to > one request, but it can be taken care of by rejecting registration at > JobMaster. > There are still some question need to discuss in a step further. > 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I > think it's better that SlotManager delegates the rpc call to ResourceManager > when SlotManager need to communicate with outside world. ResourceManager > know which taskExecutor to send the request based on ResourceID. Besides this > RPC call which used to request slot to taskExecutor should not be a > RpcMethod, because we hope only SlotManager has permission to call the > method, but the other component, for example JobMaster and TaskExecutor, > cannot call this method directly. > 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor > should notify the free slot to ResourceManager immediately, or wait for next > heartbeat sync. The advantage of first way is the resourceManager’s view > could be updated faster. The advantage of second way is save a RPC method in > ResourceManager. > 3. There are two communication type. First, the slot request could be sent as > an ask operation where the response is returned as a future. Second, > resourceManager send the slot request in fire and forget way, the response > could be returned by an RPC call. I prefer the first one because it is more > simple and could save a RPC method in ResourceManager (for callback in the > second way). -- This message was sent by Atlassian JIRA (v6.3.4#6332)