[TaskManager] Add test for failure behavior on TaskManager startup

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ed009ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ed009ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ed009ec

Branch: refs/heads/master
Commit: 4ed009ec9921e998161f82bc78137429986e2ba3
Parents: 0ca1f0c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 18 14:29:11 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Apr 7 10:14:13 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala |  28 +-
 .../runtime/taskmanager/RegistrationTest.java   | 379 ----------------
 .../TaskManagerRegistrationTest.java            | 443 +++++++++++++++++++
 .../taskmanager/TestManagerStartupTest.java     | 166 +++++++
 4 files changed, 626 insertions(+), 390 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ed009ec/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index af13b74..7d60c00 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1368,7 +1368,7 @@ object TaskManager {
           val cause = t.getCause()
           if (cause != null && 
t.getCause().isInstanceOf[java.net.BindException]) {
             val address = taskManagerHostname + ":" + actorSystemPort
-            throw new Exception("Unable to bind TaskManager actor system to 
address " +
+            throw new IOException("Unable to bind TaskManager actor system to 
address " +
               address + " - " + cause.getMessage(), t)
           }
         }
@@ -1532,22 +1532,28 @@ object TaskManager {
     }
 
     // now start the memory manager
-    val memoryManager = new DefaultMemoryManager(memorySize,
-                                                 
taskManagerConfig.numberOfSlots,
-                                                 netConfig.networkBufferSize)
+    val memoryManager = try {
+      new DefaultMemoryManager(memorySize,
+                               taskManagerConfig.numberOfSlots,
+                               netConfig.networkBufferSize)
+    } catch {
+      case e: OutOfMemoryError => throw new Exception(
+        "OutOfMemory error (" + e.getMessage + ") while allocating the 
TaskManager memory (" +
+          memorySize + " bytes).", e)
+    }
 
     // start the I/O manager last, it will create some temp directories.
     val ioManager: IOManager = new 
IOManagerAsync(taskManagerConfig.tmpDirPaths)
 
     // create the actor properties (which define the actor constructor 
parameters)
     val tmProps = Props(taskManagerClass,
-                        taskManagerConfig,
-                        connectionInfo,
-                        jobManagerAkkaUrl,
-                        memoryManager,
-                        ioManager,
-                        network,
-                        taskManagerConfig.numberOfSlots)
+      taskManagerConfig,
+      connectionInfo,
+      jobManagerAkkaUrl,
+      memoryManager,
+      ioManager,
+      network,
+      taskManagerConfig.numberOfSlots)
 
     taskManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(tmProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/4ed009ec/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java
deleted file mode 100644
index 1b4f5f3..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import 
org.apache.flink.runtime.messages.RegistrationMessages.AcknowledgeRegistration;
-import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager;
-import 
org.apache.flink.runtime.messages.RegistrationMessages.RefuseRegistration;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import scala.Option;
-import scala.Some;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-/**
- * The tests in this class verify the behavior of the TaskManager
- * when connecting to the JobManager, and when the JobManager
- * is unreachable.
- */
-public class RegistrationTest {
-
-       private static final Option<String> NONE_STRING = Option.empty();
-
-       // use one actor system throughout all tests
-       private static ActorSystem actorSystem;
-
-       @BeforeClass
-       public static void startActorSystem() {
-               Configuration config = new Configuration();
-               config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
-               config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"200 ms");
-               config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 
s");
-               config.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);
-
-               actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-       }
-
-       @AfterClass
-       public static void shutdownActorSystem() {
-               if (actorSystem != null) {
-                       actorSystem.shutdown();
-               }
-       }
-
-       /**
-        * A test that verifies that two TaskManagers correctly register at the
-        * JobManager.
-        */
-       @Test
-       public void testSimpleRegistration() {
-               new JavaTestKit(actorSystem) {{
-                       try {
-                               // a simple JobManager
-                               ActorRef jobManager = startJobManager();
-
-                               // start two TaskManagers. it will 
automatically try to register
-                               final ActorRef taskManager1 = 
startTaskManager(jobManager);
-                               final ActorRef taskManager2 = 
startTaskManager(jobManager);
-
-                               // check that the TaskManagers are registered
-                               Future<Object> responseFuture1 = Patterns.ask(
-                                               taskManager1,
-                                               
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
-                                               5000);
-
-                               Future<Object> responseFuture2 = Patterns.ask(
-                                               taskManager2,
-                                               
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
-                                               5000);
-
-                               Object response1 = 
Await.result(responseFuture1, new FiniteDuration(5, TimeUnit.SECONDS));
-                               Object response2 = 
Await.result(responseFuture2, new FiniteDuration(5, TimeUnit.SECONDS));
-
-                               // this is a hack to work around the way Java 
can interact with scala case objects
-                               Class<?> confirmClass = 
TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
-                               assertTrue(response1 != null && 
confirmClass.isAssignableFrom(response1.getClass()));
-                               assertTrue(response2 != null && 
confirmClass.isAssignableFrom(response2.getClass()));
-
-                               // check that the JobManager has 2 TaskManagers 
registered
-                               Future<Object> numTaskManagersFuture = 
Patterns.ask(
-                                               jobManager,
-                                               
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-                                               1000);
-
-                               Integer count = (Integer) 
Await.result(numTaskManagersFuture, new FiniteDuration(1, TimeUnit.SECONDS));
-                               assertEquals(2, count.intValue());
-
-                               stopActor(taskManager1);
-                               stopActor(taskManager2);
-                               stopActor(jobManager);
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-               }};
-       }
-
-       /**
-        * A test that verifies that two TaskManagers correctly register at the
-        * JobManager.
-        */
-       @Test
-       public void testDelayedRegistration() {
-               new JavaTestKit(actorSystem) {{
-                       try {
-                               // start a TaskManager that tries to register 
at the JobManager before the JobManager is
-                               // available. we give it the regular JobManager 
akka URL
-                               final ActorRef taskManager = 
startTaskManager(JobManager.getLocalJobManagerAkkaURL(),
-                                                                               
                                                new Configuration());
-                               // let it try for a bit
-                               Thread.sleep(6000);
-
-                               // now start the JobManager, with the regular 
akka URL
-                               final ActorRef jobManager = 
JobManager.startJobManagerActors(new Configuration(), actorSystem)._1();
-
-                               // check that the TaskManagers are registered
-                               Future<Object> responseFuture = Patterns.ask(
-                                               taskManager,
-                                               
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
-                                               30000);
-
-                               Object response = Await.result(responseFuture, 
new FiniteDuration(30, TimeUnit.SECONDS));
-
-                               // this is a hack to work around the way Java 
can interact with scala case objects
-                               Class<?> confirmClass = 
TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
-                               assertTrue(response != null && 
confirmClass.isAssignableFrom(response.getClass()));
-
-                               stopActor(taskManager);
-                               stopActor(jobManager);
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-               }};
-       }
-
-       /**
-        * Tests that the TaskManager shuts down when it cannot register at the
-        * JobManager within the given maximum duration.
-        *
-        * Unfortunately, this test does not give good error messages.
-        * (I have not figured out how to get any better message out of the
-        * Akka TestKit than "ask timeout exception".)
-        *
-        * Anyways: An "ask timeout exception" here means that the TaskManager
-        * did not shut down after its registration timeout expired.
-        */
-       @Test
-       public void testShutdownAfterRegistrationDurationExpired() {
-               new JavaTestKit(actorSystem) {{
-                       try {
-                               // registration timeout of 1 second
-                               Configuration tmConfig = new Configuration();
-                               
tmConfig.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "500 
ms");
-
-                               // start the taskManager actor
-                               final ActorRef taskManager = 
startTaskManager(JobManager.getLocalJobManagerAkkaURL(), tmConfig);
-
-                               // make sure it terminates in time, since it 
cannot register at a JobManager
-                               watch(taskManager);
-                               new Within(new FiniteDuration(10, 
TimeUnit.SECONDS)) {
-
-                                       @Override
-                                       protected void run() {
-                                               expectTerminated(taskManager);
-                                       }
-                               };
-
-                               stopActor(taskManager);
-                       }
-                       catch (Throwable e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-               }};
-       }
-
-       /**
-        * Make sure that the TaskManager keeps trying to register, even after
-        * registration attempts have been refused.
-        */
-       @Test
-       public void testTaskManagerResumesConnectAfterRefusedRegistration() {
-               new JavaTestKit(actorSystem) {{
-                       try {
-                               // we make the test actor (the test kit) the 
JobManager to intercept
-                               // the messages
-                               final ActorRef taskManager = 
startTaskManager(getTestActor());
-
-                               // check and decline initial registration
-                               new Within(new FiniteDuration(2, 
TimeUnit.SECONDS)) {
-
-                                       @Override
-                                       protected void run() {
-                                               // the TaskManager should try 
to register
-                                               
expectMsgClass(RegisterTaskManager.class);
-
-                                               // we decline the registration
-                                               getLastSender().tell(new 
RefuseRegistration("test reason"), getTestActor());
-                                       }
-                               };
-
-                               // the TaskManager should wait a bit an retry...
-                               FiniteDuration maxDelay = (FiniteDuration) 
TaskManager.DELAY_AFTER_REFUSED_REGISTRATION().$times(2.0);
-                               new Within(maxDelay) {
-
-                                       @Override
-                                       protected void run() {
-                                               
expectMsgClass(RegisterTaskManager.class);
-                                       }
-                               };
-
-                               stopActor(taskManager);
-                       }
-                       catch (Throwable e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-               }};
-       }
-
-       /**
-        * Validate that the TaskManager attempts to re-connect after it lost 
the connection
-        * to the JobManager.
-        */
-       @Test
-       public void testTaskManagerResumesConnectAfterJobManagerFailure() {
-               new JavaTestKit(actorSystem) {{
-                       try {
-                               final Props fakeJmProps = 
Props.create(ForwardingActor.class, getTestActor());
-                               final String jobManagerName = 
"FAKE_JOB_MANAGER";
-
-                               final ActorRef fakeJobManager1 = 
actorSystem.actorOf(fakeJmProps, jobManagerName);
-
-
-                               // we make the test actor (the test kit) the 
JobManager to intercept
-                               // the messages
-                               final ActorRef taskManager = 
startTaskManager(fakeJobManager1);
-
-                               // validate initial registration
-                               new Within(new FiniteDuration(2, 
TimeUnit.SECONDS)) {
-
-                                       @Override
-                                       protected void run() {
-                                               // the TaskManager should try 
to register
-                                               
expectMsgClass(RegisterTaskManager.class);
-
-                                               // we accept the registration
-                                               taskManager.tell(new 
AcknowledgeRegistration(fakeJobManager1, new InstanceID(), 45234),
-                                                                               
fakeJobManager1);
-                                       }
-                               };
-
-                               // kill the first forwarding JobManager
-                               watch(fakeJobManager1);
-                               stopActor(fakeJobManager1);
-
-                               // wait for the killing to be completed
-                               new Within(new FiniteDuration(2, 
TimeUnit.SECONDS)) {
-
-                                       @Override
-                                       protected void run() {
-                                               
expectTerminated(fakeJobManager1);
-                                       }
-                               };
-
-                               // now start the second fake JobManager and 
expect that
-                               // the TaskManager registers again
-                               // the second fake JM needs to have the same 
actor URL
-                               final ActorRef fakeJobManager2 = 
actorSystem.actorOf(fakeJmProps, jobManagerName);
-
-                               // expect the next registration
-                               new Within(new FiniteDuration(10, 
TimeUnit.SECONDS)) {
-
-                                       @Override
-                                       protected void run() {
-                                               
expectMsgClass(RegisterTaskManager.class);
-
-                                               // we accept the registration
-                                               taskManager.tell(new 
AcknowledgeRegistration(fakeJobManager2, new InstanceID(), 45234),
-                                                                               
fakeJobManager2);
-                                       }
-                               };
-
-                               stopActor(taskManager);
-                               stopActor(fakeJobManager2);
-                       }
-                       catch (Throwable e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-               }};
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Utility Functions
-       // 
--------------------------------------------------------------------------------------------
-
-       private static ActorRef startJobManager() throws Exception {
-               // start the actors. don't give names, so they get generated 
names and we
-               // avoid conflicts with the actor names
-               return JobManager.startJobManagerActors(new Configuration(), 
actorSystem, NONE_STRING, NONE_STRING)._1();
-       }
-
-       private static ActorRef startTaskManager(ActorRef jobManager) throws 
Exception {
-               return startTaskManager(jobManager.path().toString(), new 
Configuration());
-       }
-
-       private static ActorRef startTaskManager(String jobManagerUrl, 
Configuration config) throws Exception {
-               config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
1);
-
-               return TaskManager.startTaskManagerComponentsAndActor(
-                               config, actorSystem, "localhost",
-                               NONE_STRING, // no actor name -> random
-                               new Some<String>(jobManagerUrl), // job manager 
path
-                               true, // local network stack only
-                               TaskManager.class);
-       }
-
-       private static void stopActor(ActorRef actor) {
-               actor.tell(Kill.getInstance(), ActorRef.noSender());
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Utility Actor that only forwards messages
-       // 
--------------------------------------------------------------------------------------------
-
-       public static class ForwardingActor extends UntypedActor {
-
-               private final ActorRef target;
-
-               public ForwardingActor(ActorRef target) {
-                       this.target = target;
-               }
-
-               @Override
-               public void onReceive(Object message) throws Exception {
-                       target.forward(message, context());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ed009ec/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
new file mode 100644
index 0000000..7421837
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.RegistrationMessages.AcknowledgeRegistration;
+import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager;
+import 
org.apache.flink.runtime.messages.RegistrationMessages.RefuseRegistration;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * The tests in this class verify the behavior of the TaskManager
+ * when connecting to the JobManager, and when the JobManager
+ * is unreachable.
+ */
+public class TaskManagerRegistrationTest {
+
+       private static final Option<String> NONE_STRING = Option.empty();
+
+       // use one actor system throughout all tests
+       private static ActorSystem actorSystem;
+
+       @BeforeClass
+       public static void startActorSystem() {
+               Configuration config = new Configuration();
+               config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
+               config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"200 ms");
+               config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 
s");
+               config.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);
+
+               actorSystem = AkkaUtils.createLocalActorSystem(config);
+       }
+
+       @AfterClass
+       public static void shutdownActorSystem() {
+               if (actorSystem != null) {
+                       actorSystem.shutdown();
+               }
+       }
+
+       /**
+        * A test that verifies that two TaskManagers correctly register at the
+        * JobManager.
+        */
+       @Test
+       public void testSimpleRegistration() {
+               new JavaTestKit(actorSystem) {{
+                       try {
+                               // a simple JobManager
+                               ActorRef jobManager = startJobManager();
+
+                               // start two TaskManagers. it will 
automatically try to register
+                               final ActorRef taskManager1 = 
startTaskManager(jobManager);
+                               final ActorRef taskManager2 = 
startTaskManager(jobManager);
+
+                               // check that the TaskManagers are registered
+                               Future<Object> responseFuture1 = Patterns.ask(
+                                               taskManager1,
+                                               
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+                                               5000);
+
+                               Future<Object> responseFuture2 = Patterns.ask(
+                                               taskManager2,
+                                               
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+                                               5000);
+
+                               Object response1 = 
Await.result(responseFuture1, new FiniteDuration(5, TimeUnit.SECONDS));
+                               Object response2 = 
Await.result(responseFuture2, new FiniteDuration(5, TimeUnit.SECONDS));
+
+                               // this is a hack to work around the way Java 
can interact with scala case objects
+                               Class<?> confirmClass = 
TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
+                               assertTrue(response1 != null && 
confirmClass.isAssignableFrom(response1.getClass()));
+                               assertTrue(response2 != null && 
confirmClass.isAssignableFrom(response2.getClass()));
+
+                               // check that the JobManager has 2 TaskManagers 
registered
+                               Future<Object> numTaskManagersFuture = 
Patterns.ask(
+                                               jobManager,
+                                               
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+                                               1000);
+
+                               Integer count = (Integer) 
Await.result(numTaskManagersFuture, new FiniteDuration(1, TimeUnit.SECONDS));
+                               assertEquals(2, count.intValue());
+
+                               stopActor(taskManager1);
+                               stopActor(taskManager2);
+                               stopActor(jobManager);
+                       }
+                       catch (Exception e) {
+                               e.printStackTrace();
+                               fail(e.getMessage());
+                       }
+               }};
+       }
+
+       /**
+        * A test that verifies that two TaskManagers correctly register at the
+        * JobManager.
+        */
+       @Test
+       public void testDelayedRegistration() {
+               new JavaTestKit(actorSystem) {{
+                       try {
+                               // start a TaskManager that tries to register 
at the JobManager before the JobManager is
+                               // available. we give it the regular JobManager 
akka URL
+                               final ActorRef taskManager = 
startTaskManager(JobManager.getLocalJobManagerAkkaURL(),
+                                                                               
                                                new Configuration());
+                               // let it try for a bit
+                               Thread.sleep(6000);
+
+                               // now start the JobManager, with the regular 
akka URL
+                               final ActorRef jobManager = 
JobManager.startJobManagerActors(new Configuration(), actorSystem)._1();
+
+                               // check that the TaskManagers are registered
+                               Future<Object> responseFuture = Patterns.ask(
+                                               taskManager,
+                                               
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+                                               30000);
+
+                               Object response = Await.result(responseFuture, 
new FiniteDuration(30, TimeUnit.SECONDS));
+
+                               // this is a hack to work around the way Java 
can interact with scala case objects
+                               Class<?> confirmClass = 
TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
+                               assertTrue(response != null && 
confirmClass.isAssignableFrom(response.getClass()));
+
+                               stopActor(taskManager);
+                               stopActor(jobManager);
+                       }
+                       catch (Exception e) {
+                               e.printStackTrace();
+                               fail(e.getMessage());
+                       }
+               }};
+       }
+
+       /**
+        * Tests that the TaskManager shuts down when it cannot register at the
+        * JobManager within the given maximum duration.
+        *
+        * Unfortunately, this test does not give good error messages.
+        * (I have not figured out how to get any better message out of the
+        * Akka TestKit than "ask timeout exception".)
+        *
+        * Anyways: An "ask timeout exception" here means that the TaskManager
+        * did not shut down after its registration timeout expired.
+        */
+       @Test
+       public void testShutdownAfterRegistrationDurationExpired() {
+               new JavaTestKit(actorSystem) {{
+                       try {
+                               // registration timeout of 1 second
+                               Configuration tmConfig = new Configuration();
+                               
tmConfig.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "500 
ms");
+
+                               // start the taskManager actor
+                               final ActorRef taskManager = 
startTaskManager(JobManager.getLocalJobManagerAkkaURL(), tmConfig);
+
+                               // make sure it terminates in time, since it 
cannot register at a JobManager
+                               watch(taskManager);
+                               new Within(new FiniteDuration(10, 
TimeUnit.SECONDS)) {
+
+                                       @Override
+                                       protected void run() {
+                                               expectTerminated(taskManager);
+                                       }
+                               };
+
+                               stopActor(taskManager);
+                       }
+                       catch (Throwable e) {
+                               e.printStackTrace();
+                               fail(e.getMessage());
+                       }
+               }};
+       }
+
+       /**
+        * Make sure that the TaskManager keeps trying to register, even after
+        * registration attempts have been refused.
+        */
+       @Test
+       public void testTaskManagerResumesConnectAfterRefusedRegistration() {
+               new JavaTestKit(actorSystem) {{
+                       try {
+                               // we make the test actor (the test kit) the 
JobManager to intercept
+                               // the messages
+                               final ActorRef taskManager = 
startTaskManager(getTestActor());
+
+                               // check and decline initial registration
+                               new Within(new FiniteDuration(2, 
TimeUnit.SECONDS)) {
+
+                                       @Override
+                                       protected void run() {
+                                               // the TaskManager should try 
to register
+                                               
expectMsgClass(RegisterTaskManager.class);
+
+                                               // we decline the registration
+                                               getLastSender().tell(new 
RefuseRegistration("test reason"), getTestActor());
+                                       }
+                               };
+
+                               // the TaskManager should wait a bit an retry...
+                               FiniteDuration maxDelay = (FiniteDuration) 
TaskManager.DELAY_AFTER_REFUSED_REGISTRATION().$times(2.0);
+                               new Within(maxDelay) {
+
+                                       @Override
+                                       protected void run() {
+                                               
expectMsgClass(RegisterTaskManager.class);
+                                       }
+                               };
+
+                               stopActor(taskManager);
+                       }
+                       catch (Throwable e) {
+                               e.printStackTrace();
+                               fail(e.getMessage());
+                       }
+               }};
+       }
+
+       /**
+        * Validate that the TaskManager attempts to re-connect after it lost 
the connection
+        * to the JobManager.
+        */
+       @Test
+       public void testTaskManagerResumesConnectAfterJobManagerFailure() {
+               new JavaTestKit(actorSystem) {{
+                       try {
+                               final Props fakeJmProps = 
Props.create(ForwardingActor.class, getTestActor());
+                               final String jobManagerName = 
"FAKE_JOB_MANAGER";
+
+                               final ActorRef fakeJobManager1 = 
actorSystem.actorOf(fakeJmProps, jobManagerName);
+
+
+                               // we make the test actor (the test kit) the 
JobManager to intercept
+                               // the messages
+                               final ActorRef taskManager = 
startTaskManager(fakeJobManager1);
+
+                               // validate initial registration
+                               new Within(new FiniteDuration(2, 
TimeUnit.SECONDS)) {
+
+                                       @Override
+                                       protected void run() {
+                                               // the TaskManager should try 
to register
+                                               
expectMsgClass(RegisterTaskManager.class);
+
+                                               // we accept the registration
+                                               taskManager.tell(new 
AcknowledgeRegistration(fakeJobManager1, new InstanceID(), 45234),
+                                                                               
fakeJobManager1);
+                                       }
+                               };
+
+                               // kill the first forwarding JobManager
+                               watch(fakeJobManager1);
+                               stopActor(fakeJobManager1);
+
+                               // wait for the killing to be completed
+                               new Within(new FiniteDuration(2, 
TimeUnit.SECONDS)) {
+
+                                       @Override
+                                       protected void run() {
+                                               
expectTerminated(fakeJobManager1);
+                                       }
+                               };
+
+                               // now start the second fake JobManager and 
expect that
+                               // the TaskManager registers again
+                               // the second fake JM needs to have the same 
actor URL
+                               final ActorRef fakeJobManager2 = 
actorSystem.actorOf(fakeJmProps, jobManagerName);
+
+                               // expect the next registration
+                               new Within(new FiniteDuration(10, 
TimeUnit.SECONDS)) {
+
+                                       @Override
+                                       protected void run() {
+                                               
expectMsgClass(RegisterTaskManager.class);
+
+                                               // we accept the registration
+                                               taskManager.tell(new 
AcknowledgeRegistration(fakeJobManager2, new InstanceID(), 45234),
+                                                                               
fakeJobManager2);
+                                       }
+                               };
+
+                               stopActor(taskManager);
+                               stopActor(fakeJobManager2);
+                       }
+                       catch (Throwable e) {
+                               e.printStackTrace();
+                               fail(e.getMessage());
+                       }
+               }};
+       }
+
+
+       @Test
+       public void testStartupWhenNetworkStackFailsToInitialize() {
+
+               ServerSocket blocker = null;
+               try {
+                       blocker = new ServerSocket(0, 50, 
InetAddress.getByName("localhost"));
+
+                       final Configuration cfg = new Configuration();
+                       
cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 
blocker.getLocalPort());
+                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+
+                       new JavaTestKit(actorSystem) {{
+                               try {
+                                       // a simple JobManager
+                                       final ActorRef jobManager = 
startJobManager();
+
+                                       // start a task manager with a 
configuration that provides a blocked port
+                                       final ActorRef taskManager = 
TaskManager.startTaskManagerComponentsAndActor(
+                                                       cfg, actorSystem, 
"localhost",
+                                                       NONE_STRING, // no 
actor name -> random
+                                                       new 
Some<String>(jobManager.path().toString()), // job manager path
+                                                       false, // init network 
stack !!!
+                                                       TaskManager.class);
+
+                                       watch(taskManager);
+
+                                       expectTerminated(new FiniteDuration(20, 
TimeUnit.SECONDS), taskManager);
+
+                                       stopActor(taskManager);
+                                       stopActor(jobManager);
+                               }
+                               catch (Exception e) {
+                                       e.printStackTrace();
+                                       fail(e.getMessage());
+                               }
+                       }};
+               }
+               catch (Exception e) {
+                       // does not work, skip test
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (blocker != null) {
+                               try {
+                                       blocker.close();
+                               }
+                               catch (IOException e) {
+                                       // ignore, best effort
+                               }
+                       }
+               }
+       }
+
+       @Test
+       public void testStartupWhenBlobDirectoriesAreNotWritable() {
+
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Utility Functions
+       // 
--------------------------------------------------------------------------------------------
+
+       private static ActorRef startJobManager() throws Exception {
+               // start the actors. don't give names, so they get generated 
names and we
+               // avoid conflicts with the actor names
+               return JobManager.startJobManagerActors(new Configuration(), 
actorSystem, NONE_STRING, NONE_STRING)._1();
+       }
+
+       private static ActorRef startTaskManager(ActorRef jobManager) throws 
Exception {
+               return startTaskManager(jobManager.path().toString(), new 
Configuration());
+       }
+
+       private static ActorRef startTaskManager(String jobManagerUrl, 
Configuration config) throws Exception {
+               config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
1);
+
+               return TaskManager.startTaskManagerComponentsAndActor(
+                               config, actorSystem, "localhost",
+                               NONE_STRING, // no actor name -> random
+                               new Some<String>(jobManagerUrl), // job manager 
path
+                               true, // local network stack only
+                               TaskManager.class);
+       }
+
+       private static void stopActor(ActorRef actor) {
+               actor.tell(Kill.getInstance(), ActorRef.noSender());
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Utility Actor that only forwards messages
+       // 
--------------------------------------------------------------------------------------------
+
+       public static class ForwardingActor extends UntypedActor {
+
+               private final ActorRef target;
+
+               public ForwardingActor(ActorRef target) {
+                       this.target = target;
+               }
+
+               @Override
+               public void onReceive(Object message) throws Exception {
+                       target.forward(message, context());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ed009ec/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
new file mode 100644
index 0000000..2b945e1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.*;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.UUID;
+
+/**
+ * Tests that check how the TaskManager behaves when encountering startup 
problems.
+ */
+public class TestManagerStartupTest {
+
+       /**
+        * Tests that the TaskManager fails synchronously when the actor system 
port
+        * is in use.
+        */
+       @Test
+       public void testStartupWhenTaskmanagerActorPortIsUsed() {
+               ServerSocket blocker = null;
+               try {
+                       final String localHostName = "localhost";
+                       final InetAddress localAddress = 
InetAddress.getByName(localHostName);
+
+                       // block some port
+                       blocker = new ServerSocket(0, 50, localAddress);
+                       final int port = blocker.getLocalPort();
+
+                       try {
+                               TaskManager.runTaskManager(localHostName, port, 
new Configuration(), TaskManager.class);
+                               fail("This should fail with an IOException");
+                       }
+                       catch (IOException e) {
+                               // expected. validate the error message
+                               assertNotNull(e.getMessage());
+                               assertTrue(e.getMessage().contains("Address 
already in use"));
+                       }
+
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (blocker != null) {
+                               try {
+                                       blocker.close();
+                               }
+                               catch (IOException e) {
+                                       // no need to log here
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Tests that the TaskManager startup fails synchronously when the I/O 
directories are
+        * not writable.
+        */
+       @Test
+       public void testIODirectoryNotWritable() {
+               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
+               File nonWritable = new File(tempDir, 
UUID.randomUUID().toString());
+
+               if (!nonWritable.mkdirs() || !nonWritable.setWritable(false, 
false)) {
+                       System.err.println("Cannot create non-writable 
temporary file directory. Skipping test.");
+                       return;
+               }
+
+               try {
+                       Configuration cfg = new Configuration();
+                       cfg.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, 
nonWritable.getAbsolutePath());
+                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+                       
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+                       
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
+
+                       try {
+                               TaskManager.runTaskManager("localhost", 0, cfg);
+                               fail("Should fail synchronously with an 
exception");
+                       }
+                       catch (IOException e) {
+                               // splendid!
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       //noinspection ResultOfMethodCallIgnored
+                       nonWritable.setWritable(true, false);
+                       try {
+                               FileUtils.deleteDirectory(nonWritable);
+                       }
+                       catch (IOException e) {
+                               // best effort
+                       }
+               }
+       }
+
+       /**
+        * Tests that the TaskManager startup fails synchronously when the I/O 
directories are
+        * not writable.
+        */
+       @Test
+       public void testMemoryConfigWrong() {
+               try {
+                       Configuration cfg = new Configuration();
+                       
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+                       
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
+
+                       // something invalid
+                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42);
+                       try {
+                               TaskManager.runTaskManager("localhost", 0, cfg);
+                               fail("Should fail synchronously with an 
exception");
+                       }
+                       catch (IllegalConfigurationException e) {
+                               // splendid!
+                       }
+
+                       // something ridiculously high
+                       final long memSize = (((long) Integer.MAX_VALUE - 1) *
+                                                                       
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) >> 20;
+                       
cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize);
+                       try {
+                               TaskManager.runTaskManager("localhost", 0, cfg);
+                               fail("Should fail synchronously with an 
exception");
+                       }
+                       catch (Exception e) {
+                               // splendid!
+                               assertTrue(e.getCause() instanceof 
OutOfMemoryError);
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

Reply via email to