Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148553179 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -399,6 +399,26 @@ public void disconnectJobManager(final JobID jobId, final Exception cause) { } @Override + public void cancelSlotRequest(JobID jobID, JobMasterId jobMasterId, AllocationID allocationID) { + + // As the slot allocations are async, it can not avoid all redundent slots, but should best effort. + JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobID); + + if (null != jobManagerRegistration) { + if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) { + log.info("Cancel slot request for job {} with allocation id {}.", + jobID, allocationID); + + slotManager.unregisterSlotRequest(allocationID); + } else { + log.info("Job manager {} is not the leader of job {}.", jobMasterId, jobID); + } + } else { + log.warn("Could not find registered job manager for job {}.", jobID); + } --- End diff -- This could be simplified to `slotManager.unregisterSlotRequest(allocationId)` if we change `cancelSlotRequest(JobID, JobMasterId, AllocationID)` to `cancelSlotRequest(AllocationID)`.
---