This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 106acbced18 [FLINK-39387][Test] Fix flaky scheduler benchmark tests
caused by thread assertion failure in async TDD creation
106acbced18 is described below
commit 106acbced18b0da5fa17181ef6a72738e5ab211c
Author: Liu Jiangang <[email protected]>
AuthorDate: Fri Apr 10 10:25:25 2026 +0800
[FLINK-39387][Test] Fix flaky scheduler benchmark tests caused by thread
assertion failure in async TDD creation
---
...MainThreadCheckComponentMainThreadExecutor.java | 70 ++++++++++++++++++++++
.../coordination/EventReceivingTasks.java | 53 +---------------
.../benchmark/SchedulerBenchmarkUtils.java | 4 +-
3 files changed, 73 insertions(+), 54 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java
new file mode 100644
index 00000000000..894f6365206
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A synchronous {@link ComponentMainThreadExecutor} that executes tasks
directly on the calling
+ * thread without performing strict thread identity checks.
+ *
+ * <p>Unlike {@link
ComponentMainThreadExecutorServiceAdapter#forMainThread()}, this executor does
+ * not assert that the current thread is the main thread, avoiding flaky test
failures when {@link
+ * java.util.concurrent.CompletableFuture} callbacks are dispatched from
background threads.
+ */
+public class NoMainThreadCheckComponentMainThreadExecutor implements
ComponentMainThreadExecutor {
+
+ private final DirectScheduledExecutorService executor = new
DirectScheduledExecutorService();
+
+ @Override
+ public void assertRunningInMainThread() {
+ // No-op: Skip thread assertion to avoid flaky test failures
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ executor.execute(command);
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit) {
+ return executor.schedule(command, delay, unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit) {
+ return executor.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return executor.scheduleAtFixedRate(command, initialDelay, period,
unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return executor.scheduleWithFixedDelay(command, initialDelay, delay,
unit);
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
index aa27bb2f907..2ceddc7176b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
@@ -18,18 +18,13 @@
package org.apache.flink.runtime.operators.coordination;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.Acknowledge;
import
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
-import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayList;
@@ -39,8 +34,6 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
@@ -242,48 +235,4 @@ public class EventReceivingTasks implements
SubtaskAccess.SubtaskAccessFactory {
return taskFailoverReasons;
}
}
-
- /**
- * An implementation of {@link ComponentMainThreadExecutor} that executes
Runnables with a
- * wrapped {@link ScheduledExecutor} and disables {@link
#assertRunningInMainThread()} checks.
- */
- private static class NoMainThreadCheckComponentMainThreadExecutor
- implements ComponentMainThreadExecutor {
- private final ScheduledExecutor scheduledExecutor;
-
- private NoMainThreadCheckComponentMainThreadExecutor() {
- this.scheduledExecutor =
- new ScheduledExecutorServiceAdapter(new
DirectScheduledExecutorService());
- }
-
- @Override
- public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
- return scheduledExecutor.schedule(command, delay, unit);
- }
-
- @Override
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long
delay, TimeUnit unit) {
- return scheduledExecutor.schedule(callable, delay, unit);
- }
-
- @Override
- public ScheduledFuture<?> scheduleAtFixedRate(
- Runnable command, long initialDelay, long period, TimeUnit
unit) {
- return scheduledExecutor.scheduleAtFixedRate(command,
initialDelay, period, unit);
- }
-
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(
- Runnable command, long initialDelay, long delay, TimeUnit
unit) {
- return scheduledExecutor.scheduleAtFixedRate(command,
initialDelay, delay, unit);
- }
-
- @Override
- public void assertRunningInMainThread() {}
-
- @Override
- public void execute(@Nonnull Runnable command) {
- scheduledExecutor.execute(command);
- }
- }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
index baab5227f09..988f90b6d57 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.scheduler.benchmark;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import
org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -111,7 +111,7 @@ public class SchedulerBenchmarkUtils {
final JobGraph jobGraph = createJobGraph(jobVertices,
jobConfiguration);
final ComponentMainThreadExecutor mainThreadExecutor =
- ComponentMainThreadExecutorServiceAdapter.forMainThread();
+ new NoMainThreadCheckComponentMainThreadExecutor();
DefaultSchedulerBuilder schedulerBuilder =
new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor,
scheduledExecutorService)