This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 106acbced18 [FLINK-39387][Test] Fix flaky scheduler benchmark tests 
caused by thread assertion failure in async TDD creation
106acbced18 is described below

commit 106acbced18b0da5fa17181ef6a72738e5ab211c
Author: Liu Jiangang <[email protected]>
AuthorDate: Fri Apr 10 10:25:25 2026 +0800

    [FLINK-39387][Test] Fix flaky scheduler benchmark tests caused by thread 
assertion failure in async TDD creation
---
 ...MainThreadCheckComponentMainThreadExecutor.java | 70 ++++++++++++++++++++++
 .../coordination/EventReceivingTasks.java          | 53 +---------------
 .../benchmark/SchedulerBenchmarkUtils.java         |  4 +-
 3 files changed, 73 insertions(+), 54 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java
new file mode 100644
index 00000000000..894f6365206
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A synchronous {@link ComponentMainThreadExecutor} that executes tasks 
directly on the calling
+ * thread without performing strict thread identity checks.
+ *
+ * <p>Unlike {@link 
ComponentMainThreadExecutorServiceAdapter#forMainThread()}, this executor does
+ * not assert that the current thread is the main thread, avoiding flaky test 
failures when {@link
+ * java.util.concurrent.CompletableFuture} callbacks are dispatched from 
background threads.
+ */
+public class NoMainThreadCheckComponentMainThreadExecutor implements 
ComponentMainThreadExecutor {
+
+    private final DirectScheduledExecutorService executor = new 
DirectScheduledExecutorService();
+
+    @Override
+    public void assertRunningInMainThread() {
+        // No-op: Skip thread assertion to avoid flaky test failures
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        executor.execute(command);
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
+        return executor.schedule(command, delay, unit);
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
+        return executor.schedule(callable, delay, unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(
+            Runnable command, long initialDelay, long period, TimeUnit unit) {
+        return executor.scheduleAtFixedRate(command, initialDelay, period, 
unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(
+            Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        return executor.scheduleWithFixedDelay(command, initialDelay, delay, 
unit);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
index aa27bb2f907..2ceddc7176b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
@@ -18,18 +18,13 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import 
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
-import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.concurrent.FutureUtils;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -39,8 +34,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
@@ -242,48 +235,4 @@ public class EventReceivingTasks implements 
SubtaskAccess.SubtaskAccessFactory {
             return taskFailoverReasons;
         }
     }
-
-    /**
-     * An implementation of {@link ComponentMainThreadExecutor} that executes 
Runnables with a
-     * wrapped {@link ScheduledExecutor} and disables {@link 
#assertRunningInMainThread()} checks.
-     */
-    private static class NoMainThreadCheckComponentMainThreadExecutor
-            implements ComponentMainThreadExecutor {
-        private final ScheduledExecutor scheduledExecutor;
-
-        private NoMainThreadCheckComponentMainThreadExecutor() {
-            this.scheduledExecutor =
-                    new ScheduledExecutorServiceAdapter(new 
DirectScheduledExecutorService());
-        }
-
-        @Override
-        public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
-            return scheduledExecutor.schedule(command, delay, unit);
-        }
-
-        @Override
-        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit) {
-            return scheduledExecutor.schedule(callable, delay, unit);
-        }
-
-        @Override
-        public ScheduledFuture<?> scheduleAtFixedRate(
-                Runnable command, long initialDelay, long period, TimeUnit 
unit) {
-            return scheduledExecutor.scheduleAtFixedRate(command, 
initialDelay, period, unit);
-        }
-
-        @Override
-        public ScheduledFuture<?> scheduleWithFixedDelay(
-                Runnable command, long initialDelay, long delay, TimeUnit 
unit) {
-            return scheduledExecutor.scheduleAtFixedRate(command, 
initialDelay, delay, unit);
-        }
-
-        @Override
-        public void assertRunningInMainThread() {}
-
-        @Override
-        public void execute(@Nonnull Runnable command) {
-            scheduledExecutor.execute(command);
-        }
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
index baab5227f09..988f90b6d57 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.scheduler.benchmark;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import 
org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -111,7 +111,7 @@ public class SchedulerBenchmarkUtils {
         final JobGraph jobGraph = createJobGraph(jobVertices, 
jobConfiguration);
 
         final ComponentMainThreadExecutor mainThreadExecutor =
-                ComponentMainThreadExecutorServiceAdapter.forMainThread();
+                new NoMainThreadCheckComponentMainThreadExecutor();
 
         DefaultSchedulerBuilder schedulerBuilder =
                 new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, 
scheduledExecutorService)

Reply via email to