Repository: flink
Updated Branches:
  refs/heads/master 264f6df8e -> 97f178894


[FLINK-6064][flip6] fix BlobServer connection in TaskExecutor

The hostname used for the BlobServer was set to the akka address which is
invalid for this use. Instead, this adds the hostname to the RpcGateway /
AkkaInvocationHandler so that this information is available to the TaskExecutor.

This closes #3551.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97f17889
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97f17889
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97f17889

Branch: refs/heads/master
Commit: 97f1788941b3a1a0710f530ddd83bad713098d56
Parents: 264f6df
Author: Nico Kruber <n...@data-artisans.com>
Authored: Wed Mar 15 16:09:38 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Mar 16 10:48:39 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcGateway.java    |  9 +++++++-
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 16 ++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 23 ++++++++++++++++----
 .../runtime/taskexecutor/TaskExecutor.java      |  6 ++---
 .../flink/runtime/rpc/TestingGatewayBase.java   |  5 +++++
 .../runtime/rpc/TestingSerialRpcService.java    |  7 ++++++
 .../taskexecutor/TaskExecutorITCase.java        |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  6 ++---
 8 files changed, 62 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
index 81075ee..aa1d102 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
@@ -26,7 +26,14 @@ public interface RpcGateway {
        /**
         * Returns the fully qualified address under which the associated rpc 
endpoint is reachable.
         *
-        * @return Fully qualified address under which the associated rpc 
endpoint is reachable
+        * @return Fully qualified (RPC) address under which the associated rpc 
endpoint is reachable
         */
        String getAddress();
+
+       /**
+        * Returns the fully qualified hostname under which the associated rpc 
endpoint is reachable.
+        *
+        * @return Fully qualified hostname under which the associated rpc 
endpoint is reachable
+        */
+       String getHostname();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index e211b27..56505f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -57,8 +57,17 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutable, StartStoppable, SelfGateway {
        private static final Logger LOG = 
LoggerFactory.getLogger(AkkaInvocationHandler.class);
 
+       /**
+        * The Akka (RPC) address of {@link #rpcEndpoint} including host and 
port of the ActorSystem in
+        * which the actor is running.
+        */
        private final String address;
 
+       /**
+        * Hostname of the host, {@link #rpcEndpoint} is running on.
+        */
+       private final String hostname;
+
        private final ActorRef rpcEndpoint;
 
        // whether the actor ref is local and thus no message serialization is 
needed
@@ -74,12 +83,14 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
 
        AkkaInvocationHandler(
                        String address,
+                       String hostname,
                        ActorRef rpcEndpoint,
                        Time timeout,
                        long maximumFramesize,
                        Future<Void> terminationFuture) {
 
                this.address = Preconditions.checkNotNull(address);
+               this.hostname = Preconditions.checkNotNull(hostname);
                this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
                this.isLocal = 
this.rpcEndpoint.path().address().hasLocalScope();
                this.timeout = Preconditions.checkNotNull(timeout);
@@ -314,6 +325,11 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
        }
 
        @Override
+       public String getHostname() {
+               return hostname;
+       }
+
+       @Override
        public Future<Void> getTerminationFuture() {
                return terminationFuture;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 4298021..f5ccdbb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -29,7 +29,6 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
-
 import akka.pattern.Patterns;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -39,16 +38,16 @@ import 
org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
-import org.apache.flink.runtime.rpc.SelfGateway;
-import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.SelfGateway;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.Nonnull;
@@ -143,9 +142,17 @@ public class AkkaRpcService implements RpcService {
                                        ActorRef actorRef = 
actorIdentity.getRef();
 
                                        final String address = 
AkkaUtils.getAkkaURL(actorSystem, actorRef);
+                                       final String hostname;
+                                       Option<String> host = 
actorRef.path().address().host();
+                                       if (host.isEmpty()) {
+                                               hostname = "localhost";
+                                       } else {
+                                               hostname = host.get();
+                                       }
 
                                        InvocationHandler akkaInvocationHandler 
= new AkkaInvocationHandler(
                                                address,
+                                               hostname,
                                                actorRef,
                                                timeout,
                                                maximumFramesize,
@@ -187,9 +194,17 @@ public class AkkaRpcService implements RpcService {
                LOG.info("Starting RPC endpoint for {} at {} .", 
rpcEndpoint.getClass().getName(), actorRef.path());
 
                final String address = AkkaUtils.getAkkaURL(actorSystem, 
actorRef);
+               final String hostname;
+               Option<String> host = actorRef.path().address().host();
+               if (host.isEmpty()) {
+                       hostname = "localhost";
+               } else {
+                       hostname = host.get();
+               }
 
                InvocationHandler akkaInvocationHandler = new 
AkkaInvocationHandler(
                        address,
+                       hostname,
                        actorRef,
                        timeout,
                        maximumFramesize,

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8db1d5b..df5765a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -788,17 +788,17 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        private JobManagerConnection associateWithJobManager(JobMasterGateway 
jobMasterGateway, UUID jobManagerLeaderId, int blobPort) {
                Preconditions.checkNotNull(jobManagerLeaderId);
                Preconditions.checkNotNull(jobMasterGateway);
-               Preconditions.checkArgument(blobPort > 0 || blobPort < 
MAX_BLOB_PORT, "Blob port is out of range.");
+               Preconditions.checkArgument(blobPort > 0 || blobPort < 
MAX_BLOB_PORT, "Blob server port is out of range.");
 
                TaskManagerActions taskManagerActions = new 
TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway);
 
                CheckpointResponder checkpointResponder = new 
RpcCheckpointResponder(jobMasterGateway);
 
-               InetSocketAddress address = new 
InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
+               InetSocketAddress blobServerAddress = new 
InetSocketAddress(jobMasterGateway.getHostname(), blobPort);
 
                final LibraryCacheManager libraryCacheManager;
                try {
-                       final BlobCache blobCache = new BlobCache(address, 
taskManagerConfiguration.getConfiguration(), haServices);
+                       final BlobCache blobCache = new 
BlobCache(blobServerAddress, taskManagerConfiguration.getConfiguration(), 
haServices);
                        libraryCacheManager = new BlobLibraryCacheManager(
                                blobCache,
                                taskManagerConfiguration.getCleanupInterval());

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index caf5e81..03fe84b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -68,6 +68,11 @@ public abstract class TestingGatewayBase implements 
RpcGateway {
                return address;
        }
 
+       @Override
+       public String getHostname() {
+               return address;
+       }
+
        // 
------------------------------------------------------------------------
        //  utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 6280a46..25156e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -308,6 +308,13 @@ public class TestingSerialRpcService implements RpcService 
{
                        return address;
                }
 
+               // this is not a real hostname but the address above is also 
not a real akka RPC address
+               // and we keep it that way until actually needed by a test case
+               @Override
+               public String getHostname() {
+                       return address;
+               }
+
                @Override
                public void start() {
                        // do nothing

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 898584c..076d126 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -146,7 +146,7 @@ public class TaskExecutorITCase {
 
                when(jmGateway.registerTaskManager(any(String.class), 
any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class)))
                        
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
-               when(jmGateway.getAddress()).thenReturn(jmAddress);
+               when(jmGateway.getHostname()).thenReturn(jmAddress);
 
 
                rpcService.registerGateway(rmAddress, 
resourceManager.getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 31bf9b8..d413a01 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -438,7 +438,7 @@ public class TaskExecutorTest extends TestLogger {
                                eq(jobManagerLeaderId),
                                any(Time.class)
                
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
-               
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+               
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
                rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -551,7 +551,7 @@ public class TaskExecutorTest extends TestLogger {
                                eq(jobManagerLeaderId),
                                any(Time.class)
                
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
-               
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+               
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
                when(jobMasterGateway.offerSlots(
                                any(ResourceID.class), any(Iterable.class), 
eq(jobManagerLeaderId), any(Time.class)))
@@ -754,7 +754,7 @@ public class TaskExecutorTest extends TestLogger {
                        eq(jobManagerLeaderId),
                        any(Time.class)
                
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
-               
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+               
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
 
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);

Reply via email to