If you don't want a few slow tasks to slow down the entire job, you can turn on speculation. Here are the speculation settings from Spark Configuration - Spark 1.2.0 Documentation. | | | | | | | | | Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark Properties Dynamically Loading Spark Properties Viewing Spark Properties Available Properties Application Properties Runtime Environment Shuffle Behavior Spark UI | | | | View on spark.apache.org | Preview by Yahoo | | | | |
| spark.speculation | false | If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. | | spark.speculation.interval | 100 | How often Spark will check for tasks to speculate, in milliseconds. | | spark.speculation.quantile | 0.75 | Percentage of tasks which must be complete before speculation is enabled for a particular stage. | | spark.speculation.multiplier | 1.5 | How many times slower a task is than the median to be considered for speculation. | On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava <a_k_srivast...@yahoo.com.INVALID> wrote: Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - "Executor task launch worker-12" daemon prio=10 tid=0x00007fcd44276000 nid=0x3f85 waiting on condition [0x00007fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007fd0aee82e00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) Any inputs/suggestions to improve job time will be appreciated. Regards,Ajay