It could also be that your hash function is expensive. What is the key class you have for the reduceByKey / groupByKey?
Matei > On May 12, 2015, at 10:08 AM, Night Wolf <nightwolf...@gmail.com> wrote: > > I'm seeing a similar thing with a slightly different stack trace. Ideas? > > org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150) > org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205) > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > org.apache.spark.scheduler.Task.run(Task.scala:64) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:745) > > On Tue, May 12, 2015 at 5:55 AM, Reynold Xin <r...@databricks.com > <mailto:r...@databricks.com>> wrote: > Looks like it is spending a lot of time doing hash probing. It could be a > number of the following: > > 1. hash probing itself is inherently expensive compared with rest of your > workload > > 2. murmur3 doesn't work well with this key distribution > > 3. quadratic probing (triangular sequence) with a power-of-2 hash table > works really badly for this workload. > > One way to test this is to instrument changeValue function to store the > number of probes in total, and then log it. We added this probing > capability to the new Bytes2Bytes hash map we built. We should consider > just having it being reported as some built-in metrics to facilitate > debugging. > > https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214 > > <https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214> > > > > > > > On Mon, May 11, 2015 at 4:21 AM, Michal Haris <michal.ha...@visualdna.com > <mailto:michal.ha...@visualdna.com>> > wrote: > > > This is the stack trace of the worker thread: > > > > > > org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150) > > > > org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) > > > > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) > > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) > > > > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) > > org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:242) > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > org.apache.spark.scheduler.Task.run(Task.scala:64) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > java.lang.Thread.run(Thread.java:745) > > > > On 8 May 2015 at 22:12, Josh Rosen <rosenvi...@gmail.com > > <mailto:rosenvi...@gmail.com>> wrote: > > > >> Do you have any more specific profiling data that you can share? I'm > >> curious to know where AppendOnlyMap.changeValue is being called from. > >> > >> On Fri, May 8, 2015 at 1:26 PM, Michal Haris <michal.ha...@visualdna.com > >> <mailto:michal.ha...@visualdna.com>> > >> wrote: > >> > >>> +dev > >>> On 6 May 2015 10:45, "Michal Haris" <michal.ha...@visualdna.com > >>> <mailto:michal.ha...@visualdna.com>> wrote: > >>> > >>> > Just wanted to check if somebody has seen similar behaviour or knows > >>> what > >>> > we might be doing wrong. We have a relatively complex spark application > >>> > which processes half a terabyte of data at various stages. We have > >>> profiled > >>> > it in several ways and everything seems to point to one place where > >>> 90% of > >>> > the time is spent: AppendOnlyMap.changeValue. The job scales and is > >>> > relatively faster than its map-reduce alternative but it still feels > >>> slower > >>> > than it should be. I am suspecting too much spill but I haven't seen > >>> any > >>> > improvement by increasing number of partitions to 10k. Any idea would > >>> be > >>> > appreciated. > >>> > > >>> > -- > >>> > Michal Haris > >>> > Technical Architect > >>> > direct line: +44 (0) 207 749 0229 > >>> > <tel:%2B44%20%280%29%20207%20749%200229> > >>> > www.visualdna.com <http://www.visualdna.com/> | t: +44 (0) 207 734 7033 > >>> > <tel:%2B44%20%280%29%20207%20734%207033>, > >>> > > >>> > >> > >> > > > > > > -- > > Michal Haris > > Technical Architect > > direct line: +44 (0) 207 749 0229 <tel:%2B44%20%280%29%20207%20749%200229> > > www.visualdna.com <http://www.visualdna.com/> | t: +44 (0) 207 734 7033 > > <tel:%2B44%20%280%29%20207%20734%207033>, > > >