It could also be that your hash function is expensive. What is the key class 
you have for the reduceByKey / groupByKey?

Matei

> On May 12, 2015, at 10:08 AM, Night Wolf <nightwolf...@gmail.com> wrote:
> 
> I'm seeing a similar thing with a slightly different stack trace. Ideas?
> 
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> org.apache.spark.scheduler.Task.run(Task.scala:64)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
> 
> On Tue, May 12, 2015 at 5:55 AM, Reynold Xin <r...@databricks.com 
> <mailto:r...@databricks.com>> wrote:
> Looks like it is spending a lot of time doing hash probing. It could be a
> number of the following:
> 
> 1. hash probing itself is inherently expensive compared with rest of your
> workload
> 
> 2. murmur3 doesn't work well with this key distribution
> 
> 3. quadratic probing (triangular sequence) with a power-of-2 hash table
> works really badly for this workload.
> 
> One way to test this is to instrument changeValue function to store the
> number of probes in total, and then log it. We added this probing
> capability to the new Bytes2Bytes hash map we built. We should consider
> just having it being reported as some built-in metrics to facilitate
> debugging.
> 
> https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214
>  
> <https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214>
> 
> 
> 
> 
> 
> 
> On Mon, May 11, 2015 at 4:21 AM, Michal Haris <michal.ha...@visualdna.com 
> <mailto:michal.ha...@visualdna.com>>
> wrote:
> 
> > This is the stack trace of the worker thread:
> >
> >
> > org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
> >
> > org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
> >
> > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
> > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
> >
> > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> > org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> > org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> > org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> > org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> > org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> > org.apache.spark.scheduler.Task.run(Task.scala:64)
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> >
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > java.lang.Thread.run(Thread.java:745)
> >
> > On 8 May 2015 at 22:12, Josh Rosen <rosenvi...@gmail.com 
> > <mailto:rosenvi...@gmail.com>> wrote:
> >
> >> Do you have any more specific profiling data that you can share?  I'm
> >> curious to know where AppendOnlyMap.changeValue is being called from.
> >>
> >> On Fri, May 8, 2015 at 1:26 PM, Michal Haris <michal.ha...@visualdna.com 
> >> <mailto:michal.ha...@visualdna.com>>
> >> wrote:
> >>
> >>> +dev
> >>> On 6 May 2015 10:45, "Michal Haris" <michal.ha...@visualdna.com 
> >>> <mailto:michal.ha...@visualdna.com>> wrote:
> >>>
> >>> > Just wanted to check if somebody has seen similar behaviour or knows
> >>> what
> >>> > we might be doing wrong. We have a relatively complex spark application
> >>> > which processes half a terabyte of data at various stages. We have
> >>> profiled
> >>> > it in several ways and everything seems to point to one place where
> >>> 90% of
> >>> > the time is spent:  AppendOnlyMap.changeValue. The job scales and is
> >>> > relatively faster than its map-reduce alternative but it still feels
> >>> slower
> >>> > than it should be. I am suspecting too much spill but I haven't seen
> >>> any
> >>> > improvement by increasing number of partitions to 10k. Any idea would
> >>> be
> >>> > appreciated.
> >>> >
> >>> > --
> >>> > Michal Haris
> >>> > Technical Architect
> >>> > direct line: +44 (0) 207 749 0229 
> >>> > <tel:%2B44%20%280%29%20207%20749%200229>
> >>> > www.visualdna.com <http://www.visualdna.com/> | t: +44 (0) 207 734 7033 
> >>> > <tel:%2B44%20%280%29%20207%20734%207033>,
> >>> >
> >>>
> >>
> >>
> >
> >
> > --
> > Michal Haris
> > Technical Architect
> > direct line: +44 (0) 207 749 0229 <tel:%2B44%20%280%29%20207%20749%200229>
> > www.visualdna.com <http://www.visualdna.com/> | t: +44 (0) 207 734 7033 
> > <tel:%2B44%20%280%29%20207%20734%207033>,
> >
> 

Reply via email to