This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a5b178fab3 [FLINK-39311][runtime] Fix application-related main thread 
execution in Dispatcher
4a5b178fab3 is described below

commit 4a5b178fab3613ac8f3ae125819298027cdde357
Author: Yi Zhang <[email protected]>
AuthorDate: Tue Mar 24 15:37:56 2026 +0800

    [FLINK-39311][runtime] Fix application-related main thread execution in 
Dispatcher
---
 .../flink/runtime/dispatcher/Dispatcher.java       |  5 ++-
 .../DispatcherApplicationResourceCleanupTest.java  | 23 ++++++++---
 .../dispatcher/DispatcherApplicationTest.java      | 44 +++++++++++-----------
 .../dispatcher/DispatcherCleanupITCase.java        | 16 +++++++-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 19 +++++++---
 5 files changed, 69 insertions(+), 38 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c9538955750..d5394704e32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -980,8 +980,9 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
             // Step 4: perform application cleanup and mark result clean after 
all cleanup
             // (including job cleanup) is done
-            applicationDirtyResultFuture.thenCompose(
-                    ignored -> removeApplication(applicationId, 
application.getJobs()));
+            applicationDirtyResultFuture.thenComposeAsync(
+                    ignored -> removeApplication(applicationId, 
application.getJobs()),
+                    getMainThreadExecutor());
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java
index a649c766930..b3e21f124d8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java
@@ -130,7 +130,7 @@ public class DispatcherApplicationResourceCleanupTest {
         CompletableFuture<?> applicationTerminationFuture =
                 dispatcher.getApplicationTerminationFuture(applicationId);
 
-        dispatcher.notifyApplicationStatusChange(applicationId, 
ApplicationState.FINISHED);
+        mockApplicationFinished();
 
         applicationTerminationFuture.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
 
@@ -161,7 +161,7 @@ public class DispatcherApplicationResourceCleanupTest {
 
         submitApplicationAndWait();
 
-        dispatcher.notifyApplicationStatusChange(applicationId, 
ApplicationState.FINISHED);
+        mockApplicationFinished();
 
         assertThatNoCleanupWasTriggered();
 
@@ -213,7 +213,7 @@ public class DispatcherApplicationResourceCleanupTest {
         CompletableFuture<?> applicationTerminationFuture =
                 dispatcher.getApplicationTerminationFuture(applicationId);
 
-        dispatcher.notifyApplicationStatusChange(applicationId, 
ApplicationState.FINISHED);
+        mockApplicationFinished();
 
         // Mark as clean should not have been called yet
         assertFalse(markAsCleanFuture.isDone());
@@ -258,7 +258,7 @@ public class DispatcherApplicationResourceCleanupTest {
 
         submitApplicationAndWait();
 
-        dispatcher.notifyApplicationStatusChange(applicationId, 
ApplicationState.FINISHED);
+        mockApplicationFinished();
 
         // Fatal error should be reported
         final CompletableFuture<? extends Throwable> errorFuture =
@@ -291,7 +291,7 @@ public class DispatcherApplicationResourceCleanupTest {
 
         submitApplicationAndWait();
 
-        dispatcher.notifyApplicationStatusChange(applicationId, 
ApplicationState.FINISHED);
+        mockApplicationFinished();
 
         // No fatal error should be reported (mark as clean failure is handled 
gracefully)
         final CompletableFuture<? extends Throwable> errorFuture =
@@ -330,7 +330,7 @@ public class DispatcherApplicationResourceCleanupTest {
         CompletableFuture<?> applicationTerminationFuture =
                 dispatcher.getApplicationTerminationFuture(applicationId);
 
-        dispatcher.notifyApplicationStatusChange(applicationId, 
ApplicationState.FINISHED);
+        mockApplicationFinished();
 
         // Before the archiving is finished, the cleanup is not finished and 
the application is not
         // terminated
@@ -386,4 +386,15 @@ public class DispatcherApplicationResourceCleanupTest {
     private void submitApplicationAndWait() {
         submitApplication().join();
     }
+
+    private void mockApplicationFinished() throws Exception {
+        dispatcher
+                .callAsyncInMainThread(
+                        () -> {
+                            dispatcher.notifyApplicationStatusChange(
+                                    applicationId, ApplicationState.FINISHED);
+                            return CompletableFuture.completedFuture(null);
+                        })
+                .get();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
index accd62e7f4e..06e92581bd1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
@@ -53,8 +53,10 @@ import 
org.apache.flink.runtime.messages.FlinkApplicationTerminatedWithoutCancel
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.TestingApplicationResultStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.junit.jupiter.api.AfterAll;
@@ -211,11 +213,9 @@ public class DispatcherApplicationTest {
         // wait for archive to complete
         applicationTerminationFuture.get(TIMEOUT.toMillis(), 
TimeUnit.MILLISECONDS);
 
-        assertThrows(
-                IllegalStateException.class,
-                () ->
-                        dispatcher.notifyApplicationStatusChange(
-                                applicationId, ApplicationState.FAILED));
+        assertThatThrownBy(() -> 
mockApplicationStatusChange(ApplicationState.FAILED))
+                .extracting(ExceptionUtils::stripExecutionException)
+                .isInstanceOf(IllegalStateException.class);
     }
 
     @Test
@@ -581,14 +581,7 @@ public class DispatcherApplicationTest {
                 .isTrue();
 
         // complete the application - this should trigger cleanup of the 
remaining recovered job
-        dispatcher
-                .callAsyncInMainThread(
-                        () -> {
-                            dispatcher.notifyApplicationStatusChange(
-                                    applicationId, ApplicationState.FINISHED);
-                            return 
CompletableFuture.completedFuture(Acknowledge.get());
-                        })
-                .get();
+        mockApplicationStatusChange(ApplicationState.FINISHED);
 
         // verify that no jobs are recovered
         assertThat(jobManagerRunnerFactory.getQueueSize()).isZero();
@@ -654,13 +647,14 @@ public class DispatcherApplicationTest {
                                 .anyMatch(r -> r.getJobId().equals(jobId)))
                 .isTrue();
 
+        CompletableFuture<?> applicationTerminationFuture =
+                dispatcher.getApplicationTerminationFuture(applicationId);
+
         // complete the application
-        dispatcher.notifyApplicationStatusChange(applicationId, 
ApplicationState.FINISHED);
+        mockApplicationStatusChange(ApplicationState.FINISHED);
 
         // wait for application termination
-        dispatcher
-                .getApplicationTerminationFuture(applicationId)
-                .get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+        applicationTerminationFuture.get(TIMEOUT.toMillis(), 
TimeUnit.MILLISECONDS);
 
         // wait for job termination
         dispatcher
@@ -781,12 +775,12 @@ public class DispatcherApplicationTest {
         assertThat(application.getApplicationId()).isEqualTo(applicationId);
         
assertThat(application.getApplicationStatus()).isEqualTo(ApplicationState.FINISHED);
 
-        assertThatFuture(
+        CommonTestUtils.waitUntilCondition(
+                () ->
                         haServices
                                 .getApplicationResultStore()
-                                
.hasCleanApplicationResultEntryAsync(applicationId))
-                .eventuallySucceeds()
-                .isEqualTo(true);
+                                
.hasCleanApplicationResultEntryAsync(applicationId)
+                                .get());
     }
 
     @Test
@@ -827,7 +821,13 @@ public class DispatcherApplicationTest {
     }
 
     private void mockApplicationStatusChange(ApplicationState targetState) 
throws Exception {
-        dispatcher.notifyApplicationStatusChange(applicationId, targetState);
+        dispatcher
+                .callAsyncInMainThread(
+                        () -> {
+                            
dispatcher.notifyApplicationStatusChange(applicationId, targetState);
+                            return CompletableFuture.completedFuture(null);
+                        })
+                .get();
     }
 
     private TestingDispatcher.Builder createTestingDispatcherBuilder() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index e9fc9833be5..da202b0dc44 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -417,7 +417,7 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
         final DispatcherGateway dispatcherGateway =
                 dispatcher.getSelfGateway(DispatcherGateway.class);
         dispatcherGateway.submitApplication(application, TIMEOUT).get();
-        dispatcher.notifyApplicationStatusChange(applicationId, 
ApplicationState.FINISHED);
+        mockApplicationFinished(dispatcher, applicationId);
 
         successfulCleanupLatch.await();
 
@@ -524,7 +524,7 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
         final DispatcherGateway dispatcherGateway =
                 dispatcher.getSelfGateway(DispatcherGateway.class);
         dispatcherGateway.submitApplication(application, TIMEOUT).get();
-        dispatcher.notifyApplicationStatusChange(applicationId, 
ApplicationState.FINISHED);
+        mockApplicationFinished(dispatcher, applicationId);
 
         firstCleanupTriggered.await();
 
@@ -621,4 +621,16 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
             DispatcherGateway dispatcherGateway, ExecutionPlan executionPlan) 
throws Exception {
         dispatcherGateway.submitApplication(new 
SingleJobApplication(executionPlan), TIMEOUT).get();
     }
+
+    private void mockApplicationFinished(TestingDispatcher dispatcher, 
ApplicationID applicationId)
+            throws Exception {
+        dispatcher
+                .callAsyncInMainThread(
+                        () -> {
+                            dispatcher.notifyApplicationStatusChange(
+                                    applicationId, ApplicationState.FINISHED);
+                            return CompletableFuture.completedFuture(null);
+                        })
+                .get();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index c492d150c5a..0048ab5d86a 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -510,7 +510,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
                                         .build())));
 
         // mock application termination so that its jobs can be marked clean 
and terminate
-        mockApplicationStatusChange(ApplicationState.FINISHED);
+        mockApplicationFinished();
 
         // wait for job to finish
         dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
@@ -549,7 +549,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
                                         .build())));
 
         // mock application termination so that its jobs can be marked clean 
and terminate
-        mockApplicationStatusChange(ApplicationState.FINISHED);
+        mockApplicationFinished();
 
         // wait for job to finish
         dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
@@ -618,8 +618,15 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
                 .get();
     }
 
-    private void mockApplicationStatusChange(ApplicationState targetState) 
throws Exception {
-        dispatcher.notifyApplicationStatusChange(applicationId, targetState);
+    private void mockApplicationFinished() throws Exception {
+        dispatcher
+                .callAsyncInMainThread(
+                        () -> {
+                            dispatcher.notifyApplicationStatusChange(
+                                    applicationId, ApplicationState.FINISHED);
+                            return CompletableFuture.completedFuture(null);
+                        })
+                .get();
     }
 
     @Test
@@ -659,7 +666,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
                         testFailure));
 
         // mock application termination so that its jobs can be marked clean 
and terminate
-        mockApplicationStatusChange(ApplicationState.FINISHED);
+        mockApplicationFinished();
 
         // wait till job has failed
         dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
@@ -1270,7 +1277,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
         // mock application termination so that its jobs can be marked clean 
and terminate
-        mockApplicationStatusChange(ApplicationState.FINISHED);
+        mockApplicationFinished();
 
         dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
 

Reply via email to