This is an automated email from the ASF dual-hosted git repository.

junrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b0607a15e62 [FLINK-37697][runtime] Fix application mode cluster shut 
down before all job termination futures complete
b0607a15e62 is described below

commit b0607a15e62b664d15efbda0b0e991f72e45a467
Author: Yi Zhang <zhangyi0...@163.com>
AuthorDate: Tue Apr 15 18:51:46 2025 +0800

    [FLINK-37697][runtime] Fix application mode cluster shut down before all 
job termination futures complete
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 19 +++++++++++++++---
 .../flink/runtime/dispatcher/DispatcherTest.java   | 23 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5c9930be711..13a10850ee4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -1093,13 +1093,25 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
     @Override
     public CompletableFuture<Acknowledge> shutDownCluster() {
-        return shutDownCluster(ApplicationStatus.SUCCEEDED);
+        return internalShutDownCluster(ApplicationStatus.SUCCEEDED, false);
     }
 
     @Override
     public CompletableFuture<Acknowledge> shutDownCluster(
             final ApplicationStatus applicationStatus) {
-        shutDownFuture.complete(applicationStatus);
+        return internalShutDownCluster(applicationStatus, true);
+    }
+
+    private CompletableFuture<Acknowledge> internalShutDownCluster(
+            final ApplicationStatus applicationStatus,
+            final boolean waitForAllJobTerminationFutures) {
+        final CompletableFuture<Void> allJobsTerminationFuture =
+                waitForAllJobTerminationFutures
+                        ? 
FutureUtils.completeAll(jobManagerRunnerTerminationFutures.values())
+                        : CompletableFuture.completedFuture(null);
+
+        FutureUtils.runAfterwards(
+                allJobsTerminationFuture, () -> 
shutDownFuture.complete(applicationStatus));
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
@@ -1256,7 +1268,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
         }
     }
 
-    private void registerJobManagerRunnerTerminationFuture(
+    @VisibleForTesting
+    void registerJobManagerRunnerTerminationFuture(
             JobID jobId, CompletableFuture<Void> 
jobManagerRunnerTerminationFuture) {
         
Preconditions.checkState(!jobManagerRunnerTerminationFutures.containsKey(jobId));
         jobManagerRunnerTerminationFutures.put(jobId, 
jobManagerRunnerTerminationFuture);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index fd1931b6c84..a6325b252ff 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -1041,6 +1041,29 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         dispatcher.getShutDownFuture().get();
     }
 
+    @Test
+    public void testShutDownFutureCompletesAfterJobTerminationFutures() throws 
Exception {
+        dispatcher =
+                createAndStartDispatcher(
+                        heartbeatServices,
+                        haServices,
+                        JobMasterServiceLeadershipRunnerFactory.INSTANCE);
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        CompletableFuture<Void> jobTerminationFuture = new 
CompletableFuture<>();
+        dispatcher.registerJobManagerRunnerTerminationFuture(new JobID(), 
jobTerminationFuture);
+
+        dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED).get();
+
+        assertThatThrownBy(() -> dispatcher.getShutDownFuture().get(10L, 
TimeUnit.MILLISECONDS))
+                .isInstanceOf(TimeoutException.class);
+
+        jobTerminationFuture.complete(null);
+
+        dispatcher.getShutDownFuture().get();
+    }
+
     @Test
     public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception {
         final CompletableFuture<JobID> removeJobGraphFuture = new 
CompletableFuture<>();

Reply via email to