Repository: flink
Updated Branches:
  refs/heads/flip-6 ad6693ee6 -> 08af7def6


[FLINK-4368] [distributed runtime] Eagerly initialize the RPC endpoint members

This closes #2351


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08af7def
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08af7def
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08af7def

Branch: refs/heads/flip-6
Commit: 08af7def60d54c22126b902e6fa57101d5fbb8fa
Parents: ad6693e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 10 18:27:21 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 11 15:18:11 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/MainThreadExecutor.java   |   9 +-
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 156 +++++++++++--------
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   4 +-
 3 files changed, 99 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08af7def/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index e06711e..14b2997 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -26,22 +26,23 @@ import java.util.concurrent.TimeoutException;
 
 /**
  * Interface to execute {@link Runnable} and {@link Callable} in the main 
thread of the underlying
- * rpc server.
+ * RPC endpoint.
  *
- * This interface is intended to be implemented by the self gateway in a 
{@link RpcEndpoint}
+ * <p>This interface is intended to be implemented by the self gateway in a 
{@link RpcEndpoint}
  * implementation which allows to dispatch local procedures to the main thread 
of the underlying
  * rpc server.
  */
 public interface MainThreadExecutor {
+
        /**
-        * Execute the runnable in the main thread of the underlying rpc server.
+        * Execute the runnable in the main thread of the underlying RPC 
endpoint.
         *
         * @param runnable Runnable to be executed
         */
        void runAsync(Runnable runnable);
 
        /**
-        * Execute the callable in the main thread of the underlying rpc server 
and return a future for
+        * Execute the callable in the main thread of the underlying RPC 
endpoint and return a future for
         * the callable result. If the future is not completed within the given 
timeout, the returned
         * future will throw a {@link TimeoutException}.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/08af7def/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 3d8757f..0d928a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -19,85 +19,116 @@
 package org.apache.flink.runtime.rpc;
 
 import akka.util.Timeout;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
 import java.util.concurrent.Callable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Base class for rpc endpoints. Distributed components which offer remote 
procedure calls have to
- * extend the rpc endpoint base class.
+ * Base class for RPC endpoints. Distributed components which offer remote 
procedure calls have to
+ * extend the RPC endpoint base class. An RPC endpoint is backed by an {@link 
RpcService}. 
+ * 
+ * <h1>Endpoint and Gateway</h1>
+ * 
+ * To be done...
+ * 
+ * <h1>Single Threaded Endpoint Execution </h1>
+ * 
+ * <p>All RPC calls on the same endpoint are called by the same thread
+ * (referred to as the endpoint's <i>main thread</i>).
+ * Thus, by executing all state changing operations within the main 
+ * thread, we don't have to reason about concurrent accesses, in the same way 
in the Actor Model
+ * of Erlang or Akka.
  *
- * The main idea is that a rpc endpoint is backed by a rpc server which has a 
single thread
- * processing the rpc calls. Thus, by executing all state changing operations 
within the main
- * thread, we don't have to reason about concurrent accesses. The rpc provides 
provides
- * {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} and the
- * {@link #getMainThreadExecutionContext()} to execute code in the rpc 
server's main thread.
+ * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link 
#callAsync(Callable, Timeout)}
+  * and the {@link #getMainThreadExecutionContext()} to execute code in the 
RPC endoint's main thread.
  *
- * @param <C> Rpc gateway counterpart for the implementing rpc endpoint
+ * @param <C> The RPC gateway counterpart for the implementing RPC endpoint
  */
 public abstract class RpcEndpoint<C extends RpcGateway> {
 
        protected final Logger log = LoggerFactory.getLogger(getClass());
 
-       /** Rpc service to be used to start the rpc server and to obtain rpc 
gateways */
+       // 
------------------------------------------------------------------------
+
+       /** RPC service to be used to start the RPC server and to obtain rpc 
gateways */
        private final RpcService rpcService;
 
        /** Self gateway which can be used to schedule asynchronous calls on 
yourself */
-       private C self;
+       private final C self;
+
+       /** the fully qualified address of the this RPC endpoint */
+       private final String selfAddress;
+
+       /** The main thread execution context to be used to execute future 
callbacks in the main thread
+        * of the executing rpc server. */
+       private final MainThreadExecutionContext mainThreadExecutionContext;
+
 
        /**
-        * The main thread execution context to be used to execute future 
callbacks in the main thread
-        * of the executing rpc server.
-        *
-        * IMPORTANT: The main thread context is only available after the rpc 
server has been started.
+        * Initializes the RPC endpoint.
+        * 
+        * @param rpcService The RPC server that dispatches calls to this RPC 
endpoint. 
         */
-       private MainThreadExecutionContext mainThreadExecutionContext;
-
        public RpcEndpoint(RpcService rpcService) {
-               this.rpcService = rpcService;
+               this.rpcService = checkNotNull(rpcService, "rpcService");
+               this.self = rpcService.startServer(this);
+               this.selfAddress = rpcService.getAddress(self);
+               this.mainThreadExecutionContext = new 
MainThreadExecutionContext((MainThreadExecutor) self);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Shutdown
+       // 
------------------------------------------------------------------------
+
        /**
-        * Get self-gateway which should be used to run asynchronous rpc calls 
on this endpoint.
-        *
-        * IMPORTANT: Always issue local method calls via the self-gateway if 
the current thread
-        * is not the main thread of the underlying rpc server, e.g. from 
within a future callback.
-        *
-        * @return Self gateway
+        * Shuts down the underlying RPC endpoint via the RPC service.
+        * After this method was called, the RPC endpoint will no longer be 
reachable, neither remotely,
+        * not via its {@link #getSelf() self gateway}. It will also not 
accepts executions in main thread
+        * any more (via {@link #callAsync(Callable, Timeout)} and {@link 
#runAsync(Runnable)}).
+        * 
+        * <p>This method can be overridden to add RPC endpoint specific shut 
down code.
+        * The overridden method should always call the parent shut down method.
         */
-       public C getSelf() {
-               return self;
+       public void shutDown() {
+               rpcService.stopServer(self);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Basic RPC endpoint properties
+       // 
------------------------------------------------------------------------
+
        /**
-        * Execute the runnable in the main thread of the underlying rpc server.
+        * Get self-gateway which should be used to run asynchronous RPC calls 
on this endpoint.
+        *
+        * <p><b>IMPORTANT</b>: Always issue local method calls via the 
self-gateway if the current thread
+        * is not the main thread of the underlying rpc server, e.g. from 
within a future callback.
         *
-        * @param runnable Runnable to be executed in the main thread of the 
underlying rpc server
+        * @return The self gateway
         */
-       public void runAsync(Runnable runnable) {
-               ((MainThreadExecutor) self).runAsync(runnable);
+       public C getSelf() {
+               return self;
        }
 
        /**
-        * Execute the callable in the main thread of the underlying rpc server 
returning a future for
-        * the result of the callable. If the callable is not completed within 
the given timeout, then
-        * the future will be failed with a {@link 
java.util.concurrent.TimeoutException}.
+        * Gets the address of the underlying RPC endpoint. The address should 
be fully qualified so that
+        * a remote system can connect to this RPC endpoint via this address.
         *
-        * @param callable Callable to be executed in the main thread of the 
underlying rpc server
-        * @param timeout Timeout for the callable to be completed
-        * @param <V> Return type of the callable
-        * @return Future for the result of the callable.
+        * @return Fully qualified address of the underlying RPC endpoint
         */
-       public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
-               return ((MainThreadExecutor) self).callAsync(callable, timeout);
+       public String getAddress() {
+               return selfAddress;
        }
 
        /**
         * Gets the main thread execution context. The main thread execution 
context can be used to
-        * execute tasks in the main thread of the underlying rpc server.
+        * execute tasks in the main thread of the underlying RPC endpoint.
         *
         * @return Main thread execution context
         */
@@ -106,52 +137,51 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        }
 
        /**
-        * Gets the used rpc service.
+        * Gets the endpoint's RPC service.
         *
-        * @return Rpc service
+        * @return The endpoint's RPC service
         */
        public RpcService getRpcService() {
                return rpcService;
        }
 
-       /**
-        * Starts the underlying rpc server via the rpc service and creates the 
main thread execution
-        * context. This makes the rpc endpoint effectively reachable from the 
outside.
-        *
-        * Can be overriden to add rpc endpoint specific start up code. Should 
always call the parent
-        * start method.
-        */
-       public void start() {
-               self = rpcService.startServer(this);
-               mainThreadExecutionContext = new 
MainThreadExecutionContext((MainThreadExecutor) self);
-       }
-
+       // 
------------------------------------------------------------------------
+       //  Asynchronous executions
+       // 
------------------------------------------------------------------------
 
        /**
-        * Shuts down the underlying rpc server via the rpc service.
+        * Execute the runnable in the main thread of the underlying RPC 
endpoint.
         *
-        * Can be overriden to add rpc endpoint specific shut down code. Should 
always call the parent
-        * shut down method.
+        * @param runnable Runnable to be executed in the main thread of the 
underlying RPC endpoint
         */
-       public void shutDown() {
-               rpcService.stopServer(self);
+       public void runAsync(Runnable runnable) {
+               ((MainThreadExecutor) self).runAsync(runnable);
        }
 
        /**
-        * Gets the address of the underlying rpc server. The address should be 
fully qualified so that
-        * a remote system can connect to this rpc server via this address.
+        * Execute the callable in the main thread of the underlying RPC 
service, returning a future for
+        * the result of the callable. If the callable is not completed within 
the given timeout, then
+        * the future will be failed with a {@link 
java.util.concurrent.TimeoutException}.
         *
-        * @return Fully qualified address of the underlying rpc server
+        * @param callable Callable to be executed in the main thread of the 
underlying rpc server
+        * @param timeout Timeout for the callable to be completed
+        * @param <V> Return type of the callable
+        * @return Future for the result of the callable.
         */
-       public String getAddress() {
-               return rpcService.getAddress(self);
+       public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+               return ((MainThreadExecutor) self).callAsync(callable, timeout);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       
        /**
         * Execution context which executes runnables in the main thread 
context. A reported failure
         * will cause the underlying rpc server to shut down.
         */
        private class MainThreadExecutionContext implements ExecutionContext {
+
                private final MainThreadExecutor gateway;
 
                MainThreadExecutionContext(MainThreadExecutor gateway) {

http://git-wip-us.apache.org/repos/asf/flink/blob/08af7def/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index c5bac94..642a380 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -54,15 +54,13 @@ public class AkkaRpcServiceTest extends TestLogger {
                ResourceManager resourceManager = new 
ResourceManager(akkaRpcService, executorService);
                JobMaster jobMaster = new JobMaster(akkaRpcService2, 
executorService);
 
-               resourceManager.start();
-
                ResourceManagerGateway rm = resourceManager.getSelf();
 
                assertTrue(rm instanceof AkkaGateway);
 
                AkkaGateway akkaClient = (AkkaGateway) rm;
 
-               jobMaster.start();
+               
                
jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, 
akkaClient.getActorRef()));
 
                // wait for successful registration

Reply via email to