[FLINK-6341] [jm] Add test case to guard against RM registration loop
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/591841f3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/591841f3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/591841f3 Branch: refs/heads/table-retraction Commit: 591841f30f7a7652b6f418abde17c7f23becf1c0 Parents: 2383839 Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri Apr 28 15:21:56 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri Apr 28 17:59:28 2017 +0200 ---------------------------------------------------------------------- .../flink/configuration/JobManagerOptions.java | 10 ++ .../messages/ReconnectResourceManager.java | 18 ++-- .../flink/runtime/jobmanager/JobManager.scala | 24 +++-- .../runtime/jobmanager/JobManagerTest.java | 105 ++++++++++++++++++- 4 files changed, 138 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index d129405..10d9e16 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -72,6 +72,16 @@ public class JobManagerOptions { .defaultValue(16) .withDeprecatedKeys("job-manager.max-attempts-history-size"); + /** + * This option specifies the interval in order to trigger a resource manager reconnection if the connection + * to the resource manager has been lost. + * + * This option is only intended for internal use. + */ + public static final ConfigOption<Long> RESOURCE_MANAGER_RECONNECT_INTERVAL = + key("jobmanager.resourcemanager.reconnect-interval") + .defaultValue(2000L); + // ------------------------------------------------------------------------ // JobManager web UI // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java index d02193e..1f852e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java @@ -22,7 +22,7 @@ import akka.actor.ActorRef; import org.apache.flink.runtime.messages.RequiresLeaderSessionID; import org.apache.flink.util.Preconditions; -import java.util.UUID; +import java.io.Serializable; /** * This message signals that the ResourceManager should reconnect to the JobManager. It is processed @@ -30,28 +30,30 @@ import java.util.UUID; * the ResourceManager to go through the reconciliation phase to sync up with the JobManager bookkeeping. * This is done by forcing the ResourceManager to reconnect. */ -public class ReconnectResourceManager implements RequiresLeaderSessionID, java.io.Serializable { +public class ReconnectResourceManager implements RequiresLeaderSessionID, Serializable { private static final long serialVersionUID = 1L; private final ActorRef resourceManager; - private final UUID currentConnID; + private final long connectionId; - public ReconnectResourceManager(ActorRef resourceManager, UUID currentConnID) { + public ReconnectResourceManager(ActorRef resourceManager, long connectionId) { this.resourceManager = Preconditions.checkNotNull(resourceManager); - this.currentConnID = Preconditions.checkNotNull(currentConnID); + this.connectionId = Preconditions.checkNotNull(connectionId); } public ActorRef resourceManager() { return resourceManager; } - public UUID connID() { - return currentConnID; + public long getConnectionId() { + return connectionId; } @Override public String toString() { - return "ReconnectResourceManager " + resourceManager.path(); + return "ReconnectResourceManager(" + + resourceManager.path() + ", " + + connectionId + ')'; } } http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index da9df2b..b01ddc0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -178,10 +178,14 @@ class JobManager( /** The resource manager actor responsible for allocating and managing task manager resources. */ var currentResourceManager: Option[ActorRef] = None - var currentRMConnID: UUID = null + var currentResourceManagerConnectionId: Long = 0 val taskManagerMap = mutable.Map[ActorRef, InstanceID]() + val triggerResourceManagerReconnectInterval = new FiniteDuration( + flinkConfiguration.getLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL), + TimeUnit.MILLISECONDS) + /** * Run when the job manager is started. Simply logs an informational message. * The method also starts the leader election service. @@ -339,7 +343,7 @@ class JobManager( // ditch current resource manager (if any) currentResourceManager = Option(msg.resourceManager()) - currentRMConnID = UUID.randomUUID() + currentResourceManagerConnectionId += 1 val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( instance => instance.getTaskManagerID).toList.asJava @@ -358,24 +362,25 @@ class JobManager( def reconnectRepeatedly(): Unit = { msg.resourceManager() ! decorateMessage(new TriggerRegistrationAtJobManager(self)) // try again after some delay - context.system.scheduler.scheduleOnce(2 seconds) { + context.system.scheduler.scheduleOnce(triggerResourceManagerReconnectInterval) { self ! decorateMessage(msg) }(context.dispatcher) } currentResourceManager match { - case Some(rm) if rm.equals(msg.resourceManager()) && currentRMConnID.equals(msg.connID()) => + case Some(rm) if rm.equals(msg.resourceManager()) && + currentResourceManagerConnectionId == msg.getConnectionId => // we should ditch the current resource manager log.debug(s"Disconnecting resource manager $rm and forcing a reconnect.") currentResourceManager = None reconnectRepeatedly() - case Some(rm) => - // we have registered with another ResourceManager in the meantime, stop sending - // TriggerRegistrationAtJobManager messages to the old ResourceManager case None => log.warn(s"No resource manager ${msg.resourceManager()} connected. " + s"Telling old ResourceManager to register again.") reconnectRepeatedly() + case _ => + // we have established a new connection to a ResourceManager in the meantime, stop sending + // TriggerRegistrationAtJobManager messages to the old ResourceManager } case msg @ RegisterTaskManager( @@ -399,7 +404,10 @@ class JobManager( case _ => log.warn("Failure while asking ResourceManager for RegisterResource. Retrying", t) } - self ! decorateMessage(new ReconnectResourceManager(rm, currentRMConnID)) + self ! decorateMessage( + new ReconnectResourceManager( + rm, + currentResourceManagerConnectionId)) }(context.dispatcher) case None => http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index d7fc71d..6316bfd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -18,18 +18,22 @@ package org.apache.flink.runtime.jobmanager; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; +import akka.actor.*; import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; import com.typesafe.config.Config; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason; +import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -40,6 +44,7 @@ import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -64,6 +69,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess; import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; +import org.apache.flink.runtime.messages.RegistrationMessages; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation; @@ -73,6 +79,7 @@ import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManager; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; @@ -89,11 +96,15 @@ import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.StoppableInvokable; import org.apache.flink.util.TestLogger; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.ArgumentCaptor; import scala.Option; import scala.Some; import scala.Tuple2; @@ -107,6 +118,7 @@ import java.io.File; import java.net.InetAddress; import java.util.Collections; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; import static org.apache.flink.runtime.messages.JobManagerMessages.JobResultSuccess; @@ -121,6 +133,9 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; public class JobManagerTest extends TestLogger { @@ -1261,4 +1276,88 @@ public class JobManagerTest extends TestLogger { } } } + + /** + * This tests makes sure that triggering a reconnection from the ResourceManager will stop after a new + * ResourceManager has connected. Furthermore it makes sure that there is not endless loop of reconnection + * commands (see FLINK-6341). + */ + @Test + public void testResourceManagerConnection() throws TimeoutException, InterruptedException { + FiniteDuration testTimeout = new FiniteDuration(30L, TimeUnit.SECONDS); + final long reconnectionInterval = 200L; + + final Configuration configuration = new Configuration(); + configuration.setLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL, reconnectionInterval); + + + final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(configuration); + + try { + final ActorGateway jmGateway = TestingUtils.createJobManager( + actorSystem, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + configuration); + + final TestProbe probe = TestProbe.apply(actorSystem); + final AkkaActorGateway rmGateway = new AkkaActorGateway(probe.ref(), HighAvailabilityServices.DEFAULT_LEADER_ID); + + // wait for the JobManager to become the leader + Future<?> leaderFuture = jmGateway.ask(TestingJobManagerMessages.getNotifyWhenLeader(), testTimeout); + Await.ready(leaderFuture, testTimeout); + + jmGateway.tell(new RegisterResourceManager(probe.ref()), rmGateway); + + JobManagerMessages.LeaderSessionMessage leaderSessionMessage = probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class); + + assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, leaderSessionMessage.leaderSessionID()); + assertTrue(leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful); + + jmGateway.tell( + new RegistrationMessages.RegisterTaskManager( + ResourceID.generate(), + mock(TaskManagerLocation.class), + new HardwareDescription(1, 1L, 1L, 1L), + 1)); + leaderSessionMessage = probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class); + + assertTrue(leaderSessionMessage.message() instanceof NotifyResourceStarted); + + // fail the NotifyResourceStarted so that we trigger the reconnection process on the JobManager's side + probe.lastSender().tell(new Status.Failure(new Exception("Test exception")), ActorRef.noSender()); + + Deadline reconnectionDeadline = new FiniteDuration(5L * reconnectionInterval, TimeUnit.MILLISECONDS).fromNow(); + boolean registered = false; + + while (reconnectionDeadline.hasTimeLeft()) { + try { + leaderSessionMessage = probe.expectMsgClass(reconnectionDeadline.timeLeft(), JobManagerMessages.LeaderSessionMessage.class); + } catch (AssertionError ignored) { + // expected timeout after the reconnectionDeadline has been exceeded + continue; + } + + if (leaderSessionMessage.message() instanceof TriggerRegistrationAtJobManager) { + if (registered) { + fail("A successful registration should not be followed by another TriggerRegistrationAtJobManager message."); + } + + jmGateway.tell(new RegisterResourceManager(probe.ref()), rmGateway); + } else if (leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful) { + // now we should no longer receive TriggerRegistrationAtJobManager messages + registered = true; + } else { + fail("Received unknown message: " + leaderSessionMessage.message() + '.'); + } + } + + assertTrue(registered); + + } finally { + // cleanup the actor system and with it all of the started actors if not already terminated + actorSystem.shutdown(); + actorSystem.awaitTermination(); + } + } }