[FLINK-6341] [jm] Don't let JM fall into infinite loop

This closes #3745.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23838392
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23838392
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23838392

Branch: refs/heads/table-retraction
Commit: 238383926b762c1d47159a2b4dabe8fd59777307
Parents: c36d6b8
Author: WangTaoTheTonic <wangtao...@huawei.com>
Authored: Thu Apr 20 20:28:10 2017 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Apr 28 17:59:27 2017 +0200

----------------------------------------------------------------------
 .../messages/ReconnectResourceManager.java               | 11 ++++++++++-
 .../org/apache/flink/runtime/jobmanager/JobManager.scala |  7 +++++--
 2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23838392/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
index 6f6f878..d02193e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
@@ -22,6 +22,8 @@ import akka.actor.ActorRef;
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
 import org.apache.flink.util.Preconditions;
 
+import java.util.UUID;
+
 /**
  * This message signals that the ResourceManager should reconnect to the 
JobManager. It is processed
  * by the JobManager if it fails to register resources with the 
ResourceManager. The JobManager wants
@@ -33,14 +35,21 @@ public class ReconnectResourceManager implements 
RequiresLeaderSessionID, java.i
 
        private final ActorRef resourceManager;
 
-       public ReconnectResourceManager(ActorRef resourceManager) {
+       private final UUID currentConnID;
+
+       public ReconnectResourceManager(ActorRef resourceManager, UUID 
currentConnID) {
                this.resourceManager = 
Preconditions.checkNotNull(resourceManager);
+               this.currentConnID = Preconditions.checkNotNull(currentConnID);
        }
        
        public ActorRef resourceManager() {
                return resourceManager;
        }
 
+       public UUID connID() {
+               return currentConnID;
+       }
+
        @Override
        public String toString() {
                return "ReconnectResourceManager " + resourceManager.path();

http://git-wip-us.apache.org/repos/asf/flink/blob/23838392/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2fc3ef4..da9df2b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -178,6 +178,8 @@ class JobManager(
   /** The resource manager actor responsible for allocating and managing task 
manager resources. */
   var currentResourceManager: Option[ActorRef] = None
 
+  var currentRMConnID: UUID = null
+
   val taskManagerMap = mutable.Map[ActorRef, InstanceID]()
 
   /**
@@ -337,6 +339,7 @@ class JobManager(
 
       // ditch current resource manager (if any)
       currentResourceManager = Option(msg.resourceManager())
+      currentRMConnID = UUID.randomUUID()
 
       val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
         instance => instance.getTaskManagerID).toList.asJava
@@ -361,7 +364,7 @@ class JobManager(
       }
 
       currentResourceManager match {
-        case Some(rm) if rm.equals(msg.resourceManager()) =>
+        case Some(rm) if rm.equals(msg.resourceManager()) && 
currentRMConnID.equals(msg.connID()) =>
           // we should ditch the current resource manager
           log.debug(s"Disconnecting resource manager $rm and forcing a 
reconnect.")
           currentResourceManager = None
@@ -396,7 +399,7 @@ class JobManager(
                 case _ =>
                   log.warn("Failure while asking ResourceManager for 
RegisterResource. Retrying", t)
               }
-              self ! decorateMessage(new ReconnectResourceManager(rm))
+              self ! decorateMessage(new ReconnectResourceManager(rm, 
currentRMConnID))
           }(context.dispatcher)
 
         case None =>

Reply via email to