[runtime] Fix TaskManager's BLOB service host lookup when connecting to the 
JobManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b89855a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b89855a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b89855a

Branch: refs/heads/release-0.9.0-milestone-1
Commit: 4b89855aaab50ec785a0c5e0e19124f8f9ea9440
Parents: 09bd1f8
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 7 17:40:01 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Apr 7 18:13:01 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala      | 15 ++++++++++++---
 .../flink/runtime/taskmanager/TaskManagerTest.java   |  2 +-
 2 files changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7d60c00..d6b91ec 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -608,6 +608,16 @@ extends Actor with ActorLogMessages with ActorLogging {
                                       id: InstanceID,
                                       blobPort: Int): Unit = {
 
+    if (jobManager == null) {
+      throw new NullPointerException("jobManager may not be null")
+    }
+    if (id == null) {
+      throw new NullPointerException("instance ID may not be null")
+    }
+    if (blobPort <= 0 || blobPort > 65535) {
+      throw new IllegalArgumentException("blob port is out of range: " + 
blobPort)
+    }
+
     // sanity check that we are not currently registered with a different 
JobManager
     if (isConnected) {
       if (currentJobManager.get == jobManager) {
@@ -644,9 +654,8 @@ extends Actor with ActorLogMessages with ActorLogging {
 
     // start a blob service, if a blob server is specified
     if (blobPort > 0) {
-      val address = new InetSocketAddress(
-        currentJobManager.flatMap(_.path.address.host).getOrElse("localhost"),
-        blobPort)
+      val jmHost = jobManager.path.address.host.getOrElse("localhost")
+      val address = new InetSocketAddress(jmHost, blobPort)
 
       LOG.info("Determined BLOB server address to be {}. Starting BLOB 
cache.", address)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index e736a55..760b14e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -566,7 +566,7 @@ public class TaskManagerTest {
                        if (message instanceof 
RegistrationMessages.RegisterTaskManager) {
                                final InstanceID iid = new InstanceID();
                                final ActorRef self = getSelf();
-                               getSender().tell(new 
RegistrationMessages.AcknowledgeRegistration(self, iid, -1), self);
+                               getSender().tell(new 
RegistrationMessages.AcknowledgeRegistration(self, iid, 12345), self);
                        }
                        else if(message instanceof 
TaskMessages.UpdateTaskExecutionState){
                                getSender().tell(true, getSelf());

Reply via email to