[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-71476676 I updated the PR with the exponential backoff registration strategy. On the way, I fixed the flakey RecoveryIT case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-71596308 Looks good to me +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23523674 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { -registrationAttempts = 0 -import context.dispatcher -registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) +registrationDuration = 0 seconds + +registered = false + +context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager = { - registrationAttempts += 1 + if(!registered) { +registrationDuration += registrationDelay +// double delay for exponential backoff +registrationDelay *= 2 - if (registered) { -registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts = TaskManager.MAX_REGISTRATION_ATTEMPTS) { +if (registrationDuration maxRegistrationDuration) { + log.warning(TaskManager could not register at JobManager {} after {}., jobManagerAkkaURL, -log.info(Try to register at master {}. Attempt #{}, jobManagerAkkaURL, - registrationAttempts) -val jobManager = context.actorSelection(jobManagerAkkaURL) +maxRegistrationDuration) -jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { -log.error(TaskManager could not register at JobManager.); -self ! PoisonPill + self ! PoisonPill +} else if (!registered) { + log.info(sTry to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. + +sAttempt) + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) +} } } case AcknowledgeRegistration(id, blobPort) = { - if (!registered) { + if(!registered) { +finishRegistration(id, blobPort) registered = true -currentJobManager = sender -instanceID = id - -context.watch(currentJobManager) - -log.info(TaskManager successfully registered at JobManager {}., - currentJobManager.path.toString) - -setupNetworkEnvironment() -setupLibraryCacheManager(blobPort) + } else { +if (log.isDebugEnabled) { + log.debug(The TaskManager {} is already registered at the JobManager {}, but received + +another AcknowledgeRegistration message., self.path, currentJobManager.path) +} + } +} -heartbeatScheduler = Some(context.system.scheduler.schedule( - TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)) +case AlreadyRegistered(id, blobPort) = + if(!registered) { +log.warning(The TaskManager {} seems to be already registered at the JobManager {} even + + though it has not yet finished the registration process., self.path, sender.path) -profiler foreach { - _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager)) +finishRegistration(id, blobPort) +registered = true + } else { +// ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration +if(log.isDebugEnabled){ + log.debug(The TaskManager {} has already been registered at the JobManager {}., +self.path, sender.path) } + } -for (listener - waitForRegistration) { - listener ! RegisteredAtJobManager -} +case RefuseRegistration(reason) = + if(!registered) { +log.error(The registration of task manager {} was refused by the job manager {} + + because {}., self.path, jobManagerAkkaURL, reason) -waitForRegistration.clear() +// Shut task manager down +self ! PoisonPill + } else { +// ignore RefuseRegistration messages which arrived after AcknowledgeRegistration +if(log.isDebugEnabled) { --- End diff -- I was
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23523589 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { -registrationAttempts = 0 -import context.dispatcher -registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) +registrationDuration = 0 seconds + +registered = false + +context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager = { - registrationAttempts += 1 + if(!registered) { +registrationDuration += registrationDelay +// double delay for exponential backoff +registrationDelay *= 2 - if (registered) { -registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts = TaskManager.MAX_REGISTRATION_ATTEMPTS) { +if (registrationDuration maxRegistrationDuration) { + log.warning(TaskManager could not register at JobManager {} after {}., jobManagerAkkaURL, -log.info(Try to register at master {}. Attempt #{}, jobManagerAkkaURL, - registrationAttempts) -val jobManager = context.actorSelection(jobManagerAkkaURL) +maxRegistrationDuration) -jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { -log.error(TaskManager could not register at JobManager.); -self ! PoisonPill + self ! PoisonPill +} else if (!registered) { + log.info(sTry to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. + +sAttempt) + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) +} } } case AcknowledgeRegistration(id, blobPort) = { - if (!registered) { + if(!registered) { +finishRegistration(id, blobPort) registered = true -currentJobManager = sender -instanceID = id - -context.watch(currentJobManager) - -log.info(TaskManager successfully registered at JobManager {}., - currentJobManager.path.toString) - -setupNetworkEnvironment() -setupLibraryCacheManager(blobPort) + } else { +if (log.isDebugEnabled) { --- End diff -- You're right Henry, I'll remove it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23485615 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { -registrationAttempts = 0 -import context.dispatcher -registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) +registrationDuration = 0 seconds + +registered = false + +context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager = { - registrationAttempts += 1 + if(!registered) { +registrationDuration += registrationDelay +// double delay for exponential backoff +registrationDelay *= 2 - if (registered) { -registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts = TaskManager.MAX_REGISTRATION_ATTEMPTS) { +if (registrationDuration maxRegistrationDuration) { + log.warning(TaskManager could not register at JobManager {} after {}., jobManagerAkkaURL, -log.info(Try to register at master {}. Attempt #{}, jobManagerAkkaURL, - registrationAttempts) -val jobManager = context.actorSelection(jobManagerAkkaURL) +maxRegistrationDuration) -jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { -log.error(TaskManager could not register at JobManager.); -self ! PoisonPill + self ! PoisonPill +} else if (!registered) { + log.info(sTry to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. + +sAttempt) + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) +} } } case AcknowledgeRegistration(id, blobPort) = { - if (!registered) { + if(!registered) { +finishRegistration(id, blobPort) registered = true -currentJobManager = sender -instanceID = id - -context.watch(currentJobManager) - -log.info(TaskManager successfully registered at JobManager {}., - currentJobManager.path.toString) - -setupNetworkEnvironment() -setupLibraryCacheManager(blobPort) + } else { +if (log.isDebugEnabled) { + log.debug(The TaskManager {} is already registered at the JobManager {}, but received + +another AcknowledgeRegistration message., self.path, currentJobManager.path) +} + } +} -heartbeatScheduler = Some(context.system.scheduler.schedule( - TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)) +case AlreadyRegistered(id, blobPort) = + if(!registered) { +log.warning(The TaskManager {} seems to be already registered at the JobManager {} even + + though it has not yet finished the registration process., self.path, sender.path) -profiler foreach { - _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager)) +finishRegistration(id, blobPort) +registered = true + } else { +// ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration +if(log.isDebugEnabled){ + log.debug(The TaskManager {} has already been registered at the JobManager {}., +self.path, sender.path) } + } -for (listener - waitForRegistration) { - listener ! RegisteredAtJobManager -} +case RefuseRegistration(reason) = + if(!registered) { +log.error(The registration of task manager {} was refused by the job manager {} + + because {}., self.path, jobManagerAkkaURL, reason) -waitForRegistration.clear() +// Shut task manager down +self ! PoisonPill + } else { +// ignore RefuseRegistration messages which arrived after AcknowledgeRegistration +if(log.isDebugEnabled) { --- End diff -- This is
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23485570 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { -registrationAttempts = 0 -import context.dispatcher -registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) +registrationDuration = 0 seconds + +registered = false + +context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager = { - registrationAttempts += 1 + if(!registered) { +registrationDuration += registrationDelay +// double delay for exponential backoff +registrationDelay *= 2 - if (registered) { -registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts = TaskManager.MAX_REGISTRATION_ATTEMPTS) { +if (registrationDuration maxRegistrationDuration) { + log.warning(TaskManager could not register at JobManager {} after {}., jobManagerAkkaURL, -log.info(Try to register at master {}. Attempt #{}, jobManagerAkkaURL, - registrationAttempts) -val jobManager = context.actorSelection(jobManagerAkkaURL) +maxRegistrationDuration) -jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { -log.error(TaskManager could not register at JobManager.); -self ! PoisonPill + self ! PoisonPill +} else if (!registered) { + log.info(sTry to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. + +sAttempt) + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) +} } } case AcknowledgeRegistration(id, blobPort) = { - if (!registered) { + if(!registered) { +finishRegistration(id, blobPort) registered = true -currentJobManager = sender -instanceID = id - -context.watch(currentJobManager) - -log.info(TaskManager successfully registered at JobManager {}., - currentJobManager.path.toString) - -setupNetworkEnvironment() -setupLibraryCacheManager(blobPort) + } else { +if (log.isDebugEnabled) { --- End diff -- Small nit, with slf4j formatting we do not need to check isDebugEnabled anymore because it uses parameterized messages feature that check for it before materialize the string. It will the keep the code cleaner =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-71002640 You are right @hsaputra, because I'm not sure which approach is the best. In the corresponding JIRA issue I have tried to give a summary of what I think are the pros and cons of indefinitely many registration tries vs. a limited number of tries and a constant pause in between tries vs. an increasing pause. Indefinitely many registration tries: Pros: If the JobManager becomes available at some point in time, then the TaskManager will definitely connect to it Cons: If the JobManager dies of some reason, then the TaskManager will linger around for all eternity or until it is stopped manually Limited number of tries: Pros: Will terminate itself after some time Cons: The time interval might be too short for the JobManager to get started Constant pause: Pros: Relatively quick response time Cons: Causing network traffic until the JobManager has been started Increasing pause: Pros: Reduction of network traffic if the JobManager takes a little bit longer to start Cons: Might delay the registration process if one interval was just missed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-71148024 Thanks for the explation @tillrohrmann +1 for exponential backoff approach. We can have max retries and max delay for each try as configurable configuration properties. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user Humbedooh commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-70851722 Please ignore this, I am trying to get our JIRA integration working again, so I am posting a couple of bogus replies here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-70848126 Well, I wanted to get some feedback/discussion on the retry strategy before merging this PR. At the moment the strategy is a limited number of retries with a constant pause in between. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---