Update: as expected, switching to kryo merely delays the inevitable. Does
anyone have experience controlling memory consumption while processing
(e.g. writing out) imbalanced partitions?
On 09-Aug-2014 10:41 am, "Bharath Ravi Kumar" <reachb...@gmail.com> wrote:

> Our prototype application reads a 20GB dataset from HDFS (nearly 180
> partitions), groups it by key, sorts by rank and write out to HDFS in that
> order. The job runs against two nodes (16G, 24 cores per node available to
> the job). I noticed that the execution plan results in two sortByKey
> stages, followed by groupByKey and a saveAsTextFile. The shuffle write for
> groupByKey is ~20G in size. During the saveAsTextFile stage, however, after
> writing out 50% of the partitions, the memory on one of the executors
> shoots up to 16G and the executor spends all its time in GC's. Eventually,
> the logs show an OOM [1] included at the end of the mail followed by
> another TID loss to "FileSystem closed" errors indicated in stacktrace [2].
> I noticed that the partitions may be skewed as a result of the sort, with
> one or two paritions having upto 10% of all rows. I also noticed that the
> data written out until the 50% stage (when memory shoots up) had a large
> number of empty part- files followed by a few files of 200M in size. It
> could hence be that the attempt to write out one large partition is causing
> the OOM. The tuning documentation states that a larger level of parallelism
> might help mitigate the problem, but setting default parallelism to 2x the
> number of cores didn't help either. While I could attempt to partition
> manually (choosing custom ranges for a range partitioner), it appears that
> limiting read sizes (from the earlier shuffle) during the write to HDFS
> should help successfully write out even overloaded partitions as well. Are
> there parameters that might help achieve that?
> (On a related note, any idea if using Kryo serialization would help in
> this case?)
>
> Thanks,
> Bharath
>
> [1]
> 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 711 (task 33.0:137)
> 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to
> java.lang.OutOfMemoryError
> java.lang.OutOfMemoryError: Java heap space
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>         at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>         at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>         at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>         at
> org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>         at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>         at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>         at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
>         at org.apache.spark.InterruptibleIterator.to
> (InterruptibleIterator.scala:28)
>         at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>         at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         at
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>         at
> org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62)
>
>
>
> [2]
> 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 774 (task 33.0:198)
> 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to
> java.io.FileNotFoundException
> java.io.FileNotFoundException:
> /tmp/spark-local-20140809033328-e374/31/shuffle_0_5_198 (No such file or
> directory)
>         at java.io.RandomAccessFile.open(Native Method)
>         at java.io.RandomAccessFile.<init>(RandomAccessFile.java:236)
>         at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98)
>         at
> org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
>         at
> org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
>         at
> org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
>         at
> org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:203)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:203)
>         at
> org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:234)
>         at
> org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:537)
>         at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:76)
>         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>         at org.apache.spark.scheduler.Task.run(Task.scala:51)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
>
>

Reply via email to