[FLINK-4694] [rpc] Add termination futures to RpcEndpoint and RpcService The termination futures can be used to wait for the termination of the respective component.
This closes #2558. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9dfaf457 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9dfaf457 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9dfaf457 Branch: refs/heads/flip-6 Commit: 9dfaf457fcc282fb01a1ee11950416e6a0b51171 Parents: 5915613 Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Sep 27 18:17:42 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:43 2016 +0200 ---------------------------------------------------------------------- .../concurrent/impl/FlinkCompletableFuture.java | 11 +++--- .../apache/flink/runtime/rpc/RpcEndpoint.java | 9 +++++ .../apache/flink/runtime/rpc/RpcService.java | 7 ++++ .../apache/flink/runtime/rpc/SelfGateway.java | 34 ++++++++++++++++++ .../runtime/rpc/akka/AkkaInvocationHandler.java | 22 ++++++++++-- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 17 ++++++++- .../flink/runtime/rpc/akka/AkkaRpcService.java | 32 +++++++++++++++-- .../runtime/rpc/TestingSerialRpcService.java | 10 +++++- .../runtime/rpc/akka/AkkaRpcActorTest.java | 36 ++++++++++++++++++++ .../runtime/rpc/akka/AkkaRpcServiceTest.java | 29 ++++++++++++++++ 10 files changed, 193 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java index e648a71..14686d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.concurrent.impl; import akka.dispatch.Futures; import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.util.Preconditions; import scala.concurrent.Promise; import scala.concurrent.Promise$; @@ -52,8 +51,6 @@ public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements Complet @Override public boolean complete(T value) { - Preconditions.checkNotNull(value); - try { promise.success(value); @@ -65,10 +62,12 @@ public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements Complet @Override public boolean completeExceptionally(Throwable t) { - Preconditions.checkNotNull(t); - try { - promise.failure(t); + if (t == null) { + promise.failure(new NullPointerException("Throwable was null.")); + } else { + promise.failure(t); + } return true; } catch (IllegalStateException e) { http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 79961f7..f93a2e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -173,6 +173,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> { return rpcService; } + /** + * Return a future which is completed when the rpc endpoint has been terminated. + * + * @return Future which is completed when the rpc endpoint has been terminated. + */ + public Future<Void> getTerminationFuture() { + return ((SelfGateway)self).getTerminationFuture(); + } + // ------------------------------------------------------------------------ // Asynchronous executions // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 96844ed..2052f98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -77,6 +77,13 @@ public interface RpcService { void stopService(); /** + * Returns a future indicating when the RPC service has been shut down. + * + * @return Termination future + */ + Future<Void> getTerminationFuture(); + + /** * Gets the executor, provided by this RPC service. This executor can be used for example for * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures. * http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java new file mode 100644 index 0000000..ed8ef9d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import org.apache.flink.runtime.concurrent.Future; + +/** + * Interface for self gateways + */ +public interface SelfGateway { + + /** + * Return a future which is completed when the rpc endpoint has been terminated. + * + * @return Future indicating when the rpc endpoint has been terminated + */ + Future<Void> getTerminationFuture(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 8f4deff..709ff92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadExecutable; +import org.apache.flink.runtime.rpc.SelfGateway; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.StartStoppable; @@ -52,7 +53,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is * executed. */ -class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable { +class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable, SelfGateway { private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class); private final String address; @@ -67,12 +68,22 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea private final long maximumFramesize; - AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Time timeout, long maximumFramesize) { + // null if gateway; otherwise non-null + private final Future<Void> terminationFuture; + + AkkaInvocationHandler( + String address, + ActorRef rpcEndpoint, + Time timeout, + long maximumFramesize, + Future<Void> terminationFuture) { + this.address = Preconditions.checkNotNull(address); this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint); this.isLocal = this.rpcEndpoint.path().address().hasLocalScope(); this.timeout = Preconditions.checkNotNull(timeout); this.maximumFramesize = maximumFramesize; + this.terminationFuture = terminationFuture; } @Override @@ -83,7 +94,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || - declaringClass.equals(RpcGateway.class)) { + declaringClass.equals(RpcGateway.class) || declaringClass.equals(SelfGateway.class)) { result = method.invoke(this, args); } else { String methodName = method.getName(); @@ -300,4 +311,9 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea public String getAddress() { return address; } + + @Override + public Future<Void> getTerminationFuture() { + return terminationFuture; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 1b456a7..c21383a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -24,6 +24,7 @@ import akka.actor.UntypedActorWithStash; import akka.dispatch.Futures; import akka.japi.Procedure; import akka.pattern.Patterns; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; @@ -76,9 +77,23 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp /** the helper that tracks whether calls come from the main thread */ private final MainThreadValidatorUtil mainThreadValidator; - AkkaRpcActor(final T rpcEndpoint) { + private final CompletableFuture<Void> terminationFuture; + + AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> terminationFuture) { this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); + this.terminationFuture = checkNotNull(terminationFuture); + } + + @Override + public void postStop() { + super.postStop(); + + // IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise + // we would complete the future and let the actor system restart the actor with a completed + // future. + // Complete the termination future so that others know that we've stopped. + terminationFuture.complete(null); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index fb7896a..44719c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -32,9 +32,12 @@ import akka.dispatch.Mapper; import akka.pattern.Patterns; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadExecutable; +import org.apache.flink.runtime.rpc.SelfGateway; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; @@ -131,7 +134,12 @@ public class AkkaRpcService implements RpcService { final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef); - InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize); + InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler( + address, + actorRef, + timeout, + maximumFramesize, + null); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink @@ -156,7 +164,8 @@ public class AkkaRpcService implements RpcService { public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); - Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint); + CompletableFuture<Void> terminationFuture = new FlinkCompletableFuture<>(); + Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture); ActorRef actorRef; synchronized (lock) { @@ -169,7 +178,12 @@ public class AkkaRpcService implements RpcService { final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef); - InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize); + InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler( + address, + actorRef, + timeout, + maximumFramesize, + terminationFuture); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink @@ -181,6 +195,7 @@ public class AkkaRpcService implements RpcService { classLoader, new Class<?>[]{ rpcEndpoint.getSelfGatewayType(), + SelfGateway.class, MainThreadExecutable.class, StartStoppable.class, AkkaGateway.class}, @@ -231,6 +246,17 @@ public class AkkaRpcService implements RpcService { } @Override + public Future<Void> getTerminationFuture() { + return FlinkFuture.supplyAsync(new Callable<Void>(){ + @Override + public Void call() throws Exception { + actorSystem.awaitTermination(); + return null; + } + }, getExecutor()); + } + + @Override public Executor getExecutor() { return actorSystem.dispatcher(); } http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 2a004c5..88906a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc; import akka.dispatch.Futures; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.util.DirectExecutorService; @@ -39,7 +40,6 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; - /** * An RPC Service implementation for testing. This RPC service directly executes all asynchronous * calls one by one in the calling thread. @@ -48,10 +48,12 @@ public class TestingSerialRpcService implements RpcService { private final DirectExecutorService executorService; private final ConcurrentHashMap<String, RpcGateway> registeredConnections; + private final CompletableFuture<Void> terminationFuture; public TestingSerialRpcService() { executorService = new DirectExecutorService(); this.registeredConnections = new ConcurrentHashMap<>(16); + this.terminationFuture = new FlinkCompletableFuture<>(); } @Override @@ -89,6 +91,12 @@ public class TestingSerialRpcService implements RpcService { public void stopService() { executorService.shutdown(); registeredConnections.clear(); + terminationFuture.complete(null); + } + + @Override + public Future<Void> getTerminationFuture() { + return terminationFuture; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 5d76024..ba8eb11 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -22,6 +22,8 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcMethod; @@ -32,9 +34,15 @@ import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.Test; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -148,6 +156,34 @@ public class AkkaRpcActorTest extends TestLogger { } } + /** + * Tests that we can wait for a RpcEndpoint to terminate. + * + * @throws ExecutionException + * @throws InterruptedException + */ + @Test(timeout=1000) + public void testRpcEndpointTerminationFuture() throws ExecutionException, InterruptedException { + final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService); + rpcEndpoint.start(); + + Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture(); + + assertFalse(terminationFuture.isDone()); + + FlinkFuture.supplyAsync(new Callable<Void>() { + @Override + public Void call() throws Exception { + rpcEndpoint.shutDown(); + + return null; + } + }, actorSystem.dispatcher()); + + // wait until the rpc endpoint has terminated + terminationFuture.get(); + } + private interface DummyRpcGateway extends RpcGateway { Future<Integer> foobar(); } http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 3388011..7c8defa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class AkkaRpcServiceTest extends TestLogger { @@ -120,4 +122,31 @@ public class AkkaRpcServiceTest extends TestLogger { public void testGetAddress() { assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress()); } + + /** + * Tests that we can wait for the termination of the rpc service + * + * @throws ExecutionException + * @throws InterruptedException + */ + @Test(timeout = 1000) + public void testTerminationFuture() throws ExecutionException, InterruptedException { + final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000)); + + Future<Void> terminationFuture = rpcService.getTerminationFuture(); + + assertFalse(terminationFuture.isDone()); + + FlinkFuture.supplyAsync(new Callable<Void>() { + @Override + public Void call() throws Exception { + rpcService.stopService(); + + return null; + } + }, actorSystem.dispatcher()); + + terminationFuture.get(); + } }