This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 94c65494b20 [FLINK-38441][runtime] Fix
ExecutionGraphCoLocationRestartTest failures (#27060)
94c65494b20 is described below
commit 94c65494b20268d6ae4bcecc0d7160ab42a5ae0e
Author: Mate Czagany <[email protected]>
AuthorDate: Tue Jan 6 06:23:30 2026 +0100
[FLINK-38441][runtime] Fix ExecutionGraphCoLocationRestartTest failures
(#27060)
---
.../ExecutionGraphCoLocationRestartTest.java | 38 ++++++++++++++++------
1 file changed, 28 insertions(+), 10 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
index 27d735967ff..6aaa644e91e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobStatus;
-import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.execution.ExecutionState;
import
org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
@@ -57,6 +56,13 @@ class ExecutionGraphCoLocationRestartTest {
static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorExtension();
+ @RegisterExtension
+ static final TestingComponentMainThreadExecutor.Extension
MAIN_EXECUTOR_RESOURCE =
+ new TestingComponentMainThreadExecutor.Extension();
+
+ private final TestingComponentMainThreadExecutor mainThreadExecutor =
+ MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
private static final int NUM_TASKS = 31;
@Test
@@ -85,7 +91,7 @@ class ExecutionGraphCoLocationRestartTest {
final SchedulerBase scheduler =
new DefaultSchedulerBuilder(
jobGraph,
-
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ mainThreadExecutor.getMainThreadExecutor(),
EXECUTOR_RESOURCE.getExecutor())
.setExecutionSlotAllocatorFactory(
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
@@ -106,7 +112,7 @@ class ExecutionGraphCoLocationRestartTest {
// enable the queued scheduling for the slot pool
assertThat(eg.getState()).isEqualTo(JobStatus.CREATED);
- scheduler.startScheduling();
+ mainThreadExecutor.execute(scheduler::startScheduling);
Predicate<AccessExecution> isDeploying =
ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
@@ -117,7 +123,13 @@ class ExecutionGraphCoLocationRestartTest {
// sanity checks
validateConstraints(eg);
- eg.getAllExecutionVertices().iterator().next().fail(new
FlinkException("Test exception"));
+ mainThreadExecutor.execute(
+ () -> {
+ eg.getAllExecutionVertices()
+ .iterator()
+ .next()
+ .fail(new FlinkException("Test exception"));
+ });
assertThat(eg.getState()).isEqualTo(JobStatus.RESTARTING);
@@ -125,11 +137,14 @@ class ExecutionGraphCoLocationRestartTest {
// cancellation. This ensures the restarting actions to be performed
in main thread.
delayExecutor.triggerNonPeriodicScheduledTask();
- for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
- if (vertex.getExecutionState() == ExecutionState.CANCELING) {
- vertex.getCurrentExecutionAttempt().completeCancelling();
- }
- }
+ mainThreadExecutor.execute(
+ () -> {
+ for (ExecutionVertex vertex :
eg.getAllExecutionVertices()) {
+ if (vertex.getExecutionState() ==
ExecutionState.CANCELING) {
+
vertex.getCurrentExecutionAttempt().completeCancelling();
+ }
+ }
+ });
// wait until we have restarted
ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING,
timeout);
@@ -139,7 +154,10 @@ class ExecutionGraphCoLocationRestartTest {
// checking execution vertex properties
validateConstraints(eg);
- ExecutionGraphTestUtils.finishAllVertices(eg);
+ mainThreadExecutor.execute(
+ () -> {
+ ExecutionGraphTestUtils.finishAllVertices(eg);
+ });
assertThat(eg.getState()).isEqualTo(FINISHED);
}