spark git commit: [SPARK-5529] [CORE] Add expireDeadHosts in HeartbeatReceiver

2015-04-30 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 3bce87ebd - ec196ab1c


[SPARK-5529] [CORE] Add expireDeadHosts in HeartbeatReceiver

If a blockManager has not send heartBeat more than 120s, 
BlockManagerMasterActor will remove it. But coarseGrainedSchedulerBackend can 
only remove executor after an DisassociatedEvent.  We should expireDeadHosts at 
HeartbeatReceiver.

Author: Hong Shen hongshentencent.com

Closes #4363 from shenh062326/my_change3 and squashes the following commits:

2c9a46a [Hong Shen] Change some code style.
1a042ff [Hong Shen] Change some code style.
2dc456e [Hong Shen] Change some code style.
d221493 [Hong Shen] Fix test failed
7448ac6 [Hong Shen] A minor change in sparkContext and heartbeatReceiver
b904aed [Hong Shen] Fix failed test
52725af [Hong Shen] Remove assert in SparkContext.killExecutors
5bedcb8 [Hong Shen] Remove assert in SparkContext.killExecutors
a858fb5 [Hong Shen] A minor change in HeartbeatReceiver
3e221d9 [Hong Shen] A minor change in HeartbeatReceiver
6bab7aa [Hong Shen] Change a code style.
07952f3 [Hong Shen] Change configs name and code style.
ce9257e [Hong Shen] Fix test failed
bccd515 [Hong Shen] Fix test failed
8e77408 [Hong Shen] Fix test failed
c1dfda1 [Hong Shen] Fix test failed
e197e20 [Hong Shen] Fix test failed
fb5df97 [Hong Shen] Remove ExpireDeadHosts in BlockManagerMessages
b5c0441 [Hong Shen] Remove expireDeadHosts in BlockManagerMasterActor
c922cb0 [Hong Shen] Add expireDeadHosts in HeartbeatReceiver

Author: Hong Shen hongs...@tencent.com

Closes #5793 from alexrovner/SPARK-5529-backport-1.3-v2 and squashes the 
following commits:

f238f94 [Hong Shen]  [SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec196ab1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec196ab1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec196ab1

Branch: refs/heads/branch-1.3
Commit: ec196ab1c7569d7ab0a50c9d7338c2835f2c84d5
Parents: 3bce87e
Author: Hong Shen hongs...@tencent.com
Authored: Thu Apr 30 17:05:27 2015 +0100
Committer: Sean Owen so...@cloudera.com
Committed: Thu Apr 30 17:05:27 2015 +0100

--
 .../org/apache/spark/HeartbeatReceiver.scala| 65 ++--
 .../scala/org/apache/spark/SparkContext.scala   | 15 +++--
 .../apache/spark/scheduler/TaskScheduler.scala  |  6 +-
 .../spark/scheduler/TaskSchedulerImpl.scala |  2 +-
 .../spark/storage/BlockManagerMasterActor.scala | 36 +--
 .../spark/storage/BlockManagerMessages.scala|  2 -
 .../spark/scheduler/DAGSchedulerSuite.scala |  2 +
 7 files changed, 79 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
--
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 83ae57b..69178da 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -17,33 +17,86 @@
 
 package org.apache.spark
 
-import akka.actor.Actor
+import scala.concurrent.duration._
+import scala.collection.mutable
+
+import akka.actor.{Actor, Cancellable}
+
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.scheduler.TaskScheduler
+import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
 import org.apache.spark.util.ActorLogReceive
 
 /**
  * A heartbeat from executors to the driver. This is a shared message used by 
several internal
- * components to convey liveness or execution information for in-progress 
tasks.
+ * components to convey liveness or execution information for in-progress 
tasks. It will also 
+ * expire the hosts that have not heartbeated for more than 
spark.network.timeout.
  */
 private[spark] case class Heartbeat(
 executorId: String,
 taskMetrics: Array[(Long, TaskMetrics)], // taskId - TaskMetrics
 blockManagerId: BlockManagerId)
 
+private[spark] case object ExpireDeadHosts 
+
 private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
 
 /**
  * Lives in the driver to receive heartbeats from executors..
  */
-private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
+private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: 
TaskScheduler)
   extends Actor with ActorLogReceive with Logging {
 
+  // executor ID - timestamp of when the last heartbeat from this executor 
was received
+  private val executorLastSeen = new mutable.HashMap[String, Long]
+  
+  private val executorTimeoutMs = sc.conf.getLong(spark.network.timeout, 
+

spark git commit: [SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver

2015-02-26 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master fbc469473 - 18f209843


 [SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver

If a blockManager has not send heartBeat more than 120s, 
BlockManagerMasterActor will remove it. But coarseGrainedSchedulerBackend can 
only remove executor after an DisassociatedEvent.  We should expireDeadHosts at 
HeartbeatReceiver.

Author: Hong Shen hongs...@tencent.com

Closes #4363 from shenh062326/my_change3 and squashes the following commits:

2c9a46a [Hong Shen] Change some code style.
1a042ff [Hong Shen] Change some code style.
2dc456e [Hong Shen] Change some code style.
d221493 [Hong Shen] Fix test failed
7448ac6 [Hong Shen] A minor change in sparkContext and heartbeatReceiver
b904aed [Hong Shen] Fix failed test
52725af [Hong Shen] Remove assert in SparkContext.killExecutors
5bedcb8 [Hong Shen] Remove assert in SparkContext.killExecutors
a858fb5 [Hong Shen] A minor change in HeartbeatReceiver
3e221d9 [Hong Shen] A minor change in HeartbeatReceiver
6bab7aa [Hong Shen] Change a code style.
07952f3 [Hong Shen] Change configs name and code style.
ce9257e [Hong Shen] Fix test failed
bccd515 [Hong Shen] Fix test failed
8e77408 [Hong Shen] Fix test failed
c1dfda1 [Hong Shen] Fix test failed
e197e20 [Hong Shen] Fix test failed
fb5df97 [Hong Shen] Remove ExpireDeadHosts in BlockManagerMessages
b5c0441 [Hong Shen] Remove expireDeadHosts in BlockManagerMasterActor
c922cb0 [Hong Shen] Add expireDeadHosts in HeartbeatReceiver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18f20984
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18f20984
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18f20984

Branch: refs/heads/master
Commit: 18f2098433e0bfef9497bacd601fdf098ed03eab
Parents: fbc4694
Author: Hong Shen hongs...@tencent.com
Authored: Thu Feb 26 18:43:23 2015 -0800
Committer: Andrew Or and...@databricks.com
Committed: Thu Feb 26 18:43:23 2015 -0800

--
 .../org/apache/spark/HeartbeatReceiver.scala| 65 ++--
 .../scala/org/apache/spark/SparkContext.scala   | 15 +++--
 .../apache/spark/scheduler/TaskScheduler.scala  |  6 +-
 .../spark/scheduler/TaskSchedulerImpl.scala |  2 +-
 .../spark/storage/BlockManagerMasterActor.scala | 36 +--
 .../spark/storage/BlockManagerMessages.scala|  2 -
 .../spark/scheduler/DAGSchedulerSuite.scala |  2 +
 7 files changed, 79 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/18f20984/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
--
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 83ae57b..69178da 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -17,33 +17,86 @@
 
 package org.apache.spark
 
-import akka.actor.Actor
+import scala.concurrent.duration._
+import scala.collection.mutable
+
+import akka.actor.{Actor, Cancellable}
+
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.scheduler.TaskScheduler
+import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
 import org.apache.spark.util.ActorLogReceive
 
 /**
  * A heartbeat from executors to the driver. This is a shared message used by 
several internal
- * components to convey liveness or execution information for in-progress 
tasks.
+ * components to convey liveness or execution information for in-progress 
tasks. It will also 
+ * expire the hosts that have not heartbeated for more than 
spark.network.timeout.
  */
 private[spark] case class Heartbeat(
 executorId: String,
 taskMetrics: Array[(Long, TaskMetrics)], // taskId - TaskMetrics
 blockManagerId: BlockManagerId)
 
+private[spark] case object ExpireDeadHosts 
+
 private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
 
 /**
  * Lives in the driver to receive heartbeats from executors..
  */
-private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
+private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: 
TaskScheduler)
   extends Actor with ActorLogReceive with Logging {
 
+  // executor ID - timestamp of when the last heartbeat from this executor 
was received
+  private val executorLastSeen = new mutable.HashMap[String, Long]
+  
+  private val executorTimeoutMs = sc.conf.getLong(spark.network.timeout, 
+sc.conf.getLong(spark.storage.blockManagerSlaveTimeoutMs, 120)) * 1000
+  
+  private val checkTimeoutIntervalMs = 
sc.conf.getLong(spark.network.timeoutInterval,
+sc.conf.getLong(spark.storage.blockManagerTimeoutIntervalMs, 60)) *