Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6279#discussion_r201869163
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
---
@@ -536,7 +540,25 @@ private JobManagerRunner
createJobManagerRunner(JobGraph jobGraph) throws Except
private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean
cleanupHA) {
final CompletableFuture<Void> cleanupFuture = removeJob(jobId,
cleanupHA);
- registerOrphanedJobManagerTerminationFuture(cleanupFuture);
+ registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
+ }
+
+ private void registerJobManagerRunnerTerminationFuture(JobID jobId,
CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
+
Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
+
+ jobManagerTerminationFutures.put(jobId,
jobManagerRunnerTerminationFuture);
+
+ // clean up the pending termination future
+ jobManagerRunnerTerminationFuture.thenRunAsync(
+ () -> {
+ final CompletableFuture<Void> terminationFuture
= jobManagerTerminationFutures.remove(jobId);
+
+ //noinspection ObjectEquality
+ if (terminationFuture != null &&
terminationFuture != jobManagerRunnerTerminationFuture) {
+ jobManagerTerminationFutures.put(jobId,
terminationFuture);
--- End diff --
It can happen because we also clear the termination future in the callback
of the `Dispatcher#waitForTerminatingJobManager` method.
---