Repository: spark
Updated Branches:
  refs/heads/master 4d9e560b5 -> 874a2ca93


[SPARK-7174][Core] Move calling `TaskScheduler.executorHeartbeatReceived` to 
another thread

`HeartbeatReceiver` will call `TaskScheduler.executorHeartbeatReceived`, which 
is a blocking operation because `TaskScheduler.executorHeartbeatReceived` will 
call

```Scala
    blockManagerMaster.driverEndpoint.askWithReply[Boolean](
      BlockManagerHeartbeat(blockManagerId), 600 seconds)
```

finally. Even if it asks from a local Actor, it may block the current Akka 
thread. E.g., the reply may be dispatched to the same thread of the ask 
operation. So the reply cannot be processed. An extreme case is setting the 
thread number of Akka dispatch thread pool to 1.

jstack log:

```
"sparkDriver-akka.actor.default-dispatcher-14" daemon prio=10 
tid=0x00007f2a8c02d000 nid=0x725 waiting on condition [0x00007f2b1d6d0000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006197a0868> (a 
scala.concurrent.impl.Promise$CompletionLatch)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at 
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
        at 
scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
        at 
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355)
        at 
org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367)
        at 
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103)
        at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182)
        at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128)
        at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203)
        at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

This PR moved this blocking operation to a separated thread.

Author: zsxwing <zsxw...@gmail.com>

Closes #5723 from zsxwing/SPARK-7174 and squashes the following commits:

98bfe48 [zsxwing] Use a single thread for checking timeout and reporting 
executorHeartbeatReceived
5b3b545 [zsxwing] Move calling `TaskScheduler.executorHeartbeatReceived` to 
another thread to avoid blocking the Akka thread pool


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/874a2ca9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/874a2ca9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/874a2ca9

Branch: refs/heads/master
Commit: 874a2ca93d095a0dfa1acfdacf0e9d80388c4422
Parents: 4d9e560
Author: zsxwing <zsxw...@gmail.com>
Authored: Mon Apr 27 21:45:40 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Apr 27 21:45:40 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/HeartbeatReceiver.scala    | 26 +++++++++++++-------
 1 file changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/874a2ca9/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 68d05d5..f2b024f 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
   
   private var timeoutCheckingTask: ScheduledFuture[_] = null
 
-  private val timeoutCheckingThread =
-    
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
+  // "eventLoopThread" is used to run some pretty fast actions. The actions 
running in it should not
+  // block the thread for a long time.
+  private val eventLoopThread =
+    
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
 
   private val killExecutorThread = 
ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
 
   override def onStart(): Unit = {
-    timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new 
Runnable {
+    timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
       override def run(): Unit = Utils.tryLogNonFatalError {
         Option(self).foreach(_.send(ExpireDeadHosts))
       }
@@ -99,11 +101,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
     case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
       if (scheduler != null) {
-        val unknownExecutor = !scheduler.executorHeartbeatReceived(
-          executorId, taskMetrics, blockManagerId)
-        val response = HeartbeatResponse(reregisterBlockManager = 
unknownExecutor)
         executorLastSeen(executorId) = System.currentTimeMillis()
-        context.reply(response)
+        eventLoopThread.submit(new Runnable {
+          override def run(): Unit = Utils.tryLogNonFatalError {
+            val unknownExecutor = !scheduler.executorHeartbeatReceived(
+              executorId, taskMetrics, blockManagerId)
+            val response = HeartbeatResponse(reregisterBlockManager = 
unknownExecutor)
+            context.reply(response)
+          }
+        })
       } else {
         // Because Executor will sleep several seconds before sending the 
first "Heartbeat", this
         // case rarely happens. However, if it really happens, log it and ask 
the executor to
@@ -125,7 +131,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
         if (sc.supportDynamicAllocation) {
           // Asynchronously kill the executor to avoid blocking the current 
thread
           killExecutorThread.submit(new Runnable {
-            override def run(): Unit = sc.killExecutor(executorId)
+            override def run(): Unit = Utils.tryLogNonFatalError {
+              sc.killExecutor(executorId)
+            }
           })
         }
         executorLastSeen.remove(executorId)
@@ -137,7 +145,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
     if (timeoutCheckingTask != null) {
       timeoutCheckingTask.cancel(true)
     }
-    timeoutCheckingThread.shutdownNow()
+    eventLoopThread.shutdownNow()
     killExecutorThread.shutdownNow()
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to