[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204706#comment-15204706
 ] 

ASF GitHub Bot commented on FLINK-3544:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1741#discussion_r56863204
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
    @@ -312,59 +323,125 @@ class JobManager(
     
           leaderSessionID = None
     
    -    case RegisterTaskManager(
    -      connectionInfo,
    -      hardwareInformation,
    -      numberOfSlots) =>
    +    case msg: RegisterResourceManager =>
    +      log.debug(s"Resource manager registration: $msg")
    +
    +      // ditch current resource manager (if any)
    +      currentResourceManager = Option(msg.resourceManager())
    +
    +      val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
    +        instance => instance.getResourceId).toList.asJava
    +
    +      // confirm registration and send known task managers with their 
resource ids
    +      sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
    +
    +    case msg: DisconnectResourceManager =>
    +      log.debug(s"Resource manager disconnect: $msg")
    +
    +      currentResourceManager match {
    +        case Some(rm) if rm.equals(msg.resourceManager()) =>
    +          // we should ditch the current resource manager
    +          log.debug(s"Disconnecting resource manager $rm.")
    +          // send the old one a disconnect message
    +          rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
    +          currentResourceManager = None
    +        case None =>
    +          // not connected, thus ignoring this message
    +          log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
    +      }
    +
    +    case msg @ RegisterTaskManager(
    +          resourceId,
    +          connectionInfo,
    +          hardwareInformation,
    +          numberOfSlots) =>
    +      // we are being informed by the ResourceManager that a new task 
manager is available
    +      log.debug(s"RegisterTaskManager: $msg")
     
           val taskManager = sender()
     
    +      currentResourceManager match {
    +        case Some(rm) =>
    +          val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
    +          future.onComplete {
    +            case scala.util.Success(response) =>
    +              // the resource manager is available and answered
    +              self ! response
    +            case scala.util.Failure(t) =>
    +              // slow or unreachable resource manager, register anyway and 
let the rm reconnect
    +              self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
    +              self ! decorateMessage(new DisconnectResourceManager(rm))
    +          }(context.dispatcher)
    +
    +        case None =>
    +          log.info("Task Manager Registration but not connected to 
ResourceManager")
    +          // ResourceManager not yet available
    +          // sending task manager information later upon ResourceManager 
registration
    +          self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
    +      }
    +
    +    case msg: RegisterResourceSuccessful =>
    +
    +      val originalMsg = msg.getRegistrationMessage
    +      val taskManager = msg.getTaskManager
    +
    +      // ResourceManager knows about the resource, now let's try to 
register TaskManager
           if (instanceManager.isRegistered(taskManager)) {
             val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
     
    -        // IMPORTANT: Send the response to the "sender", which is not the
    -        //            TaskManager actor, but the ask future!
    -        sender() ! decorateMessage(
    +        taskManager ! decorateMessage(
               AlreadyRegistered(
                 instanceID,
    -            libraryCacheManager.getBlobServerPort)
    -        )
    -      }
    -      else {
    +            libraryCacheManager.getBlobServerPort))
    +      } else {
             try {
               val instanceID = instanceManager.registerTaskManager(
                 taskManager,
    -            connectionInfo,
    -            hardwareInformation,
    -            numberOfSlots,
    +            originalMsg.resourceId,
    +            originalMsg.connectionInfo,
    +            originalMsg.resources,
    +            originalMsg.numberOfSlots,
                 leaderSessionID.orNull)
     
    -          // IMPORTANT: Send the response to the "sender", which is not the
    -          //            TaskManager actor, but the ask future!
    -          sender() ! decorateMessage(
    -            AcknowledgeRegistration(
    -              instanceID,
    -              libraryCacheManager.getBlobServerPort)
    -          )
    +          taskManager ! decorateMessage(
    +            AcknowledgeRegistration(instanceID, 
libraryCacheManager.getBlobServerPort))
     
               // to be notified when the taskManager is no longer reachable
               context.watch(taskManager)
    -        }
    -        catch {
    +        } catch {
               // registerTaskManager throws an IllegalStateException if it is 
already shut down
               // let the actor crash and restart itself in this case
               case e: Exception =>
                 log.error("Failed to register TaskManager at instance 
manager", e)
     
    -            // IMPORTANT: Send the response to the "sender", which is not 
the
    -            //            TaskManager actor, but the ask future!
    -            sender() ! decorateMessage(
    +            taskManager ! decorateMessage(
                   RefuseRegistration(
    -                ExceptionUtils.stringifyException(e))
    -            )
    +                ExceptionUtils.stringifyException(e)))
             }
           }
     
    +    case msg: RegisterResourceFailed =>
    +
    +      val taskManager = msg.getTaskManager
    +      val resourceId = msg.getResourceID
    +      log.warn(s"TaskManager's resource id $resourceId is not registered 
with ResourceManager. " +
    +        s"Refusing registration.")
    +
    +      taskManager ! decorateMessage(
    +        RefuseRegistration(
    +          ExceptionUtils.stringifyException(new IllegalStateException(
    --- End diff --
    
    Sure. You want to stringify it in the `RefuseRegistration` constructor, 
right?


> ResourceManager runtime components
> ----------------------------------
>
>                 Key: FLINK-3544
>                 URL: https://issues.apache.org/jira/browse/FLINK-3544
>             Project: Flink
>          Issue Type: Sub-task
>          Components: ResourceManager
>    Affects Versions: 1.1.0
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to