Our prototype application reads a 20GB dataset from HDFS (nearly 180 partitions), groups it by key, sorts by rank and write out to HDFS in that order. The job runs against two nodes (16G, 24 cores per node available to the job). I noticed that the execution plan results in two sortByKey stages, followed by groupByKey and a saveAsTextFile. The shuffle write for groupByKey is ~20G in size. During the saveAsTextFile stage, however, after writing out 50% of the partitions, the memory on one of the executors shoots up to 16G and the executor spends all its time in GC's. Eventually, the logs show an OOM [1] included at the end of the mail followed by another TID loss to "FileSystem closed" errors indicated in stacktrace [2]. I noticed that the partitions may be skewed as a result of the sort, with one or two paritions having upto 10% of all rows. I also noticed that the data written out until the 50% stage (when memory shoots up) had a large number of empty part- files followed by a few files of 200M in size. It could hence be that the attempt to write out one large partition is causing the OOM. The tuning documentation states that a larger level of parallelism might help mitigate the problem, but setting default parallelism to 2x the number of cores didn't help either. While I could attempt to partition manually (choosing custom ranges for a range partitioner), it appears that limiting read sizes (from the earlier shuffle) during the write to HDFS should help successfully write out even overloaded partitions as well. Are there parameters that might help achieve that? (On a related note, any idea if using Kryo serialization would help in this case?)
Thanks, Bharath [1] 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 711 (task 33.0:137) 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at org.apache.spark.InterruptibleIterator.to (InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62) [2] 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 774 (task 33.0:198) 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /tmp/spark-local-20140809033328-e374/31/shuffle_0_5_198 (No such file or directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.<init>(RandomAccessFile.java:236) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:203) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:203) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:234) at org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:537) at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:76) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)