Repository: flink Updated Branches: refs/heads/flip-6 ad6693ee6 -> 08af7def6
[FLINK-4368] [distributed runtime] Eagerly initialize the RPC endpoint members This closes #2351 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08af7def Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08af7def Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08af7def Branch: refs/heads/flip-6 Commit: 08af7def60d54c22126b902e6fa57101d5fbb8fa Parents: ad6693e Author: Stephan Ewen <se...@apache.org> Authored: Wed Aug 10 18:27:21 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Aug 11 15:18:11 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/rpc/MainThreadExecutor.java | 9 +- .../apache/flink/runtime/rpc/RpcEndpoint.java | 156 +++++++++++-------- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 4 +- 3 files changed, 99 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/08af7def/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java index e06711e..14b2997 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java @@ -26,22 +26,23 @@ import java.util.concurrent.TimeoutException; /** * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying - * rpc server. + * RPC endpoint. * - * This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint} + * <p>This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint} * implementation which allows to dispatch local procedures to the main thread of the underlying * rpc server. */ public interface MainThreadExecutor { + /** - * Execute the runnable in the main thread of the underlying rpc server. + * Execute the runnable in the main thread of the underlying RPC endpoint. * * @param runnable Runnable to be executed */ void runAsync(Runnable runnable); /** - * Execute the callable in the main thread of the underlying rpc server and return a future for + * Execute the callable in the main thread of the underlying RPC endpoint and return a future for * the callable result. If the future is not completed within the given timeout, the returned * future will throw a {@link TimeoutException}. * http://git-wip-us.apache.org/repos/asf/flink/blob/08af7def/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 3d8757f..0d928a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -19,85 +19,116 @@ package org.apache.flink.runtime.rpc; import akka.util.Timeout; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import java.util.concurrent.Callable; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * Base class for rpc endpoints. Distributed components which offer remote procedure calls have to - * extend the rpc endpoint base class. + * Base class for RPC endpoints. Distributed components which offer remote procedure calls have to + * extend the RPC endpoint base class. An RPC endpoint is backed by an {@link RpcService}. + * + * <h1>Endpoint and Gateway</h1> + * + * To be done... + * + * <h1>Single Threaded Endpoint Execution </h1> + * + * <p>All RPC calls on the same endpoint are called by the same thread + * (referred to as the endpoint's <i>main thread</i>). + * Thus, by executing all state changing operations within the main + * thread, we don't have to reason about concurrent accesses, in the same way in the Actor Model + * of Erlang or Akka. * - * The main idea is that a rpc endpoint is backed by a rpc server which has a single thread - * processing the rpc calls. Thus, by executing all state changing operations within the main - * thread, we don't have to reason about concurrent accesses. The rpc provides provides - * {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} and the - * {@link #getMainThreadExecutionContext()} to execute code in the rpc server's main thread. + * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} + * and the {@link #getMainThreadExecutionContext()} to execute code in the RPC endoint's main thread. * - * @param <C> Rpc gateway counterpart for the implementing rpc endpoint + * @param <C> The RPC gateway counterpart for the implementing RPC endpoint */ public abstract class RpcEndpoint<C extends RpcGateway> { protected final Logger log = LoggerFactory.getLogger(getClass()); - /** Rpc service to be used to start the rpc server and to obtain rpc gateways */ + // ------------------------------------------------------------------------ + + /** RPC service to be used to start the RPC server and to obtain rpc gateways */ private final RpcService rpcService; /** Self gateway which can be used to schedule asynchronous calls on yourself */ - private C self; + private final C self; + + /** the fully qualified address of the this RPC endpoint */ + private final String selfAddress; + + /** The main thread execution context to be used to execute future callbacks in the main thread + * of the executing rpc server. */ + private final MainThreadExecutionContext mainThreadExecutionContext; + /** - * The main thread execution context to be used to execute future callbacks in the main thread - * of the executing rpc server. - * - * IMPORTANT: The main thread context is only available after the rpc server has been started. + * Initializes the RPC endpoint. + * + * @param rpcService The RPC server that dispatches calls to this RPC endpoint. */ - private MainThreadExecutionContext mainThreadExecutionContext; - public RpcEndpoint(RpcService rpcService) { - this.rpcService = rpcService; + this.rpcService = checkNotNull(rpcService, "rpcService"); + this.self = rpcService.startServer(this); + this.selfAddress = rpcService.getAddress(self); + this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self); } + // ------------------------------------------------------------------------ + // Shutdown + // ------------------------------------------------------------------------ + /** - * Get self-gateway which should be used to run asynchronous rpc calls on this endpoint. - * - * IMPORTANT: Always issue local method calls via the self-gateway if the current thread - * is not the main thread of the underlying rpc server, e.g. from within a future callback. - * - * @return Self gateway + * Shuts down the underlying RPC endpoint via the RPC service. + * After this method was called, the RPC endpoint will no longer be reachable, neither remotely, + * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread + * any more (via {@link #callAsync(Callable, Timeout)} and {@link #runAsync(Runnable)}). + * + * <p>This method can be overridden to add RPC endpoint specific shut down code. + * The overridden method should always call the parent shut down method. */ - public C getSelf() { - return self; + public void shutDown() { + rpcService.stopServer(self); } + // ------------------------------------------------------------------------ + // Basic RPC endpoint properties + // ------------------------------------------------------------------------ + /** - * Execute the runnable in the main thread of the underlying rpc server. + * Get self-gateway which should be used to run asynchronous RPC calls on this endpoint. + * + * <p><b>IMPORTANT</b>: Always issue local method calls via the self-gateway if the current thread + * is not the main thread of the underlying rpc server, e.g. from within a future callback. * - * @param runnable Runnable to be executed in the main thread of the underlying rpc server + * @return The self gateway */ - public void runAsync(Runnable runnable) { - ((MainThreadExecutor) self).runAsync(runnable); + public C getSelf() { + return self; } /** - * Execute the callable in the main thread of the underlying rpc server returning a future for - * the result of the callable. If the callable is not completed within the given timeout, then - * the future will be failed with a {@link java.util.concurrent.TimeoutException}. + * Gets the address of the underlying RPC endpoint. The address should be fully qualified so that + * a remote system can connect to this RPC endpoint via this address. * - * @param callable Callable to be executed in the main thread of the underlying rpc server - * @param timeout Timeout for the callable to be completed - * @param <V> Return type of the callable - * @return Future for the result of the callable. + * @return Fully qualified address of the underlying RPC endpoint */ - public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) { - return ((MainThreadExecutor) self).callAsync(callable, timeout); + public String getAddress() { + return selfAddress; } /** * Gets the main thread execution context. The main thread execution context can be used to - * execute tasks in the main thread of the underlying rpc server. + * execute tasks in the main thread of the underlying RPC endpoint. * * @return Main thread execution context */ @@ -106,52 +137,51 @@ public abstract class RpcEndpoint<C extends RpcGateway> { } /** - * Gets the used rpc service. + * Gets the endpoint's RPC service. * - * @return Rpc service + * @return The endpoint's RPC service */ public RpcService getRpcService() { return rpcService; } - /** - * Starts the underlying rpc server via the rpc service and creates the main thread execution - * context. This makes the rpc endpoint effectively reachable from the outside. - * - * Can be overriden to add rpc endpoint specific start up code. Should always call the parent - * start method. - */ - public void start() { - self = rpcService.startServer(this); - mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self); - } - + // ------------------------------------------------------------------------ + // Asynchronous executions + // ------------------------------------------------------------------------ /** - * Shuts down the underlying rpc server via the rpc service. + * Execute the runnable in the main thread of the underlying RPC endpoint. * - * Can be overriden to add rpc endpoint specific shut down code. Should always call the parent - * shut down method. + * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint */ - public void shutDown() { - rpcService.stopServer(self); + public void runAsync(Runnable runnable) { + ((MainThreadExecutor) self).runAsync(runnable); } /** - * Gets the address of the underlying rpc server. The address should be fully qualified so that - * a remote system can connect to this rpc server via this address. + * Execute the callable in the main thread of the underlying RPC service, returning a future for + * the result of the callable. If the callable is not completed within the given timeout, then + * the future will be failed with a {@link java.util.concurrent.TimeoutException}. * - * @return Fully qualified address of the underlying rpc server + * @param callable Callable to be executed in the main thread of the underlying rpc server + * @param timeout Timeout for the callable to be completed + * @param <V> Return type of the callable + * @return Future for the result of the callable. */ - public String getAddress() { - return rpcService.getAddress(self); + public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) { + return ((MainThreadExecutor) self).callAsync(callable, timeout); } + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + /** * Execution context which executes runnables in the main thread context. A reported failure * will cause the underlying rpc server to shut down. */ private class MainThreadExecutionContext implements ExecutionContext { + private final MainThreadExecutor gateway; MainThreadExecutionContext(MainThreadExecutor gateway) { http://git-wip-us.apache.org/repos/asf/flink/blob/08af7def/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index c5bac94..642a380 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -54,15 +54,13 @@ public class AkkaRpcServiceTest extends TestLogger { ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService); JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService); - resourceManager.start(); - ResourceManagerGateway rm = resourceManager.getSelf(); assertTrue(rm instanceof AkkaGateway); AkkaGateway akkaClient = (AkkaGateway) rm; - jobMaster.start(); + jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getActorRef())); // wait for successful registration