This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f5cdecaa2a314ae6f00cb0c8bb392bb62a381c09
Author: shammon <zjur...@gmail.com>
AuthorDate: Thu Mar 17 09:38:13 2022 +0800

    [FLINK-25085][runtime] Support schedule task in MainThreadExecutor local 
thread pool
    
    This closes #18303.
---
 .../flink/runtime/rpc/FencedRpcEndpoint.java       |  34 +++++-
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  | 135 +++++++++++++++++++--
 .../flink/runtime/rpc/FencedRpcEndpointTest.java   |   5 +
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 119 ++++++++++++++++--
 4 files changed, 268 insertions(+), 25 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
index 76bc3bb..4177a31 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.Preconditions;
 
@@ -56,10 +57,12 @@ public abstract class FencedRpcEndpoint<F extends 
Serializable> extends RpcEndpo
         this.fencingToken = fencingToken;
         this.unfencedMainThreadExecutor =
                 new UnfencedMainThreadExecutor((FencedMainThreadExecutable) 
rpcServer);
-        this.fencedMainThreadExecutor =
+
+        MainThreadExecutable mainThreadExecutable =
+                getRpcService().fenceRpcServer(rpcServer, fencingToken);
+        setFencedMainThreadExecutor(
                 new MainThreadExecutor(
-                        getRpcService().fenceRpcServer(rpcServer, 
fencingToken),
-                        this::validateRunsInMainThread);
+                        mainThreadExecutable, this::validateRunsInMainThread, 
endpointId));
     }
 
     protected FencedRpcEndpoint(RpcService rpcService, @Nullable F 
fencingToken) {
@@ -80,9 +83,23 @@ public abstract class FencedRpcEndpoint<F extends 
Serializable> extends RpcEndpo
         // which is bound to the new fencing token
         MainThreadExecutable mainThreadExecutable =
                 getRpcService().fenceRpcServer(rpcServer, newFencingToken);
+        setFencedMainThreadExecutor(
+                new MainThreadExecutor(
+                        mainThreadExecutable, this::validateRunsInMainThread, 
getEndpointId()));
+    }
 
-        this.fencedMainThreadExecutor =
-                new MainThreadExecutor(mainThreadExecutable, 
this::validateRunsInMainThread);
+    /**
+     * Set fenced main thread executor and register it to closeable register.
+     *
+     * @param fencedMainThreadExecutor the given fenced main thread executor
+     */
+    private void setFencedMainThreadExecutor(MainThreadExecutor 
fencedMainThreadExecutor) {
+        if (this.fencedMainThreadExecutor != null) {
+            this.fencedMainThreadExecutor.close();
+            unregisterResource(this.fencedMainThreadExecutor);
+        }
+        this.fencedMainThreadExecutor = fencedMainThreadExecutor;
+        registerResource(this.fencedMainThreadExecutor);
     }
 
     /**
@@ -142,6 +159,13 @@ public abstract class FencedRpcEndpoint<F extends 
Serializable> extends RpcEndpo
         }
     }
 
+    @VisibleForTesting
+    public boolean validateResourceClosed() {
+        return super.validateResourceClosed()
+                && (fencedMainThreadExecutor == null
+                        || 
fencedMainThreadExecutor.validateScheduledExecutorClosed());
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 9b7572e..e67509e 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -19,21 +19,28 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -107,6 +114,12 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
     private final MainThreadExecutor mainThreadExecutor;
 
     /**
+     * Register endpoint closeable resource to the registry and close them 
when the server is
+     * stopped.
+     */
+    private final CloseableRegistry resourceRegistry;
+
+    /**
      * Indicates whether the RPC endpoint is started and not stopped or being 
stopped.
      *
      * <p>IMPORTANT: the running state is not thread safe and can be used only 
in the main thread of
@@ -125,8 +138,11 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
         this.endpointId = checkNotNull(endpointId, "endpointId");
 
         this.rpcServer = rpcService.startServer(this);
+        this.resourceRegistry = new CloseableRegistry();
 
-        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, 
this::validateRunsInMainThread);
+        this.mainThreadExecutor =
+                new MainThreadExecutor(rpcServer, 
this::validateRunsInMainThread, endpointId);
+        registerResource(this.mainThreadExecutor);
     }
 
     /**
@@ -211,12 +227,44 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
      */
     public final CompletableFuture<Void> internalCallOnStop() {
         validateRunsInMainThread();
-        CompletableFuture<Void> stopFuture = onStop();
+        CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+        try {
+            resourceRegistry.close();
+            stopFuture.complete(null);
+        } catch (IOException e) {
+            stopFuture.completeExceptionally(
+                    new RuntimeException("Close resource registry fail", e));
+        }
+        stopFuture = CompletableFuture.allOf(stopFuture, onStop());
         isRunning = false;
         return stopFuture;
     }
 
     /**
+     * Register the given closeable resource to {@link CloseableRegistry}.
+     *
+     * @param closeableResource the given closeable resource
+     */
+    protected void registerResource(Closeable closeableResource) {
+        try {
+            resourceRegistry.registerCloseable(closeableResource);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Registry closeable resource " + closeableResource + " 
fail", e);
+        }
+    }
+
+    /**
+     * Unregister the given closeable resource from {@link CloseableRegistry}.
+     *
+     * @param closeableResource the given closeable resource
+     * @return true if the given resource unregister successful, otherwise 
false
+     */
+    protected boolean unregisterResource(Closeable closeableResource) {
+        return resourceRegistry.unregisterCloseable(closeableResource);
+    }
+
+    /**
      * User overridable callback which is called from {@link 
#internalCallOnStop()}.
      *
      * <p>This method is called when the RpcEndpoint is being shut down. The 
method is guaranteed to
@@ -395,23 +443,38 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
         assert 
MainThreadValidatorUtil.isRunningInExpectedThread(currentMainThread.get());
     }
 
+    /**
+     * Validate whether all the resources are closed.
+     *
+     * @return true if all the resources are closed, otherwise false
+     */
+    boolean validateResourceClosed() {
+        return mainThreadExecutor.validateScheduledExecutorClosed() && 
resourceRegistry.isClosed();
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
 
     /** Executor which executes runnables in the main thread context. */
-    protected static class MainThreadExecutor implements 
ComponentMainThreadExecutor {
+    protected static class MainThreadExecutor implements 
ComponentMainThreadExecutor, Closeable {
+        private static final Logger log = 
LoggerFactory.getLogger(MainThreadExecutor.class);
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
-
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable 
mainThreadCheck) {
+        /**
+         * The main scheduled executor manages the scheduled tasks and send 
them to gateway when
+         * they should be executed.
+         */
+        private final ScheduledExecutorService mainScheduledExecutor;
+
+        MainThreadExecutor(
+                MainThreadExecutable gateway, Runnable mainThreadCheck, String 
endpointId) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-        }
-
-        private void scheduleRunAsync(Runnable runnable, long delayMillis) {
-            gateway.scheduleRunAsync(runnable, delayMillis);
+            this.mainScheduledExecutor =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ExecutorThreadFactory(endpointId + 
"-main-scheduler"));
         }
 
         @Override
@@ -419,19 +482,52 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
             gateway.runAsync(command);
         }
 
+        /**
+         * The mainScheduledExecutor manages the task and sends it to the 
gateway after the given
+         * delay.
+         *
+         * @param command the task to execute in the future
+         * @param delay the time from now to delay the execution
+         * @param unit the time unit of the delay parameter
+         * @return a ScheduledFuture representing the completion of the 
scheduled task
+         */
         @Override
         public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
             final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
             FutureTask<Void> ft = new FutureTask<>(command, null);
-            scheduleRunAsync(ft, delayMillis);
+            if (mainScheduledExecutor.isShutdown()) {
+                log.warn(
+                        "The scheduled executor service is shutdown and 
ignores the command {}",
+                        command);
+            } else {
+                mainScheduledExecutor.schedule(
+                        () -> gateway.runAsync(ft), delayMillis, 
TimeUnit.MILLISECONDS);
+            }
             return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
         }
 
+        /**
+         * The mainScheduledExecutor manages the given callable and sends it 
to the gateway after
+         * the given delay.
+         *
+         * @param callable the callable to execute
+         * @param delay the time from now to delay the execution
+         * @param unit the time unit of the delay parameter
+         * @param <V> result type of the callable
+         * @return a ScheduledFuture which holds the future value of the given 
callable
+         */
         @Override
         public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit) {
             final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
             FutureTask<V> ft = new FutureTask<>(callable);
-            scheduleRunAsync(ft, delayMillis);
+            if (mainScheduledExecutor.isShutdown()) {
+                log.warn(
+                        "The scheduled executor service is shutdown and 
ignores the callable {}",
+                        callable);
+            } else {
+                mainScheduledExecutor.schedule(
+                        () -> gateway.runAsync(ft), delayMillis, 
TimeUnit.MILLISECONDS);
+            }
             return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
         }
 
@@ -453,5 +549,22 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
         public void assertRunningInMainThread() {
             mainThreadCheck.run();
         }
+
+        /** Shutdown the {@link ScheduledThreadPoolExecutor} and remove all 
the pending tasks. */
+        @Override
+        public void close() {
+            if (!mainScheduledExecutor.isShutdown()) {
+                mainScheduledExecutor.shutdownNow();
+            }
+        }
+
+        /**
+         * Validate whether the scheduled executor is closed.
+         *
+         * @return true if the scheduled executor is shutdown, otherwise false
+         */
+        final boolean validateScheduledExecutorClosed() {
+            return mainScheduledExecutor.isShutdown();
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
index 24b1c1b..d4d342b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
@@ -110,6 +110,7 @@ public class FencedRpcEndpointTest {
             assertEquals(newFencingToken, 
fencedTestingEndpoint.getFencingToken());
         } finally {
             RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
+            fencedTestingEndpoint.validateResourceClosed();
         }
     }
 
@@ -178,6 +179,7 @@ public class FencedRpcEndpointTest {
 
         } finally {
             RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
+            fencedTestingEndpoint.validateResourceClosed();
         }
     }
 
@@ -245,6 +247,7 @@ public class FencedRpcEndpointTest {
             }
         } finally {
             RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
+            fencedTestingEndpoint.validateResourceClosed();
         }
     }
 
@@ -294,6 +297,7 @@ public class FencedRpcEndpointTest {
 
         } finally {
             RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
+            fencedTestingEndpoint.validateResourceClosed();
         }
     }
 
@@ -335,6 +339,7 @@ public class FencedRpcEndpointTest {
             }
         } finally {
             RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
+            fencedTestingEndpoint.validateResourceClosed();
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index baec000..0deab41 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -32,9 +32,12 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -42,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 /** Tests for the RpcEndpoint, its self gateways and MainThreadExecutor 
scheduling command. */
 @ExtendWith(TestLoggerExtension.class)
@@ -79,6 +83,8 @@ public class RpcEndpointTest {
             assertEquals(Integer.valueOf(expectedValue), foobar.get());
         } finally {
             RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
+
+            baseEndpoint.validateResourceClosed();
         }
     }
 
@@ -104,6 +110,8 @@ public class RpcEndpointTest {
                                 "Expected to fail with a RuntimeException 
since we requested the wrong gateway type.");
                     } finally {
                         RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
+
+                        baseEndpoint.validateResourceClosed();
                     }
                 });
     }
@@ -134,6 +142,7 @@ public class RpcEndpointTest {
             assertEquals(foo, differentGateway.foo().get());
         } finally {
             RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+            endpoint.validateResourceClosed();
         }
     }
 
@@ -152,6 +161,7 @@ public class RpcEndpointTest {
             assertTrue(gateway.queryIsRunningFlag().get());
         } finally {
             RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+            endpoint.validateResourceClosed();
         }
     }
 
@@ -172,6 +182,7 @@ public class RpcEndpointTest {
 
         stopFuture.complete(null);
         terminationFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+        endpoint.validateResourceClosed();
     }
 
     public interface BaseGateway extends RpcGateway {
@@ -281,6 +292,7 @@ public class RpcEndpointTest {
             asyncExecutionFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
         } finally {
             RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+            endpoint.validateResourceClosed();
         }
     }
 
@@ -301,6 +313,28 @@ public class RpcEndpointTest {
     }
 
     @Test
+    public void testScheduleRunnableAfterClose() throws Exception {
+        testScheduleAfterClose(
+                (mainThreadExecutor, expectedDelay) ->
+                        mainThreadExecutor.schedule(
+                                () -> {}, expectedDelay.toMillis() / 1000, 
TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testCancelScheduledRunnable() throws Exception {
+        testCancelScheduledTask(
+                (mainThreadExecutor, longCompletableFuture) -> {
+                    final Duration delayDuration = Duration.ofMillis(2);
+                    return mainThreadExecutor.schedule(
+                            () -> {
+                                
longCompletableFuture.complete(delayDuration.toMillis());
+                            },
+                            delayDuration.toMillis(),
+                            TimeUnit.MILLISECONDS);
+                });
+    }
+
+    @Test
     public void testScheduleCallableWithDelayInMilliseconds() throws Exception 
{
         testScheduleWithDelay(
                 (mainThreadExecutor, expectedDelay) ->
@@ -316,22 +350,87 @@ public class RpcEndpointTest {
                                 () -> 1, expectedDelay.toMillis() / 1000, 
TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testScheduleCallableAfterClose() throws Exception {
+        testScheduleAfterClose(
+                (mainThreadExecutor, expectedDelay) ->
+                        mainThreadExecutor.schedule(
+                                () -> 1, expectedDelay.toMillis() / 1000, 
TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testCancelScheduledCallable() {
+        testCancelScheduledTask(
+                (mainThreadExecutor, longCompletableFuture) -> {
+                    final Duration delayDuration = Duration.ofMillis(2);
+                    return mainThreadExecutor.schedule(
+                            () -> {
+                                
longCompletableFuture.complete(delayDuration.toMillis());
+                                return null;
+                            },
+                            delayDuration.toMillis(),
+                            TimeUnit.MILLISECONDS);
+                });
+    }
+
     private static void testScheduleWithDelay(
             BiConsumer<RpcEndpoint.MainThreadExecutor, Duration> scheduler) 
throws Exception {
-        final CompletableFuture<Long> actualDelayMsFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> taskCompletedFuture = new 
CompletableFuture<>();
+        final String endpointId = "foobar";
 
         final MainThreadExecutable mainThreadExecutable =
-                new TestMainThreadExecutable(
-                        (runnable, delay) -> 
actualDelayMsFuture.complete(delay));
+                new TestMainThreadExecutable((runnable) -> 
taskCompletedFuture.complete(null));
 
         final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
-                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> 
{});
+                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> 
{}, endpointId);
 
         final Duration expectedDelay = Duration.ofSeconds(1);
 
         scheduler.accept(mainThreadExecutor, expectedDelay);
 
-        assertEquals(actualDelayMsFuture.get(), expectedDelay.toMillis());
+        taskCompletedFuture.get();
+        mainThreadExecutor.close();
+    }
+
+    private static void testScheduleAfterClose(
+            BiFunction<RpcEndpoint.MainThreadExecutor, Duration, 
ScheduledFuture<?>> scheduler) {
+        final CompletableFuture<Void> taskCompletedFuture = new 
CompletableFuture<>();
+        final String endpointId = "foobar";
+
+        final MainThreadExecutable mainThreadExecutable =
+                new TestMainThreadExecutable((runnable) -> 
taskCompletedFuture.complete(null));
+
+        final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
+                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> 
{}, endpointId);
+
+        mainThreadExecutor.close();
+
+        final Duration expectedDelay = Duration.ofSeconds(0);
+        ScheduledFuture<?> future = scheduler.apply(mainThreadExecutor, 
expectedDelay);
+        assertFalse(taskCompletedFuture.isDone());
+        assertFalse(future.isDone());
+    }
+
+    private static void testCancelScheduledTask(
+            BiFunction<RpcEndpoint.MainThreadExecutor, 
CompletableFuture<Long>, ScheduledFuture<?>>
+                    scheduler) {
+        final String endpointId = "foobar";
+
+        final MainThreadExecutable mainThreadExecutable =
+                new TestMainThreadExecutable(Runnable::run);
+
+        final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
+                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> 
{}, endpointId);
+        final CompletableFuture<Long> actualDelayMsFuture = new 
CompletableFuture<>();
+
+        ScheduledFuture<?> scheduledFuture =
+                scheduler.apply(mainThreadExecutor, actualDelayMsFuture);
+        scheduledFuture.cancel(true);
+
+        assumeTrue(!actualDelayMsFuture.isDone(), "The command is done and no 
need to cancel it.");
+        assertTrue(scheduledFuture.isCancelled());
+        assertFalse(actualDelayMsFuture.isDone());
+        mainThreadExecutor.close();
     }
 
     /**
@@ -356,6 +455,7 @@ public class RpcEndpointTest {
             assertEquals(expectedInteger, integerFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit()));
         } finally {
             RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+            endpoint.validateResourceClosed();
         }
     }
 
@@ -387,20 +487,21 @@ public class RpcEndpointTest {
         } finally {
             latch.countDown();
             RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+            endpoint.validateResourceClosed();
         }
     }
 
     private static class TestMainThreadExecutable implements 
MainThreadExecutable {
 
-        private final BiConsumer<Runnable, Long> scheduleRunAsyncConsumer;
+        private final Consumer<Runnable> scheduleRunAsyncConsumer;
 
-        private TestMainThreadExecutable(BiConsumer<Runnable, Long> 
scheduleRunAsyncConsumer) {
+        private TestMainThreadExecutable(Consumer<Runnable> 
scheduleRunAsyncConsumer) {
             this.scheduleRunAsyncConsumer = scheduleRunAsyncConsumer;
         }
 
         @Override
         public void runAsync(Runnable runnable) {
-            throw new UnsupportedOperationException();
+            scheduleRunAsyncConsumer.accept(runnable);
         }
 
         @Override
@@ -410,7 +511,7 @@ public class RpcEndpointTest {
 
         @Override
         public void scheduleRunAsync(Runnable runnable, long delay) {
-            scheduleRunAsyncConsumer.accept(runnable, delay);
+            scheduleRunAsyncConsumer.accept(runnable);
         }
     }
 }

Reply via email to