[ 
https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14291708#comment-14291708
 ] 

ASF GitHub Bot commented on FLINK-1352:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/328#discussion_r23523674
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
    @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
       }
     
       private def tryJobManagerRegistration(): Unit = {
    -    registrationAttempts = 0
    -    import context.dispatcher
    -    registrationScheduler = Some(context.system.scheduler.schedule(
    -      TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
    -      self, RegisterAtJobManager))
    +    registrationDuration = 0 seconds
    +
    +    registered = false
    +
    +    context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
       }
     
       override def receiveWithLogMessages: Receive = {
         case RegisterAtJobManager => {
    -      registrationAttempts += 1
    +      if(!registered) {
    +        registrationDuration += registrationDelay
    +        // double delay for exponential backoff
    +        registrationDelay *= 2
     
    -      if (registered) {
    -        registrationScheduler.foreach(_.cancel())
    -      }
    -      else if (registrationAttempts <= 
TaskManager.MAX_REGISTRATION_ATTEMPTS) {
    +        if (registrationDuration > maxRegistrationDuration) {
    +          log.warning("TaskManager could not register at JobManager {} 
after {}.", jobManagerAkkaURL,
     
    -        log.info("Try to register at master {}. Attempt #{}", 
jobManagerAkkaURL,
    -          registrationAttempts)
    -        val jobManager = context.actorSelection(jobManagerAkkaURL)
    +            maxRegistrationDuration)
     
    -        jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
    -      }
    -      else {
    -        log.error("TaskManager could not register at JobManager.");
    -        self ! PoisonPill
    +          self ! PoisonPill
    +        } else if (!registered) {
    +          log.info(s"Try to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}. " +
    +            s"Attempt")
    +          val jobManager = context.actorSelection(jobManagerAkkaURL)
    +
    +          jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
    +
    +          context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
    +        }
           }
         }
     
         case AcknowledgeRegistration(id, blobPort) => {
    -      if (!registered) {
    +      if(!registered) {
    +        finishRegistration(id, blobPort)
             registered = true
    -        currentJobManager = sender
    -        instanceID = id
    -
    -        context.watch(currentJobManager)
    -
    -        log.info("TaskManager successfully registered at JobManager {}.",
    -          currentJobManager.path.toString)
    -
    -        setupNetworkEnvironment()
    -        setupLibraryCacheManager(blobPort)
    +      } else {
    +        if (log.isDebugEnabled) {
    +          log.debug("The TaskManager {} is already registered at the 
JobManager {}, but received " +
    +            "another AcknowledgeRegistration message.", self.path, 
currentJobManager.path)
    +        }
    +      }
    +    }
     
    -        heartbeatScheduler = Some(context.system.scheduler.schedule(
    -          TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, 
self, SendHeartbeat))
    +    case AlreadyRegistered(id, blobPort) =>
    +      if(!registered) {
    +        log.warning("The TaskManager {} seems to be already registered at 
the JobManager {} even" +
    +          "though it has not yet finished the registration process.", 
self.path, sender.path)
     
    -        profiler foreach {
    -          _.tell(RegisterProfilingListener, 
JobManager.getProfiler(currentJobManager))
    +        finishRegistration(id, blobPort)
    +        registered = true
    +      } else {
    +        // ignore AlreadyRegistered messages which arrived after 
AcknowledgeRegistration
    +        if(log.isDebugEnabled){
    +          log.debug("The TaskManager {} has already been registered at the 
JobManager {}.",
    +            self.path, sender.path)
             }
    +      }
     
    -        for (listener <- waitForRegistration) {
    -          listener ! RegisteredAtJobManager
    -        }
    +    case RefuseRegistration(reason) =>
    +      if(!registered) {
    +        log.error("The registration of task manager {} was refused by the 
job manager {} " +
    +          "because {}.", self.path, jobManagerAkkaURL, reason)
     
    -        waitForRegistration.clear()
    +        // Shut task manager down
    +        self ! PoisonPill
    +      } else {
    +        // ignore RefuseRegistration messages which arrived after 
AcknowledgeRegistration
    +        if(log.isDebugEnabled) {
    --- End diff --
    
    I was wondering whether this gives us any valuable information for 
bug-tracking purposes. You're right that it should not happen too often and 
thus it won't probably hurt too much. 


> Buggy registration from TaskManager to JobManager
> -------------------------------------------------
>
>                 Key: FLINK-1352
>                 URL: https://issues.apache.org/jira/browse/FLINK-1352
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager, TaskManager
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Till Rohrmann
>             Fix For: 0.9
>
>
> The JobManager's InstanceManager may refuse the registration attempt from a 
> TaskManager, because it has this taskmanager already connected, or,in the 
> future, because the TaskManager has been blacklisted as unreliable.
> Unpon refused registration, the instance ID is null, to signal that refused 
> registration. TaskManager reacts incorrectly to such methods, assuming 
> successful registration
> Possible solution: JobManager sends back a dedicated "RegistrationRefused" 
> message, if the instance manager returns null as the registration result. If 
> the TastManager receives that before being registered, it knows that the 
> registration response was lost (which should not happen on TCP and it would 
> indicate a corrupt connection)
> Followup question: Does it make sense to have the TaskManager trying 
> indefinitely to connect to the JobManager. With increasing interval (from 
> seconds to minutes)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to