Repository: flink Updated Branches: refs/heads/master 264f6df8e -> 97f178894
[FLINK-6064][flip6] fix BlobServer connection in TaskExecutor The hostname used for the BlobServer was set to the akka address which is invalid for this use. Instead, this adds the hostname to the RpcGateway / AkkaInvocationHandler so that this information is available to the TaskExecutor. This closes #3551. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97f17889 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97f17889 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97f17889 Branch: refs/heads/master Commit: 97f1788941b3a1a0710f530ddd83bad713098d56 Parents: 264f6df Author: Nico Kruber <n...@data-artisans.com> Authored: Wed Mar 15 16:09:38 2017 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Mar 16 10:48:39 2017 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/rpc/RpcGateway.java | 9 +++++++- .../runtime/rpc/akka/AkkaInvocationHandler.java | 16 ++++++++++++++ .../flink/runtime/rpc/akka/AkkaRpcService.java | 23 ++++++++++++++++---- .../runtime/taskexecutor/TaskExecutor.java | 6 ++--- .../flink/runtime/rpc/TestingGatewayBase.java | 5 +++++ .../runtime/rpc/TestingSerialRpcService.java | 7 ++++++ .../taskexecutor/TaskExecutorITCase.java | 2 +- .../runtime/taskexecutor/TaskExecutorTest.java | 6 ++--- 8 files changed, 62 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java index 81075ee..aa1d102 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java @@ -26,7 +26,14 @@ public interface RpcGateway { /** * Returns the fully qualified address under which the associated rpc endpoint is reachable. * - * @return Fully qualified address under which the associated rpc endpoint is reachable + * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable */ String getAddress(); + + /** + * Returns the fully qualified hostname under which the associated rpc endpoint is reachable. + * + * @return Fully qualified hostname under which the associated rpc endpoint is reachable + */ + String getHostname(); } http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index e211b27..56505f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -57,8 +57,17 @@ import static org.apache.flink.util.Preconditions.checkArgument; class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable, SelfGateway { private static final Logger LOG = LoggerFactory.getLogger(AkkaInvocationHandler.class); + /** + * The Akka (RPC) address of {@link #rpcEndpoint} including host and port of the ActorSystem in + * which the actor is running. + */ private final String address; + /** + * Hostname of the host, {@link #rpcEndpoint} is running on. + */ + private final String hostname; + private final ActorRef rpcEndpoint; // whether the actor ref is local and thus no message serialization is needed @@ -74,12 +83,14 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea AkkaInvocationHandler( String address, + String hostname, ActorRef rpcEndpoint, Time timeout, long maximumFramesize, Future<Void> terminationFuture) { this.address = Preconditions.checkNotNull(address); + this.hostname = Preconditions.checkNotNull(hostname); this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint); this.isLocal = this.rpcEndpoint.path().address().hasLocalScope(); this.timeout = Preconditions.checkNotNull(timeout); @@ -314,6 +325,11 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea } @Override + public String getHostname() { + return hostname; + } + + @Override public Future<Void> getTerminationFuture() { return terminationFuture; } http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 4298021..f5ccdbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -29,7 +29,6 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.dispatch.Futures; import akka.dispatch.Mapper; - import akka.pattern.Patterns; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; @@ -39,16 +38,16 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadExecutable; -import org.apache.flink.runtime.rpc.SelfGateway; -import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.SelfGateway; import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import scala.Option; import scala.concurrent.duration.FiniteDuration; import javax.annotation.Nonnull; @@ -143,9 +142,17 @@ public class AkkaRpcService implements RpcService { ActorRef actorRef = actorIdentity.getRef(); final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef); + final String hostname; + Option<String> host = actorRef.path().address().host(); + if (host.isEmpty()) { + hostname = "localhost"; + } else { + hostname = host.get(); + } InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler( address, + hostname, actorRef, timeout, maximumFramesize, @@ -187,9 +194,17 @@ public class AkkaRpcService implements RpcService { LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path()); final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef); + final String hostname; + Option<String> host = actorRef.path().address().host(); + if (host.isEmpty()) { + hostname = "localhost"; + } else { + hostname = host.get(); + } InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler( address, + hostname, actorRef, timeout, maximumFramesize, http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 8db1d5b..df5765a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -788,17 +788,17 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, int blobPort) { Preconditions.checkNotNull(jobManagerLeaderId); Preconditions.checkNotNull(jobMasterGateway); - Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob port is out of range."); + Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob server port is out of range."); TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway); CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway); - InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort); + InetSocketAddress blobServerAddress = new InetSocketAddress(jobMasterGateway.getHostname(), blobPort); final LibraryCacheManager libraryCacheManager; try { - final BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration(), haServices); + final BlobCache blobCache = new BlobCache(blobServerAddress, taskManagerConfiguration.getConfiguration(), haServices); libraryCacheManager = new BlobLibraryCacheManager( blobCache, taskManagerConfiguration.getCleanupInterval()); http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java index caf5e81..03fe84b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java @@ -68,6 +68,11 @@ public abstract class TestingGatewayBase implements RpcGateway { return address; } + @Override + public String getHostname() { + return address; + } + // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 6280a46..25156e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -308,6 +308,13 @@ public class TestingSerialRpcService implements RpcService { return address; } + // this is not a real hostname but the address above is also not a real akka RPC address + // and we keep it that way until actually needed by a test case + @Override + public String getHostname() { + return address; + } + @Override public void start() { // do nothing http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 898584c..076d126 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -146,7 +146,7 @@ public class TaskExecutorITCase { when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class))) .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(taskManagerResourceId, 1234))); - when(jmGateway.getAddress()).thenReturn(jmAddress); + when(jmGateway.getHostname()).thenReturn(jmAddress); rpcService.registerGateway(rmAddress, resourceManager.getSelf()); http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 31bf9b8..d413a01 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -438,7 +438,7 @@ public class TaskExecutorTest extends TestLogger { eq(jobManagerLeaderId), any(Time.class) )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); - when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); + when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); rpc.registerGateway(jobManagerAddress, jobMasterGateway); @@ -551,7 +551,7 @@ public class TaskExecutorTest extends TestLogger { eq(jobManagerLeaderId), any(Time.class) )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); - when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); + when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress); when(jobMasterGateway.offerSlots( any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class))) @@ -754,7 +754,7 @@ public class TaskExecutorTest extends TestLogger { eq(jobManagerLeaderId), any(Time.class) )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); - when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); + when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);