Repository: spark Updated Branches: refs/heads/branch-1.3 3bce87ebd -> ec196ab1c
[SPARK-5529] [CORE] Add expireDeadHosts in HeartbeatReceiver If a blockManager has not send heartBeat more than 120s, BlockManagerMasterActor will remove it. But coarseGrainedSchedulerBackend can only remove executor after an DisassociatedEvent. We should expireDeadHosts at HeartbeatReceiver. Author: Hong Shen <hongshentencent.com> Closes #4363 from shenh062326/my_change3 and squashes the following commits: 2c9a46a [Hong Shen] Change some code style. 1a042ff [Hong Shen] Change some code style. 2dc456e [Hong Shen] Change some code style. d221493 [Hong Shen] Fix test failed 7448ac6 [Hong Shen] A minor change in sparkContext and heartbeatReceiver b904aed [Hong Shen] Fix failed test 52725af [Hong Shen] Remove assert in SparkContext.killExecutors 5bedcb8 [Hong Shen] Remove assert in SparkContext.killExecutors a858fb5 [Hong Shen] A minor change in HeartbeatReceiver 3e221d9 [Hong Shen] A minor change in HeartbeatReceiver 6bab7aa [Hong Shen] Change a code style. 07952f3 [Hong Shen] Change configs name and code style. ce9257e [Hong Shen] Fix test failed bccd515 [Hong Shen] Fix test failed 8e77408 [Hong Shen] Fix test failed c1dfda1 [Hong Shen] Fix test failed e197e20 [Hong Shen] Fix test failed fb5df97 [Hong Shen] Remove ExpireDeadHosts in BlockManagerMessages b5c0441 [Hong Shen] Remove expireDeadHosts in BlockManagerMasterActor c922cb0 [Hong Shen] Add expireDeadHosts in HeartbeatReceiver Author: Hong Shen <hongs...@tencent.com> Closes #5793 from alexrovner/SPARK-5529-backport-1.3-v2 and squashes the following commits: f238f94 [Hong Shen] [SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec196ab1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec196ab1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec196ab1 Branch: refs/heads/branch-1.3 Commit: ec196ab1c7569d7ab0a50c9d7338c2835f2c84d5 Parents: 3bce87e Author: Hong Shen <hongs...@tencent.com> Authored: Thu Apr 30 17:05:27 2015 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Thu Apr 30 17:05:27 2015 +0100 ---------------------------------------------------------------------- .../org/apache/spark/HeartbeatReceiver.scala | 65 ++++++++++++++++++-- .../scala/org/apache/spark/SparkContext.scala | 15 +++-- .../apache/spark/scheduler/TaskScheduler.scala | 6 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../spark/storage/BlockManagerMasterActor.scala | 36 +---------- .../spark/storage/BlockManagerMessages.scala | 2 - .../spark/scheduler/DAGSchedulerSuite.scala | 2 + 7 files changed, 79 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 83ae57b..69178da 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -17,33 +17,86 @@ package org.apache.spark -import akka.actor.Actor +import scala.concurrent.duration._ +import scala.collection.mutable + +import akka.actor.{Actor, Cancellable} + import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId -import org.apache.spark.scheduler.TaskScheduler +import org.apache.spark.scheduler.{SlaveLost, TaskScheduler} import org.apache.spark.util.ActorLogReceive /** * A heartbeat from executors to the driver. This is a shared message used by several internal - * components to convey liveness or execution information for in-progress tasks. + * components to convey liveness or execution information for in-progress tasks. It will also + * expire the hosts that have not heartbeated for more than spark.network.timeout. */ private[spark] case class Heartbeat( executorId: String, taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics blockManagerId: BlockManagerId) +private[spark] case object ExpireDeadHosts + private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) /** * Lives in the driver to receive heartbeats from executors.. */ -private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) +private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler) extends Actor with ActorLogReceive with Logging { + // executor ID -> timestamp of when the last heartbeat from this executor was received + private val executorLastSeen = new mutable.HashMap[String, Long] + + private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout", + sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000 + + private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval", + sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000 + + private var timeoutCheckingTask: Cancellable = null + + override def preStart(): Unit = { + import context.dispatcher + timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, + checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts) + super.preStart() + } + override def receiveWithLogging = { case Heartbeat(executorId, taskMetrics, blockManagerId) => - val response = HeartbeatResponse( - !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) + val unknownExecutor = !scheduler.executorHeartbeatReceived( + executorId, taskMetrics, blockManagerId) + val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) + executorLastSeen(executorId) = System.currentTimeMillis() sender ! response + case ExpireDeadHosts => + expireDeadHosts() + } + + private def expireDeadHosts(): Unit = { + logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.") + val now = System.currentTimeMillis() + for ((executorId, lastSeenMs) <- executorLastSeen) { + if (now - lastSeenMs > executorTimeoutMs) { + logWarning(s"Removing executor $executorId with no recent heartbeats: " + + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") + scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + + "timed out after ${now - lastSeenMs} ms")) + if (sc.supportDynamicAllocation) { + sc.killExecutor(executorId) + } + executorLastSeen.remove(executorId) + } + } + } + + override def postStop(): Unit = { + if (timeoutCheckingTask != null) { + timeoutCheckingTask.cancel() + } + super.postStop() } } http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 98da676..d5e9168 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -362,7 +362,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) private val heartbeatReceiver = env.actorSystem.actorOf( - Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver") + Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver") @volatile private[spark] var dagScheduler: DAGScheduler = _ try { dagScheduler = new DAGScheduler(this) @@ -409,7 +409,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false) private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = if (dynamicAllocationEnabled) { - assert(master.contains("yarn") || dynamicAllocationTesting, + assert(supportDynamicAllocation, "Dynamic allocation of executors is currently only supported in YARN mode") Some(new ExecutorAllocationManager(this, listenerBus, conf)) } else { @@ -1122,6 +1122,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** + * Return whether dynamically adjusting the amount of resources allocated to + * this application is supported. This is currently only available for YARN. + */ + private[spark] def supportDynamicAllocation = + master.contains("yarn") || dynamicAllocationTesting + + /** * :: DeveloperApi :: * Register a listener to receive up-calls from events that happen during execution. */ @@ -1154,7 +1161,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { - assert(master.contains("yarn") || dynamicAllocationTesting, + assert(supportDynamicAllocation, "Requesting executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => @@ -1172,7 +1179,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { - assert(master.contains("yarn") || dynamicAllocationTesting, + assert(supportDynamicAllocation, "Killing executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index f095915..ed34186 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -73,5 +73,9 @@ private[spark] trait TaskScheduler { * @return An application ID */ def applicationId(): String = appId - + + /** + * Process a lost executor + */ + def executorLost(executorId: String, reason: ExecutorLossReason): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 54f8fcf..7a9cf1c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -436,7 +436,7 @@ private[spark] class TaskSchedulerImpl( } } - def executorLost(executorId: String, reason: ExecutorLossReason) { + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { var failedExecutor: Option[String] = None synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 6413346..787b0f9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._ import scala.concurrent.Future import scala.concurrent.duration._ -import akka.actor.{Actor, ActorRef, Cancellable} +import akka.actor.{Actor, ActorRef} import akka.pattern.ask import org.apache.spark.{Logging, SparkConf, SparkException} @@ -52,19 +52,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus private val akkaTimeout = AkkaUtils.askTimeout(conf) - val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000) - - val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000) - - var timeoutCheckingTask: Cancellable = null - - override def preStart() { - import context.dispatcher - timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, - checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) - super.preStart() - } - override def receiveWithLogging = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => register(blockManagerId, maxMemSize, slaveActor) @@ -118,14 +105,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case StopBlockManagerMaster => sender ! true - if (timeoutCheckingTask != null) { - timeoutCheckingTask.cancel() - } context.stop(self) - case ExpireDeadHosts => - expireDeadHosts() - case BlockManagerHeartbeat(blockManagerId) => sender ! heartbeatReceived(blockManagerId) @@ -207,21 +188,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus logInfo(s"Removing block manager $blockManagerId") } - private def expireDeadHosts() { - logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.") - val now = System.currentTimeMillis() - val minSeenTime = now - slaveTimeout - val toRemove = new mutable.HashSet[BlockManagerId] - for (info <- blockManagerInfo.values) { - if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) { - logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " - + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") - toRemove += info.blockManagerId - } - } - toRemove.foreach(removeBlockManager) - } - private def removeExecutor(execId: String) { logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 3f32099..4824745 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -109,6 +109,4 @@ private[spark] object BlockManagerMessages { extends ToBlockManagerMaster case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster - - case object ExpireDeadHosts extends ToBlockManagerMaster } http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index febfe08..06586aa 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -96,6 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} } /** Length of time to wait while draining listener events. */ @@ -386,6 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar override def defaultParallelism() = 2 override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} } val noKillScheduler = new DAGScheduler( sc, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org