[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206128#comment-15206128 ]
ASF GitHub Bot commented on FLINK-3544: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56963220 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None - case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => + case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( + instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + + case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { + case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None + case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + + case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { + case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { + case scala.util.Success(response) => + // the resource manager is available and answered + self ! response + case scala.util.Failure(t) => + // slow or unreachable resource manager, register anyway and let the rm reconnect + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + self ! decorateMessage(new DisconnectResourceManager(rm)) + }(context.dispatcher) + + case None => + log.info("Task Manager Registration but not connected to ResourceManager") + // ResourceManager not yet available + // sending task manager information later upon ResourceManager registration + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + } + + case msg: RegisterResourceSuccessful => + + val originalMsg = msg.getRegistrationMessage + val taskManager = msg.getTaskManager + + // ResourceManager knows about the resource, now let's try to register TaskManager if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId - // IMPORTANT: Send the response to the "sender", which is not the - // TaskManager actor, but the ask future! - sender() ! decorateMessage( + taskManager ! decorateMessage( AlreadyRegistered( instanceID, - libraryCacheManager.getBlobServerPort) - ) - } - else { + libraryCacheManager.getBlobServerPort)) + } else { try { val instanceID = instanceManager.registerTaskManager( taskManager, - connectionInfo, - hardwareInformation, - numberOfSlots, + originalMsg.resourceId, + originalMsg.connectionInfo, + originalMsg.resources, + originalMsg.numberOfSlots, leaderSessionID.orNull) - // IMPORTANT: Send the response to the "sender", which is not the - // TaskManager actor, but the ask future! - sender() ! decorateMessage( - AcknowledgeRegistration( - instanceID, - libraryCacheManager.getBlobServerPort) - ) + taskManager ! decorateMessage( + AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)) // to be notified when the taskManager is no longer reachable context.watch(taskManager) - } - catch { + } catch { // registerTaskManager throws an IllegalStateException if it is already shut down // let the actor crash and restart itself in this case case e: Exception => log.error("Failed to register TaskManager at instance manager", e) - // IMPORTANT: Send the response to the "sender", which is not the - // TaskManager actor, but the ask future! - sender() ! decorateMessage( + taskManager ! decorateMessage( RefuseRegistration( - ExceptionUtils.stringifyException(e)) - ) + ExceptionUtils.stringifyException(e))) } } + case msg: RegisterResourceFailed => + + val taskManager = msg.getTaskManager + val resourceId = msg.getResourceID + log.warn(s"TaskManager's resource id $resourceId is not registered with ResourceManager. " + + s"Refusing registration.") + + taskManager ! decorateMessage( + RefuseRegistration( + ExceptionUtils.stringifyException(new IllegalStateException( --- End diff -- All I'm saying is that I'm not perfectly aware of subtle bugs this change would introduce. It seems like String was deliberately chosen to avoid ClassNotFoundExceptions. (Think of different Flink versions talking to each other) Thus, I'd rather not change it. > ResourceManager runtime components > ---------------------------------- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager > Affects Versions: 1.1.0 > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)