Github user hsaputra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/328#discussion_r23485615
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
    @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
       }
     
       private def tryJobManagerRegistration(): Unit = {
    -    registrationAttempts = 0
    -    import context.dispatcher
    -    registrationScheduler = Some(context.system.scheduler.schedule(
    -      TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
    -      self, RegisterAtJobManager))
    +    registrationDuration = 0 seconds
    +
    +    registered = false
    +
    +    context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
       }
     
       override def receiveWithLogMessages: Receive = {
         case RegisterAtJobManager => {
    -      registrationAttempts += 1
    +      if(!registered) {
    +        registrationDuration += registrationDelay
    +        // double delay for exponential backoff
    +        registrationDelay *= 2
     
    -      if (registered) {
    -        registrationScheduler.foreach(_.cancel())
    -      }
    -      else if (registrationAttempts <= 
TaskManager.MAX_REGISTRATION_ATTEMPTS) {
    +        if (registrationDuration > maxRegistrationDuration) {
    +          log.warning("TaskManager could not register at JobManager {} 
after {}.", jobManagerAkkaURL,
     
    -        log.info("Try to register at master {}. Attempt #{}", 
jobManagerAkkaURL,
    -          registrationAttempts)
    -        val jobManager = context.actorSelection(jobManagerAkkaURL)
    +            maxRegistrationDuration)
     
    -        jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
    -      }
    -      else {
    -        log.error("TaskManager could not register at JobManager.");
    -        self ! PoisonPill
    +          self ! PoisonPill
    +        } else if (!registered) {
    +          log.info(s"Try to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}. " +
    +            s"Attempt")
    +          val jobManager = context.actorSelection(jobManagerAkkaURL)
    +
    +          jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
    +
    +          context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
    +        }
           }
         }
     
         case AcknowledgeRegistration(id, blobPort) => {
    -      if (!registered) {
    +      if(!registered) {
    +        finishRegistration(id, blobPort)
             registered = true
    -        currentJobManager = sender
    -        instanceID = id
    -
    -        context.watch(currentJobManager)
    -
    -        log.info("TaskManager successfully registered at JobManager {}.",
    -          currentJobManager.path.toString)
    -
    -        setupNetworkEnvironment()
    -        setupLibraryCacheManager(blobPort)
    +      } else {
    +        if (log.isDebugEnabled) {
    +          log.debug("The TaskManager {} is already registered at the 
JobManager {}, but received " +
    +            "another AcknowledgeRegistration message.", self.path, 
currentJobManager.path)
    +        }
    +      }
    +    }
     
    -        heartbeatScheduler = Some(context.system.scheduler.schedule(
    -          TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, 
self, SendHeartbeat))
    +    case AlreadyRegistered(id, blobPort) =>
    +      if(!registered) {
    +        log.warning("The TaskManager {} seems to be already registered at 
the JobManager {} even" +
    +          "though it has not yet finished the registration process.", 
self.path, sender.path)
     
    -        profiler foreach {
    -          _.tell(RegisterProfilingListener, 
JobManager.getProfiler(currentJobManager))
    +        finishRegistration(id, blobPort)
    +        registered = true
    +      } else {
    +        // ignore AlreadyRegistered messages which arrived after 
AcknowledgeRegistration
    +        if(log.isDebugEnabled){
    +          log.debug("The TaskManager {} has already been registered at the 
JobManager {}.",
    +            self.path, sender.path)
             }
    +      }
     
    -        for (listener <- waitForRegistration) {
    -          listener ! RegisteredAtJobManager
    -        }
    +    case RefuseRegistration(reason) =>
    +      if(!registered) {
    +        log.error("The registration of task manager {} was refused by the 
job manager {} " +
    +          "because {}.", self.path, jobManagerAkkaURL, reason)
     
    -        waitForRegistration.clear()
    +        // Shut task manager down
    +        self ! PoisonPill
    +      } else {
    +        // ignore RefuseRegistration messages which arrived after 
AcknowledgeRegistration
    +        if(log.isDebugEnabled) {
    --- End diff --
    
    This is probably should be log info? Or will it be too noisy? I would 
assume not many will happen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to