Repository: flink Updated Branches: refs/heads/master 90ca43810 -> 926547582
[hotfix] [tests] Harden JobManagerRegistrationTest The problem is that we don't wait until the JobManager becomes the leader. Due to this, the sent RegisterTaskManager messages might get dropped. This PR fixes the problem by waiting on the completion of the NotifyWhenLeader message. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92654758 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92654758 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92654758 Branch: refs/heads/master Commit: 926547582d62733bdedb1cdcc277653b2ff7b4e7 Parents: 90ca438 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Apr 27 12:32:25 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Apr 27 12:43:50 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 2 +- .../jobmanager/JobManagerRegistrationTest.scala | 91 +++++++++++--------- 2 files changed, 52 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/92654758/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 40e2c2a..2fc3ef4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -216,7 +216,7 @@ class JobManager( case Some(group) => instantiateMetrics(group) case None => - log.warn("Could not instantiate JobManager metrics.") + log.warn("Could not instantiate JobManager metric group.") } } http://git-wip-us.apache.org/repos/asf/flink/blob/92654758/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index 6be8bcc..b2e8005 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -33,7 +33,8 @@ import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwa import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager} import org.apache.flink.runtime.taskmanager.TaskManagerLocation -import org.apache.flink.runtime.testingUtils.TestingUtils +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenLeader +import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingUtils} import org.apache.flink.runtime.testutils.TestingResourceManager import org.apache.flink.runtime.util.LeaderRetrievalUtils import org.junit.Assert.{assertNotEquals, assertNotNull} @@ -41,6 +42,7 @@ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps @@ -94,39 +96,44 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { var id1: InstanceID = null var id2: InstanceID = null + // wait until the JobManager becomes the leader, otherwise the RegisterTaskManager messages + // are dropped + val leaderFuture = jm.ask(NotifyWhenLeader, TestingUtils.TESTING_TIMEOUT) + Await.ready(leaderFuture, TestingUtils.TESTING_TIMEOUT) + // task manager 1 within(10 seconds) { - jm.tell( - RegisterTaskManager( - resourceId1, - connectionInfo1, - hardwareDescription, - 1), - new AkkaActorGateway(tm1, HighAvailabilityServices.DEFAULT_LEADER_ID)) - - val response = probe.expectMsgType[LeaderSessionMessage] - response match { - case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => id1 = id - case _ => fail("Wrong response message: " + response) - } - } + jm.tell( + RegisterTaskManager( + resourceId1, + connectionInfo1, + hardwareDescription, + 1), + new AkkaActorGateway(tm1, HighAvailabilityServices.DEFAULT_LEADER_ID)) + + val response = probe.expectMsgType[LeaderSessionMessage] + response match { + case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => id1 = id + case _ => fail("Wrong response message: " + response) + } + } // task manager 2 within(10 seconds) { - jm.tell( - RegisterTaskManager( - resourceId2, - connectionInfo2, - hardwareDescription, - 1), - new AkkaActorGateway(tm2, HighAvailabilityServices.DEFAULT_LEADER_ID)) - - val response = probe.expectMsgType[LeaderSessionMessage] - response match { - case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id2 = id - case _ => fail("Wrong response message: " + response) - } - } + jm.tell( + RegisterTaskManager( + resourceId2, + connectionInfo2, + hardwareDescription, + 1), + new AkkaActorGateway(tm2, HighAvailabilityServices.DEFAULT_LEADER_ID)) + + val response = probe.expectMsgType[LeaderSessionMessage] + response match { + case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id2 = id + case _ => fail("Wrong response message: " + response) + } + } assertNotNull(id1) assertNotNull(id2) @@ -160,6 +167,11 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { val connectionInfo = new TaskManagerLocation(resourceID, InetAddress.getLocalHost, 1) val hardwareDescription = HardwareDescription.extractFromSystem(10) + // wait until the JobManager becomes the leader, otherwise the RegisterTaskManager messages + // are dropped + val leaderFuture = jm.ask(NotifyWhenLeader, TestingUtils.TESTING_TIMEOUT) + Await.ready(leaderFuture, TestingUtils.TESTING_TIMEOUT) + within(20 seconds) { jm.tell( RegisterTaskManager( @@ -167,7 +179,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { connectionInfo, hardwareDescription, 1), - selfGateway) + selfGateway) jm.tell( RegisterTaskManager( @@ -175,7 +187,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { connectionInfo, hardwareDescription, 1), - selfGateway) + selfGateway) jm.tell( RegisterTaskManager( @@ -183,26 +195,26 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { connectionInfo, hardwareDescription, 1), - selfGateway) + selfGateway) probe.expectMsgType[LeaderSessionMessage] match { case LeaderSessionMessage( - HighAvailabilityServices.DEFAULT_LEADER_ID, - AcknowledgeRegistration(_, _)) => + HighAvailabilityServices.DEFAULT_LEADER_ID, + AcknowledgeRegistration(_, _)) => case m => fail("Wrong message type: " + m) } probe.expectMsgType[LeaderSessionMessage] match { case LeaderSessionMessage( - HighAvailabilityServices.DEFAULT_LEADER_ID, - AlreadyRegistered(_, _)) => + HighAvailabilityServices.DEFAULT_LEADER_ID, + AlreadyRegistered(_, _)) => case m => fail("Wrong message type: " + m) } probe.expectMsgType[LeaderSessionMessage] match { case LeaderSessionMessage( - HighAvailabilityServices.DEFAULT_LEADER_ID, - AlreadyRegistered(_, _)) => + HighAvailabilityServices.DEFAULT_LEADER_ID, + AlreadyRegistered(_, _)) => case m => fail("Wrong message type: " + m) } } @@ -227,7 +239,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { // if there exists already one of these actors (e.g. JobManager has not been properly shutdown), // then this will fail the JobManager creation val props = JobManager.getJobManagerProps( - classOf[JobManager], + classOf[TestingJobManager], config, executor, executor, @@ -249,7 +261,6 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { } private def startTestingResourceManager(system: ActorSystem, jm: ActorRef): ActorGateway = { - val jobManagerURL = AkkaUtils.getAkkaURL(system, jm) val config = new Configuration() val rm: ActorRef = FlinkResourceManager.startResourceManagerActors( config,