This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit aa3c124b536adb42d219a57594d399379397bf0a Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Mon Aug 29 13:28:25 2022 +0200 [FLINK-27030][tests] Prevent race-condition --- .../java/org/apache/flink/runtime/rpc/RpcEndpoint.java | 17 ++++++++++++++--- .../org/apache/flink/runtime/rpc/RpcEndpointTest.java | 12 +++++++----- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 2aa90b7df91..1eb32a0549b 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rpc; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter; @@ -470,11 +471,21 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { MainThreadExecutor( MainThreadExecutable gateway, Runnable mainThreadCheck, String endpointId) { + this( + gateway, + mainThreadCheck, + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory(endpointId + "-main-scheduler"))); + } + + @VisibleForTesting + MainThreadExecutor( + MainThreadExecutable gateway, + Runnable mainThreadCheck, + ScheduledExecutorService mainScheduledExecutor) { this.gateway = Preconditions.checkNotNull(gateway); this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck); - this.mainScheduledExecutor = - Executors.newSingleThreadScheduledExecutor( - new ExecutorThreadFactory(endpointId + "-main-scheduler")); + this.mainScheduledExecutor = mainScheduledExecutor; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java index 20e45ab92b8..07216016a92 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.util.TestLoggerExtension; import org.junit.jupiter.api.AfterAll; @@ -44,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.junit.jupiter.api.Assumptions.assumeTrue; /** Tests for the RpcEndpoint, its self gateways and MainThreadExecutor scheduling command. */ @ExtendWith(TestLoggerExtension.class) @@ -412,20 +412,22 @@ public class RpcEndpointTest { private static void testCancelScheduledTask( BiFunction<RpcEndpoint.MainThreadExecutor, CompletableFuture<Long>, ScheduledFuture<?>> scheduler) { - final String endpointId = "foobar"; - final MainThreadExecutable mainThreadExecutable = new TestMainThreadExecutable(Runnable::run); + final ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = + new ManuallyTriggeredScheduledExecutorService(); + final RpcEndpoint.MainThreadExecutor mainThreadExecutor = - new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}, endpointId); + new RpcEndpoint.MainThreadExecutor( + mainThreadExecutable, () -> {}, manuallyTriggeredScheduledExecutorService); final CompletableFuture<Long> actualDelayMsFuture = new CompletableFuture<>(); ScheduledFuture<?> scheduledFuture = scheduler.apply(mainThreadExecutor, actualDelayMsFuture); scheduledFuture.cancel(true); + manuallyTriggeredScheduledExecutorService.triggerAllNonPeriodicTasks(); - assumeTrue(!actualDelayMsFuture.isDone(), "The command is done and no need to cancel it."); assertTrue(scheduledFuture.isCancelled()); assertFalse(actualDelayMsFuture.isDone()); mainThreadExecutor.close();