Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1741#discussion_r56855460
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
    @@ -312,59 +323,125 @@ class JobManager(
     
           leaderSessionID = None
     
    -    case RegisterTaskManager(
    -      connectionInfo,
    -      hardwareInformation,
    -      numberOfSlots) =>
    +    case msg: RegisterResourceManager =>
    +      log.debug(s"Resource manager registration: $msg")
    +
    +      // ditch current resource manager (if any)
    +      currentResourceManager = Option(msg.resourceManager())
    +
    +      val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
    +        instance => instance.getResourceId).toList.asJava
    +
    +      // confirm registration and send known task managers with their 
resource ids
    +      sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
    +
    +    case msg: DisconnectResourceManager =>
    +      log.debug(s"Resource manager disconnect: $msg")
    +
    +      currentResourceManager match {
    +        case Some(rm) if rm.equals(msg.resourceManager()) =>
    +          // we should ditch the current resource manager
    +          log.debug(s"Disconnecting resource manager $rm.")
    +          // send the old one a disconnect message
    +          rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
    +          currentResourceManager = None
    +        case None =>
    +          // not connected, thus ignoring this message
    +          log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
    +      }
    +
    +    case msg @ RegisterTaskManager(
    +          resourceId,
    +          connectionInfo,
    +          hardwareInformation,
    +          numberOfSlots) =>
    +      // we are being informed by the ResourceManager that a new task 
manager is available
    +      log.debug(s"RegisterTaskManager: $msg")
     
           val taskManager = sender()
     
    +      currentResourceManager match {
    +        case Some(rm) =>
    +          val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
    +          future.onComplete {
    +            case scala.util.Success(response) =>
    +              // the resource manager is available and answered
    +              self ! response
    +            case scala.util.Failure(t) =>
    +              // slow or unreachable resource manager, register anyway and 
let the rm reconnect
    +              self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
    +              self ! decorateMessage(new DisconnectResourceManager(rm))
    +          }(context.dispatcher)
    +
    +        case None =>
    +          log.info("Task Manager Registration but not connected to 
ResourceManager")
    +          // ResourceManager not yet available
    +          // sending task manager information later upon ResourceManager 
registration
    +          self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
    +      }
    +
    +    case msg: RegisterResourceSuccessful =>
    +
    +      val originalMsg = msg.getRegistrationMessage
    +      val taskManager = msg.getTaskManager
    +
    +      // ResourceManager knows about the resource, now let's try to 
register TaskManager
           if (instanceManager.isRegistered(taskManager)) {
             val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
     
    -        // IMPORTANT: Send the response to the "sender", which is not the
    -        //            TaskManager actor, but the ask future!
    -        sender() ! decorateMessage(
    +        taskManager ! decorateMessage(
               AlreadyRegistered(
                 instanceID,
    -            libraryCacheManager.getBlobServerPort)
    -        )
    -      }
    -      else {
    +            libraryCacheManager.getBlobServerPort))
    +      } else {
             try {
               val instanceID = instanceManager.registerTaskManager(
                 taskManager,
    -            connectionInfo,
    -            hardwareInformation,
    -            numberOfSlots,
    +            originalMsg.resourceId,
    +            originalMsg.connectionInfo,
    +            originalMsg.resources,
    +            originalMsg.numberOfSlots,
                 leaderSessionID.orNull)
     
    -          // IMPORTANT: Send the response to the "sender", which is not the
    -          //            TaskManager actor, but the ask future!
    -          sender() ! decorateMessage(
    -            AcknowledgeRegistration(
    -              instanceID,
    -              libraryCacheManager.getBlobServerPort)
    -          )
    +          taskManager ! decorateMessage(
    +            AcknowledgeRegistration(instanceID, 
libraryCacheManager.getBlobServerPort))
     
               // to be notified when the taskManager is no longer reachable
               context.watch(taskManager)
    -        }
    -        catch {
    +        } catch {
               // registerTaskManager throws an IllegalStateException if it is 
already shut down
               // let the actor crash and restart itself in this case
               case e: Exception =>
                 log.error("Failed to register TaskManager at instance 
manager", e)
     
    -            // IMPORTANT: Send the response to the "sender", which is not 
the
    -            //            TaskManager actor, but the ask future!
    -            sender() ! decorateMessage(
    +            taskManager ! decorateMessage(
                   RefuseRegistration(
    -                ExceptionUtils.stringifyException(e))
    -            )
    +                ExceptionUtils.stringifyException(e)))
             }
           }
     
    +    case msg: RegisterResourceFailed =>
    +
    +      val taskManager = msg.getTaskManager
    +      val resourceId = msg.getResourceID
    +      log.warn(s"TaskManager's resource id $resourceId is not registered 
with ResourceManager. " +
    +        s"Refusing registration.")
    +
    +      taskManager ! decorateMessage(
    +        RefuseRegistration(
    +          ExceptionUtils.stringifyException(new IllegalStateException(
    --- End diff --
    
    Can we change `RefuseRegistration` such that we can simply give a 
`Throwable` as parameter instead of using this stringifyException tool?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to