Repository: flink
Updated Branches:
  refs/heads/master 90ca43810 -> 926547582


[hotfix] [tests] Harden JobManagerRegistrationTest

The problem is that we don't wait until the JobManager becomes the leader. Due 
to this,
the sent RegisterTaskManager messages might get dropped.

This PR fixes the problem by waiting on the completion of the NotifyWhenLeader 
message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92654758
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92654758
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92654758

Branch: refs/heads/master
Commit: 926547582d62733bdedb1cdcc277653b2ff7b4e7
Parents: 90ca438
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Apr 27 12:32:25 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Apr 27 12:43:50 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  2 +-
 .../jobmanager/JobManagerRegistrationTest.scala | 91 +++++++++++---------
 2 files changed, 52 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92654758/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 40e2c2a..2fc3ef4 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -216,7 +216,7 @@ class JobManager(
       case Some(group) =>
         instantiateMetrics(group)
       case None =>
-        log.warn("Could not instantiate JobManager metrics.")
+        log.warn("Could not instantiate JobManager metric group.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92654758/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 6be8bcc..b2e8005 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -33,7 +33,8 @@ import 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwa
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import 
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
 AlreadyRegistered, RegisterTaskManager}
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation
-import org.apache.flink.runtime.testingUtils.TestingUtils
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenLeader
+import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingUtils}
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
 import org.junit.Assert.{assertNotEquals, assertNotNull}
@@ -41,6 +42,7 @@ import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
+import scala.concurrent.Await
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -94,39 +96,44 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
         var id1: InstanceID = null
         var id2: InstanceID = null
 
+        // wait until the JobManager becomes the leader, otherwise the 
RegisterTaskManager messages
+        // are dropped
+        val leaderFuture = jm.ask(NotifyWhenLeader, 
TestingUtils.TESTING_TIMEOUT)
+        Await.ready(leaderFuture, TestingUtils.TESTING_TIMEOUT)
+
         // task manager 1
         within(10 seconds) {
-         jm.tell(
-           RegisterTaskManager(
-             resourceId1,
-             connectionInfo1,
-             hardwareDescription,
-             1),
-           new AkkaActorGateway(tm1, 
HighAvailabilityServices.DEFAULT_LEADER_ID))
-
-         val response = probe.expectMsgType[LeaderSessionMessage]
-         response match {
-           case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => id1 
= id
-           case _ => fail("Wrong response message: " + response)
-         }
-       }
+          jm.tell(
+            RegisterTaskManager(
+              resourceId1,
+              connectionInfo1,
+              hardwareDescription,
+              1),
+            new AkkaActorGateway(tm1, 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+
+          val response = probe.expectMsgType[LeaderSessionMessage]
+          response match {
+            case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => 
id1 = id
+            case _ => fail("Wrong response message: " + response)
+          }
+        }
 
         // task manager 2
         within(10 seconds) {
-         jm.tell(
-           RegisterTaskManager(
-             resourceId2,
-             connectionInfo2,
-             hardwareDescription,
-             1),
-           new AkkaActorGateway(tm2, 
HighAvailabilityServices.DEFAULT_LEADER_ID))
-
-         val response = probe.expectMsgType[LeaderSessionMessage]
-         response match {
-           case LeaderSessionMessage(leaderSessionID, 
AcknowledgeRegistration(id, _)) => id2 = id
-           case _ => fail("Wrong response message: " + response)
-         }
-       }
+          jm.tell(
+            RegisterTaskManager(
+              resourceId2,
+              connectionInfo2,
+              hardwareDescription,
+              1),
+            new AkkaActorGateway(tm2, 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+
+          val response = probe.expectMsgType[LeaderSessionMessage]
+          response match {
+            case LeaderSessionMessage(leaderSessionID, 
AcknowledgeRegistration(id, _)) => id2 = id
+            case _ => fail("Wrong response message: " + response)
+          }
+        }
 
         assertNotNull(id1)
         assertNotNull(id2)
@@ -160,6 +167,11 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
         val connectionInfo = new TaskManagerLocation(resourceID, 
InetAddress.getLocalHost, 1)
         val hardwareDescription = HardwareDescription.extractFromSystem(10)
 
+        // wait until the JobManager becomes the leader, otherwise the 
RegisterTaskManager messages
+        // are dropped
+        val leaderFuture = jm.ask(NotifyWhenLeader, 
TestingUtils.TESTING_TIMEOUT)
+        Await.ready(leaderFuture, TestingUtils.TESTING_TIMEOUT)
+
         within(20 seconds) {
           jm.tell(
             RegisterTaskManager(
@@ -167,7 +179,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
               connectionInfo,
               hardwareDescription,
               1),
-              selfGateway)
+            selfGateway)
 
           jm.tell(
             RegisterTaskManager(
@@ -175,7 +187,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
               connectionInfo,
               hardwareDescription,
               1),
-              selfGateway)
+            selfGateway)
 
           jm.tell(
             RegisterTaskManager(
@@ -183,26 +195,26 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
               connectionInfo,
               hardwareDescription,
               1),
-              selfGateway)
+            selfGateway)
 
           probe.expectMsgType[LeaderSessionMessage] match {
             case LeaderSessionMessage(
-              HighAvailabilityServices.DEFAULT_LEADER_ID,
-              AcknowledgeRegistration(_, _)) =>
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AcknowledgeRegistration(_, _)) =>
             case m => fail("Wrong message type: " + m)
           }
 
           probe.expectMsgType[LeaderSessionMessage] match {
             case LeaderSessionMessage(
-              HighAvailabilityServices.DEFAULT_LEADER_ID,
-              AlreadyRegistered(_, _)) =>
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AlreadyRegistered(_, _)) =>
             case m => fail("Wrong message type: " + m)
           }
 
           probe.expectMsgType[LeaderSessionMessage] match {
             case LeaderSessionMessage(
-              HighAvailabilityServices.DEFAULT_LEADER_ID,
-              AlreadyRegistered(_, _)) =>
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AlreadyRegistered(_, _)) =>
             case m => fail("Wrong message type: " + m)
           }
         }
@@ -227,7 +239,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
     // if there exists already one of these actors (e.g. JobManager has not 
been properly shutdown),
     // then this will fail the JobManager creation
     val props = JobManager.getJobManagerProps(
-      classOf[JobManager],
+      classOf[TestingJobManager],
       config,
       executor,
       executor,
@@ -249,7 +261,6 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
   }
 
   private def startTestingResourceManager(system: ActorSystem, jm: ActorRef): 
ActorGateway = {
-    val jobManagerURL = AkkaUtils.getAkkaURL(system, jm)
     val config = new Configuration()
     val rm: ActorRef = FlinkResourceManager.startResourceManagerActors(
       config,

Reply via email to