[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206091#comment-15206091 ]
ASF GitHub Bot commented on FLINK-3544: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56960595 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,125 @@ class JobManager( leaderSessionID = None - case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => + case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( + instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + + case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { + case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None + case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + + case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { + case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { + case scala.util.Success(response) => + // the resource manager is available and answered + self ! response + case scala.util.Failure(t) => + // slow or unreachable resource manager, register anyway and let the rm reconnect + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + self ! decorateMessage(new DisconnectResourceManager(rm)) + }(context.dispatcher) + + case None => + log.info("Task Manager Registration but not connected to ResourceManager") + // ResourceManager not yet available + // sending task manager information later upon ResourceManager registration + self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) + } + + case msg: RegisterResourceSuccessful => + + val originalMsg = msg.getRegistrationMessage + val taskManager = msg.getTaskManager + + // ResourceManager knows about the resource, now let's try to register TaskManager if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId - // IMPORTANT: Send the response to the "sender", which is not the - // TaskManager actor, but the ask future! - sender() ! decorateMessage( + taskManager ! decorateMessage( AlreadyRegistered( instanceID, - libraryCacheManager.getBlobServerPort) - ) - } - else { + libraryCacheManager.getBlobServerPort)) + } else { try { val instanceID = instanceManager.registerTaskManager( taskManager, - connectionInfo, - hardwareInformation, - numberOfSlots, + originalMsg.resourceId, + originalMsg.connectionInfo, + originalMsg.resources, + originalMsg.numberOfSlots, leaderSessionID.orNull) - // IMPORTANT: Send the response to the "sender", which is not the - // TaskManager actor, but the ask future! - sender() ! decorateMessage( - AcknowledgeRegistration( - instanceID, - libraryCacheManager.getBlobServerPort) - ) + taskManager ! decorateMessage( + AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)) // to be notified when the taskManager is no longer reachable context.watch(taskManager) - } - catch { + } catch { // registerTaskManager throws an IllegalStateException if it is already shut down // let the actor crash and restart itself in this case case e: Exception => log.error("Failed to register TaskManager at instance manager", e) - // IMPORTANT: Send the response to the "sender", which is not the - // TaskManager actor, but the ask future! - sender() ! decorateMessage( + taskManager ! decorateMessage( RefuseRegistration( - ExceptionUtils.stringifyException(e)) - ) + ExceptionUtils.stringifyException(e))) } } + case msg: RegisterResourceFailed => + + val taskManager = msg.getTaskManager + val resourceId = msg.getResourceID + log.warn(s"TaskManager's resource id $resourceId is not registered with ResourceManager. " + + s"Refusing registration.") + + taskManager ! decorateMessage( + RefuseRegistration( + ExceptionUtils.stringifyException(new IllegalStateException( --- End diff -- What do you mean with different class loaders? I think it should be perfectly fine to have a ``` case class RefuseRegistration(t: Throwable) ``` because all `Throwables` inserted here should be known to the system class loader. > ResourceManager runtime components > ---------------------------------- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager > Affects Versions: 1.1.0 > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)