[FLINK-4414] [cluster] Add getAddress method to RpcGateway The RpcGateway.getAddress method allows to retrieve the fully qualified address of the associated RpcEndpoint.
This closes #2392. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c9f8844 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c9f8844 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c9f8844 Branch: refs/heads/flip-6 Commit: 6c9f8844952b91f9164aee4ebc75481c8f2eafef Parents: a7e3579 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Aug 18 16:34:47 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Sep 8 17:26:58 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/rpc/RpcEndpoint.java | 6 +----- .../apache/flink/runtime/rpc/RpcGateway.java | 7 +++++++ .../apache/flink/runtime/rpc/RpcService.java | 11 ---------- .../runtime/rpc/akka/AkkaInvocationHandler.java | 14 +++++++++++-- .../flink/runtime/rpc/akka/AkkaRpcService.java | 21 ++++++-------------- .../runtime/rpc/akka/AkkaRpcActorTest.java | 16 +++++++++++++++ 6 files changed, 42 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index a28bc14..7b3f8a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -69,9 +69,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> { /** Self gateway which can be used to schedule asynchronous calls on yourself */ private final C self; - /** the fully qualified address of the this RPC endpoint */ - private final String selfAddress; - /** The main thread execution context to be used to execute future callbacks in the main thread * of the executing rpc server. */ private final ExecutionContext mainThreadExecutionContext; @@ -92,7 +89,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> { this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass()); this.self = rpcService.startServer(this); - this.selfAddress = rpcService.getAddress(self); this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self); } @@ -156,7 +152,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * @return Fully qualified address of the underlying RPC endpoint */ public String getAddress() { - return selfAddress; + return self.getAddress(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java index e3a16b4..81075ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java @@ -22,4 +22,11 @@ package org.apache.flink.runtime.rpc; * Rpc gateway interface which has to be implemented by Rpc gateways. */ public interface RpcGateway { + + /** + * Returns the fully qualified address under which the associated rpc endpoint is reachable. + * + * @return Fully qualified address under which the associated rpc endpoint is reachable + */ + String getAddress(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index fabdb05..bc0f5cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -65,17 +65,6 @@ public interface RpcService { void stopService(); /** - * Get the fully qualified address of the underlying rpc server represented by the self gateway. - * It must be possible to connect from a remote host to the rpc server via the returned fully - * qualified address. - * - * @param selfGateway Self gateway associated with the underlying rpc server - * @param <C> Type of the rpc gateway - * @return Fully qualified address - */ - <C extends RpcGateway> String getAddress(C selfGateway); - - /** * Gets the execution context, provided by this RPC service. This execution * context can be used for example for the {@code onComplete(...)} or {@code onSuccess(...)} * methods of Futures. http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 524bf74..bfa04f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -23,6 +23,7 @@ import akka.pattern.Patterns; import akka.util.Timeout; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.rpc.akka.messages.CallAsync; @@ -55,6 +56,8 @@ import static org.apache.flink.util.Preconditions.checkArgument; class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor, StartStoppable { private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class); + private final String address; + private final ActorRef rpcEndpoint; // whether the actor ref is local and thus no message serialization is needed @@ -65,7 +68,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea private final long maximumFramesize; - AkkaInvocationHandler(ActorRef rpcEndpoint, Timeout timeout, long maximumFramesize) { + AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Timeout timeout, long maximumFramesize) { + this.address = Preconditions.checkNotNull(address); this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint); this.isLocal = this.rpcEndpoint.path().address().hasLocalScope(); this.timeout = Preconditions.checkNotNull(timeout); @@ -79,7 +83,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea Object result; if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) || - declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class)) { + declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || + declaringClass.equals(RpcGateway.class)) { result = method.invoke(this, args); } else { String methodName = method.getName(); @@ -290,4 +295,9 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea return false; } + + @Override + public String getAddress() { + return address; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index d987c2f..00a6932 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -102,7 +102,9 @@ public class AkkaRpcService implements RpcService { public C apply(Object obj) { ActorRef actorRef = ((ActorIdentity) obj).getRef(); - InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout, maximumFramesize); + final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef); + + InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink @@ -135,7 +137,9 @@ public class AkkaRpcService implements RpcService { LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path()); - InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout, maximumFramesize); + final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef); + + InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink @@ -197,19 +201,6 @@ public class AkkaRpcService implements RpcService { } @Override - public String getAddress(RpcGateway selfGateway) { - checkState(!stopped, "RpcService is stopped"); - - if (selfGateway instanceof AkkaGateway) { - ActorRef actorRef = ((AkkaGateway) selfGateway).getRpcEndpoint(); - return AkkaUtils.getAkkaURL(actorSystem, actorRef); - } else { - String className = AkkaGateway.class.getName(); - throw new IllegalArgumentException("Cannot get address for non " + className + '.'); - } - } - - @Override public ExecutionContext getExecutionContext() { return actorSystem.dispatcher(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6c9f8844/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 1653fac..82d13f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -34,6 +34,7 @@ import scala.concurrent.Future; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; public class AkkaRpcActorTest extends TestLogger { @@ -57,6 +58,21 @@ public class AkkaRpcActorTest extends TestLogger { } /** + * Tests that the rpc endpoint and the associated rpc gateway have the same addresses. + * @throws Exception + */ + @Test + public void testAddressResolution() throws Exception { + DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService); + + Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class); + + DummyRpcGateway rpcGateway = Await.result(futureRpcGateway, timeout.duration()); + + assertEquals(rpcEndpoint.getAddress(), rpcGateway.getAddress()); + } + + /** * Tests that the {@link AkkaRpcActor} stashes messages until the corresponding * {@link RpcEndpoint} has been started. */