Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue
I'm seeing a similar thing with a slightly different stack trace. Ideas? org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150) org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205) org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:64) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) On Tue, May 12, 2015 at 5:55 AM, Reynold Xin r...@databricks.com wrote: Looks like it is spending a lot of time doing hash probing. It could be a number of the following: 1. hash probing itself is inherently expensive compared with rest of your workload 2. murmur3 doesn't work well with this key distribution 3. quadratic probing (triangular sequence) with a power-of-2 hash table works really badly for this workload. One way to test this is to instrument changeValue function to store the number of probes in total, and then log it. We added this probing capability to the new Bytes2Bytes hash map we built. We should consider just having it being reported as some built-in metrics to facilitate debugging. https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214 On Mon, May 11, 2015 at 4:21 AM, Michal Haris michal.ha...@visualdna.com wrote: This is the stack trace of the worker thread: org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150) org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:242) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) org.apache.spark.scheduler.Task.run(Task.scala:64) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote: Do you have any more specific profiling data that you can share? I'm curious to know where AppendOnlyMap.changeValue is being called from. On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com wrote: +dev On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote: Just wanted to check if somebody has seen similar behaviour or knows what we might be doing wrong. We have a relatively complex spark application which processes half a terabyte of data at various stages. We have profiled it in several ways and everything seems to point to one place where 90% of the time is spent: AppendOnlyMap.changeValue. The job scales and is relatively faster than its map-reduce alternative but it still feels slower than it should be. I am suspecting too much spill but I haven't seen any improvement by increasing number of partitions to 10k. Any idea would be appreciated. -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033, -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033,
Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Seeing similar issues, did you find a solution? One would be to increase the number of partitions if you're doing lots of object creation. On Thu, Feb 12, 2015 at 7:26 PM, fightf...@163.com fightf...@163.com wrote: Hi, patrick Really glad to get your reply. Yes, we are doing group by operations for our work. We know that this is common for growTable when processing large data sets. The problem actually goes to : Do we have any possible chance to self-modify the initialCapacity using specifically for our application? Does spark provide such configs for achieving that goal? We know that this is trickle to get it working. Just want to know that how could this be resolved, or from other possible channel for we did not cover. Expecting for your kind advice. Thanks, Sun. -- fightf...@163.com *From:* Patrick Wendell pwend...@gmail.com *Date:* 2015-02-12 16:12 *To:* fightf...@163.com *CC:* user u...@spark.apache.org; dev dev@spark.apache.org *Subject:* Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets The map will start with a capacity of 64, but will grow to accommodate new data. Are you using the groupBy operator in Spark or are you using Spark SQL's group by? This usually happens if you are grouping or aggregating in a way that doesn't sufficiently condense the data created from each input partition. - Patrick On Wed, Feb 11, 2015 at 9:37 PM, fightf...@163.com fightf...@163.com wrote: Hi, Really have no adequate solution got for this issue. Expecting any available analytical rules or hints. Thanks, Sun. fightf...@163.com From: fightf...@163.com Date: 2015-02-09 11:56 To: user; dev Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets Hi, Problem still exists. Any experts would take a look at this? Thanks, Sun. fightf...@163.com From: fightf...@163.com Date: 2015-02-06 17:54 To: user; dev Subject: Sort Shuffle performance issues about using AppendOnlyMap for large data sets Hi, all Recently we had caught performance issues when using spark 1.2.0 to read data from hbase and do some summary work. Our scenario means to : read large data sets from hbase (maybe 100G+ file) , form hbaseRDD, transform to schemardd, groupby and aggregate the data while got fewer new summary data sets, loading data into hbase (phoenix). Our major issue lead to : aggregate large datasets to get summary data sets would consume too long time (1 hour +) , while that should be supposed not so bad performance. We got the dump file attached and stacktrace from jstack like the following: From the stacktrace and dump file we can identify that processing large datasets would cause frequent AppendOnlyMap growing, and leading to huge map entrysize. We had referenced the source code of org.apache.spark.util.collection.AppendOnlyMap and found that the map had been initialized with capacity of 64. That would be too small for our use case. So the question is : Does anyone had encounted such issues before? How did that be resolved? I cannot find any jira issues for such problems and if someone had seen, please kindly let us know. More specified solution would goes to : Does any possibility exists for user defining the map capacity releatively in spark? If so, please tell how to achieve that. Best Thanks, Sun. Thread 22432: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame) - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame) - org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame) - org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame) -