Repository: flink Updated Branches: refs/heads/master 0ca1f0c7b -> 5cd9e9d94
[runtime] Improve robustness of test TaskManagerRegistrationTest. Also rename jobmanager.TaskManagerRegistrationTest to avoid name conflicts. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cd9e9d9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cd9e9d9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cd9e9d9 Branch: refs/heads/master Commit: 5cd9e9d94bb3af4a5a868789f663af262672cac8 Parents: 4ed009e Author: Stephan Ewen <se...@apache.org> Authored: Mon Apr 6 20:00:21 2015 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Apr 7 10:14:13 2015 +0200 ---------------------------------------------------------------------- .../TaskManagerRegistrationTest.java | 20 ++- .../jobmanager/JobManagerRegistrationTest.scala | 139 +++++++++++++++++++ .../TaskManagerRegistrationTest.scala | 139 ------------------- 3 files changed, 156 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5cd9e9d9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 7421837..69964ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.InvalidActorNameException; import akka.actor.Kill; import akka.actor.Props; import akka.actor.UntypedActor; @@ -308,9 +309,22 @@ public class TaskManagerRegistrationTest { // now start the second fake JobManager and expect that // the TaskManager registers again // the second fake JM needs to have the same actor URL - final ActorRef fakeJobManager2 = actorSystem.actorOf(fakeJmProps, jobManagerName); + ActorRef fakeJobManager2 = null; + + // since we cannot reliably wait until the actor is unregistered (name is + // available again) we loop with multiple tries for 20 seconds + long deadline = 20000000000L + System.nanoTime(); + do { + try { + fakeJobManager2 = actorSystem.actorOf(fakeJmProps, jobManagerName); + } catch (InvalidActorNameException e) { + // wait and retry + Thread.sleep(100); + } + } while (fakeJobManager2 == null && System.nanoTime() < deadline); // expect the next registration + final ActorRef jm2Closure = fakeJobManager2; new Within(new FiniteDuration(10, TimeUnit.SECONDS)) { @Override @@ -318,8 +332,8 @@ public class TaskManagerRegistrationTest { expectMsgClass(RegisterTaskManager.class); // we accept the registration - taskManager.tell(new AcknowledgeRegistration(fakeJobManager2, new InstanceID(), 45234), - fakeJobManager2); + taskManager.tell(new AcknowledgeRegistration(jm2Closure, new InstanceID(), 45234), + jm2Closure); } }; http://git-wip-us.apache.org/repos/asf/flink/blob/5cd9e9d9/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala new file mode 100644 index 0000000..5fde5ea --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager + +import java.net.InetAddress + +import akka.actor._ +import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID} +import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager} +import org.junit.Assert.{assertNotEquals, assertNotNull} +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import scala.concurrent.duration._ +import scala.language.postfixOps + +/** + * Tests for the JobManager's behavior when a TaskManager solicits registration. + * It also tests the JobManager's response to heartbeats from TaskManagers it does + * not know. + */ +class JobManagerRegistrationTest(_system: ActorSystem) extends TestKit(_system) with +ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { + + def this() = this(AkkaUtils.createLocalActorSystem(new Configuration())) + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + "The JobManager" should { + + "assign a TaskManager a unique instance ID" in { + val jm = startTestingJobManager(_system) + + val tmDummy1 = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor])) + val tmDummy2 = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor])) + + try { + val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000) + val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001) + + val hardwareDescription = HardwareDescription.extractFromSystem(10) + + var id1: InstanceID = null + var id2: InstanceID = null + + // task manager 1 + within(1 second) { + jm ! RegisterTaskManager(tmDummy1, connectionInfo1, hardwareDescription, 1) + + val response = receiveOne(1 second) + response match { + case AcknowledgeRegistration(_, id, _) => id1 = id + case _ => fail("Wrong response message: " + response) + } + } + + // task manager 2 + within(1 second) { + jm ! RegisterTaskManager(tmDummy2, connectionInfo2, hardwareDescription, 1) + + val response = receiveOne(1 second) + response match { + case AcknowledgeRegistration(_, id, _) => id2 = id + case _ => fail("Wrong response message: " + response) + } + } + + assertNotNull(id1) + assertNotNull(id2) + assertNotEquals(id1, id2) + } + finally { + tmDummy1 ! Kill + tmDummy2 ! Kill + jm ! Kill + } + } + + "handle repeated registration calls" in { + + val jm = startTestingJobManager(_system) + val tmDummy = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor])) + + try { + val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1) + val hardwareDescription = HardwareDescription.extractFromSystem(10) + + within(1 second) { + jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1) + jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1) + jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1) + + expectMsgType[AcknowledgeRegistration] + expectMsgType[AlreadyRegistered] + expectMsgType[AlreadyRegistered] + } + } finally { + tmDummy ! Kill + jm ! Kill + } + } + } + + private def startTestingJobManager(system: ActorSystem): ActorRef = { + val (jm: ActorRef, _) = JobManager.startJobManagerActors( + new Configuration(), _system, None, None) + jm + } +} + +object JobManagerRegistrationTest { + + /** Simply dummy actor that swallows all messages */ + class DummyActor extends Actor { + override def receive: Receive = { + case _ => + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5cd9e9d9/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala deleted file mode 100644 index 409e98d..0000000 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager - -import java.net.InetAddress - -import akka.actor._ -import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID} -import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager} -import org.junit.Assert.{assertNotEquals, assertNotNull} -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import scala.concurrent.duration._ -import scala.language.postfixOps - -/** - * Tests for the JobManager's behavior when a TaskManager solicits registration. - * It also tests the JobManager's response to heartbeats from TaskManagers it does - * not know. - */ -class TaskManagerRegistrationTest(_system: ActorSystem) extends TestKit(_system) with -ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { - - def this() = this(AkkaUtils.createLocalActorSystem(new Configuration())) - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - } - - "The JobManager" should { - - "assign a TaskManager a unique instance ID" in { - val jm = startTestingJobManager(_system) - - val tmDummy1 = _system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor])) - val tmDummy2 = _system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor])) - - try { - val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000) - val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001) - - val hardwareDescription = HardwareDescription.extractFromSystem(10) - - var id1: InstanceID = null - var id2: InstanceID = null - - // task manager 1 - within(1 second) { - jm ! RegisterTaskManager(tmDummy1, connectionInfo1, hardwareDescription, 1) - - val response = receiveOne(1 second) - response match { - case AcknowledgeRegistration(_, id, _) => id1 = id - case _ => fail("Wrong response message: " + response) - } - } - - // task manager 2 - within(1 second) { - jm ! RegisterTaskManager(tmDummy2, connectionInfo2, hardwareDescription, 1) - - val response = receiveOne(1 second) - response match { - case AcknowledgeRegistration(_, id, _) => id2 = id - case _ => fail("Wrong response message: " + response) - } - } - - assertNotNull(id1) - assertNotNull(id2) - assertNotEquals(id1, id2) - } - finally { - tmDummy1 ! Kill - tmDummy2 ! Kill - jm ! Kill - } - } - - "handle repeated registration calls" in { - - val jm = startTestingJobManager(_system) - val tmDummy = _system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor])) - - try { - val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1) - val hardwareDescription = HardwareDescription.extractFromSystem(10) - - within(1 second) { - jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1) - jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1) - jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1) - - expectMsgType[AcknowledgeRegistration] - expectMsgType[AlreadyRegistered] - expectMsgType[AlreadyRegistered] - } - } finally { - tmDummy ! Kill - jm ! Kill - } - } - } - - private def startTestingJobManager(system: ActorSystem): ActorRef = { - val (jm: ActorRef, _) = JobManager.startJobManagerActors( - new Configuration(), _system, None, None) - jm - } -} - -object TaskManagerRegistrationTest { - - /** Simply dummy actor that swallows all messages */ - class DummyActor extends Actor { - override def receive: Receive = { - case _ => - } - } -}