Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23485615 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { - registrationAttempts = 0 - import context.dispatcher - registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) + registrationDuration = 0 seconds + + registered = false + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager => { - registrationAttempts += 1 + if(!registered) { + registrationDuration += registrationDelay + // double delay for exponential backoff + registrationDelay *= 2 - if (registered) { - registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) { + if (registrationDuration > maxRegistrationDuration) { + log.warning("TaskManager could not register at JobManager {} after {}.", jobManagerAkkaURL, - log.info("Try to register at master {}. Attempt #{}", jobManagerAkkaURL, - registrationAttempts) - val jobManager = context.actorSelection(jobManagerAkkaURL) + maxRegistrationDuration) - jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { - log.error("TaskManager could not register at JobManager."); - self ! PoisonPill + self ! PoisonPill + } else if (!registered) { + log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " + + s"Attempt") + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) + } } } case AcknowledgeRegistration(id, blobPort) => { - if (!registered) { + if(!registered) { + finishRegistration(id, blobPort) registered = true - currentJobManager = sender - instanceID = id - - context.watch(currentJobManager) - - log.info("TaskManager successfully registered at JobManager {}.", - currentJobManager.path.toString) - - setupNetworkEnvironment() - setupLibraryCacheManager(blobPort) + } else { + if (log.isDebugEnabled) { + log.debug("The TaskManager {} is already registered at the JobManager {}, but received " + + "another AcknowledgeRegistration message.", self.path, currentJobManager.path) + } + } + } - heartbeatScheduler = Some(context.system.scheduler.schedule( - TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)) + case AlreadyRegistered(id, blobPort) => + if(!registered) { + log.warning("The TaskManager {} seems to be already registered at the JobManager {} even" + + "though it has not yet finished the registration process.", self.path, sender.path) - profiler foreach { - _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager)) + finishRegistration(id, blobPort) + registered = true + } else { + // ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration + if(log.isDebugEnabled){ + log.debug("The TaskManager {} has already been registered at the JobManager {}.", + self.path, sender.path) } + } - for (listener <- waitForRegistration) { - listener ! RegisteredAtJobManager - } + case RefuseRegistration(reason) => + if(!registered) { + log.error("The registration of task manager {} was refused by the job manager {} " + + "because {}.", self.path, jobManagerAkkaURL, reason) - waitForRegistration.clear() + // Shut task manager down + self ! PoisonPill + } else { + // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration + if(log.isDebugEnabled) { --- End diff -- This is probably should be log info? Or will it be too noisy? I would assume not many will happen.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---