Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-12 Thread Night Wolf
I'm seeing a similar thing with a slightly different stack trace. Ideas?

org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:64)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)


On Tue, May 12, 2015 at 5:55 AM, Reynold Xin r...@databricks.com wrote:

 Looks like it is spending a lot of time doing hash probing. It could be a
 number of the following:

 1. hash probing itself is inherently expensive compared with rest of your
 workload

 2. murmur3 doesn't work well with this key distribution

 3. quadratic probing (triangular sequence) with a power-of-2 hash table
 works really badly for this workload.

 One way to test this is to instrument changeValue function to store the
 number of probes in total, and then log it. We added this probing
 capability to the new Bytes2Bytes hash map we built. We should consider
 just having it being reported as some built-in metrics to facilitate
 debugging.


 https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214






 On Mon, May 11, 2015 at 4:21 AM, Michal Haris michal.ha...@visualdna.com
 wrote:

  This is the stack trace of the worker thread:
 
 
 
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
 
 
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
 
 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
  org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
 
 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
  org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  org.apache.spark.scheduler.Task.run(Task.scala:64)
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:745)
 
  On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote:
 
  Do you have any more specific profiling data that you can share?  I'm
  curious to know where AppendOnlyMap.changeValue is being called from.
 
  On Fri, May 8, 2015 at 1:26 PM, Michal Haris 
 michal.ha...@visualdna.com
  wrote:
 
  +dev
  On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com
 wrote:
 
   Just wanted to check if somebody has seen similar behaviour or knows
  what
   we might be doing wrong. We have a relatively complex spark
 application
   which processes half a terabyte of data at various stages. We have
  profiled
   it in several ways and everything seems to point to one place where
  90% of
   the time is spent:  AppendOnlyMap.changeValue. The job scales and is
   relatively faster than its map-reduce alternative but it still feels
  slower
   than it should be. I am suspecting too much spill but I haven't seen
  any
   improvement by increasing number of partitions to 10k. Any idea would
  be
   appreciated.
  
   --
   Michal Haris
   Technical Architect
   direct line: +44 (0) 207 749 0229
   www.visualdna.com | t: +44 (0) 207 734 7033,
  
 
 
 
 
 
  --
  Michal Haris
  Technical Architect
  direct line: +44 (0) 207 749 0229
  www.visualdna.com | t: +44 (0) 207 734 7033,
 



Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-12 Thread Matei Zaharia
It could also be that your hash function is expensive. What is the key class 
you have for the reduceByKey / groupByKey?

Matei

 On May 12, 2015, at 10:08 AM, Night Wolf nightwolf...@gmail.com wrote:
 
 I'm seeing a similar thing with a slightly different stack trace. Ideas?
 
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 org.apache.spark.scheduler.Task.run(Task.scala:64)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 
 On Tue, May 12, 2015 at 5:55 AM, Reynold Xin r...@databricks.com 
 mailto:r...@databricks.com wrote:
 Looks like it is spending a lot of time doing hash probing. It could be a
 number of the following:
 
 1. hash probing itself is inherently expensive compared with rest of your
 workload
 
 2. murmur3 doesn't work well with this key distribution
 
 3. quadratic probing (triangular sequence) with a power-of-2 hash table
 works really badly for this workload.
 
 One way to test this is to instrument changeValue function to store the
 number of probes in total, and then log it. We added this probing
 capability to the new Bytes2Bytes hash map we built. We should consider
 just having it being reported as some built-in metrics to facilitate
 debugging.
 
 https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214
  
 https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214
 
 
 
 
 
 
 On Mon, May 11, 2015 at 4:21 AM, Michal Haris michal.ha...@visualdna.com 
 mailto:michal.ha...@visualdna.com
 wrote:
 
  This is the stack trace of the worker thread:
 
 
  org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
 
  org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
 
  org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
  org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
 
  org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
  org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  org.apache.spark.scheduler.Task.run(Task.scala:64)
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:745)
 
  On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com 
  mailto:rosenvi...@gmail.com wrote:
 
  Do you have any more specific profiling data that you can share?  I'm
  curious to know where AppendOnlyMap.changeValue is being called from.
 
  On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com 
  mailto:michal.ha...@visualdna.com
  wrote:
 
  +dev
  On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com 
  mailto:michal.ha...@visualdna.com wrote:
 
   Just wanted to check if somebody has seen similar behaviour or knows
  what
   we might be doing wrong. We have a relatively complex spark application
   which processes half a terabyte of data at various stages. We have
  profiled
   it in several ways and everything seems to point to one place where
  90% of
   the time is spent:  AppendOnlyMap.changeValue. The job scales and is
   relatively faster than its map-reduce alternative but it still 

Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-11 Thread Michal Haris
This is the stack trace of the worker thread:

org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
org.apache.spark.scheduler.Task.run(Task.scala:64)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote:

 Do you have any more specific profiling data that you can share?  I'm
 curious to know where AppendOnlyMap.changeValue is being called from.

 On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com
 wrote:

 +dev
 On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote:

  Just wanted to check if somebody has seen similar behaviour or knows
 what
  we might be doing wrong. We have a relatively complex spark application
  which processes half a terabyte of data at various stages. We have
 profiled
  it in several ways and everything seems to point to one place where 90%
 of
  the time is spent:  AppendOnlyMap.changeValue. The job scales and is
  relatively faster than its map-reduce alternative but it still feels
 slower
  than it should be. I am suspecting too much spill but I haven't seen any
  improvement by increasing number of partitions to 10k. Any idea would be
  appreciated.
 
  --
  Michal Haris
  Technical Architect
  direct line: +44 (0) 207 749 0229
  www.visualdna.com | t: +44 (0) 207 734 7033,
 





-- 
Michal Haris
Technical Architect
direct line: +44 (0) 207 749 0229
www.visualdna.com | t: +44 (0) 207 734 7033,


Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-11 Thread Reynold Xin
Looks like it is spending a lot of time doing hash probing. It could be a
number of the following:

1. hash probing itself is inherently expensive compared with rest of your
workload

2. murmur3 doesn't work well with this key distribution

3. quadratic probing (triangular sequence) with a power-of-2 hash table
works really badly for this workload.

One way to test this is to instrument changeValue function to store the
number of probes in total, and then log it. We added this probing
capability to the new Bytes2Bytes hash map we built. We should consider
just having it being reported as some built-in metrics to facilitate
debugging.

https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214






On Mon, May 11, 2015 at 4:21 AM, Michal Haris michal.ha...@visualdna.com
wrote:

 This is the stack trace of the worker thread:


 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)

 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
 org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)

 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
 org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 org.apache.spark.scheduler.Task.run(Task.scala:64)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)

 On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote:

 Do you have any more specific profiling data that you can share?  I'm
 curious to know where AppendOnlyMap.changeValue is being called from.

 On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com
 wrote:

 +dev
 On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote:

  Just wanted to check if somebody has seen similar behaviour or knows
 what
  we might be doing wrong. We have a relatively complex spark application
  which processes half a terabyte of data at various stages. We have
 profiled
  it in several ways and everything seems to point to one place where
 90% of
  the time is spent:  AppendOnlyMap.changeValue. The job scales and is
  relatively faster than its map-reduce alternative but it still feels
 slower
  than it should be. I am suspecting too much spill but I haven't seen
 any
  improvement by increasing number of partitions to 10k. Any idea would
 be
  appreciated.
 
  --
  Michal Haris
  Technical Architect
  direct line: +44 (0) 207 749 0229
  www.visualdna.com | t: +44 (0) 207 734 7033,
 





 --
 Michal Haris
 Technical Architect
 direct line: +44 (0) 207 749 0229
 www.visualdna.com | t: +44 (0) 207 734 7033,



Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-08 Thread Josh Rosen
Do you have any more specific profiling data that you can share?  I'm
curious to know where AppendOnlyMap.changeValue is being called from.

On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com
wrote:

 +dev
 On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote:

  Just wanted to check if somebody has seen similar behaviour or knows what
  we might be doing wrong. We have a relatively complex spark application
  which processes half a terabyte of data at various stages. We have
 profiled
  it in several ways and everything seems to point to one place where 90%
 of
  the time is spent:  AppendOnlyMap.changeValue. The job scales and is
  relatively faster than its map-reduce alternative but it still feels
 slower
  than it should be. I am suspecting too much spill but I haven't seen any
  improvement by increasing number of partitions to 10k. Any idea would be
  appreciated.
 
  --
  Michal Haris
  Technical Architect
  direct line: +44 (0) 207 749 0229
  www.visualdna.com | t: +44 (0) 207 734 7033,
 



large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-06 Thread Michal Haris
Just wanted to check if somebody has seen similar behaviour or knows what
we might be doing wrong. We have a relatively complex spark application
which processes half a terabyte of data at various stages. We have profiled
it in several ways and everything seems to point to one place where 90% of
the time is spent:  AppendOnlyMap.changeValue. The job scales and is
relatively faster than its map-reduce alternative but it still feels slower
than it should be. I am suspecting too much spill but I haven't seen any
improvement by increasing number of partitions to 10k. Any idea would be
appreciated.

-- 
Michal Haris
Technical Architect
direct line: +44 (0) 207 749 0229
www.visualdna.com | t: +44 (0) 207 734 7033,