Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6279#discussion_r201329956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -536,7 +540,25 @@ private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Except private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) { final CompletableFuture<Void> cleanupFuture = removeJob(jobId, cleanupHA); - registerOrphanedJobManagerTerminationFuture(cleanupFuture); + registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture); + } + + private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) { + Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId)); + + jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture); + + // clean up the pending termination future + jobManagerRunnerTerminationFuture.thenRunAsync( + () -> { + final CompletableFuture<Void> terminationFuture = jobManagerTerminationFutures.remove(jobId); + + //noinspection ObjectEquality + if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) { + jobManagerTerminationFutures.put(jobId, terminationFuture); --- End diff -- Here you handle a case where a terminationFuture for a job got replaced. Under what circumstances can this happen? Doesn't the `checkState`: `Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));` prevent this?
---