This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0bbee3047ed [FLINK-29249][rpc] Drop RpcService#execute/scheduleRunnable 0bbee3047ed is described below commit 0bbee3047ed81d39d25fd736946c527e627a2bad Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Tue Sep 13 20:42:58 2022 +0200 [FLINK-29249][rpc] Drop RpcService#execute/scheduleRunnable --- .../flink/runtime/rpc/akka/AkkaRpcService.java | 24 ----- .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 29 +----- .../rpc/akka/ContextClassLoadingSettingTest.java | 103 ++++++++++++++------- .../org/apache/flink/runtime/rpc/RpcService.java | 40 -------- .../runtime/registration/RetryingRegistration.java | 19 ++-- .../runtime/metrics/util/MetricUtilsTest.java | 7 +- .../registration/RetryingRegistrationTest.java | 10 -- .../flink/runtime/rpc/TestingRpcService.java | 18 ---- .../OperatorEventSendingCheckpointITCase.java | 17 ---- 9 files changed, 91 insertions(+), 176 deletions(-) diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 0b5a4819164..40bb5bbbb09 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -38,7 +38,6 @@ import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; -import org.apache.flink.util.function.FunctionUtils; import akka.actor.AbstractActor; import akka.actor.ActorRef; @@ -64,12 +63,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -79,7 +76,6 @@ import scala.reflect.ClassTag$; import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.guardCompletionWithContextClassLoader; import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader; import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.withContextClassLoader; -import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -464,26 +460,6 @@ public class AkkaRpcService implements RpcService { return internalScheduledExecutor; } - @Override - public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { - checkNotNull(runnable, "runnable"); - checkNotNull(unit, "unit"); - checkArgument(delay >= 0L, "delay must be zero or larger"); - - return internalScheduledExecutor.schedule(runnable, delay, unit); - } - - @Override - public void execute(Runnable runnable) { - getScheduledExecutor().execute(runnable); - } - - @Override - public <T> CompletableFuture<T> execute(Callable<T> callable) { - return CompletableFuture.supplyAsync( - FunctionUtils.uncheckedSupplier(callable::call), getScheduledExecutor()); - } - // --------------------------------------------------------------------------------------- // Private helper methods // --------------------------------------------------------------------------------------- diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index f4512f81cd1..1db75ee82bb 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -85,7 +85,6 @@ class AkkaRpcServiceTest { // ------------------------------------------------------------------------ // tests // ------------------------------------------------------------------------ - @Test void testScheduleRunnable() throws Exception { final OneShotLatch latch = new OneShotLatch(); @@ -93,7 +92,9 @@ class AkkaRpcServiceTest { final long start = System.nanoTime(); ScheduledFuture<?> scheduledFuture = - akkaRpcService.scheduleRunnable(latch::trigger, delay, TimeUnit.MILLISECONDS); + akkaRpcService + .getScheduledExecutor() + .schedule(latch::trigger, delay, TimeUnit.MILLISECONDS); scheduledFuture.get(); @@ -110,33 +111,11 @@ class AkkaRpcServiceTest { void testExecuteRunnable() throws Exception { final OneShotLatch latch = new OneShotLatch(); - akkaRpcService.execute(latch::trigger); + akkaRpcService.getScheduledExecutor().execute(latch::trigger); latch.await(30L, TimeUnit.SECONDS); } - /** - * Tests that the {@link AkkaRpcService} can execute callables and returns their result as a - * {@link CompletableFuture}. - */ - @Test - void testExecuteCallable() throws Exception { - final OneShotLatch latch = new OneShotLatch(); - final int expected = 42; - - CompletableFuture<Integer> result = - akkaRpcService.execute( - () -> { - latch.trigger(); - return expected; - }); - - int actual = result.get(30L, TimeUnit.SECONDS); - - assertThat(actual).isEqualTo(expected); - assertThat(latch.isTriggered()).isTrue(); - } - @Test void testGetAddress() { assertThat(akkaRpcService.getAddress()) diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java index 48318d5a36e..c3bfc0f600b 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java @@ -40,8 +40,9 @@ import java.io.Serializable; import java.net.URL; import java.net.URLClassLoader; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; -import java.util.concurrent.Callable; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -107,49 +108,89 @@ class ContextClassLoadingSettingTest { void testAkkaRpcService_ExecuteRunnableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException { final CompletableFuture<ClassLoader> contextClassLoader = new CompletableFuture<>(); - akkaRpcService.execute( - () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader())); - assertIsFlinkClassLoader(contextClassLoader.get()); + akkaRpcService + .getScheduledExecutor() + .execute( + () -> + contextClassLoader.complete( + Thread.currentThread().getContextClassLoader())); + assertThat(contextClassLoader.get()).isSameAs(pretendFlinkClassLoader); } @Test - void testAkkaRpcService_ExecuteCallableSetsFlinkContextClassLoader() + void testAkkaRpcService_ScheduleCallableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException { - final CompletableFuture<ClassLoader> contextClassLoader = - akkaRpcService.execute(() -> Thread.currentThread().getContextClassLoader()); - assertIsFlinkClassLoader(contextClassLoader.get()); + final ClassLoader contextClassLoader = + akkaRpcService + .getScheduledExecutor() + .schedule( + () -> Thread.currentThread().getContextClassLoader(), + 0, + TimeUnit.MILLISECONDS) + .get(); + assertThat(contextClassLoader).isSameAs(pretendFlinkClassLoader); } @Test - void testAkkaRpcService_ExecuteCallableResultCompletedWithFlinkContextClassLoader() + void testAkkaRpcService_ScheduleRunnableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException { - - final CompletableFuture<Void> blocker = new CompletableFuture<>(); - - final CompletableFuture<ClassLoader> contextClassLoader = - runWithContextClassLoader( + final CompletableFuture<ClassLoader> contextClassLoader = new CompletableFuture<>(); + akkaRpcService + .getScheduledExecutor() + .schedule( () -> - akkaRpcService - .execute((Callable<Void>) blocker::get) - .thenApply( - ignored -> - Thread.currentThread() - .getContextClassLoader()), - testClassLoader); - blocker.complete(null); + contextClassLoader.complete( + Thread.currentThread().getContextClassLoader()), + 5, + TimeUnit.MILLISECONDS); + assertThat(contextClassLoader.get()).isSameAs(pretendFlinkClassLoader); + } - assertIsFlinkClassLoader(contextClassLoader.get()); + @Test + void testAkkaRpcService_ScheduleRunnableWithFixedRateSetsFlinkContextClassLoader() { + final int numberOfScheduledRuns = 2; + final List<ClassLoader> contextClassLoaders = new ArrayList<>(numberOfScheduledRuns); + akkaRpcService + .getScheduledExecutor() + .scheduleAtFixedRate( + () -> { + if (contextClassLoaders.size() < numberOfScheduledRuns) { + contextClassLoaders.add( + Thread.currentThread().getContextClassLoader()); + } else { + throw new RuntimeException("cancel task"); + } + }, + 0, + 1, + TimeUnit.MILLISECONDS); + + assertThat(contextClassLoaders) + .allSatisfy( + classLoader -> assertThat(classLoader).isSameAs(pretendFlinkClassLoader)); } @Test - void testAkkaRpcService_ScheduleSetsFlinkContextClassLoader() - throws ExecutionException, InterruptedException { - final CompletableFuture<ClassLoader> contextClassLoader = new CompletableFuture<>(); - akkaRpcService.scheduleRunnable( - () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()), - 5, - TimeUnit.MILLISECONDS); - assertThat(contextClassLoader.get()).isSameAs(pretendFlinkClassLoader); + void testAkkaRpcService_ScheduleRunnableWithFixedDelaySetsFlinkContextClassLoader() { + final int numberOfScheduledRuns = 2; + final List<ClassLoader> contextClassLoaders = new ArrayList<>(numberOfScheduledRuns); + akkaRpcService + .getScheduledExecutor() + .scheduleWithFixedDelay( + () -> { + if (contextClassLoaders.size() < numberOfScheduledRuns) { + contextClassLoaders.add( + Thread.currentThread().getContextClassLoader()); + } else { + throw new RuntimeException("cancel task"); + } + }, + 0, + 1, + TimeUnit.MILLISECONDS); + assertThat(contextClassLoaders) + .allSatisfy( + classLoader -> assertThat(classLoader).isSameAs(pretendFlinkClassLoader)); } @Test diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index a09ae9ead37..4edf3f70e10 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -22,10 +22,7 @@ import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.concurrent.ScheduledExecutor; import java.io.Serializable; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; /** * Interface for rpc services. An rpc service is used to start and connect to a {@link RpcEndpoint}. @@ -121,41 +118,4 @@ public interface RpcService { * @return The RPC service provided scheduled executor */ ScheduledExecutor getScheduledExecutor(); - - /** - * Execute the runnable in the execution context of this RPC Service, as returned by {@link - * #getScheduledExecutor()} ()}, after a scheduled delay. - * - * @param runnable Runnable to be executed - * @param delay The delay after which the runnable will be executed - */ - ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit); - - /** - * Execute the given runnable in the executor of the RPC service. This method can be used to run - * code outside of the main thread of a {@link RpcEndpoint}. - * - * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any - * concurrent invocations and is therefore not suitable to run completion methods of futures - * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link - * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}. - * - * @param runnable to execute - */ - void execute(Runnable runnable); - - /** - * Execute the given callable and return its result as a {@link CompletableFuture}. This method - * can be used to run code outside of the main thread of a {@link RpcEndpoint}. - * - * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any - * concurrent invocations and is therefore not suitable to run completion methods of futures - * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link - * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}. - * - * @param callable to execute - * @param <T> is the return value type - * @return Future containing the callable's future result - */ - <T> CompletableFuture<T> execute(Callable<T> callable); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index c7b2ca43a0b..299962b4e4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -329,19 +329,18 @@ public abstract class RetryingRegistration< private void registerLater( final G gateway, final int attempt, final long timeoutMillis, long delay) { - rpcService.scheduleRunnable( - new Runnable() { - @Override - public void run() { - register(gateway, attempt, timeoutMillis); - } - }, - delay, - TimeUnit.MILLISECONDS); + rpcService + .getScheduledExecutor() + .schedule( + () -> register(gateway, attempt, timeoutMillis), + delay, + TimeUnit.MILLISECONDS); } private void startRegistrationLater(final long delay) { - rpcService.scheduleRunnable(this::startRegistration, delay, TimeUnit.MILLISECONDS); + rpcService + .getScheduledExecutor() + .schedule(this::startRegistration, delay, TimeUnit.MILLISECONDS); } static final class RetryingRegistrationResult<G, S, R> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java index 7966c6f4e4b..28da2209cee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java @@ -49,6 +49,7 @@ import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_FLINK; import static org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_MANAGED_MEMORY; @@ -84,7 +85,11 @@ public class MetricUtilsTest extends TestLogger { try { final int threadPriority = - rpcService.execute(() -> Thread.currentThread().getPriority()).get(); + rpcService + .getScheduledExecutor() + .schedule( + () -> Thread.currentThread().getPriority(), 0, TimeUnit.SECONDS) + .get(); assertThat(threadPriority, is(expectedThreadPriority)); } finally { rpcService.stopService().get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java index 9d1bd05fabc..4c02c443e02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -31,7 +31,6 @@ import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; @@ -52,7 +51,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -171,14 +169,6 @@ public class RetryingRegistrationTest extends TestLogger { testGateway) // second connection attempt succeeds ); when(rpc.getScheduledExecutor()).thenReturn(executor); - when(rpc.scheduleRunnable(any(Runnable.class), anyLong(), any(TimeUnit.class))) - .thenAnswer( - (InvocationOnMock invocation) -> { - final Runnable runnable = invocation.getArgument(0); - final long delay = invocation.getArgument(1); - final TimeUnit timeUnit = invocation.getArgument(2); - return executor.schedule(runnable, delay, timeUnit); - }); TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index 1b7990cb447..b2c34c1b958 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -23,11 +23,8 @@ import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; import java.io.Serializable; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -211,19 +208,4 @@ public class TestingRpcService implements RpcService { public ScheduledExecutor getScheduledExecutor() { return backingRpcService.getScheduledExecutor(); } - - @Override - public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { - return backingRpcService.scheduleRunnable(runnable, delay, unit); - } - - @Override - public void execute(Runnable runnable) { - backingRpcService.execute(runnable); - } - - @Override - public <T> CompletableFuture<T> execute(Callable<T> callable) { - return backingRpcService.execute(callable); - } } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java index aed994c9bbf..9757de12a2d 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java @@ -72,9 +72,7 @@ import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -516,21 +514,6 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { return rpcService.getScheduledExecutor(); } - @Override - public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { - return rpcService.scheduleRunnable(runnable, delay, unit); - } - - @Override - public void execute(Runnable runnable) { - rpcService.execute(runnable); - } - - @Override - public <T> CompletableFuture<T> execute(Callable<T> callable) { - return rpcService.execute(callable); - } - @SuppressWarnings("unchecked") private <C extends RpcGateway> CompletableFuture<C> decorateTmGateway( CompletableFuture<C> future) {