Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77768256
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
    @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
         * RPC's main thread to avoid race condition).
         *
         * @param request The detailed request of the slot
    +    * @return SlotRequestRegistered The confirmation message to be send to 
the caller
         */
    -   public void requestSlot(final SlotRequest request) {
    +   public SlotRequestRegistered requestSlot(final SlotRequest request) {
    +           final AllocationID allocationId = request.getAllocationId();
                if (isRequestDuplicated(request)) {
    -                   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
    -                   return;
    +                   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
    +                   return null;
                }
     
                // try to fulfil the request with current free slots
    -           ResourceSlot slot = chooseSlotToUse(request, freeSlots);
    +           final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
                if (slot != null) {
                        LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
    -                           request.getAllocationId(), request.getJobId());
    +                           allocationId, request.getJobId());
     
                        // record this allocation in bookkeeping
    -                   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
    +                   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
     
                        // remove selected slot from free pool
                        freeSlots.remove(slot.getSlotId());
     
    -                   // TODO: send slot request to TaskManager
    +                   slot.getTaskExecutorGateway()
    +                           .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
    --- End diff --
    
    ResourceManager keeps a relationship between resourceID and 
TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID 
using ResourceManager here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to