Our prototype application reads a 20GB dataset from HDFS (nearly 180
partitions), groups it by key, sorts by rank and write out to HDFS in that
order. The job runs against two nodes (16G, 24 cores per node available to
the job). I noticed that the execution plan results in two sortByKey
stages, followed by groupByKey and a saveAsTextFile. The shuffle write for
groupByKey is ~20G in size. During the saveAsTextFile stage, however, after
writing out 50% of the partitions, the memory on one of the executors
shoots up to 16G and the executor spends all its time in GC's. Eventually,
the logs show an OOM [1] included at the end of the mail followed by
another TID loss to "FileSystem closed" errors indicated in stacktrace [2].
I noticed that the partitions may be skewed as a result of the sort, with
one or two paritions having upto 10% of all rows. I also noticed that the
data written out until the 50% stage (when memory shoots up) had a large
number of empty part- files followed by a few files of 200M in size. It
could hence be that the attempt to write out one large partition is causing
the OOM. The tuning documentation states that a larger level of parallelism
might help mitigate the problem, but setting default parallelism to 2x the
number of cores didn't help either. While I could attempt to partition
manually (choosing custom ranges for a range partitioner), it appears that
limiting read sizes (from the earlier shuffle) during the write to HDFS
should help successfully write out even overloaded partitions as well. Are
there parameters that might help achieve that?
(On a related note, any idea if using Kryo serialization would help in this
case?)

Thanks,
Bharath

[1]
14/08/09 04:59:26 WARN TaskSetManager: Lost TID 711 (task 33.0:137)
14/08/09 04:59:26 WARN TaskSetManager: Loss was due to
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: Java heap space
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
        at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at
org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273)
        at org.apache.spark.InterruptibleIterator.to
(InterruptibleIterator.scala:28)
        at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at
org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at
org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at
org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62)



[2]
14/08/09 04:59:26 WARN TaskSetManager: Lost TID 774 (task 33.0:198)
14/08/09 04:59:26 WARN TaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/tmp/spark-local-20140809033328-e374/31/shuffle_0_5_198 (No such file or
directory)
        at java.io.RandomAccessFile.open(Native Method)
        at java.io.RandomAccessFile.<init>(RandomAccessFile.java:236)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98)
        at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
        at
org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
        at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
        at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:203)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:203)
        at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:234)
        at
org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:537)
        at
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:76)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
        at org.apache.spark.scheduler.Task.run(Task.scala:51)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Reply via email to