Update: as expected, switching to kryo merely delays the inevitable. Does anyone have experience controlling memory consumption while processing (e.g. writing out) imbalanced partitions? On 09-Aug-2014 10:41 am, "Bharath Ravi Kumar" <reachb...@gmail.com> wrote:
> Our prototype application reads a 20GB dataset from HDFS (nearly 180 > partitions), groups it by key, sorts by rank and write out to HDFS in that > order. The job runs against two nodes (16G, 24 cores per node available to > the job). I noticed that the execution plan results in two sortByKey > stages, followed by groupByKey and a saveAsTextFile. The shuffle write for > groupByKey is ~20G in size. During the saveAsTextFile stage, however, after > writing out 50% of the partitions, the memory on one of the executors > shoots up to 16G and the executor spends all its time in GC's. Eventually, > the logs show an OOM [1] included at the end of the mail followed by > another TID loss to "FileSystem closed" errors indicated in stacktrace [2]. > I noticed that the partitions may be skewed as a result of the sort, with > one or two paritions having upto 10% of all rows. I also noticed that the > data written out until the 50% stage (when memory shoots up) had a large > number of empty part- files followed by a few files of 200M in size. It > could hence be that the attempt to write out one large partition is causing > the OOM. The tuning documentation states that a larger level of parallelism > might help mitigate the problem, but setting default parallelism to 2x the > number of cores didn't help either. While I could attempt to partition > manually (choosing custom ranges for a range partitioner), it appears that > limiting read sizes (from the earlier shuffle) during the write to HDFS > should help successfully write out even overloaded partitions as well. Are > there parameters that might help achieve that? > (On a related note, any idea if using Kryo serialization would help in > this case?) > > Thanks, > Bharath > > [1] > 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 711 (task 33.0:137) > 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to > java.lang.OutOfMemoryError > java.lang.OutOfMemoryError: Java heap space > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) > at > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at > org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to > (TraversableOnce.scala:273) > at org.apache.spark.InterruptibleIterator.to > (InterruptibleIterator.scala:28) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at > org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at > org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) > at > org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62) > > > > [2] > 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 774 (task 33.0:198) > 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to > java.io.FileNotFoundException > java.io.FileNotFoundException: > /tmp/spark-local-20140809033328-e374/31/shuffle_0_5_198 (No such file or > directory) > at java.io.RandomAccessFile.open(Native Method) > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:236) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98) > at > org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) > at > org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) > at > org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) > at > org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:203) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:203) > at > org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:234) > at > org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:537) > at > org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:76) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > at org.apache.spark.scheduler.Task.run(Task.scala:51) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > >