[runtime] Fix TaskManager's BLOB service host lookup when connecting to the JobManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b89855a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b89855a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b89855a Branch: refs/heads/release-0.9.0-milestone-1 Commit: 4b89855aaab50ec785a0c5e0e19124f8f9ea9440 Parents: 09bd1f8 Author: Stephan Ewen <se...@apache.org> Authored: Tue Apr 7 17:40:01 2015 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Apr 7 18:13:01 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/taskmanager/TaskManager.scala | 15 ++++++++++++--- .../flink/runtime/taskmanager/TaskManagerTest.java | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 7d60c00..d6b91ec 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -608,6 +608,16 @@ extends Actor with ActorLogMessages with ActorLogging { id: InstanceID, blobPort: Int): Unit = { + if (jobManager == null) { + throw new NullPointerException("jobManager may not be null") + } + if (id == null) { + throw new NullPointerException("instance ID may not be null") + } + if (blobPort <= 0 || blobPort > 65535) { + throw new IllegalArgumentException("blob port is out of range: " + blobPort) + } + // sanity check that we are not currently registered with a different JobManager if (isConnected) { if (currentJobManager.get == jobManager) { @@ -644,9 +654,8 @@ extends Actor with ActorLogMessages with ActorLogging { // start a blob service, if a blob server is specified if (blobPort > 0) { - val address = new InetSocketAddress( - currentJobManager.flatMap(_.path.address.host).getOrElse("localhost"), - blobPort) + val jmHost = jobManager.path.address.host.getOrElse("localhost") + val address = new InetSocketAddress(jmHost, blobPort) LOG.info("Determined BLOB server address to be {}. Starting BLOB cache.", address) http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index e736a55..760b14e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -566,7 +566,7 @@ public class TaskManagerTest { if (message instanceof RegistrationMessages.RegisterTaskManager) { final InstanceID iid = new InstanceID(); final ActorRef self = getSelf(); - getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, -1), self); + getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, 12345), self); } else if(message instanceof TaskMessages.UpdateTaskExecutionState){ getSender().tell(true, getSelf());