[ https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14291703#comment-14291703 ]
ASF GitHub Bot commented on FLINK-1352: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23523589 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { - registrationAttempts = 0 - import context.dispatcher - registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) + registrationDuration = 0 seconds + + registered = false + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager => { - registrationAttempts += 1 + if(!registered) { + registrationDuration += registrationDelay + // double delay for exponential backoff + registrationDelay *= 2 - if (registered) { - registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) { + if (registrationDuration > maxRegistrationDuration) { + log.warning("TaskManager could not register at JobManager {} after {}.", jobManagerAkkaURL, - log.info("Try to register at master {}. Attempt #{}", jobManagerAkkaURL, - registrationAttempts) - val jobManager = context.actorSelection(jobManagerAkkaURL) + maxRegistrationDuration) - jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { - log.error("TaskManager could not register at JobManager."); - self ! PoisonPill + self ! PoisonPill + } else if (!registered) { + log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " + + s"Attempt") + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) + } } } case AcknowledgeRegistration(id, blobPort) => { - if (!registered) { + if(!registered) { + finishRegistration(id, blobPort) registered = true - currentJobManager = sender - instanceID = id - - context.watch(currentJobManager) - - log.info("TaskManager successfully registered at JobManager {}.", - currentJobManager.path.toString) - - setupNetworkEnvironment() - setupLibraryCacheManager(blobPort) + } else { + if (log.isDebugEnabled) { --- End diff -- You're right Henry, I'll remove it. > Buggy registration from TaskManager to JobManager > ------------------------------------------------- > > Key: FLINK-1352 > URL: https://issues.apache.org/jira/browse/FLINK-1352 > Project: Flink > Issue Type: Bug > Components: JobManager, TaskManager > Affects Versions: 0.9 > Reporter: Stephan Ewen > Assignee: Till Rohrmann > Fix For: 0.9 > > > The JobManager's InstanceManager may refuse the registration attempt from a > TaskManager, because it has this taskmanager already connected, or,in the > future, because the TaskManager has been blacklisted as unreliable. > Unpon refused registration, the instance ID is null, to signal that refused > registration. TaskManager reacts incorrectly to such methods, assuming > successful registration > Possible solution: JobManager sends back a dedicated "RegistrationRefused" > message, if the instance manager returns null as the registration result. If > the TastManager receives that before being registered, it knows that the > registration response was lost (which should not happen on TCP and it would > indicate a corrupt connection) > Followup question: Does it make sense to have the TaskManager trying > indefinitely to connect to the JobManager. With increasing interval (from > seconds to minutes)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)