[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...

2015-01-26 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-71476676
  
I updated the PR with the exponential backoff registration strategy. On the 
way, I fixed the flakey RecoveryIT case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...

2015-01-26 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-71596308
  
Looks good to me

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...

2015-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/328#discussion_r23523674
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
   }
 
   private def tryJobManagerRegistration(): Unit = {
-registrationAttempts = 0
-import context.dispatcher
-registrationScheduler = Some(context.system.scheduler.schedule(
-  TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
-  self, RegisterAtJobManager))
+registrationDuration = 0 seconds
+
+registered = false
+
+context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
   }
 
   override def receiveWithLogMessages: Receive = {
 case RegisterAtJobManager = {
-  registrationAttempts += 1
+  if(!registered) {
+registrationDuration += registrationDelay
+// double delay for exponential backoff
+registrationDelay *= 2
 
-  if (registered) {
-registrationScheduler.foreach(_.cancel())
-  }
-  else if (registrationAttempts = 
TaskManager.MAX_REGISTRATION_ATTEMPTS) {
+if (registrationDuration  maxRegistrationDuration) {
+  log.warning(TaskManager could not register at JobManager {} 
after {}., jobManagerAkkaURL,
 
-log.info(Try to register at master {}. Attempt #{}, 
jobManagerAkkaURL,
-  registrationAttempts)
-val jobManager = context.actorSelection(jobManagerAkkaURL)
+maxRegistrationDuration)
 
-jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
-  }
-  else {
-log.error(TaskManager could not register at JobManager.);
-self ! PoisonPill
+  self ! PoisonPill
+} else if (!registered) {
+  log.info(sTry to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}.  +
+sAttempt)
+  val jobManager = context.actorSelection(jobManagerAkkaURL)
+
+  jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
+
+  context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
+}
   }
 }
 
 case AcknowledgeRegistration(id, blobPort) = {
-  if (!registered) {
+  if(!registered) {
+finishRegistration(id, blobPort)
 registered = true
-currentJobManager = sender
-instanceID = id
-
-context.watch(currentJobManager)
-
-log.info(TaskManager successfully registered at JobManager {}.,
-  currentJobManager.path.toString)
-
-setupNetworkEnvironment()
-setupLibraryCacheManager(blobPort)
+  } else {
+if (log.isDebugEnabled) {
+  log.debug(The TaskManager {} is already registered at the 
JobManager {}, but received  +
+another AcknowledgeRegistration message., self.path, 
currentJobManager.path)
+}
+  }
+}
 
-heartbeatScheduler = Some(context.system.scheduler.schedule(
-  TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, 
self, SendHeartbeat))
+case AlreadyRegistered(id, blobPort) =
+  if(!registered) {
+log.warning(The TaskManager {} seems to be already registered at 
the JobManager {} even +
+  though it has not yet finished the registration process., 
self.path, sender.path)
 
-profiler foreach {
-  _.tell(RegisterProfilingListener, 
JobManager.getProfiler(currentJobManager))
+finishRegistration(id, blobPort)
+registered = true
+  } else {
+// ignore AlreadyRegistered messages which arrived after 
AcknowledgeRegistration
+if(log.isDebugEnabled){
+  log.debug(The TaskManager {} has already been registered at the 
JobManager {}.,
+self.path, sender.path)
 }
+  }
 
-for (listener - waitForRegistration) {
-  listener ! RegisteredAtJobManager
-}
+case RefuseRegistration(reason) =
+  if(!registered) {
+log.error(The registration of task manager {} was refused by the 
job manager {}  +
+  because {}., self.path, jobManagerAkkaURL, reason)
 
-waitForRegistration.clear()
+// Shut task manager down
+self ! PoisonPill
+  } else {
+// ignore RefuseRegistration messages which arrived after 
AcknowledgeRegistration
+if(log.isDebugEnabled) {
--- End diff --

I was 

[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...

2015-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/328#discussion_r23523589
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
   }
 
   private def tryJobManagerRegistration(): Unit = {
-registrationAttempts = 0
-import context.dispatcher
-registrationScheduler = Some(context.system.scheduler.schedule(
-  TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
-  self, RegisterAtJobManager))
+registrationDuration = 0 seconds
+
+registered = false
+
+context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
   }
 
   override def receiveWithLogMessages: Receive = {
 case RegisterAtJobManager = {
-  registrationAttempts += 1
+  if(!registered) {
+registrationDuration += registrationDelay
+// double delay for exponential backoff
+registrationDelay *= 2
 
-  if (registered) {
-registrationScheduler.foreach(_.cancel())
-  }
-  else if (registrationAttempts = 
TaskManager.MAX_REGISTRATION_ATTEMPTS) {
+if (registrationDuration  maxRegistrationDuration) {
+  log.warning(TaskManager could not register at JobManager {} 
after {}., jobManagerAkkaURL,
 
-log.info(Try to register at master {}. Attempt #{}, 
jobManagerAkkaURL,
-  registrationAttempts)
-val jobManager = context.actorSelection(jobManagerAkkaURL)
+maxRegistrationDuration)
 
-jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
-  }
-  else {
-log.error(TaskManager could not register at JobManager.);
-self ! PoisonPill
+  self ! PoisonPill
+} else if (!registered) {
+  log.info(sTry to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}.  +
+sAttempt)
+  val jobManager = context.actorSelection(jobManagerAkkaURL)
+
+  jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
+
+  context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
+}
   }
 }
 
 case AcknowledgeRegistration(id, blobPort) = {
-  if (!registered) {
+  if(!registered) {
+finishRegistration(id, blobPort)
 registered = true
-currentJobManager = sender
-instanceID = id
-
-context.watch(currentJobManager)
-
-log.info(TaskManager successfully registered at JobManager {}.,
-  currentJobManager.path.toString)
-
-setupNetworkEnvironment()
-setupLibraryCacheManager(blobPort)
+  } else {
+if (log.isDebugEnabled) {
--- End diff --

You're right Henry, I'll remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...

2015-01-23 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/328#discussion_r23485615
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
   }
 
   private def tryJobManagerRegistration(): Unit = {
-registrationAttempts = 0
-import context.dispatcher
-registrationScheduler = Some(context.system.scheduler.schedule(
-  TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
-  self, RegisterAtJobManager))
+registrationDuration = 0 seconds
+
+registered = false
+
+context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
   }
 
   override def receiveWithLogMessages: Receive = {
 case RegisterAtJobManager = {
-  registrationAttempts += 1
+  if(!registered) {
+registrationDuration += registrationDelay
+// double delay for exponential backoff
+registrationDelay *= 2
 
-  if (registered) {
-registrationScheduler.foreach(_.cancel())
-  }
-  else if (registrationAttempts = 
TaskManager.MAX_REGISTRATION_ATTEMPTS) {
+if (registrationDuration  maxRegistrationDuration) {
+  log.warning(TaskManager could not register at JobManager {} 
after {}., jobManagerAkkaURL,
 
-log.info(Try to register at master {}. Attempt #{}, 
jobManagerAkkaURL,
-  registrationAttempts)
-val jobManager = context.actorSelection(jobManagerAkkaURL)
+maxRegistrationDuration)
 
-jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
-  }
-  else {
-log.error(TaskManager could not register at JobManager.);
-self ! PoisonPill
+  self ! PoisonPill
+} else if (!registered) {
+  log.info(sTry to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}.  +
+sAttempt)
+  val jobManager = context.actorSelection(jobManagerAkkaURL)
+
+  jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
+
+  context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
+}
   }
 }
 
 case AcknowledgeRegistration(id, blobPort) = {
-  if (!registered) {
+  if(!registered) {
+finishRegistration(id, blobPort)
 registered = true
-currentJobManager = sender
-instanceID = id
-
-context.watch(currentJobManager)
-
-log.info(TaskManager successfully registered at JobManager {}.,
-  currentJobManager.path.toString)
-
-setupNetworkEnvironment()
-setupLibraryCacheManager(blobPort)
+  } else {
+if (log.isDebugEnabled) {
+  log.debug(The TaskManager {} is already registered at the 
JobManager {}, but received  +
+another AcknowledgeRegistration message., self.path, 
currentJobManager.path)
+}
+  }
+}
 
-heartbeatScheduler = Some(context.system.scheduler.schedule(
-  TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, 
self, SendHeartbeat))
+case AlreadyRegistered(id, blobPort) =
+  if(!registered) {
+log.warning(The TaskManager {} seems to be already registered at 
the JobManager {} even +
+  though it has not yet finished the registration process., 
self.path, sender.path)
 
-profiler foreach {
-  _.tell(RegisterProfilingListener, 
JobManager.getProfiler(currentJobManager))
+finishRegistration(id, blobPort)
+registered = true
+  } else {
+// ignore AlreadyRegistered messages which arrived after 
AcknowledgeRegistration
+if(log.isDebugEnabled){
+  log.debug(The TaskManager {} has already been registered at the 
JobManager {}.,
+self.path, sender.path)
 }
+  }
 
-for (listener - waitForRegistration) {
-  listener ! RegisteredAtJobManager
-}
+case RefuseRegistration(reason) =
+  if(!registered) {
+log.error(The registration of task manager {} was refused by the 
job manager {}  +
+  because {}., self.path, jobManagerAkkaURL, reason)
 
-waitForRegistration.clear()
+// Shut task manager down
+self ! PoisonPill
+  } else {
+// ignore RefuseRegistration messages which arrived after 
AcknowledgeRegistration
+if(log.isDebugEnabled) {
--- End diff --

This is 

[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...

2015-01-23 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/328#discussion_r23485570
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
   }
 
   private def tryJobManagerRegistration(): Unit = {
-registrationAttempts = 0
-import context.dispatcher
-registrationScheduler = Some(context.system.scheduler.schedule(
-  TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
-  self, RegisterAtJobManager))
+registrationDuration = 0 seconds
+
+registered = false
+
+context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
   }
 
   override def receiveWithLogMessages: Receive = {
 case RegisterAtJobManager = {
-  registrationAttempts += 1
+  if(!registered) {
+registrationDuration += registrationDelay
+// double delay for exponential backoff
+registrationDelay *= 2
 
-  if (registered) {
-registrationScheduler.foreach(_.cancel())
-  }
-  else if (registrationAttempts = 
TaskManager.MAX_REGISTRATION_ATTEMPTS) {
+if (registrationDuration  maxRegistrationDuration) {
+  log.warning(TaskManager could not register at JobManager {} 
after {}., jobManagerAkkaURL,
 
-log.info(Try to register at master {}. Attempt #{}, 
jobManagerAkkaURL,
-  registrationAttempts)
-val jobManager = context.actorSelection(jobManagerAkkaURL)
+maxRegistrationDuration)
 
-jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
-  }
-  else {
-log.error(TaskManager could not register at JobManager.);
-self ! PoisonPill
+  self ! PoisonPill
+} else if (!registered) {
+  log.info(sTry to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}.  +
+sAttempt)
+  val jobManager = context.actorSelection(jobManagerAkkaURL)
+
+  jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
+
+  context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
+}
   }
 }
 
 case AcknowledgeRegistration(id, blobPort) = {
-  if (!registered) {
+  if(!registered) {
+finishRegistration(id, blobPort)
 registered = true
-currentJobManager = sender
-instanceID = id
-
-context.watch(currentJobManager)
-
-log.info(TaskManager successfully registered at JobManager {}.,
-  currentJobManager.path.toString)
-
-setupNetworkEnvironment()
-setupLibraryCacheManager(blobPort)
+  } else {
+if (log.isDebugEnabled) {
--- End diff --

Small nit, with slf4j formatting we do not need to check isDebugEnabled 
anymore because it uses parameterized messages feature that check for it before 
materialize the string. 
It will the keep the code cleaner =)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...

2015-01-22 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-71002640
  
You are right @hsaputra, because I'm not sure which approach is the best. 
In the corresponding JIRA issue I have tried to give a summary of what I think 
are the pros and cons of indefinitely many registration tries vs. a limited 
number of tries and a constant pause in between tries vs. an increasing pause.

Indefinitely many registration tries:
Pros: If the JobManager becomes available at some point in time, then the 
TaskManager will definitely connect to it
Cons: If the JobManager dies of some reason, then the TaskManager will 
linger around for all eternity or until it is stopped manually

Limited number of tries:
Pros: Will terminate itself after some time
Cons: The time interval might be too short for the JobManager to get started

Constant pause:
Pros: Relatively quick response time
Cons: Causing network traffic until the JobManager has been started

Increasing pause:
Pros: Reduction of network traffic if the JobManager takes a little bit 
longer to start
Cons: Might delay the registration process if one interval was just missed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...

2015-01-22 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-71148024
  
Thanks for the explation @tillrohrmann 

+1 for exponential backoff approach. We can have max retries and max delay 
for each try as configurable configuration properties.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...

2015-01-21 Thread Humbedooh
Github user Humbedooh commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-70851722
  
Please ignore this, I am trying to get our JIRA integration working again, 
so I am posting a couple of bogus replies here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...

2015-01-21 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-70848126
  
Well, I wanted to get some feedback/discussion on the retry strategy before 
merging this PR. At the moment the strategy is a limited number of retries with 
a constant pause in between.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---