Ajay,
        Unless we are dealing with some synchronization/conditional variable 
bug in Spark, try this per tuning guide:
Cache Size Tuning

One important configuration parameter for GC is the amount of memory that 
should be used for caching RDDs. By default, Spark uses 60% of the configured 
executor memory (spark.executor.memory) to cache RDDs. This means that 40% of 
memory is available for any objects created during task execution.

In case your tasks slow down and you find that your JVM is garbage-collecting 
frequently or running out of memory, lowering this value will help reduce the 
memory consumption. To change this to, say, 50%, you can call 
conf.set("spark.storage.memoryFraction", "0.5") on your SparkConf. Combined 
with the use of serialized caching, using a smaller cache should be sufficient 
to mitigate most of the garbage collection problems. In case you are interested 
in further tuning the Java GC, continue reading below.


Complete list of tips here:
https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage 
<https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage>

Cheers,
- Nicos

> On Jan 15, 2015, at 6:49 AM, Ajay Srivastava 
> <a_k_srivast...@yahoo.com.INVALID> wrote:
> 
> Thanks RK. I can turn on speculative execution but I am trying to find out 
> actual reason for delay as it happens on any node. Any idea about the stack 
> trace in my previous mail.
> 
> Regards,
> Ajay
> 
> 
> On Thursday, January 15, 2015 8:02 PM, RK <prk...@yahoo.com.INVALID> wrote:
> 
> 
> If you don't want a few slow tasks to slow down the entire job, you can turn 
> on speculation. 
> 
> Here are the speculation settings from Spark Configuration - Spark 1.2.0 
> Documentation <http://spark.apache.org/docs/1.2.0/configuration.html>.
>  
>  
>  
>  
>  
>  
> Spark Configuration - Spark 1.2.0 Documentation
>  <http://spark.apache.org/docs/1.2.0/configuration.html>Spark Configuration 
> Spark Properties Dynamically Loading Spark Properties Viewing Spark 
> Properties Available Properties Application Properties Runtime Environment 
> Shuffle Behavior Spark UI
> View on spark.apache.org 
> <http://spark.apache.org/docs/1.2.0/configuration.html>      
> Preview by Yahoo
>  
> 
> spark.speculation     false   If set to "true", performs speculative 
> execution of tasks. This means if one or more tasks are running slowly in a 
> stage, they will be re-launched.
> spark.speculation.interval    100     How often Spark will check for tasks to 
> speculate, in milliseconds.
> spark.speculation.quantile    0.75    Percentage of tasks which must be 
> complete before speculation is enabled for a particular stage.
> spark.speculation.multiplier  1.5     
> How many times slower a task is than the median to be considered for 
> speculation.
> 
>  
> 
> 
> On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
> <a_k_srivast...@yahoo.com.INVALID> wrote:
> 
> 
> Hi,
> 
> My spark job is taking long time. I see that some tasks are taking longer 
> time for same amount of data and shuffle read/write. What could be the 
> possible reasons for it ?
> 
> The thread-dump sometimes show that all the tasks in an executor are waiting 
> with following stack trace -
> 
> "Executor task launch worker-12" daemon prio=10 tid=0x00007fcd44276000 
> nid=0x3f85 waiting on condition [0x00007fcce3ddc000]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x00007fd0aee82e00> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(Unknown Source)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
>  Source)
>     at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
>     at 
> org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
>     at 
> org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
>     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>     at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>     at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>     at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
>     at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>     at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
>     at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>     at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>     at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>     at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>     at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
>     
> Any inputs/suggestions to improve job time will be appreciated.
> 
> Regards,
> Ajay
> 
> 
> 
> 
> 
> 

Reply via email to