[FLINK-6341] [jm] Add test case to guard against RM registration loop

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/591841f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/591841f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/591841f3

Branch: refs/heads/table-retraction
Commit: 591841f30f7a7652b6f418abde17c7f23becf1c0
Parents: 2383839
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Apr 28 15:21:56 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Apr 28 17:59:28 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/JobManagerOptions.java  |  10 ++
 .../messages/ReconnectResourceManager.java      |  18 ++--
 .../flink/runtime/jobmanager/JobManager.scala   |  24 +++--
 .../runtime/jobmanager/JobManagerTest.java      | 105 ++++++++++++++++++-
 4 files changed, 138 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index d129405..10d9e16 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -72,6 +72,16 @@ public class JobManagerOptions {
                        .defaultValue(16)
                        
.withDeprecatedKeys("job-manager.max-attempts-history-size");
 
+       /**
+        * This option specifies the interval in order to trigger a resource 
manager reconnection if the connection
+        * to the resource manager has been lost.
+        *
+        * This option is only intended for internal use.
+        */
+       public static final ConfigOption<Long> 
RESOURCE_MANAGER_RECONNECT_INTERVAL =
+               key("jobmanager.resourcemanager.reconnect-interval")
+               .defaultValue(2000L);
+
        // 
------------------------------------------------------------------------
        //  JobManager web UI
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
index d02193e..1f852e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
@@ -22,7 +22,7 @@ import akka.actor.ActorRef;
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
 import org.apache.flink.util.Preconditions;
 
-import java.util.UUID;
+import java.io.Serializable;
 
 /**
  * This message signals that the ResourceManager should reconnect to the 
JobManager. It is processed
@@ -30,28 +30,30 @@ import java.util.UUID;
  * the ResourceManager to go through the reconciliation phase to sync up with 
the JobManager bookkeeping.
  * This is done by forcing the ResourceManager to reconnect.
  */
-public class ReconnectResourceManager implements RequiresLeaderSessionID, 
java.io.Serializable {
+public class ReconnectResourceManager implements RequiresLeaderSessionID, 
Serializable {
        private static final long serialVersionUID = 1L;
 
        private final ActorRef resourceManager;
 
-       private final UUID currentConnID;
+       private final long connectionId;
 
-       public ReconnectResourceManager(ActorRef resourceManager, UUID 
currentConnID) {
+       public ReconnectResourceManager(ActorRef resourceManager, long 
connectionId) {
                this.resourceManager = 
Preconditions.checkNotNull(resourceManager);
-               this.currentConnID = Preconditions.checkNotNull(currentConnID);
+               this.connectionId = Preconditions.checkNotNull(connectionId);
        }
        
        public ActorRef resourceManager() {
                return resourceManager;
        }
 
-       public UUID connID() {
-               return currentConnID;
+       public long getConnectionId() {
+               return connectionId;
        }
 
        @Override
        public String toString() {
-               return "ReconnectResourceManager " + resourceManager.path();
+               return "ReconnectResourceManager(" +
+                       resourceManager.path() + ", " +
+                       connectionId + ')';
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index da9df2b..b01ddc0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -178,10 +178,14 @@ class JobManager(
   /** The resource manager actor responsible for allocating and managing task 
manager resources. */
   var currentResourceManager: Option[ActorRef] = None
 
-  var currentRMConnID: UUID = null
+  var currentResourceManagerConnectionId: Long = 0
 
   val taskManagerMap = mutable.Map[ActorRef, InstanceID]()
 
+  val triggerResourceManagerReconnectInterval = new FiniteDuration(
+    
flinkConfiguration.getLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL),
+    TimeUnit.MILLISECONDS)
+
   /**
    * Run when the job manager is started. Simply logs an informational message.
    * The method also starts the leader election service.
@@ -339,7 +343,7 @@ class JobManager(
 
       // ditch current resource manager (if any)
       currentResourceManager = Option(msg.resourceManager())
-      currentRMConnID = UUID.randomUUID()
+      currentResourceManagerConnectionId += 1
 
       val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
         instance => instance.getTaskManagerID).toList.asJava
@@ -358,24 +362,25 @@ class JobManager(
       def reconnectRepeatedly(): Unit = {
         msg.resourceManager() ! decorateMessage(new 
TriggerRegistrationAtJobManager(self))
         // try again after some delay
-        context.system.scheduler.scheduleOnce(2 seconds) {
+        
context.system.scheduler.scheduleOnce(triggerResourceManagerReconnectInterval) {
           self ! decorateMessage(msg)
         }(context.dispatcher)
       }
 
       currentResourceManager match {
-        case Some(rm) if rm.equals(msg.resourceManager()) && 
currentRMConnID.equals(msg.connID()) =>
+        case Some(rm) if rm.equals(msg.resourceManager()) &&
+          currentResourceManagerConnectionId == msg.getConnectionId =>
           // we should ditch the current resource manager
           log.debug(s"Disconnecting resource manager $rm and forcing a 
reconnect.")
           currentResourceManager = None
           reconnectRepeatedly()
-        case Some(rm) =>
-          // we have registered with another ResourceManager in the meantime, 
stop sending
-          // TriggerRegistrationAtJobManager messages to the old 
ResourceManager
         case None =>
           log.warn(s"No resource manager ${msg.resourceManager()} connected. " 
+
             s"Telling old ResourceManager to register again.")
           reconnectRepeatedly()
+        case _ =>
+        // we have established a new connection to a ResourceManager in the 
meantime, stop sending
+        // TriggerRegistrationAtJobManager messages to the old ResourceManager
       }
 
     case msg @ RegisterTaskManager(
@@ -399,7 +404,10 @@ class JobManager(
                 case _ =>
                   log.warn("Failure while asking ResourceManager for 
RegisterResource. Retrying", t)
               }
-              self ! decorateMessage(new ReconnectResourceManager(rm, 
currentRMConnID))
+              self ! decorateMessage(
+                new ReconnectResourceManager(
+                  rm,
+                  currentResourceManagerConnectionId))
           }(context.dispatcher)
 
         case None =>

http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index d7fc71d..6316bfd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -18,18 +18,22 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
+import akka.actor.*;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
+import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -40,6 +44,7 @@ import 
org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -64,6 +69,7 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
+import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
@@ -73,6 +79,7 @@ import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -89,11 +96,15 @@ import 
org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.util.TestLogger;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.ArgumentCaptor;
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
@@ -107,6 +118,7 @@ import java.io.File;
 import java.net.InetAddress;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.JobResultSuccess;
@@ -121,6 +133,9 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
 
 public class JobManagerTest extends TestLogger {
 
@@ -1261,4 +1276,88 @@ public class JobManagerTest extends TestLogger {
                        }
                }
        }
+
+       /**
+        * This tests makes sure that triggering a reconnection from the 
ResourceManager will stop after a new
+        * ResourceManager has connected. Furthermore it makes sure that there 
is not endless loop of reconnection
+        * commands (see FLINK-6341).
+        */
+       @Test
+       public void testResourceManagerConnection() throws TimeoutException, 
InterruptedException {
+               FiniteDuration testTimeout = new FiniteDuration(30L, 
TimeUnit.SECONDS);
+               final long reconnectionInterval = 200L;
+
+               final Configuration configuration = new Configuration();
+               
configuration.setLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL, 
reconnectionInterval);
+
+
+               final ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(configuration);
+
+               try {
+                       final ActorGateway jmGateway = 
TestingUtils.createJobManager(
+                               actorSystem,
+                               TestingUtils.defaultExecutor(),
+                               TestingUtils.defaultExecutor(),
+                               configuration);
+
+                       final TestProbe probe = TestProbe.apply(actorSystem);
+                       final AkkaActorGateway rmGateway = new 
AkkaActorGateway(probe.ref(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+                       // wait for the JobManager to become the leader
+                       Future<?> leaderFuture = 
jmGateway.ask(TestingJobManagerMessages.getNotifyWhenLeader(), testTimeout);
+                       Await.ready(leaderFuture, testTimeout);
+
+                       jmGateway.tell(new 
RegisterResourceManager(probe.ref()), rmGateway);
+
+                       JobManagerMessages.LeaderSessionMessage 
leaderSessionMessage = 
probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
+
+                       
assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, 
leaderSessionMessage.leaderSessionID());
+                       assertTrue(leaderSessionMessage.message() instanceof 
RegisterResourceManagerSuccessful);
+
+                       jmGateway.tell(
+                               new RegistrationMessages.RegisterTaskManager(
+                                       ResourceID.generate(),
+                                       mock(TaskManagerLocation.class),
+                                       new HardwareDescription(1, 1L, 1L, 1L),
+                                       1));
+                       leaderSessionMessage = 
probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
+
+                       assertTrue(leaderSessionMessage.message() instanceof 
NotifyResourceStarted);
+
+                       // fail the NotifyResourceStarted so that we trigger 
the reconnection process on the JobManager's side
+                       probe.lastSender().tell(new Status.Failure(new 
Exception("Test exception")), ActorRef.noSender());
+
+                       Deadline reconnectionDeadline = new FiniteDuration(5L * 
reconnectionInterval, TimeUnit.MILLISECONDS).fromNow();
+                       boolean registered = false;
+
+                       while (reconnectionDeadline.hasTimeLeft()) {
+                               try {
+                                       leaderSessionMessage = 
probe.expectMsgClass(reconnectionDeadline.timeLeft(), 
JobManagerMessages.LeaderSessionMessage.class);
+                               } catch (AssertionError ignored) {
+                                       // expected timeout after the 
reconnectionDeadline has been exceeded
+                                       continue;
+                               }
+
+                               if (leaderSessionMessage.message() instanceof 
TriggerRegistrationAtJobManager) {
+                                       if (registered) {
+                                               fail("A successful registration 
should not be followed by another TriggerRegistrationAtJobManager message.");
+                                       }
+
+                                       jmGateway.tell(new 
RegisterResourceManager(probe.ref()), rmGateway);
+                               } else if (leaderSessionMessage.message() 
instanceof RegisterResourceManagerSuccessful) {
+                                       // now we should no longer receive 
TriggerRegistrationAtJobManager messages
+                                       registered = true;
+                               } else {
+                                       fail("Received unknown message: " + 
leaderSessionMessage.message() + '.');
+                               }
+                       }
+
+                       assertTrue(registered);
+
+               } finally {
+                       // cleanup the actor system and with it all of the 
started actors if not already terminated
+                       actorSystem.shutdown();
+                       actorSystem.awaitTermination();
+               }
+       }
 }

Reply via email to