Repository: flink
Updated Branches:
  refs/heads/master 0ca1f0c7b -> 5cd9e9d94


[runtime] Improve robustness of test TaskManagerRegistrationTest.

Also rename jobmanager.TaskManagerRegistrationTest to avoid name conflicts.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cd9e9d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cd9e9d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cd9e9d9

Branch: refs/heads/master
Commit: 5cd9e9d94bb3af4a5a868789f663af262672cac8
Parents: 4ed009e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 6 20:00:21 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Apr 7 10:14:13 2015 +0200

----------------------------------------------------------------------
 .../TaskManagerRegistrationTest.java            |  20 ++-
 .../jobmanager/JobManagerRegistrationTest.scala | 139 +++++++++++++++++++
 .../TaskManagerRegistrationTest.scala           | 139 -------------------
 3 files changed, 156 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5cd9e9d9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 7421837..69964ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.InvalidActorNameException;
 import akka.actor.Kill;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
@@ -308,9 +309,22 @@ public class TaskManagerRegistrationTest {
                                // now start the second fake JobManager and 
expect that
                                // the TaskManager registers again
                                // the second fake JM needs to have the same 
actor URL
-                               final ActorRef fakeJobManager2 = 
actorSystem.actorOf(fakeJmProps, jobManagerName);
+                               ActorRef fakeJobManager2 = null;
+
+                               // since we cannot reliably wait until the 
actor is unregistered (name is
+                               // available again) we loop with multiple tries 
for 20 seconds
+                               long deadline = 20000000000L + 
System.nanoTime();
+                               do {
+                                       try {
+                                               fakeJobManager2 = 
actorSystem.actorOf(fakeJmProps, jobManagerName);
+                                       } catch (InvalidActorNameException e) {
+                                               // wait and retry
+                                               Thread.sleep(100);
+                                       }
+                               } while (fakeJobManager2 == null && 
System.nanoTime() < deadline);
 
                                // expect the next registration
+                               final ActorRef jm2Closure = fakeJobManager2;
                                new Within(new FiniteDuration(10, 
TimeUnit.SECONDS)) {
 
                                        @Override
@@ -318,8 +332,8 @@ public class TaskManagerRegistrationTest {
                                                
expectMsgClass(RegisterTaskManager.class);
 
                                                // we accept the registration
-                                               taskManager.tell(new 
AcknowledgeRegistration(fakeJobManager2, new InstanceID(), 45234),
-                                                                               
fakeJobManager2);
+                                               taskManager.tell(new 
AcknowledgeRegistration(jm2Closure, new InstanceID(), 45234),
+                                                               jm2Closure);
                                        }
                                };
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5cd9e9d9/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
new file mode 100644
index 0000000..5fde5ea
--- /dev/null
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import java.net.InetAddress
+
+import akka.actor._
+import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.instance.{HardwareDescription, 
InstanceConnectionInfo, InstanceID}
+import 
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
 AlreadyRegistered, RegisterTaskManager}
+import org.junit.Assert.{assertNotEquals, assertNotNull}
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/**
+ * Tests for the JobManager's behavior when a TaskManager solicits 
registration.
+ * It also tests the JobManager's response to heartbeats from TaskManagers it 
does
+ * not know.
+ */
+class JobManagerRegistrationTest(_system: ActorSystem) extends 
TestKit(_system) with
+ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
+
+  def this() = this(AkkaUtils.createLocalActorSystem(new Configuration()))
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The JobManager" should {
+
+    "assign a TaskManager a unique instance ID" in {
+      val jm = startTestingJobManager(_system)
+
+      val tmDummy1 = 
_system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
+      val tmDummy2 = 
_system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
+
+      try {
+        val connectionInfo1 = new 
InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
+        val connectionInfo2 = new 
InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
+
+        val hardwareDescription = HardwareDescription.extractFromSystem(10)
+
+        var id1: InstanceID = null
+        var id2: InstanceID = null
+
+        // task manager 1
+        within(1 second) {
+          jm ! RegisterTaskManager(tmDummy1, connectionInfo1, 
hardwareDescription, 1)
+
+          val response = receiveOne(1 second)
+          response match {
+            case AcknowledgeRegistration(_, id, _) => id1 = id
+            case _ => fail("Wrong response message: " + response)
+          }
+        }
+
+        // task manager 2
+        within(1 second) {
+          jm ! RegisterTaskManager(tmDummy2, connectionInfo2, 
hardwareDescription, 1)
+
+          val response = receiveOne(1 second)
+          response match {
+            case AcknowledgeRegistration(_, id, _) => id2 = id
+            case _ => fail("Wrong response message: " + response)
+          }
+        }
+
+        assertNotNull(id1)
+        assertNotNull(id2)
+        assertNotEquals(id1, id2)
+      }
+      finally {
+        tmDummy1 ! Kill
+        tmDummy2 ! Kill
+        jm ! Kill
+      }
+    }
+
+    "handle repeated registration calls" in {
+
+      val jm = startTestingJobManager(_system)
+      val tmDummy = 
_system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
+
+      try {
+        val connectionInfo = new 
InstanceConnectionInfo(InetAddress.getLocalHost,1)
+        val hardwareDescription = HardwareDescription.extractFromSystem(10)
+        
+        within(1 second) {
+          jm ! RegisterTaskManager(tmDummy, connectionInfo, 
hardwareDescription, 1)
+          jm ! RegisterTaskManager(tmDummy, connectionInfo, 
hardwareDescription, 1)
+          jm ! RegisterTaskManager(tmDummy, connectionInfo, 
hardwareDescription, 1)
+
+          expectMsgType[AcknowledgeRegistration]
+          expectMsgType[AlreadyRegistered]
+          expectMsgType[AlreadyRegistered]
+        }
+      } finally {
+        tmDummy ! Kill
+        jm ! Kill
+      }
+    }
+  }
+
+  private def startTestingJobManager(system: ActorSystem): ActorRef = {
+    val (jm: ActorRef, _) = JobManager.startJobManagerActors(
+                                        new Configuration(), _system, None, 
None)
+    jm
+  }
+}
+
+object JobManagerRegistrationTest {
+
+  /** Simply dummy actor that swallows all messages */
+  class DummyActor extends Actor {
+    override def receive: Receive = {
+      case _ =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cd9e9d9/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala
deleted file mode 100644
index 409e98d..0000000
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-import java.net.InetAddress
-
-import akka.actor._
-import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.{HardwareDescription, 
InstanceConnectionInfo, InstanceID}
-import 
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
 AlreadyRegistered, RegisterTaskManager}
-import org.junit.Assert.{assertNotEquals, assertNotNull}
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-/**
- * Tests for the JobManager's behavior when a TaskManager solicits 
registration.
- * It also tests the JobManager's response to heartbeats from TaskManagers it 
does
- * not know.
- */
-class TaskManagerRegistrationTest(_system: ActorSystem) extends 
TestKit(_system) with
-ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
-
-  def this() = this(AkkaUtils.createLocalActorSystem(new Configuration()))
-
-  override def afterAll(): Unit = {
-    TestKit.shutdownActorSystem(system)
-  }
-
-  "The JobManager" should {
-
-    "assign a TaskManager a unique instance ID" in {
-      val jm = startTestingJobManager(_system)
-
-      val tmDummy1 = 
_system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor]))
-      val tmDummy2 = 
_system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor]))
-
-      try {
-        val connectionInfo1 = new 
InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
-        val connectionInfo2 = new 
InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
-
-        val hardwareDescription = HardwareDescription.extractFromSystem(10)
-
-        var id1: InstanceID = null
-        var id2: InstanceID = null
-
-        // task manager 1
-        within(1 second) {
-          jm ! RegisterTaskManager(tmDummy1, connectionInfo1, 
hardwareDescription, 1)
-
-          val response = receiveOne(1 second)
-          response match {
-            case AcknowledgeRegistration(_, id, _) => id1 = id
-            case _ => fail("Wrong response message: " + response)
-          }
-        }
-
-        // task manager 2
-        within(1 second) {
-          jm ! RegisterTaskManager(tmDummy2, connectionInfo2, 
hardwareDescription, 1)
-
-          val response = receiveOne(1 second)
-          response match {
-            case AcknowledgeRegistration(_, id, _) => id2 = id
-            case _ => fail("Wrong response message: " + response)
-          }
-        }
-
-        assertNotNull(id1)
-        assertNotNull(id2)
-        assertNotEquals(id1, id2)
-      }
-      finally {
-        tmDummy1 ! Kill
-        tmDummy2 ! Kill
-        jm ! Kill
-      }
-    }
-
-    "handle repeated registration calls" in {
-
-      val jm = startTestingJobManager(_system)
-      val tmDummy = 
_system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor]))
-
-      try {
-        val connectionInfo = new 
InstanceConnectionInfo(InetAddress.getLocalHost,1)
-        val hardwareDescription = HardwareDescription.extractFromSystem(10)
-        
-        within(1 second) {
-          jm ! RegisterTaskManager(tmDummy, connectionInfo, 
hardwareDescription, 1)
-          jm ! RegisterTaskManager(tmDummy, connectionInfo, 
hardwareDescription, 1)
-          jm ! RegisterTaskManager(tmDummy, connectionInfo, 
hardwareDescription, 1)
-
-          expectMsgType[AcknowledgeRegistration]
-          expectMsgType[AlreadyRegistered]
-          expectMsgType[AlreadyRegistered]
-        }
-      } finally {
-        tmDummy ! Kill
-        jm ! Kill
-      }
-    }
-  }
-
-  private def startTestingJobManager(system: ActorSystem): ActorRef = {
-    val (jm: ActorRef, _) = JobManager.startJobManagerActors(
-                                        new Configuration(), _system, None, 
None)
-    jm
-  }
-}
-
-object TaskManagerRegistrationTest {
-
-  /** Simply dummy actor that swallows all messages */
-  class DummyActor extends Actor {
-    override def receive: Receive = {
-      case _ =>
-    }
-  }
-}

Reply via email to