[FLINK-6341] [jm] Don't let JM fall into infinite loop This closes #3745.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23838392 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23838392 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23838392 Branch: refs/heads/table-retraction Commit: 238383926b762c1d47159a2b4dabe8fd59777307 Parents: c36d6b8 Author: WangTaoTheTonic <wangtao...@huawei.com> Authored: Thu Apr 20 20:28:10 2017 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri Apr 28 17:59:27 2017 +0200 ---------------------------------------------------------------------- .../messages/ReconnectResourceManager.java | 11 ++++++++++- .../org/apache/flink/runtime/jobmanager/JobManager.scala | 7 +++++-- 2 files changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/23838392/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java index 6f6f878..d02193e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java @@ -22,6 +22,8 @@ import akka.actor.ActorRef; import org.apache.flink.runtime.messages.RequiresLeaderSessionID; import org.apache.flink.util.Preconditions; +import java.util.UUID; + /** * This message signals that the ResourceManager should reconnect to the JobManager. It is processed * by the JobManager if it fails to register resources with the ResourceManager. The JobManager wants @@ -33,14 +35,21 @@ public class ReconnectResourceManager implements RequiresLeaderSessionID, java.i private final ActorRef resourceManager; - public ReconnectResourceManager(ActorRef resourceManager) { + private final UUID currentConnID; + + public ReconnectResourceManager(ActorRef resourceManager, UUID currentConnID) { this.resourceManager = Preconditions.checkNotNull(resourceManager); + this.currentConnID = Preconditions.checkNotNull(currentConnID); } public ActorRef resourceManager() { return resourceManager; } + public UUID connID() { + return currentConnID; + } + @Override public String toString() { return "ReconnectResourceManager " + resourceManager.path(); http://git-wip-us.apache.org/repos/asf/flink/blob/23838392/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 2fc3ef4..da9df2b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -178,6 +178,8 @@ class JobManager( /** The resource manager actor responsible for allocating and managing task manager resources. */ var currentResourceManager: Option[ActorRef] = None + var currentRMConnID: UUID = null + val taskManagerMap = mutable.Map[ActorRef, InstanceID]() /** @@ -337,6 +339,7 @@ class JobManager( // ditch current resource manager (if any) currentResourceManager = Option(msg.resourceManager()) + currentRMConnID = UUID.randomUUID() val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( instance => instance.getTaskManagerID).toList.asJava @@ -361,7 +364,7 @@ class JobManager( } currentResourceManager match { - case Some(rm) if rm.equals(msg.resourceManager()) => + case Some(rm) if rm.equals(msg.resourceManager()) && currentRMConnID.equals(msg.connID()) => // we should ditch the current resource manager log.debug(s"Disconnecting resource manager $rm and forcing a reconnect.") currentResourceManager = None @@ -396,7 +399,7 @@ class JobManager( case _ => log.warn("Failure while asking ResourceManager for RegisterResource. Retrying", t) } - self ! decorateMessage(new ReconnectResourceManager(rm)) + self ! decorateMessage(new ReconnectResourceManager(rm, currentRMConnID)) }(context.dispatcher) case None =>