Does increasing executor memory fix the memory problem?

How many columns does the schema contain? Parquet can be super memory consuming when writing wide tables.

Cheng

On 6/15/15 5:48 AM, Bipin Nag wrote:
HI Davies,

I have tried recent 1.4 and 1.5-snapshot to 1) open the parquet and save it again or 2 apply schema to rdd and save dataframe as parquet but now I get this error (right in the beginning):

java.lang.OutOfMemoryError: Java heap space
at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:386) at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:298) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org <http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:142) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)


Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at parquet.io.api.Binary.fromByteArray(Binary.java:159)
at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:94) at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:67)
    at parquet.column.Encoding$4.initDictionary(Encoding.java:131)
at parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:325) at parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:63) at parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:58) at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
    at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
    at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
    at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:163) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org <http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)

I am not sure if this is related to your patch or some other bug. My error doesn't show up in newer versions, so this is the problem to fix now.

Thanks

On 13 June 2015 at 06:31, Davies Liu <dav...@databricks.com <mailto:dav...@databricks.com>> wrote:

    Maybe it's related to a bug, which is fixed by
    https://github.com/apache/spark/pull/6558 recently.

    On Fri, Jun 12, 2015 at 5:38 AM, Bipin Nag <bipin....@gmail.com
    <mailto:bipin....@gmail.com>> wrote:
    > Hi Cheng,
    >
    > Yes, some rows contain unit instead of decimal values. I believe
    some rows
    > from original source I had don't have any value i.e. it is null.
    And that
    > shows up as unit. How does the spark-sql or parquet handle null
    in place of
    > decimal values, assuming that field is nullable. I will have to
    change it
    > properly.
    >
    > Thanks for helping out.
    > Bipin
    >
    > On 12 June 2015 at 14:57, Cheng Lian <lian.cs....@gmail.com
    <mailto:lian.cs....@gmail.com>> wrote:
    >>
    >> On 6/10/15 8:53 PM, Bipin Nag wrote:
    >>
    >> Hi Cheng,
    >>
    >> I am using Spark 1.3.1 binary available for Hadoop 2.6. I am
    loading an
    >> existing parquet file, then repartitioning and saving it. Doing
    this gives
>> the error. The code for this doesn't look like causing problem. I have a
    >> feeling the source - the existing parquet is the culprit.
    >>
    >> I created that parquet using a jdbcrdd (pulled from microsoft
    sql server).
    >> First I saved jdbcrdd as an objectfile on disk. Then loaded it
    again, made a
    >> dataframe from it using a schema then saved it as a parquet.
    >>
    >> Following is the code :
    >> For saving jdbcrdd:
    >>  name - fullqualifiedtablename
    >>  pk - string for primarykey
    >>  pklast - last id to pull
    >>     val myRDD = new JdbcRDD( sc, () =>
    >>  DriverManager.getConnection(url,username,password) ,
    >>         "SELECT * FROM " + name + " WITH (NOLOCK) WHERE ? <=
    "+pk+" and
    >> "+pk+" <= ?",
    >>         1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
    >>     myRDD.saveAsObjectFile("rawdata/"+name);
    >>
    >> For applying schema and saving the parquet:
    >>     val myschema = schemamap(name)
    >>     val myrdd =
    >> sc.objectFile[Array[Object]]("/home/bipin/rawdata/"+name).map(x =>
    >> org.apache.spark.sql.Row(x:_*))
    >>
    >> Have you tried to print out x here to check its contents? My
    guess is that
    >> x actually contains unit values. For example, the follow Spark
    shell code
    >> can reproduce a similar exception:
    >>
    >> import org.apache.spark.sql.types._
    >> import org.apache.spark.sql.Row
    >>
    >> val schema = StructType(StructField("dec", DecimalType(10, 0))
    :: Nil)
    >> val rdd = sc.parallelize(1 to 10).map(_ => Array(())).map(arr
    => Row(arr:
    >> _*))
    >> val df = sqlContext.createDataFrame(rdd, schema)
    >>
    >> df.saveAsParquetFile("file:///tmp/foo")
    >>
    >>     val actualdata = sqlContext.createDataFrame(myrdd, myschema)
    >>  actualdata.saveAsParquetFile("/home/bipin/stageddata/"+name)
    >>
    >> Schema structtype can be made manually, though I pull table's
    metadata and
    >> make one. It is a simple string translation (see sql docs
    and/or spark
    >> datatypes)
    >>
    >> That is how I created the parquet file. Any help to solve the
    issue is
    >> appreciated.
    >> Thanks
    >> Bipin
    >>
    >>
    >> On 9 June 2015 at 20:44, Cheng Lian <lian.cs....@gmail.com
    <mailto:lian.cs....@gmail.com>> wrote:
    >>>
    >>> Would you please provide a snippet that reproduce this issue? What
    >>> version of Spark were you using?
    >>>
    >>> Cheng
    >>>
    >>> On 6/9/15 8:18 PM, bipin wrote:
    >>>>
    >>>> Hi,
    >>>> When I try to save my data frame as a parquet file I get the
    following
    >>>> error:
    >>>>
    >>>> java.lang.ClassCastException: scala.runtime.BoxedUnit cannot
    be cast to
    >>>> org.apache.spark.sql.types.Decimal
    >>>>         at
    >>>>
    >>>>
    
org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
    >>>>         at
    >>>>
    >>>>
    
org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
    >>>>         at
    >>>>
    >>>>
    
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
    >>>>         at
    >>>>
    >>>>
    
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
    >>>>         at
    >>>>
    >>>>
    
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
    >>>>         at
    >>>>
    parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
    >>>>         at
    >>>>
    parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
    >>>>         at
    >>>>
    >>>> org.apache.spark.sql.parquet.ParquetRelation2.org
    
<http://org.apache.spark.sql.parquet.ParquetRelation2.org>$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
    >>>>         at
    >>>>
    >>>>
    
org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
    >>>>         at
    >>>>
    >>>>
    
org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
    >>>>         at
    >>>>
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    >>>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
    >>>>         at
    >>>>
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    >>>>         at
    >>>>
    >>>>
    
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    >>>>         at
    >>>>
    >>>>
    
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    >>>>         at java.lang.Thread.run(Thread.java:745)
    >>>>
    >>>> How to fix this problem ?
    >>>>
    >>>>
    >>>>
    >>>> --
    >>>> View this message in context:
    >>>>
    
http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
    >>>> Sent from the Apache Spark User List mailing list archive at
    Nabble.com.
    >>>>
    >>>>
    ---------------------------------------------------------------------
    >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
    <mailto:user-unsubscr...@spark.apache.org>
    >>>> For additional commands, e-mail: user-h...@spark.apache.org
    <mailto:user-h...@spark.apache.org>
    >>>>
    >>>>
    >>>
    >>
    >



Reply via email to