Repository: spark
Updated Branches:
  refs/heads/branch-1.3 3bce87ebd -> ec196ab1c


[SPARK-5529] [CORE] Add expireDeadHosts in HeartbeatReceiver

If a blockManager has not send heartBeat more than 120s, 
BlockManagerMasterActor will remove it. But coarseGrainedSchedulerBackend can 
only remove executor after an DisassociatedEvent.  We should expireDeadHosts at 
HeartbeatReceiver.

Author: Hong Shen <hongshentencent.com>

Closes #4363 from shenh062326/my_change3 and squashes the following commits:

2c9a46a [Hong Shen] Change some code style.
1a042ff [Hong Shen] Change some code style.
2dc456e [Hong Shen] Change some code style.
d221493 [Hong Shen] Fix test failed
7448ac6 [Hong Shen] A minor change in sparkContext and heartbeatReceiver
b904aed [Hong Shen] Fix failed test
52725af [Hong Shen] Remove assert in SparkContext.killExecutors
5bedcb8 [Hong Shen] Remove assert in SparkContext.killExecutors
a858fb5 [Hong Shen] A minor change in HeartbeatReceiver
3e221d9 [Hong Shen] A minor change in HeartbeatReceiver
6bab7aa [Hong Shen] Change a code style.
07952f3 [Hong Shen] Change configs name and code style.
ce9257e [Hong Shen] Fix test failed
bccd515 [Hong Shen] Fix test failed
8e77408 [Hong Shen] Fix test failed
c1dfda1 [Hong Shen] Fix test failed
e197e20 [Hong Shen] Fix test failed
fb5df97 [Hong Shen] Remove ExpireDeadHosts in BlockManagerMessages
b5c0441 [Hong Shen] Remove expireDeadHosts in BlockManagerMasterActor
c922cb0 [Hong Shen] Add expireDeadHosts in HeartbeatReceiver

Author: Hong Shen <hongs...@tencent.com>

Closes #5793 from alexrovner/SPARK-5529-backport-1.3-v2 and squashes the 
following commits:

f238f94 [Hong Shen]  [SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec196ab1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec196ab1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec196ab1

Branch: refs/heads/branch-1.3
Commit: ec196ab1c7569d7ab0a50c9d7338c2835f2c84d5
Parents: 3bce87e
Author: Hong Shen <hongs...@tencent.com>
Authored: Thu Apr 30 17:05:27 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Apr 30 17:05:27 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/HeartbeatReceiver.scala    | 65 ++++++++++++++++++--
 .../scala/org/apache/spark/SparkContext.scala   | 15 +++--
 .../apache/spark/scheduler/TaskScheduler.scala  |  6 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  2 +-
 .../spark/storage/BlockManagerMasterActor.scala | 36 +----------
 .../spark/storage/BlockManagerMessages.scala    |  2 -
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 +
 7 files changed, 79 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 83ae57b..69178da 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -17,33 +17,86 @@
 
 package org.apache.spark
 
-import akka.actor.Actor
+import scala.concurrent.duration._
+import scala.collection.mutable
+
+import akka.actor.{Actor, Cancellable}
+
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.scheduler.TaskScheduler
+import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
 import org.apache.spark.util.ActorLogReceive
 
 /**
  * A heartbeat from executors to the driver. This is a shared message used by 
several internal
- * components to convey liveness or execution information for in-progress 
tasks.
+ * components to convey liveness or execution information for in-progress 
tasks. It will also 
+ * expire the hosts that have not heartbeated for more than 
spark.network.timeout.
  */
 private[spark] case class Heartbeat(
     executorId: String,
     taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
     blockManagerId: BlockManagerId)
 
+private[spark] case object ExpireDeadHosts 
+    
 private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
 
 /**
  * Lives in the driver to receive heartbeats from executors..
  */
-private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
+private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: 
TaskScheduler)
   extends Actor with ActorLogReceive with Logging {
 
+  // executor ID -> timestamp of when the last heartbeat from this executor 
was received
+  private val executorLastSeen = new mutable.HashMap[String, Long]
+  
+  private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout", 
+    sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000
+  
+  private val checkTimeoutIntervalMs = 
sc.conf.getLong("spark.network.timeoutInterval",
+    sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000
+  
+  private var timeoutCheckingTask: Cancellable = null
+  
+  override def preStart(): Unit = {
+    import context.dispatcher
+    timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
+      checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
+    super.preStart()
+  }
+  
   override def receiveWithLogging = {
     case Heartbeat(executorId, taskMetrics, blockManagerId) =>
-      val response = HeartbeatResponse(
-        !scheduler.executorHeartbeatReceived(executorId, taskMetrics, 
blockManagerId))
+      val unknownExecutor = !scheduler.executorHeartbeatReceived(
+        executorId, taskMetrics, blockManagerId)
+      val response = HeartbeatResponse(reregisterBlockManager = 
unknownExecutor)
+      executorLastSeen(executorId) = System.currentTimeMillis()
       sender ! response
+    case ExpireDeadHosts =>
+      expireDeadHosts()
+  }
+
+  private def expireDeadHosts(): Unit = {
+    logTrace("Checking for hosts with no recent heartbeats in 
HeartbeatReceiver.")
+    val now = System.currentTimeMillis()
+    for ((executorId, lastSeenMs) <- executorLastSeen) {
+      if (now - lastSeenMs > executorTimeoutMs) {
+        logWarning(s"Removing executor $executorId with no recent heartbeats: 
" +
+          s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
+        scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
+          "timed out after ${now - lastSeenMs} ms"))
+        if (sc.supportDynamicAllocation) {
+          sc.killExecutor(executorId)
+        }
+        executorLastSeen.remove(executorId)
+      }
+    }
+  }
+  
+  override def postStop(): Unit = {
+    if (timeoutCheckingTask != null) {
+      timeoutCheckingTask.cancel()
+    }
+    super.postStop()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 98da676..d5e9168 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -362,7 +362,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   private[spark] var (schedulerBackend, taskScheduler) =
     SparkContext.createTaskScheduler(this, master)
   private val heartbeatReceiver = env.actorSystem.actorOf(
-    Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
+    Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
   @volatile private[spark] var dagScheduler: DAGScheduler = _
   try {
     dagScheduler = new DAGScheduler(this)
@@ -409,7 +409,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   private val dynamicAllocationTesting = 
conf.getBoolean("spark.dynamicAllocation.testing", false)
   private[spark] val executorAllocationManager: 
Option[ExecutorAllocationManager] =
     if (dynamicAllocationEnabled) {
-      assert(master.contains("yarn") || dynamicAllocationTesting,
+      assert(supportDynamicAllocation,
         "Dynamic allocation of executors is currently only supported in YARN 
mode")
       Some(new ExecutorAllocationManager(this, listenerBus, conf))
     } else {
@@ -1122,6 +1122,13 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
 
   /**
+   * Return whether dynamically adjusting the amount of resources allocated to
+   * this application is supported. This is currently only available for YARN.
+   */
+  private[spark] def supportDynamicAllocation = 
+    master.contains("yarn") || dynamicAllocationTesting
+
+  /**
    * :: DeveloperApi ::
    * Register a listener to receive up-calls from events that happen during 
execution.
    */
@@ -1154,7 +1161,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    */
   @DeveloperApi
   override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
-    assert(master.contains("yarn") || dynamicAllocationTesting,
+    assert(supportDynamicAllocation,
       "Requesting executors is currently only supported in YARN mode")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
@@ -1172,7 +1179,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    */
   @DeveloperApi
   override def killExecutors(executorIds: Seq[String]): Boolean = {
-    assert(master.contains("yarn") || dynamicAllocationTesting,
+    assert(supportDynamicAllocation,
       "Killing executors is currently only supported in YARN mode")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index f095915..ed34186 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -73,5 +73,9 @@ private[spark] trait TaskScheduler {
    * @return An application ID
    */
   def applicationId(): String = appId
-
+  
+  /**
+   * Process a lost executor
+   */
+  def executorLost(executorId: String, reason: ExecutorLossReason): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 54f8fcf..7a9cf1c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -436,7 +436,7 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
-  def executorLost(executorId: String, reason: ExecutorLossReason) {
+  override def executorLost(executorId: String, reason: ExecutorLossReason): 
Unit = {
     var failedExecutor: Option[String] = None
 
     synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 6413346..787b0f9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
 import scala.concurrent.Future
 import scala.concurrent.duration._
 
-import akka.actor.{Actor, ActorRef, Cancellable}
+import akka.actor.{Actor, ActorRef}
 import akka.pattern.ask
 
 import org.apache.spark.{Logging, SparkConf, SparkException}
@@ -52,19 +52,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
 
   private val akkaTimeout = AkkaUtils.askTimeout(conf)
 
-  val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 
120 * 1000)
-
-  val checkTimeoutInterval = 
conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)
-
-  var timeoutCheckingTask: Cancellable = null
-
-  override def preStart() {
-    import context.dispatcher
-    timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
-      checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
-    super.preStart()
-  }
-
   override def receiveWithLogging = {
     case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
       register(blockManagerId, maxMemSize, slaveActor)
@@ -118,14 +105,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
 
     case StopBlockManagerMaster =>
       sender ! true
-      if (timeoutCheckingTask != null) {
-        timeoutCheckingTask.cancel()
-      }
       context.stop(self)
 
-    case ExpireDeadHosts =>
-      expireDeadHosts()
-
     case BlockManagerHeartbeat(blockManagerId) =>
       sender ! heartbeatReceived(blockManagerId)
 
@@ -207,21 +188,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
     logInfo(s"Removing block manager $blockManagerId")
   }
 
-  private def expireDeadHosts() {
-    logTrace("Checking for hosts with no recent heart beats in 
BlockManagerMaster.")
-    val now = System.currentTimeMillis()
-    val minSeenTime = now - slaveTimeout
-    val toRemove = new mutable.HashSet[BlockManagerId]
-    for (info <- blockManagerInfo.values) {
-      if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
-        logWarning("Removing BlockManager " + info.blockManagerId + " with no 
recent heart beats: "
-          + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
-        toRemove += info.blockManagerId
-      }
-    }
-    toRemove.foreach(removeBlockManager)
-  }
-
   private def removeExecutor(execId: String) {
     logInfo("Trying to remove executor " + execId + " from 
BlockManagerMaster.")
     blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)

http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 3f32099..4824745 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -109,6 +109,4 @@ private[spark] object BlockManagerMessages {
     extends ToBlockManagerMaster
 
   case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster
-
-  case object ExpireDeadHosts extends ToBlockManagerMaster
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ec196ab1/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index febfe08..06586aa 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -96,6 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with 
BeforeAndAfter with LocalSpar
     }
     override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
     override def defaultParallelism() = 2
+    override def executorLost(executorId: String, reason: ExecutorLossReason): 
Unit = {}
   }
 
   /** Length of time to wait while draining listener events. */
@@ -386,6 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with 
BeforeAndAfter with LocalSpar
       override def defaultParallelism() = 2
       override def executorHeartbeatReceived(execId: String, taskMetrics: 
Array[(Long, TaskMetrics)],
         blockManagerId: BlockManagerId): Boolean = true
+      override def executorLost(executorId: String, reason: 
ExecutorLossReason): Unit = {}
     }
     val noKillScheduler = new DAGScheduler(
       sc,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to