[ https://issues.apache.org/jira/browse/SPARK-4073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111148#comment-15111148 ]
Juliet Hougland commented on SPARK-4073: ---------------------------------------- I have run in to a related problem. I am reading a snappy compressed parquet file via pyspark and get the follow OOM error: 16/01/21 11:43:10 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for /opt/anaconda2/bin/python,5,main] java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:658) at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) at parquet.hadoop.codec.SnappyDecompressor.setInput(SnappyDecompressor.java:102) at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:46) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204) at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89) at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72) at parquet.column.Encoding$1.initDictionary(Encoding.java:89) at parquet.column.Encoding$4.initDictionary(Encoding.java:148) at parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:337) at parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:66) at parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:61) at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:205) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205) > Parquet+Snappy can cause significant off-heap memory usage > ---------------------------------------------------------- > > Key: SPARK-4073 > URL: https://issues.apache.org/jira/browse/SPARK-4073 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.1.0 > Reporter: Patrick Wendell > Priority: Critical > > The parquet snappy codec allocates off-heap buffers for decompression[1]. In > one cases the observed size of these buffers was high enough to add several > GB of data to the overall virtual memory usage of the Spark executor process. > I don't understand enough about our use of Snappy to fully grok how much data > we would _expect_ to be present in these buffers at any given time, but I can > say a few things. > 1. The dataset had individual rows that were fairly large, e.g. megabytes. > 2. Direct buffers are not cleaned up until GC events, and overall there was > not much heap contention. So maybe they just weren't being cleaned. > I opened PARQUET-118 to see if they can provide an option to use on-heap > buffers for decompression. In the mean time, we could consider changing the > default back to gzip, or we could do nothing (not sure how many other users > will hit this). > [1] > https://github.com/apache/incubator-parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java#L28 -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org