Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148553179
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
    @@ -399,6 +399,26 @@ public void disconnectJobManager(final JobID jobId, 
final Exception cause) {
        }
     
        @Override
    +   public void cancelSlotRequest(JobID jobID, JobMasterId jobMasterId, 
AllocationID allocationID) {
    +
    +           // As the slot allocations are async, it can not avoid all 
redundent slots, but should best effort.
    +           JobManagerRegistration jobManagerRegistration = 
jobManagerRegistrations.get(jobID);
    +
    +           if (null != jobManagerRegistration) {
    +                   if (Objects.equals(jobMasterId, 
jobManagerRegistration.getJobMasterId())) {
    +                           log.info("Cancel slot request for job {} with 
allocation id {}.",
    +                                           jobID, allocationID);
    +
    +                           slotManager.unregisterSlotRequest(allocationID);
    +                   } else {
    +                           log.info("Job manager {} is not the leader of 
job {}.", jobMasterId, jobID);
    +                   }
    +           } else {
    +                   log.warn("Could not find registered job manager for job 
{}.", jobID);
    +           }
    --- End diff --
    
    This could be simplified to 
`slotManager.unregisterSlotRequest(allocationId)` if we change 
`cancelSlotRequest(JobID, JobMasterId, AllocationID)` to 
`cancelSlotRequest(AllocationID)`.


---

Reply via email to