This may be related: https://github.com/Parquet/parquet-mr/issues/211

Perhaps if we change our configuration settings for Parquet it would get
better, but the performance characteristics of Snappy are pretty bad here
under some circumstances.

On Tue, Sep 23, 2014 at 10:13 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Cool, that's pretty much what I was thinking as far as configuration goes.
>
> Running on Mesos.  Worker nodes are amazon xlarge, so 4 core / 15g.  I've
> tried executor memory sizes as high as 6G
> Default hdfs block size 64m, about 25G of total data written by a job with
> 128 partitions.  The exception comes when trying to read the data (all
> columns).
>
> Schema looks like this:
>
> case class A(
>   a: Long,
>   b: Long,
>   c: Byte,
>   d: Option[Long],
>   e: Option[Long],
>   f: Option[Long],
>   g: Option[Long],
>   h: Option[Int],
>   i: Long,
>   j: Option[Int],
>   k: Seq[Int],
>   l: Seq[Int],
>   m: Seq[Int]
> )
>
> We're just going back to gzip for now, but might be nice to help someone
> else avoid running into this.
>
> On Tue, Sep 23, 2014 at 11:18 AM, Michael Armbrust <mich...@databricks.com
> >
> wrote:
>
> > I actually submitted a patch to do this yesterday:
> > https://github.com/apache/spark/pull/2493
> >
> > Can you tell us more about your configuration.  In particular how much
> > memory/cores do the executors have and what does the schema of your data
> > look like?
> >
> > On Tue, Sep 23, 2014 at 7:39 AM, Cody Koeninger <c...@koeninger.org>
> > wrote:
> >
> >> So as a related question, is there any reason the settings in SQLConf
> >> aren't read from the spark context's conf?  I understand why the sql
> conf
> >> is mutable, but it's not particularly user friendly to have most spark
> >> configuration set via e.g. defaults.conf or --properties-file, but for
> >> spark sql to ignore those.
> >>
> >> On Mon, Sep 22, 2014 at 4:34 PM, Cody Koeninger <c...@koeninger.org>
> >> wrote:
> >>
> >> > After commit 8856c3d8 switched from gzip to snappy as default parquet
> >> > compression codec, I'm seeing the following when trying to read
> parquet
> >> > files saved using the new default (same schema and roughly same size
> as
> >> > files that were previously working):
> >> >
> >> > java.lang.OutOfMemoryError: Direct buffer memory
> >> >         java.nio.Bits.reserveMemory(Bits.java:658)
> >> >         java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> >> >         java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> >> >
> >> >
> >>
> parquet.hadoop.codec.SnappyDecompressor.setInput(SnappyDecompressor.java:99)
> >> >
> >> >
> >>
> parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:43)
> >> >         java.io.DataInputStream.readFully(DataInputStream.java:195)
> >> >         java.io.DataInputStream.readFully(DataInputStream.java:169)
> >> >
> >> >
> >>
> parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:201)
> >> >
> >> >
> parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:521)
> >> >
> >> >
> >>
> parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:493)
> >> >
> >> >
> parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:546)
> >> >
> >> > parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:339)
> >> >
> >> >
> >>
> parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:63)
> >> >
> >> >
> >>
> parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:58)
> >> >
> >> >
> >>
> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:265)
> >> >
> >>  parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60)
> >> >
> >>  parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74)
> >> >
> >> >
> >>
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110)
> >> >
> >> >
> >>
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
> >> >
> >> >
> >>
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
> >> >
> >> >
> >>
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139)
> >> >
> >> >
> >>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >> >         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >> >         scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> >> >         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >> >         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >> >         scala.collection.Iterator$class.isEmpty(Iterator.scala:256)
> >> >         scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157)
> >> >
> >> >
> >>
> org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:220)
> >> >
> >> >
> >>
> org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:219)
> >> >         org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
> >> >         org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
> >> >
> >> >
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> >> >
>  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >> >
> >>  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> >> >         org.apache.spark.scheduler.Task.run(Task.scala:54)
> >> >
> >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
> >> >
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> >
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> >         java.lang.Thread.run(Thread.java:722)
> >> >
> >> >
> >> >
> >>
> >
> >
>

Reply via email to