[FLINK-4687] [rpc] Add getAddress to RpcService This closes #2551.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07512e06 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07512e06 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07512e06 Branch: refs/heads/flip-6 Commit: 07512e06acfc4bb3c48f1286ce52478c64ffb259 Parents: 59d9e67 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Sep 26 18:01:47 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:41 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/runtime/rpc/RpcService.java | 8 ++++++++ .../flink/runtime/rpc/akka/AkkaRpcService.java | 16 ++++++++++++++++ .../apache/flink/runtime/rpc/TestingRpcService.java | 5 +++-- .../flink/runtime/rpc/TestingSerialRpcService.java | 6 ++++++ .../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 8 ++++---- .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 5 +++++ 6 files changed, 42 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 437e08b..96844ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -33,6 +33,14 @@ import java.util.concurrent.TimeUnit; public interface RpcService { /** + * Return the address under which the rpc service can be reached. If the rpc service cannot be + * contacted remotely, then it will return an empty string. + * + * @return Address of the rpc service or empty string if local rpc service + */ + String getAddress(); + + /** * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can * be used to communicate with the rpc server. If the connection failed, then the returned * future is failed with a {@link RpcConnectionException}. http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index cee19c4..6825557 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -22,6 +22,7 @@ import akka.actor.ActorIdentity; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.actor.Address; import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; @@ -75,6 +76,8 @@ public class AkkaRpcService implements RpcService { private final Set<ActorRef> actors = new HashSet<>(4); private final long maximumFramesize; + private final String address; + private volatile boolean stopped; public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { @@ -87,6 +90,19 @@ public class AkkaRpcService implements RpcService { // only local communication maximumFramesize = Long.MAX_VALUE; } + + Address actorSystemAddress = AkkaUtils.getAddress(actorSystem); + + if (actorSystemAddress.host().isDefined()) { + address = actorSystemAddress.host().get(); + } else { + address = ""; + } + } + + @Override + public String getAddress() { + return address; } // this method does not mutate state and is thus thread-safe http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index f164056..47c9e24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import java.net.UnknownHostException; import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -57,14 +58,14 @@ public class TestingRpcService extends AkkaRpcService { /** * Creates a new {@code TestingRpcService}. */ - public TestingRpcService() { + public TestingRpcService() throws UnknownHostException { this(new Configuration()); } /** * Creates a new {@code TestingRpcService}, using the given configuration. */ - public TestingRpcService(Configuration configuration) { + public TestingRpcService(Configuration configuration) throws UnknownHostException { super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10)); this.registeredConnections = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index c58ea20..5b8e6e6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -30,6 +30,7 @@ import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.net.InetAddress; import java.util.BitSet; import java.util.UUID; import java.util.concurrent.Callable; @@ -121,6 +122,11 @@ public class TestingSerialRpcService implements RpcService { } @Override + public String getAddress() { + return ""; + } + + @Override public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) { RpcGateway gateway = registeredConnections.get(address); http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 1e8c9a6..5d76024 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -133,7 +133,7 @@ public class AkkaRpcActorTest extends TestLogger { Future<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class); - WrongRpcGateway gateway = Await.result(futureGateway, timeout.duration()); + WrongRpcGateway gateway = futureGateway.get(timeout.getSize(), timeout.getUnit()); // since it is a tell operation we won't receive a RpcConnectionException, it's only logged gateway.tell("foobar"); @@ -141,10 +141,10 @@ public class AkkaRpcActorTest extends TestLogger { Future<Boolean> result = gateway.barfoo(); try { - Await.result(result, timeout.duration()); + result.get(timeout.getSize(), timeout.getUnit()); fail("We expected a RpcConnectionException."); - } catch (RpcConnectionException rpcConnectionException) { - // we expect this exception here + } catch (ExecutionException executionException) { + assertTrue(executionException.getCause() instanceof RpcConnectionException); } } http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 5550cb5..3388011 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -115,4 +115,9 @@ public class AkkaRpcServiceTest extends TestLogger { assertEquals(expected, actual); assertTrue(latch.isTriggered()); } + + @Test + public void testGetAddress() { + assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress()); + } }