spark git commit: [SPARK-5529] [CORE] Add expireDeadHosts in HeartbeatReceiver
Repository: spark Updated Branches: refs/heads/branch-1.3 3bce87ebd - ec196ab1c [SPARK-5529] [CORE] Add expireDeadHosts in HeartbeatReceiver If a blockManager has not send heartBeat more than 120s, BlockManagerMasterActor will remove it. But coarseGrainedSchedulerBackend can only remove executor after an DisassociatedEvent. We should expireDeadHosts at HeartbeatReceiver. Author: Hong Shen hongshentencent.com Closes #4363 from shenh062326/my_change3 and squashes the following commits: 2c9a46a [Hong Shen] Change some code style. 1a042ff [Hong Shen] Change some code style. 2dc456e [Hong Shen] Change some code style. d221493 [Hong Shen] Fix test failed 7448ac6 [Hong Shen] A minor change in sparkContext and heartbeatReceiver b904aed [Hong Shen] Fix failed test 52725af [Hong Shen] Remove assert in SparkContext.killExecutors 5bedcb8 [Hong Shen] Remove assert in SparkContext.killExecutors a858fb5 [Hong Shen] A minor change in HeartbeatReceiver 3e221d9 [Hong Shen] A minor change in HeartbeatReceiver 6bab7aa [Hong Shen] Change a code style. 07952f3 [Hong Shen] Change configs name and code style. ce9257e [Hong Shen] Fix test failed bccd515 [Hong Shen] Fix test failed 8e77408 [Hong Shen] Fix test failed c1dfda1 [Hong Shen] Fix test failed e197e20 [Hong Shen] Fix test failed fb5df97 [Hong Shen] Remove ExpireDeadHosts in BlockManagerMessages b5c0441 [Hong Shen] Remove expireDeadHosts in BlockManagerMasterActor c922cb0 [Hong Shen] Add expireDeadHosts in HeartbeatReceiver Author: Hong Shen hongs...@tencent.com Closes #5793 from alexrovner/SPARK-5529-backport-1.3-v2 and squashes the following commits: f238f94 [Hong Shen] [SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec196ab1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec196ab1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec196ab1 Branch: refs/heads/branch-1.3 Commit: ec196ab1c7569d7ab0a50c9d7338c2835f2c84d5 Parents: 3bce87e Author: Hong Shen hongs...@tencent.com Authored: Thu Apr 30 17:05:27 2015 +0100 Committer: Sean Owen so...@cloudera.com Committed: Thu Apr 30 17:05:27 2015 +0100 -- .../org/apache/spark/HeartbeatReceiver.scala| 65 ++-- .../scala/org/apache/spark/SparkContext.scala | 15 +++-- .../apache/spark/scheduler/TaskScheduler.scala | 6 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../spark/storage/BlockManagerMasterActor.scala | 36 +-- .../spark/storage/BlockManagerMessages.scala| 2 - .../spark/scheduler/DAGSchedulerSuite.scala | 2 + 7 files changed, 79 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala -- diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 83ae57b..69178da 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -17,33 +17,86 @@ package org.apache.spark -import akka.actor.Actor +import scala.concurrent.duration._ +import scala.collection.mutable + +import akka.actor.{Actor, Cancellable} + import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId -import org.apache.spark.scheduler.TaskScheduler +import org.apache.spark.scheduler.{SlaveLost, TaskScheduler} import org.apache.spark.util.ActorLogReceive /** * A heartbeat from executors to the driver. This is a shared message used by several internal - * components to convey liveness or execution information for in-progress tasks. + * components to convey liveness or execution information for in-progress tasks. It will also + * expire the hosts that have not heartbeated for more than spark.network.timeout. */ private[spark] case class Heartbeat( executorId: String, taskMetrics: Array[(Long, TaskMetrics)], // taskId - TaskMetrics blockManagerId: BlockManagerId) +private[spark] case object ExpireDeadHosts + private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) /** * Lives in the driver to receive heartbeats from executors.. */ -private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) +private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler) extends Actor with ActorLogReceive with Logging { + // executor ID - timestamp of when the last heartbeat from this executor was received + private val executorLastSeen = new mutable.HashMap[String, Long] + + private val executorTimeoutMs = sc.conf.getLong(spark.network.timeout, +
spark git commit: [SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver
Repository: spark Updated Branches: refs/heads/master fbc469473 - 18f209843 [SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver If a blockManager has not send heartBeat more than 120s, BlockManagerMasterActor will remove it. But coarseGrainedSchedulerBackend can only remove executor after an DisassociatedEvent. We should expireDeadHosts at HeartbeatReceiver. Author: Hong Shen hongs...@tencent.com Closes #4363 from shenh062326/my_change3 and squashes the following commits: 2c9a46a [Hong Shen] Change some code style. 1a042ff [Hong Shen] Change some code style. 2dc456e [Hong Shen] Change some code style. d221493 [Hong Shen] Fix test failed 7448ac6 [Hong Shen] A minor change in sparkContext and heartbeatReceiver b904aed [Hong Shen] Fix failed test 52725af [Hong Shen] Remove assert in SparkContext.killExecutors 5bedcb8 [Hong Shen] Remove assert in SparkContext.killExecutors a858fb5 [Hong Shen] A minor change in HeartbeatReceiver 3e221d9 [Hong Shen] A minor change in HeartbeatReceiver 6bab7aa [Hong Shen] Change a code style. 07952f3 [Hong Shen] Change configs name and code style. ce9257e [Hong Shen] Fix test failed bccd515 [Hong Shen] Fix test failed 8e77408 [Hong Shen] Fix test failed c1dfda1 [Hong Shen] Fix test failed e197e20 [Hong Shen] Fix test failed fb5df97 [Hong Shen] Remove ExpireDeadHosts in BlockManagerMessages b5c0441 [Hong Shen] Remove expireDeadHosts in BlockManagerMasterActor c922cb0 [Hong Shen] Add expireDeadHosts in HeartbeatReceiver Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18f20984 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18f20984 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18f20984 Branch: refs/heads/master Commit: 18f2098433e0bfef9497bacd601fdf098ed03eab Parents: fbc4694 Author: Hong Shen hongs...@tencent.com Authored: Thu Feb 26 18:43:23 2015 -0800 Committer: Andrew Or and...@databricks.com Committed: Thu Feb 26 18:43:23 2015 -0800 -- .../org/apache/spark/HeartbeatReceiver.scala| 65 ++-- .../scala/org/apache/spark/SparkContext.scala | 15 +++-- .../apache/spark/scheduler/TaskScheduler.scala | 6 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../spark/storage/BlockManagerMasterActor.scala | 36 +-- .../spark/storage/BlockManagerMessages.scala| 2 - .../spark/scheduler/DAGSchedulerSuite.scala | 2 + 7 files changed, 79 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/18f20984/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala -- diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 83ae57b..69178da 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -17,33 +17,86 @@ package org.apache.spark -import akka.actor.Actor +import scala.concurrent.duration._ +import scala.collection.mutable + +import akka.actor.{Actor, Cancellable} + import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId -import org.apache.spark.scheduler.TaskScheduler +import org.apache.spark.scheduler.{SlaveLost, TaskScheduler} import org.apache.spark.util.ActorLogReceive /** * A heartbeat from executors to the driver. This is a shared message used by several internal - * components to convey liveness or execution information for in-progress tasks. + * components to convey liveness or execution information for in-progress tasks. It will also + * expire the hosts that have not heartbeated for more than spark.network.timeout. */ private[spark] case class Heartbeat( executorId: String, taskMetrics: Array[(Long, TaskMetrics)], // taskId - TaskMetrics blockManagerId: BlockManagerId) +private[spark] case object ExpireDeadHosts + private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) /** * Lives in the driver to receive heartbeats from executors.. */ -private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) +private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler) extends Actor with ActorLogReceive with Logging { + // executor ID - timestamp of when the last heartbeat from this executor was received + private val executorLastSeen = new mutable.HashMap[String, Long] + + private val executorTimeoutMs = sc.conf.getLong(spark.network.timeout, +sc.conf.getLong(spark.storage.blockManagerSlaveTimeoutMs, 120)) * 1000 + + private val checkTimeoutIntervalMs = sc.conf.getLong(spark.network.timeoutInterval, +sc.conf.getLong(spark.storage.blockManagerTimeoutIntervalMs, 60)) *