This is an automated email from the ASF dual-hosted git repository. junrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b0607a15e62 [FLINK-37697][runtime] Fix application mode cluster shut down before all job termination futures complete b0607a15e62 is described below commit b0607a15e62b664d15efbda0b0e991f72e45a467 Author: Yi Zhang <zhangyi0...@163.com> AuthorDate: Tue Apr 15 18:51:46 2025 +0800 [FLINK-37697][runtime] Fix application mode cluster shut down before all job termination futures complete --- .../flink/runtime/dispatcher/Dispatcher.java | 19 +++++++++++++++--- .../flink/runtime/dispatcher/DispatcherTest.java | 23 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 5c9930be711..13a10850ee4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -1093,13 +1093,25 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> @Override public CompletableFuture<Acknowledge> shutDownCluster() { - return shutDownCluster(ApplicationStatus.SUCCEEDED); + return internalShutDownCluster(ApplicationStatus.SUCCEEDED, false); } @Override public CompletableFuture<Acknowledge> shutDownCluster( final ApplicationStatus applicationStatus) { - shutDownFuture.complete(applicationStatus); + return internalShutDownCluster(applicationStatus, true); + } + + private CompletableFuture<Acknowledge> internalShutDownCluster( + final ApplicationStatus applicationStatus, + final boolean waitForAllJobTerminationFutures) { + final CompletableFuture<Void> allJobsTerminationFuture = + waitForAllJobTerminationFutures + ? FutureUtils.completeAll(jobManagerRunnerTerminationFutures.values()) + : CompletableFuture.completedFuture(null); + + FutureUtils.runAfterwards( + allJobsTerminationFuture, () -> shutDownFuture.complete(applicationStatus)); return CompletableFuture.completedFuture(Acknowledge.get()); } @@ -1256,7 +1268,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> } } - private void registerJobManagerRunnerTerminationFuture( + @VisibleForTesting + void registerJobManagerRunnerTerminationFuture( JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) { Preconditions.checkState(!jobManagerRunnerTerminationFutures.containsKey(jobId)); jobManagerRunnerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index fd1931b6c84..a6325b252ff 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -1041,6 +1041,29 @@ public class DispatcherTest extends AbstractDispatcherTest { dispatcher.getShutDownFuture().get(); } + @Test + public void testShutDownFutureCompletesAfterJobTerminationFutures() throws Exception { + dispatcher = + createAndStartDispatcher( + heartbeatServices, + haServices, + JobMasterServiceLeadershipRunnerFactory.INSTANCE); + final DispatcherGateway dispatcherGateway = + dispatcher.getSelfGateway(DispatcherGateway.class); + + CompletableFuture<Void> jobTerminationFuture = new CompletableFuture<>(); + dispatcher.registerJobManagerRunnerTerminationFuture(new JobID(), jobTerminationFuture); + + dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED).get(); + + assertThatThrownBy(() -> dispatcher.getShutDownFuture().get(10L, TimeUnit.MILLISECONDS)) + .isInstanceOf(TimeoutException.class); + + jobTerminationFuture.complete(null); + + dispatcher.getShutDownFuture().get(); + } + @Test public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception { final CompletableFuture<JobID> removeJobGraphFuture = new CompletableFuture<>();