This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3625868f30d [FLINK-38970][runtime/tests] Fix flaky
AdaptiveBatchSchedulerTest by using synchronous executor without thread checks
(#27660)
3625868f30d is described below
commit 3625868f30d62ed625c479330880c482b3b2ed00
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Thu Mar 19 21:46:12 2026 +0530
[FLINK-38970][runtime/tests] Fix flaky AdaptiveBatchSchedulerTest by using
synchronous executor without thread checks (#27660)
---
.../adaptivebatch/AdaptiveBatchSchedulerTest.java | 55 +++++++++++++++++++++-
1 file changed, 54 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index abbeb4e54d8..9b5b1c3d334 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.scheduler.SchedulerBase;
import
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -67,9 +68,12 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -98,10 +102,59 @@ class AdaptiveBatchSchedulerTest {
@BeforeEach
void setUp() {
- mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();
+ // Use a DirectScheduledExecutorService-based executor that maintains
synchronous
+ // execution semantics without strict thread identity checks. This
fixes flaky test
+ // failures (FLINK-38970) caused by main thread constraint violations
when
+ // CompletableFuture callbacks are dispatched from background IO
executor threads.
+ // The synchronous execution is preserved while eliminating the race
condition.
+ mainThreadExecutor = new SynchronousComponentMainThreadExecutor();
taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
}
+ /**
+ * A synchronous ComponentMainThreadExecutor that runs tasks immediately
on the calling thread.
+ * Unlike {@link
ComponentMainThreadExecutorServiceAdapter#forMainThread()}, this executor does
+ * not perform strict thread identity checks, avoiding flaky failures when
CompletableFuture
+ * callbacks are dispatched from background threads.
+ */
+ private static class SynchronousComponentMainThreadExecutor
+ implements ComponentMainThreadExecutor {
+ private final DirectScheduledExecutorService executor =
+ new DirectScheduledExecutorService();
+
+ @Override
+ public void assertRunningInMainThread() {
+ // No-op: Skip thread assertion to avoid flaky failures
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ executor.execute(command);
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
+ return executor.schedule(command, delay, unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long
delay, TimeUnit unit) {
+ return executor.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit
unit) {
+ return executor.scheduleAtFixedRate(command, initialDelay, period,
unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit
unit) {
+ return executor.scheduleWithFixedDelay(command, initialDelay,
delay, unit);
+ }
+ }
+
@Test
void testVertexInitializationFailureIsLabeled() throws Exception {
final JobGraph jobGraph = createBrokenJobGraph();