[ 
https://issues.apache.org/jira/browse/SPARK-16441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15488528#comment-15488528
 ] 

Dhruve Ashar commented on SPARK-16441:
--------------------------------------

So all of these are related in one way or the other. I will share my 
observations made so far: 

I have taken multiple thread dumps of these and analyzed what is causing the 
SparkListenerBus to block. The ExecutorAllocationManager is heavily 
synchronized and its a hotspot for contention especially when schedule is being 
invoked every 100ms to balance the executors. So for a spark job running 
thousands of executors, any change in the status of these executors with 
dynamic allocation enabled leads to frequent firing of events. This causes a 
contention and leads to blocking. Specifically if calls are remote RPCs.

Also the current design of the listener bus is such that it waits for every 
listener to process the event before it can proceed to deliver the next event 
from the queue. Any wait for acquiring the locks are leading to the event queue 
being filling up fast and leads to dropping of events. 

Logging individual execution times of these do not necessarily conclude to 
updateAndSync consuming majority of the time and hence we are working on 
reducing the lock contention and the minimize the RPC calls. Also a parameter 
to look at would be the heartbeat interval. Having a small interval for a very 
large no. of executors will aggravate the problem as you would be getting 
frequent ExecutorMetricsUpdate from the running executors. 



> Spark application hang when dynamic allocation is enabled
> ---------------------------------------------------------
>
>                 Key: SPARK-16441
>                 URL: https://issues.apache.org/jira/browse/SPARK-16441
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.2, 2.0.0
>         Environment: hadoop 2.7.2  spark1.6.2
>            Reporter: cen yuhai
>
> spark application are waiting for rpc response all the time and spark 
> listener are blocked by dynamic allocation. Executors can not connect to 
> driver and lost.
> "spark-dynamic-executor-allocation" #239 daemon prio=5 os_prio=0 
> tid=0x00007fa304438000 nid=0xcec6 waiting on condition [0x00007fa2b81e4000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x000000070fdb94f8> (a 
> scala.concurrent.impl.Promise$CompletionLatch)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
>       at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>       at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>       at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>       at scala.concurrent.Await$.result(package.scala:107)
>       at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
>       at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
>       at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
>       at 
> org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doRequestTotalExecutors(YarnSchedulerBackend.scala:59)
>       at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:436)
>       - locked <0x00000000828a8960> (a 
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend)
>       at 
> org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1438)
>       at 
> org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:359)
>       at 
> org.apache.spark.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget(ExecutorAllocationManager.scala:310)
>       - locked <0x00000000880e6308> (a 
> org.apache.spark.ExecutorAllocationManager)
>       at 
> org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:264)
>       - locked <0x00000000880e6308> (a 
> org.apache.spark.ExecutorAllocationManager)
>       at 
> org.apache.spark.ExecutorAllocationManager$$anon$2.run(ExecutorAllocationManager.scala:223)
> "SparkListenerBus" #161 daemon prio=5 os_prio=0 tid=0x00007fa3053be000 
> nid=0xcec9 waiting for monitor entry [0x00007fa2b3dfc000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> org.apache.spark.ExecutorAllocationManager$ExecutorAllocationListener.onTaskEnd(ExecutorAllocationManager.scala:618)
>       - waiting to lock <0x00000000880e6308> (a 
> org.apache.spark.ExecutorAllocationManager)
>       at 
> org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:42)
>       at 
> org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
>       at 
> org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
>       at 
> org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
>       at 
> org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
>       at 
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
>       at 
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
>       at 
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>       at 
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
>       at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1182)
>       at 
> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to