[ 
https://issues.apache.org/jira/browse/SPARK-4073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111148#comment-15111148
 ] 

Juliet Hougland commented on SPARK-4073:
----------------------------------------

I have run in to a related problem. I am reading a snappy compressed parquet 
file via pyspark and get the follow OOM error:

16/01/21 11:43:10 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception 
in thread Thread[stdout writer for /opt/anaconda2/bin/python,5,main]
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:658)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
        at 
parquet.hadoop.codec.SnappyDecompressor.setInput(SnappyDecompressor.java:102)
        at 
parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:46)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at 
parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
        at 
parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89)
        at 
parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72)
        at parquet.column.Encoding$1.initDictionary(Encoding.java:89)
        at parquet.column.Encoding$4.initDictionary(Encoding.java:148)
        at 
parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:337)
        at 
parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:66)
        at 
parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:61)
        at 
parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
        at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
        at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
        at 
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
        at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
        at 
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135)
        at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:205)
        at 
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
        at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114)
        at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)



> Parquet+Snappy can cause significant off-heap memory usage
> ----------------------------------------------------------
>
>                 Key: SPARK-4073
>                 URL: https://issues.apache.org/jira/browse/SPARK-4073
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.1.0
>            Reporter: Patrick Wendell
>            Priority: Critical
>
> The parquet snappy codec allocates off-heap buffers for decompression[1]. In 
> one cases the observed size of these buffers was high enough to add several 
> GB of data to the overall virtual memory usage of the Spark executor process. 
> I don't understand enough about our use of Snappy to fully grok how much data 
> we would _expect_ to be present in these buffers at any given time, but I can 
> say a few things.
> 1. The dataset had individual rows that were fairly large, e.g. megabytes.
> 2. Direct buffers are not cleaned up until GC events, and overall there was 
> not much heap contention. So maybe they just weren't being cleaned.
> I opened PARQUET-118 to see if they can provide an option to use on-heap 
> buffers for decompression. In the mean time, we could consider changing the 
> default back to gzip, or we could do nothing (not sure how many other users 
> will hit this).
> [1] 
> https://github.com/apache/incubator-parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java#L28



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to