This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 94c65494b20 [FLINK-38441][runtime] Fix 
ExecutionGraphCoLocationRestartTest failures (#27060)
94c65494b20 is described below

commit 94c65494b20268d6ae4bcecc0d7160ab42a5ae0e
Author: Mate Czagany <[email protected]>
AuthorDate: Tue Jan 6 06:23:30 2026 +0100

    [FLINK-38441][runtime] Fix ExecutionGraphCoLocationRestartTest failures 
(#27060)
---
 .../ExecutionGraphCoLocationRestartTest.java       | 38 ++++++++++++++++------
 1 file changed, 28 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
index 27d735967ff..6aaa644e91e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobStatus;
-import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
@@ -57,6 +56,13 @@ class ExecutionGraphCoLocationRestartTest {
     static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
             TestingUtils.defaultExecutorExtension();
 
+    @RegisterExtension
+    static final TestingComponentMainThreadExecutor.Extension 
MAIN_EXECUTOR_RESOURCE =
+            new TestingComponentMainThreadExecutor.Extension();
+
+    private final TestingComponentMainThreadExecutor mainThreadExecutor =
+            MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
     private static final int NUM_TASKS = 31;
 
     @Test
@@ -85,7 +91,7 @@ class ExecutionGraphCoLocationRestartTest {
         final SchedulerBase scheduler =
                 new DefaultSchedulerBuilder(
                                 jobGraph,
-                                
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                                mainThreadExecutor.getMainThreadExecutor(),
                                 EXECUTOR_RESOURCE.getExecutor())
                         .setExecutionSlotAllocatorFactory(
                                 
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
@@ -106,7 +112,7 @@ class ExecutionGraphCoLocationRestartTest {
         // enable the queued scheduling for the slot pool
         assertThat(eg.getState()).isEqualTo(JobStatus.CREATED);
 
-        scheduler.startScheduling();
+        mainThreadExecutor.execute(scheduler::startScheduling);
 
         Predicate<AccessExecution> isDeploying =
                 
ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
@@ -117,7 +123,13 @@ class ExecutionGraphCoLocationRestartTest {
         // sanity checks
         validateConstraints(eg);
 
-        eg.getAllExecutionVertices().iterator().next().fail(new 
FlinkException("Test exception"));
+        mainThreadExecutor.execute(
+                () -> {
+                    eg.getAllExecutionVertices()
+                            .iterator()
+                            .next()
+                            .fail(new FlinkException("Test exception"));
+                });
 
         assertThat(eg.getState()).isEqualTo(JobStatus.RESTARTING);
 
@@ -125,11 +137,14 @@ class ExecutionGraphCoLocationRestartTest {
         // cancellation. This ensures the restarting actions to be performed 
in main thread.
         delayExecutor.triggerNonPeriodicScheduledTask();
 
-        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-            if (vertex.getExecutionState() == ExecutionState.CANCELING) {
-                vertex.getCurrentExecutionAttempt().completeCancelling();
-            }
-        }
+        mainThreadExecutor.execute(
+                () -> {
+                    for (ExecutionVertex vertex : 
eg.getAllExecutionVertices()) {
+                        if (vertex.getExecutionState() == 
ExecutionState.CANCELING) {
+                            
vertex.getCurrentExecutionAttempt().completeCancelling();
+                        }
+                    }
+                });
 
         // wait until we have restarted
         ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING, 
timeout);
@@ -139,7 +154,10 @@ class ExecutionGraphCoLocationRestartTest {
         // checking execution vertex properties
         validateConstraints(eg);
 
-        ExecutionGraphTestUtils.finishAllVertices(eg);
+        mainThreadExecutor.execute(
+                () -> {
+                    ExecutionGraphTestUtils.finishAllVertices(eg);
+                });
 
         assertThat(eg.getState()).isEqualTo(FINISHED);
     }

Reply via email to