[FLINK-4687] [rpc] Add getAddress to RpcService

This closes #2551.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07512e06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07512e06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07512e06

Branch: refs/heads/flip-6
Commit: 07512e06acfc4bb3c48f1286ce52478c64ffb259
Parents: 59d9e67
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Sep 26 18:01:47 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:41 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/rpc/RpcService.java    |  8 ++++++++
 .../flink/runtime/rpc/akka/AkkaRpcService.java      | 16 ++++++++++++++++
 .../apache/flink/runtime/rpc/TestingRpcService.java |  5 +++--
 .../flink/runtime/rpc/TestingSerialRpcService.java  |  6 ++++++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java    |  8 ++++----
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java  |  5 +++++
 6 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 437e08b..96844ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -33,6 +33,14 @@ import java.util.concurrent.TimeUnit;
 public interface RpcService {
 
        /**
+        * Return the address under which the rpc service can be reached. If 
the rpc service cannot be
+        * contacted remotely, then it will return an empty string.
+        *
+        * @return Address of the rpc service or empty string if local rpc 
service
+        */
+       String getAddress();
+
+       /**
         * Connect to a remote rpc server under the provided address. Returns a 
rpc gateway which can
         * be used to communicate with the rpc server. If the connection 
failed, then the returned
         * future is failed with a {@link RpcConnectionException}.

http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index cee19c4..6825557 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -22,6 +22,7 @@ import akka.actor.ActorIdentity;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.actor.Address;
 import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -75,6 +76,8 @@ public class AkkaRpcService implements RpcService {
        private final Set<ActorRef> actors = new HashSet<>(4);
        private final long maximumFramesize;
 
+       private final String address;
+
        private volatile boolean stopped;
 
        public AkkaRpcService(final ActorSystem actorSystem, final Time 
timeout) {
@@ -87,6 +90,19 @@ public class AkkaRpcService implements RpcService {
                        // only local communication
                        maximumFramesize = Long.MAX_VALUE;
                }
+
+               Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+
+               if (actorSystemAddress.host().isDefined()) {
+                       address = actorSystemAddress.host().get();
+               } else {
+                       address = "";
+               }
+       }
+
+       @Override
+       public String getAddress() {
+               return address;
        }
 
        // this method does not mutate state and is thus thread-safe

http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index f164056..47c9e24 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 
+import java.net.UnknownHostException;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -57,14 +58,14 @@ public class TestingRpcService extends AkkaRpcService {
        /**
         * Creates a new {@code TestingRpcService}. 
         */
-       public TestingRpcService() {
+       public TestingRpcService() throws UnknownHostException {
                this(new Configuration());
        }
 
        /**
         * Creates a new {@code TestingRpcService}, using the given 
configuration. 
         */
-       public TestingRpcService(Configuration configuration) {
+       public TestingRpcService(Configuration configuration) throws 
UnknownHostException {
                super(AkkaUtils.createLocalActorSystem(configuration), 
Time.seconds(10));
 
                this.registeredConnections = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index c58ea20..5b8e6e6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -30,6 +30,7 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.net.InetAddress;
 import java.util.BitSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -121,6 +122,11 @@ public class TestingSerialRpcService implements RpcService 
{
        }
 
        @Override
+       public String getAddress() {
+               return "";
+       }
+
+       @Override
        public <C extends RpcGateway> Future<C> connect(String address, 
Class<C> clazz) {
                RpcGateway gateway = registeredConnections.get(address);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 1e8c9a6..5d76024 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -133,7 +133,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
                Future<WrongRpcGateway> futureGateway = 
akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
 
-               WrongRpcGateway gateway = Await.result(futureGateway, 
timeout.duration());
+               WrongRpcGateway gateway = futureGateway.get(timeout.getSize(), 
timeout.getUnit());
 
                // since it is a tell operation we won't receive a 
RpcConnectionException, it's only logged
                gateway.tell("foobar");
@@ -141,10 +141,10 @@ public class AkkaRpcActorTest extends TestLogger {
                Future<Boolean> result = gateway.barfoo();
 
                try {
-                       Await.result(result, timeout.duration());
+                       result.get(timeout.getSize(), timeout.getUnit());
                        fail("We expected a RpcConnectionException.");
-               } catch (RpcConnectionException rpcConnectionException) {
-                       // we expect this exception here
+               } catch (ExecutionException executionException) {
+                       assertTrue(executionException.getCause() instanceof 
RpcConnectionException);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 5550cb5..3388011 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -115,4 +115,9 @@ public class AkkaRpcServiceTest extends TestLogger {
                assertEquals(expected, actual);
                assertTrue(latch.isTriggered());
        }
+
+       @Test
+       public void testGetAddress() {
+               assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), 
akkaRpcService.getAddress());
+       }
 }

Reply via email to