Looks like it is spending a lot of time doing hash probing. It could be a
number of the following:

1. hash probing itself is inherently expensive compared with rest of your
workload

2. murmur3 doesn't work well with this key distribution

3. quadratic probing (triangular sequence) with a power-of-2 hash table
works really badly for this workload.

One way to test this is to instrument changeValue function to store the
number of probes in total, and then log it. We added this probing
capability to the new Bytes2Bytes hash map we built. We should consider
just having it being reported as some built-in metrics to facilitate
debugging.

https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214






On Mon, May 11, 2015 at 4:21 AM, Michal Haris <michal.ha...@visualdna.com>
wrote:

> This is the stack trace of the worker thread:
>
>
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
>
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
>
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> org.apache.spark.scheduler.Task.run(Task.scala:64)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
>
> On 8 May 2015 at 22:12, Josh Rosen <rosenvi...@gmail.com> wrote:
>
>> Do you have any more specific profiling data that you can share?  I'm
>> curious to know where AppendOnlyMap.changeValue is being called from.
>>
>> On Fri, May 8, 2015 at 1:26 PM, Michal Haris <michal.ha...@visualdna.com>
>> wrote:
>>
>>> +dev
>>> On 6 May 2015 10:45, "Michal Haris" <michal.ha...@visualdna.com> wrote:
>>>
>>> > Just wanted to check if somebody has seen similar behaviour or knows
>>> what
>>> > we might be doing wrong. We have a relatively complex spark application
>>> > which processes half a terabyte of data at various stages. We have
>>> profiled
>>> > it in several ways and everything seems to point to one place where
>>> 90% of
>>> > the time is spent:  AppendOnlyMap.changeValue. The job scales and is
>>> > relatively faster than its map-reduce alternative but it still feels
>>> slower
>>> > than it should be. I am suspecting too much spill but I haven't seen
>>> any
>>> > improvement by increasing number of partitions to 10k. Any idea would
>>> be
>>> > appreciated.
>>> >
>>> > --
>>> > Michal Haris
>>> > Technical Architect
>>> > direct line: +44 (0) 207 749 0229
>>> > www.visualdna.com | t: +44 (0) 207 734 7033,
>>> >
>>>
>>
>>
>
>
> --
> Michal Haris
> Technical Architect
> direct line: +44 (0) 207 749 0229
> www.visualdna.com | t: +44 (0) 207 734 7033,
>

Reply via email to