[FLINK-4414] [cluster] Add getAddress method to RpcGateway

The RpcGateway.getAddress method allows to retrieve the fully qualified address 
of the
associated RpcEndpoint.

This closes #2392.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c9f8844
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c9f8844
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c9f8844

Branch: refs/heads/flip-6
Commit: 6c9f8844952b91f9164aee4ebc75481c8f2eafef
Parents: a7e3579
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Aug 18 16:34:47 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Sep 8 17:26:58 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  6 +-----
 .../apache/flink/runtime/rpc/RpcGateway.java    |  7 +++++++
 .../apache/flink/runtime/rpc/RpcService.java    | 11 ----------
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 14 +++++++++++--
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 21 ++++++--------------
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 16 +++++++++++++++
 6 files changed, 42 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index a28bc14..7b3f8a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -69,9 +69,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        /** Self gateway which can be used to schedule asynchronous calls on 
yourself */
        private final C self;
 
-       /** the fully qualified address of the this RPC endpoint */
-       private final String selfAddress;
-
        /** The main thread execution context to be used to execute future 
callbacks in the main thread
         * of the executing rpc server. */
        private final ExecutionContext mainThreadExecutionContext;
@@ -92,7 +89,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
                this.selfGatewayType = 
ReflectionUtil.getTemplateType1(getClass());
                this.self = rpcService.startServer(this);
                
-               this.selfAddress = rpcService.getAddress(self);
                this.mainThreadExecutionContext = new 
MainThreadExecutionContext((MainThreadExecutor) self);
        }
 
@@ -156,7 +152,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @return Fully qualified address of the underlying RPC endpoint
         */
        public String getAddress() {
-               return selfAddress;
+               return self.getAddress();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
index e3a16b4..81075ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
@@ -22,4 +22,11 @@ package org.apache.flink.runtime.rpc;
  * Rpc gateway interface which has to be implemented by Rpc gateways.
  */
 public interface RpcGateway {
+
+       /**
+        * Returns the fully qualified address under which the associated rpc 
endpoint is reachable.
+        *
+        * @return Fully qualified address under which the associated rpc 
endpoint is reachable
+        */
+       String getAddress();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index fabdb05..bc0f5cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -65,17 +65,6 @@ public interface RpcService {
        void stopService();
 
        /**
-        * Get the fully qualified address of the underlying rpc server 
represented by the self gateway.
-        * It must be possible to connect from a remote host to the rpc server 
via the returned fully
-        * qualified address.
-        *
-        * @param selfGateway Self gateway associated with the underlying rpc 
server
-        * @param <C> Type of the rpc gateway
-        * @return Fully qualified address
-        */
-       <C extends RpcGateway> String getAddress(C selfGateway);
-
-       /**
         * Gets the execution context, provided by this RPC service. This 
execution
         * context can be used for example for the {@code onComplete(...)} or 
{@code onSuccess(...)}
         * methods of Futures.

http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 524bf74..bfa04f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -23,6 +23,7 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
@@ -55,6 +56,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor, StartStoppable {
        private static final Logger LOG = 
Logger.getLogger(AkkaInvocationHandler.class);
 
+       private final String address;
+
        private final ActorRef rpcEndpoint;
 
        // whether the actor ref is local and thus no message serialization is 
needed
@@ -65,7 +68,8 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
 
        private final long maximumFramesize;
 
-       AkkaInvocationHandler(ActorRef rpcEndpoint, Timeout timeout, long 
maximumFramesize) {
+       AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Timeout 
timeout, long maximumFramesize) {
+               this.address = Preconditions.checkNotNull(address);
                this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
                this.isLocal = 
this.rpcEndpoint.path().address().hasLocalScope();
                this.timeout = Preconditions.checkNotNull(timeout);
@@ -79,7 +83,8 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
                Object result;
 
                if (declaringClass.equals(AkkaGateway.class) || 
declaringClass.equals(MainThreadExecutor.class) ||
-                       declaringClass.equals(Object.class) || 
declaringClass.equals(StartStoppable.class)) {
+                       declaringClass.equals(Object.class) || 
declaringClass.equals(StartStoppable.class) ||
+                       declaringClass.equals(RpcGateway.class)) {
                        result = method.invoke(this, args);
                } else {
                        String methodName = method.getName();
@@ -290,4 +295,9 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
 
                return false;
        }
+
+       @Override
+       public String getAddress() {
+               return address;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index d987c2f..00a6932 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -102,7 +102,9 @@ public class AkkaRpcService implements RpcService {
                        public C apply(Object obj) {
                                ActorRef actorRef = ((ActorIdentity) 
obj).getRef();
 
-                               InvocationHandler akkaInvocationHandler = new 
AkkaInvocationHandler(actorRef, timeout, maximumFramesize);
+                               final String address = 
AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+                               InvocationHandler akkaInvocationHandler = new 
AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
 
                                // Rather than using the System ClassLoader 
directly, we derive the ClassLoader
                                // from this class . That works better in cases 
where Flink runs embedded and all Flink
@@ -135,7 +137,9 @@ public class AkkaRpcService implements RpcService {
 
                LOG.info("Starting RPC endpoint for {} at {} .", 
rpcEndpoint.getClass().getName(), actorRef.path());
 
-               InvocationHandler akkaInvocationHandler = new 
AkkaInvocationHandler(actorRef, timeout, maximumFramesize);
+               final String address = AkkaUtils.getAkkaURL(actorSystem, 
actorRef);
+
+               InvocationHandler akkaInvocationHandler = new 
AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
 
                // Rather than using the System ClassLoader directly, we derive 
the ClassLoader
                // from this class . That works better in cases where Flink 
runs embedded and all Flink
@@ -197,19 +201,6 @@ public class AkkaRpcService implements RpcService {
        }
 
        @Override
-       public String getAddress(RpcGateway selfGateway) {
-               checkState(!stopped, "RpcService is stopped");
-
-               if (selfGateway instanceof AkkaGateway) {
-                       ActorRef actorRef = ((AkkaGateway) 
selfGateway).getRpcEndpoint();
-                       return AkkaUtils.getAkkaURL(actorSystem, actorRef);
-               } else {
-                       String className = AkkaGateway.class.getName();
-                       throw new IllegalArgumentException("Cannot get address 
for non " + className + '.');
-               }
-       }
-
-       @Override
        public ExecutionContext getExecutionContext() {
                return actorSystem.dispatcher();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 1653fac..82d13f0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -34,6 +34,7 @@ import scala.concurrent.Future;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 public class AkkaRpcActorTest extends TestLogger {
@@ -57,6 +58,21 @@ public class AkkaRpcActorTest extends TestLogger {
        }
 
        /**
+        * Tests that the rpc endpoint and the associated rpc gateway have the 
same addresses.
+        * @throws Exception
+        */
+       @Test
+       public void testAddressResolution() throws Exception {
+               DummyRpcEndpoint rpcEndpoint = new 
DummyRpcEndpoint(akkaRpcService);
+
+               Future<DummyRpcGateway> futureRpcGateway = 
akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
+
+               DummyRpcGateway rpcGateway = Await.result(futureRpcGateway, 
timeout.duration());
+
+               assertEquals(rpcEndpoint.getAddress(), rpcGateway.getAddress());
+       }
+
+       /**
         * Tests that the {@link AkkaRpcActor} stashes messages until the 
corresponding
         * {@link RpcEndpoint} has been started.
         */

Reply via email to