This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bfd13de1520 [FLINK-38406][runtime] Fix 
DefaultSchedulerCheckpointCoordinatorTest failures (#27061)
bfd13de1520 is described below

commit bfd13de1520f842c03b73939ecd651dd3005ebdc
Author: Mate Czagany <[email protected]>
AuthorDate: Mon Jan 12 11:13:49 2026 +0100

    [FLINK-38406][runtime] Fix DefaultSchedulerCheckpointCoordinatorTest 
failures (#27061)
    
    * [FLINK-38406][runtime] Fix DefaultSchedulerCheckpointCoordinatorTest 
failures
    
    * [FLINK-38406][runtime] Run ExecutionGraph operations from main thread
---
 .../DefaultSchedulerCheckpointCoordinatorTest.java | 56 +++++++++++++++-------
 1 file changed, 38 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
index 363fe516b8e..51562ef19ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
-import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -56,6 +56,13 @@ class DefaultSchedulerCheckpointCoordinatorTest {
     private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_EXTENSION =
             TestingUtils.defaultExecutorExtension();
 
+    @RegisterExtension
+    static final TestingComponentMainThreadExecutor.Extension 
MAIN_EXECUTOR_RESOURCE =
+            new TestingComponentMainThreadExecutor.Extension();
+
+    private final TestingComponentMainThreadExecutor mainThreadExecutor =
+            MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
     /** Tests that the checkpoint coordinator is shut down if the execution 
graph is failed. */
     @Test
     void 
testClosingSchedulerShutsDownCheckpointCoordinatorOnFailedExecutionGraph()
@@ -77,9 +84,14 @@ class DefaultSchedulerCheckpointCoordinatorTest {
         assertThat(checkpointCoordinator).isNotNull();
         assertThat(checkpointCoordinator.isShutdown()).isFalse();
 
-        graph.failJob(new Exception("Test Exception"), 
System.currentTimeMillis());
-
-        scheduler.closeAsync().get();
+        mainThreadExecutor
+                .execute(
+                        () -> {
+                            graph.failJob(
+                                    new Exception("Test Exception"), 
System.currentTimeMillis());
+                            return scheduler.closeAsync();
+                        })
+                .get();
 
         assertThat(checkpointCoordinator.isShutdown()).isTrue();
         
assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.FAILED);
@@ -107,9 +119,13 @@ class DefaultSchedulerCheckpointCoordinatorTest {
         assertThat(checkpointCoordinator).isNotNull();
         assertThat(checkpointCoordinator.isShutdown()).isFalse();
 
-        graph.suspend(new Exception("Test Exception"));
-
-        scheduler.closeAsync().get();
+        mainThreadExecutor
+                .execute(
+                        () -> {
+                            graph.suspend(new Exception("Test Exception"));
+                            return scheduler.closeAsync();
+                        })
+                .get();
 
         assertThat(checkpointCoordinator.isShutdown()).isTrue();
         
assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.SUSPENDED);
@@ -137,18 +153,22 @@ class DefaultSchedulerCheckpointCoordinatorTest {
         assertThat(checkpointCoordinator).isNotNull();
         assertThat(checkpointCoordinator.isShutdown()).isFalse();
 
-        scheduler.startScheduling();
-
-        for (ExecutionVertex executionVertex : 
graph.getAllExecutionVertices()) {
-            final Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
-            scheduler.updateTaskExecutionState(
-                    new TaskExecutionState(
-                            currentExecutionAttempt.getAttemptId(), 
ExecutionState.FINISHED));
-        }
+        mainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    for (ExecutionVertex executionVertex : 
graph.getAllExecutionVertices()) {
+                        final Execution currentExecutionAttempt =
+                                executionVertex.getCurrentExecutionAttempt();
+                        scheduler.updateTaskExecutionState(
+                                new TaskExecutionState(
+                                        currentExecutionAttempt.getAttemptId(),
+                                        ExecutionState.FINISHED));
+                    }
+                });
 
         
assertThat(graph.getTerminationFuture()).isCompletedWithValue(JobStatus.FINISHED);
 
-        scheduler.closeAsync().get();
+        mainThreadExecutor.execute(scheduler::closeAsync).get();
 
         assertThat(checkpointCoordinator.isShutdown()).isTrue();
         
assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.FINISHED);
@@ -176,7 +196,7 @@ class DefaultSchedulerCheckpointCoordinatorTest {
         assertThat(checkpointCoordinator).isNotNull();
         assertThat(checkpointCoordinator.isShutdown()).isFalse();
 
-        scheduler.closeAsync().get();
+        mainThreadExecutor.execute(scheduler::closeAsync).get();
 
         assertThat(graph.getState()).isEqualTo(JobStatus.SUSPENDED);
         assertThat(checkpointCoordinator.isShutdown()).isTrue();
@@ -208,7 +228,7 @@ class DefaultSchedulerCheckpointCoordinatorTest {
 
         return new DefaultSchedulerBuilder(
                         jobGraph,
-                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        mainThreadExecutor.getMainThreadExecutor(),
                         EXECUTOR_EXTENSION.getExecutor())
                 .setCheckpointRecoveryFactory(new 
TestingCheckpointRecoveryFactory(store, counter))
                 .setRpcTimeout(timeout)

Reply via email to