Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6279#discussion_r201329956
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
    @@ -536,7 +540,25 @@ private JobManagerRunner 
createJobManagerRunner(JobGraph jobGraph) throws Except
        private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean 
cleanupHA) {
                final CompletableFuture<Void> cleanupFuture = removeJob(jobId, 
cleanupHA);
     
    -           registerOrphanedJobManagerTerminationFuture(cleanupFuture);
    +           registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
    +   }
    +
    +   private void registerJobManagerRunnerTerminationFuture(JobID jobId, 
CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
    +           
Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
    +
    +           jobManagerTerminationFutures.put(jobId, 
jobManagerRunnerTerminationFuture);
    +
    +           // clean up the pending termination future
    +           jobManagerRunnerTerminationFuture.thenRunAsync(
    +                   () -> {
    +                           final CompletableFuture<Void> terminationFuture 
= jobManagerTerminationFutures.remove(jobId);
    +
    +                           //noinspection ObjectEquality
    +                           if (terminationFuture != null && 
terminationFuture != jobManagerRunnerTerminationFuture) {
    +                                   jobManagerTerminationFutures.put(jobId, 
terminationFuture);
    --- End diff --
    
    Here you handle a case where a terminationFuture for a job got replaced. 
Under what circumstances can this happen? Doesn't the `checkState`: 
`Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));` 
prevent this?


---

Reply via email to