This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4a5b178fab3 [FLINK-39311][runtime] Fix application-related main thread
execution in Dispatcher
4a5b178fab3 is described below
commit 4a5b178fab3613ac8f3ae125819298027cdde357
Author: Yi Zhang <[email protected]>
AuthorDate: Tue Mar 24 15:37:56 2026 +0800
[FLINK-39311][runtime] Fix application-related main thread execution in
Dispatcher
---
.../flink/runtime/dispatcher/Dispatcher.java | 5 ++-
.../DispatcherApplicationResourceCleanupTest.java | 23 ++++++++---
.../dispatcher/DispatcherApplicationTest.java | 44 +++++++++++-----------
.../dispatcher/DispatcherCleanupITCase.java | 16 +++++++-
.../flink/runtime/dispatcher/DispatcherTest.java | 19 +++++++---
5 files changed, 69 insertions(+), 38 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c9538955750..d5394704e32 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -980,8 +980,9 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
// Step 4: perform application cleanup and mark result clean after
all cleanup
// (including job cleanup) is done
- applicationDirtyResultFuture.thenCompose(
- ignored -> removeApplication(applicationId,
application.getJobs()));
+ applicationDirtyResultFuture.thenComposeAsync(
+ ignored -> removeApplication(applicationId,
application.getJobs()),
+ getMainThreadExecutor());
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java
index a649c766930..b3e21f124d8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java
@@ -130,7 +130,7 @@ public class DispatcherApplicationResourceCleanupTest {
CompletableFuture<?> applicationTerminationFuture =
dispatcher.getApplicationTerminationFuture(applicationId);
- dispatcher.notifyApplicationStatusChange(applicationId,
ApplicationState.FINISHED);
+ mockApplicationFinished();
applicationTerminationFuture.get(timeout.toMillis(),
TimeUnit.MILLISECONDS);
@@ -161,7 +161,7 @@ public class DispatcherApplicationResourceCleanupTest {
submitApplicationAndWait();
- dispatcher.notifyApplicationStatusChange(applicationId,
ApplicationState.FINISHED);
+ mockApplicationFinished();
assertThatNoCleanupWasTriggered();
@@ -213,7 +213,7 @@ public class DispatcherApplicationResourceCleanupTest {
CompletableFuture<?> applicationTerminationFuture =
dispatcher.getApplicationTerminationFuture(applicationId);
- dispatcher.notifyApplicationStatusChange(applicationId,
ApplicationState.FINISHED);
+ mockApplicationFinished();
// Mark as clean should not have been called yet
assertFalse(markAsCleanFuture.isDone());
@@ -258,7 +258,7 @@ public class DispatcherApplicationResourceCleanupTest {
submitApplicationAndWait();
- dispatcher.notifyApplicationStatusChange(applicationId,
ApplicationState.FINISHED);
+ mockApplicationFinished();
// Fatal error should be reported
final CompletableFuture<? extends Throwable> errorFuture =
@@ -291,7 +291,7 @@ public class DispatcherApplicationResourceCleanupTest {
submitApplicationAndWait();
- dispatcher.notifyApplicationStatusChange(applicationId,
ApplicationState.FINISHED);
+ mockApplicationFinished();
// No fatal error should be reported (mark as clean failure is handled
gracefully)
final CompletableFuture<? extends Throwable> errorFuture =
@@ -330,7 +330,7 @@ public class DispatcherApplicationResourceCleanupTest {
CompletableFuture<?> applicationTerminationFuture =
dispatcher.getApplicationTerminationFuture(applicationId);
- dispatcher.notifyApplicationStatusChange(applicationId,
ApplicationState.FINISHED);
+ mockApplicationFinished();
// Before the archiving is finished, the cleanup is not finished and
the application is not
// terminated
@@ -386,4 +386,15 @@ public class DispatcherApplicationResourceCleanupTest {
private void submitApplicationAndWait() {
submitApplication().join();
}
+
+ private void mockApplicationFinished() throws Exception {
+ dispatcher
+ .callAsyncInMainThread(
+ () -> {
+ dispatcher.notifyApplicationStatusChange(
+ applicationId, ApplicationState.FINISHED);
+ return CompletableFuture.completedFuture(null);
+ })
+ .get();
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
index accd62e7f4e..06e92581bd1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
@@ -53,8 +53,10 @@ import
org.apache.flink.runtime.messages.FlinkApplicationTerminatedWithoutCancel
import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingApplicationResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.AfterAll;
@@ -211,11 +213,9 @@ public class DispatcherApplicationTest {
// wait for archive to complete
applicationTerminationFuture.get(TIMEOUT.toMillis(),
TimeUnit.MILLISECONDS);
- assertThrows(
- IllegalStateException.class,
- () ->
- dispatcher.notifyApplicationStatusChange(
- applicationId, ApplicationState.FAILED));
+ assertThatThrownBy(() ->
mockApplicationStatusChange(ApplicationState.FAILED))
+ .extracting(ExceptionUtils::stripExecutionException)
+ .isInstanceOf(IllegalStateException.class);
}
@Test
@@ -581,14 +581,7 @@ public class DispatcherApplicationTest {
.isTrue();
// complete the application - this should trigger cleanup of the
remaining recovered job
- dispatcher
- .callAsyncInMainThread(
- () -> {
- dispatcher.notifyApplicationStatusChange(
- applicationId, ApplicationState.FINISHED);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- })
- .get();
+ mockApplicationStatusChange(ApplicationState.FINISHED);
// verify that no jobs are recovered
assertThat(jobManagerRunnerFactory.getQueueSize()).isZero();
@@ -654,13 +647,14 @@ public class DispatcherApplicationTest {
.anyMatch(r -> r.getJobId().equals(jobId)))
.isTrue();
+ CompletableFuture<?> applicationTerminationFuture =
+ dispatcher.getApplicationTerminationFuture(applicationId);
+
// complete the application
- dispatcher.notifyApplicationStatusChange(applicationId,
ApplicationState.FINISHED);
+ mockApplicationStatusChange(ApplicationState.FINISHED);
// wait for application termination
- dispatcher
- .getApplicationTerminationFuture(applicationId)
- .get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+ applicationTerminationFuture.get(TIMEOUT.toMillis(),
TimeUnit.MILLISECONDS);
// wait for job termination
dispatcher
@@ -781,12 +775,12 @@ public class DispatcherApplicationTest {
assertThat(application.getApplicationId()).isEqualTo(applicationId);
assertThat(application.getApplicationStatus()).isEqualTo(ApplicationState.FINISHED);
- assertThatFuture(
+ CommonTestUtils.waitUntilCondition(
+ () ->
haServices
.getApplicationResultStore()
-
.hasCleanApplicationResultEntryAsync(applicationId))
- .eventuallySucceeds()
- .isEqualTo(true);
+
.hasCleanApplicationResultEntryAsync(applicationId)
+ .get());
}
@Test
@@ -827,7 +821,13 @@ public class DispatcherApplicationTest {
}
private void mockApplicationStatusChange(ApplicationState targetState)
throws Exception {
- dispatcher.notifyApplicationStatusChange(applicationId, targetState);
+ dispatcher
+ .callAsyncInMainThread(
+ () -> {
+
dispatcher.notifyApplicationStatusChange(applicationId, targetState);
+ return CompletableFuture.completedFuture(null);
+ })
+ .get();
}
private TestingDispatcher.Builder createTestingDispatcherBuilder() {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index e9fc9833be5..da202b0dc44 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -417,7 +417,7 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherGateway.submitApplication(application, TIMEOUT).get();
- dispatcher.notifyApplicationStatusChange(applicationId,
ApplicationState.FINISHED);
+ mockApplicationFinished(dispatcher, applicationId);
successfulCleanupLatch.await();
@@ -524,7 +524,7 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherGateway.submitApplication(application, TIMEOUT).get();
- dispatcher.notifyApplicationStatusChange(applicationId,
ApplicationState.FINISHED);
+ mockApplicationFinished(dispatcher, applicationId);
firstCleanupTriggered.await();
@@ -621,4 +621,16 @@ public class DispatcherCleanupITCase extends
AbstractDispatcherTest {
DispatcherGateway dispatcherGateway, ExecutionPlan executionPlan)
throws Exception {
dispatcherGateway.submitApplication(new
SingleJobApplication(executionPlan), TIMEOUT).get();
}
+
+ private void mockApplicationFinished(TestingDispatcher dispatcher,
ApplicationID applicationId)
+ throws Exception {
+ dispatcher
+ .callAsyncInMainThread(
+ () -> {
+ dispatcher.notifyApplicationStatusChange(
+ applicationId, ApplicationState.FINISHED);
+ return CompletableFuture.completedFuture(null);
+ })
+ .get();
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index c492d150c5a..0048ab5d86a 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -510,7 +510,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
.build())));
// mock application termination so that its jobs can be marked clean
and terminate
- mockApplicationStatusChange(ApplicationState.FINISHED);
+ mockApplicationFinished();
// wait for job to finish
dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
@@ -549,7 +549,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
.build())));
// mock application termination so that its jobs can be marked clean
and terminate
- mockApplicationStatusChange(ApplicationState.FINISHED);
+ mockApplicationFinished();
// wait for job to finish
dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
@@ -618,8 +618,15 @@ public class DispatcherTest extends AbstractDispatcherTest
{
.get();
}
- private void mockApplicationStatusChange(ApplicationState targetState)
throws Exception {
- dispatcher.notifyApplicationStatusChange(applicationId, targetState);
+ private void mockApplicationFinished() throws Exception {
+ dispatcher
+ .callAsyncInMainThread(
+ () -> {
+ dispatcher.notifyApplicationStatusChange(
+ applicationId, ApplicationState.FINISHED);
+ return CompletableFuture.completedFuture(null);
+ })
+ .get();
}
@Test
@@ -659,7 +666,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
testFailure));
// mock application termination so that its jobs can be marked clean
and terminate
- mockApplicationStatusChange(ApplicationState.FINISHED);
+ mockApplicationFinished();
// wait till job has failed
dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
@@ -1270,7 +1277,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
// mock application termination so that its jobs can be marked clean
and terminate
- mockApplicationStatusChange(ApplicationState.FINISHED);
+ mockApplicationFinished();
dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();