This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f5cdecaa2a314ae6f00cb0c8bb392bb62a381c09 Author: shammon <zjur...@gmail.com> AuthorDate: Thu Mar 17 09:38:13 2022 +0800 [FLINK-25085][runtime] Support schedule task in MainThreadExecutor local thread pool This closes #18303. --- .../flink/runtime/rpc/FencedRpcEndpoint.java | 34 +++++- .../org/apache/flink/runtime/rpc/RpcEndpoint.java | 135 +++++++++++++++++++-- .../flink/runtime/rpc/FencedRpcEndpointTest.java | 5 + .../apache/flink/runtime/rpc/RpcEndpointTest.java | 119 ++++++++++++++++-- 4 files changed, 268 insertions(+), 25 deletions(-) diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java index 76bc3bb..4177a31 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rpc; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.util.Preconditions; @@ -56,10 +57,12 @@ public abstract class FencedRpcEndpoint<F extends Serializable> extends RpcEndpo this.fencingToken = fencingToken; this.unfencedMainThreadExecutor = new UnfencedMainThreadExecutor((FencedMainThreadExecutable) rpcServer); - this.fencedMainThreadExecutor = + + MainThreadExecutable mainThreadExecutable = + getRpcService().fenceRpcServer(rpcServer, fencingToken); + setFencedMainThreadExecutor( new MainThreadExecutor( - getRpcService().fenceRpcServer(rpcServer, fencingToken), - this::validateRunsInMainThread); + mainThreadExecutable, this::validateRunsInMainThread, endpointId)); } protected FencedRpcEndpoint(RpcService rpcService, @Nullable F fencingToken) { @@ -80,9 +83,23 @@ public abstract class FencedRpcEndpoint<F extends Serializable> extends RpcEndpo // which is bound to the new fencing token MainThreadExecutable mainThreadExecutable = getRpcService().fenceRpcServer(rpcServer, newFencingToken); + setFencedMainThreadExecutor( + new MainThreadExecutor( + mainThreadExecutable, this::validateRunsInMainThread, getEndpointId())); + } - this.fencedMainThreadExecutor = - new MainThreadExecutor(mainThreadExecutable, this::validateRunsInMainThread); + /** + * Set fenced main thread executor and register it to closeable register. + * + * @param fencedMainThreadExecutor the given fenced main thread executor + */ + private void setFencedMainThreadExecutor(MainThreadExecutor fencedMainThreadExecutor) { + if (this.fencedMainThreadExecutor != null) { + this.fencedMainThreadExecutor.close(); + unregisterResource(this.fencedMainThreadExecutor); + } + this.fencedMainThreadExecutor = fencedMainThreadExecutor; + registerResource(this.fencedMainThreadExecutor); } /** @@ -142,6 +159,13 @@ public abstract class FencedRpcEndpoint<F extends Serializable> extends RpcEndpo } } + @VisibleForTesting + public boolean validateResourceClosed() { + return super.validateResourceClosed() + && (fencedMainThreadExecutor == null + || fencedMainThreadExecutor.validateScheduledExecutorClosed()); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 9b7572e..e67509e 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -19,21 +19,28 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter; import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.io.Closeable; +import java.io.IOException; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -107,6 +114,12 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { private final MainThreadExecutor mainThreadExecutor; /** + * Register endpoint closeable resource to the registry and close them when the server is + * stopped. + */ + private final CloseableRegistry resourceRegistry; + + /** * Indicates whether the RPC endpoint is started and not stopped or being stopped. * * <p>IMPORTANT: the running state is not thread safe and can be used only in the main thread of @@ -125,8 +138,11 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { this.endpointId = checkNotNull(endpointId, "endpointId"); this.rpcServer = rpcService.startServer(this); + this.resourceRegistry = new CloseableRegistry(); - this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread); + this.mainThreadExecutor = + new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId); + registerResource(this.mainThreadExecutor); } /** @@ -211,12 +227,44 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { */ public final CompletableFuture<Void> internalCallOnStop() { validateRunsInMainThread(); - CompletableFuture<Void> stopFuture = onStop(); + CompletableFuture<Void> stopFuture = new CompletableFuture<>(); + try { + resourceRegistry.close(); + stopFuture.complete(null); + } catch (IOException e) { + stopFuture.completeExceptionally( + new RuntimeException("Close resource registry fail", e)); + } + stopFuture = CompletableFuture.allOf(stopFuture, onStop()); isRunning = false; return stopFuture; } /** + * Register the given closeable resource to {@link CloseableRegistry}. + * + * @param closeableResource the given closeable resource + */ + protected void registerResource(Closeable closeableResource) { + try { + resourceRegistry.registerCloseable(closeableResource); + } catch (IOException e) { + throw new RuntimeException( + "Registry closeable resource " + closeableResource + " fail", e); + } + } + + /** + * Unregister the given closeable resource from {@link CloseableRegistry}. + * + * @param closeableResource the given closeable resource + * @return true if the given resource unregister successful, otherwise false + */ + protected boolean unregisterResource(Closeable closeableResource) { + return resourceRegistry.unregisterCloseable(closeableResource); + } + + /** * User overridable callback which is called from {@link #internalCallOnStop()}. * * <p>This method is called when the RpcEndpoint is being shut down. The method is guaranteed to @@ -395,23 +443,38 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { assert MainThreadValidatorUtil.isRunningInExpectedThread(currentMainThread.get()); } + /** + * Validate whether all the resources are closed. + * + * @return true if all the resources are closed, otherwise false + */ + boolean validateResourceClosed() { + return mainThreadExecutor.validateScheduledExecutorClosed() && resourceRegistry.isClosed(); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ /** Executor which executes runnables in the main thread context. */ - protected static class MainThreadExecutor implements ComponentMainThreadExecutor { + protected static class MainThreadExecutor implements ComponentMainThreadExecutor, Closeable { + private static final Logger log = LoggerFactory.getLogger(MainThreadExecutor.class); private final MainThreadExecutable gateway; private final Runnable mainThreadCheck; - - MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck) { + /** + * The main scheduled executor manages the scheduled tasks and send them to gateway when + * they should be executed. + */ + private final ScheduledExecutorService mainScheduledExecutor; + + MainThreadExecutor( + MainThreadExecutable gateway, Runnable mainThreadCheck, String endpointId) { this.gateway = Preconditions.checkNotNull(gateway); this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck); - } - - private void scheduleRunAsync(Runnable runnable, long delayMillis) { - gateway.scheduleRunAsync(runnable, delayMillis); + this.mainScheduledExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory(endpointId + "-main-scheduler")); } @Override @@ -419,19 +482,52 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { gateway.runAsync(command); } + /** + * The mainScheduledExecutor manages the task and sends it to the gateway after the given + * delay. + * + * @param command the task to execute in the future + * @param delay the time from now to delay the execution + * @param unit the time unit of the delay parameter + * @return a ScheduledFuture representing the completion of the scheduled task + */ @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit); FutureTask<Void> ft = new FutureTask<>(command, null); - scheduleRunAsync(ft, delayMillis); + if (mainScheduledExecutor.isShutdown()) { + log.warn( + "The scheduled executor service is shutdown and ignores the command {}", + command); + } else { + mainScheduledExecutor.schedule( + () -> gateway.runAsync(ft), delayMillis, TimeUnit.MILLISECONDS); + } return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS); } + /** + * The mainScheduledExecutor manages the given callable and sends it to the gateway after + * the given delay. + * + * @param callable the callable to execute + * @param delay the time from now to delay the execution + * @param unit the time unit of the delay parameter + * @param <V> result type of the callable + * @return a ScheduledFuture which holds the future value of the given callable + */ @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit); FutureTask<V> ft = new FutureTask<>(callable); - scheduleRunAsync(ft, delayMillis); + if (mainScheduledExecutor.isShutdown()) { + log.warn( + "The scheduled executor service is shutdown and ignores the callable {}", + callable); + } else { + mainScheduledExecutor.schedule( + () -> gateway.runAsync(ft), delayMillis, TimeUnit.MILLISECONDS); + } return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS); } @@ -453,5 +549,22 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { public void assertRunningInMainThread() { mainThreadCheck.run(); } + + /** Shutdown the {@link ScheduledThreadPoolExecutor} and remove all the pending tasks. */ + @Override + public void close() { + if (!mainScheduledExecutor.isShutdown()) { + mainScheduledExecutor.shutdownNow(); + } + } + + /** + * Validate whether the scheduled executor is closed. + * + * @return true if the scheduled executor is shutdown, otherwise false + */ + final boolean validateScheduledExecutorClosed() { + return mainScheduledExecutor.isShutdown(); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java index 24b1c1b..d4d342b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java @@ -110,6 +110,7 @@ public class FencedRpcEndpointTest { assertEquals(newFencingToken, fencedTestingEndpoint.getFencingToken()); } finally { RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout); + fencedTestingEndpoint.validateResourceClosed(); } } @@ -178,6 +179,7 @@ public class FencedRpcEndpointTest { } finally { RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout); + fencedTestingEndpoint.validateResourceClosed(); } } @@ -245,6 +247,7 @@ public class FencedRpcEndpointTest { } } finally { RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout); + fencedTestingEndpoint.validateResourceClosed(); } } @@ -294,6 +297,7 @@ public class FencedRpcEndpointTest { } finally { RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout); + fencedTestingEndpoint.validateResourceClosed(); } } @@ -335,6 +339,7 @@ public class FencedRpcEndpointTest { } } finally { RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout); + fencedTestingEndpoint.validateResourceClosed(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java index baec000..0deab41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java @@ -32,9 +32,12 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -42,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assumptions.assumeTrue; /** Tests for the RpcEndpoint, its self gateways and MainThreadExecutor scheduling command. */ @ExtendWith(TestLoggerExtension.class) @@ -79,6 +83,8 @@ public class RpcEndpointTest { assertEquals(Integer.valueOf(expectedValue), foobar.get()); } finally { RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT); + + baseEndpoint.validateResourceClosed(); } } @@ -104,6 +110,8 @@ public class RpcEndpointTest { "Expected to fail with a RuntimeException since we requested the wrong gateway type."); } finally { RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT); + + baseEndpoint.validateResourceClosed(); } }); } @@ -134,6 +142,7 @@ public class RpcEndpointTest { assertEquals(foo, differentGateway.foo().get()); } finally { RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT); + endpoint.validateResourceClosed(); } } @@ -152,6 +161,7 @@ public class RpcEndpointTest { assertTrue(gateway.queryIsRunningFlag().get()); } finally { RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT); + endpoint.validateResourceClosed(); } } @@ -172,6 +182,7 @@ public class RpcEndpointTest { stopFuture.complete(null); terminationFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + endpoint.validateResourceClosed(); } public interface BaseGateway extends RpcGateway { @@ -281,6 +292,7 @@ public class RpcEndpointTest { asyncExecutionFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()); } finally { RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT); + endpoint.validateResourceClosed(); } } @@ -301,6 +313,28 @@ public class RpcEndpointTest { } @Test + public void testScheduleRunnableAfterClose() throws Exception { + testScheduleAfterClose( + (mainThreadExecutor, expectedDelay) -> + mainThreadExecutor.schedule( + () -> {}, expectedDelay.toMillis() / 1000, TimeUnit.SECONDS)); + } + + @Test + public void testCancelScheduledRunnable() throws Exception { + testCancelScheduledTask( + (mainThreadExecutor, longCompletableFuture) -> { + final Duration delayDuration = Duration.ofMillis(2); + return mainThreadExecutor.schedule( + () -> { + longCompletableFuture.complete(delayDuration.toMillis()); + }, + delayDuration.toMillis(), + TimeUnit.MILLISECONDS); + }); + } + + @Test public void testScheduleCallableWithDelayInMilliseconds() throws Exception { testScheduleWithDelay( (mainThreadExecutor, expectedDelay) -> @@ -316,22 +350,87 @@ public class RpcEndpointTest { () -> 1, expectedDelay.toMillis() / 1000, TimeUnit.SECONDS)); } + @Test + public void testScheduleCallableAfterClose() throws Exception { + testScheduleAfterClose( + (mainThreadExecutor, expectedDelay) -> + mainThreadExecutor.schedule( + () -> 1, expectedDelay.toMillis() / 1000, TimeUnit.SECONDS)); + } + + @Test + public void testCancelScheduledCallable() { + testCancelScheduledTask( + (mainThreadExecutor, longCompletableFuture) -> { + final Duration delayDuration = Duration.ofMillis(2); + return mainThreadExecutor.schedule( + () -> { + longCompletableFuture.complete(delayDuration.toMillis()); + return null; + }, + delayDuration.toMillis(), + TimeUnit.MILLISECONDS); + }); + } + private static void testScheduleWithDelay( BiConsumer<RpcEndpoint.MainThreadExecutor, Duration> scheduler) throws Exception { - final CompletableFuture<Long> actualDelayMsFuture = new CompletableFuture<>(); + final CompletableFuture<Void> taskCompletedFuture = new CompletableFuture<>(); + final String endpointId = "foobar"; final MainThreadExecutable mainThreadExecutable = - new TestMainThreadExecutable( - (runnable, delay) -> actualDelayMsFuture.complete(delay)); + new TestMainThreadExecutable((runnable) -> taskCompletedFuture.complete(null)); final RpcEndpoint.MainThreadExecutor mainThreadExecutor = - new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}); + new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}, endpointId); final Duration expectedDelay = Duration.ofSeconds(1); scheduler.accept(mainThreadExecutor, expectedDelay); - assertEquals(actualDelayMsFuture.get(), expectedDelay.toMillis()); + taskCompletedFuture.get(); + mainThreadExecutor.close(); + } + + private static void testScheduleAfterClose( + BiFunction<RpcEndpoint.MainThreadExecutor, Duration, ScheduledFuture<?>> scheduler) { + final CompletableFuture<Void> taskCompletedFuture = new CompletableFuture<>(); + final String endpointId = "foobar"; + + final MainThreadExecutable mainThreadExecutable = + new TestMainThreadExecutable((runnable) -> taskCompletedFuture.complete(null)); + + final RpcEndpoint.MainThreadExecutor mainThreadExecutor = + new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}, endpointId); + + mainThreadExecutor.close(); + + final Duration expectedDelay = Duration.ofSeconds(0); + ScheduledFuture<?> future = scheduler.apply(mainThreadExecutor, expectedDelay); + assertFalse(taskCompletedFuture.isDone()); + assertFalse(future.isDone()); + } + + private static void testCancelScheduledTask( + BiFunction<RpcEndpoint.MainThreadExecutor, CompletableFuture<Long>, ScheduledFuture<?>> + scheduler) { + final String endpointId = "foobar"; + + final MainThreadExecutable mainThreadExecutable = + new TestMainThreadExecutable(Runnable::run); + + final RpcEndpoint.MainThreadExecutor mainThreadExecutor = + new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}, endpointId); + final CompletableFuture<Long> actualDelayMsFuture = new CompletableFuture<>(); + + ScheduledFuture<?> scheduledFuture = + scheduler.apply(mainThreadExecutor, actualDelayMsFuture); + scheduledFuture.cancel(true); + + assumeTrue(!actualDelayMsFuture.isDone(), "The command is done and no need to cancel it."); + assertTrue(scheduledFuture.isCancelled()); + assertFalse(actualDelayMsFuture.isDone()); + mainThreadExecutor.close(); } /** @@ -356,6 +455,7 @@ public class RpcEndpointTest { assertEquals(expectedInteger, integerFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit())); } finally { RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT); + endpoint.validateResourceClosed(); } } @@ -387,20 +487,21 @@ public class RpcEndpointTest { } finally { latch.countDown(); RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT); + endpoint.validateResourceClosed(); } } private static class TestMainThreadExecutable implements MainThreadExecutable { - private final BiConsumer<Runnable, Long> scheduleRunAsyncConsumer; + private final Consumer<Runnable> scheduleRunAsyncConsumer; - private TestMainThreadExecutable(BiConsumer<Runnable, Long> scheduleRunAsyncConsumer) { + private TestMainThreadExecutable(Consumer<Runnable> scheduleRunAsyncConsumer) { this.scheduleRunAsyncConsumer = scheduleRunAsyncConsumer; } @Override public void runAsync(Runnable runnable) { - throw new UnsupportedOperationException(); + scheduleRunAsyncConsumer.accept(runnable); } @Override @@ -410,7 +511,7 @@ public class RpcEndpointTest { @Override public void scheduleRunAsync(Runnable runnable, long delay) { - scheduleRunAsyncConsumer.accept(runnable, delay); + scheduleRunAsyncConsumer.accept(runnable); } } }