[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206850#comment-15206850 ]
ASF GitHub Bot commented on FLINK-3544: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57031901 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,121 @@ class JobManager( leaderSessionID = None - case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => + case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( + instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + + case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { + case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None + case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + + case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { + case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { + case scala.util.Success(response) => + // the resource manager is available and answered + self ! response + case scala.util.Failure(t) => --- End diff -- Done (doesn't change the diff). > ResourceManager runtime components > ---------------------------------- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager > Affects Versions: 1.1.0 > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)