Does increasing executor memory fix the memory problem?
How many columns does the schema contain? Parquet can be super memory
consuming when writing wide tables.
Cheng
On 6/15/15 5:48 AM, Bipin Nag wrote:
HI Davies,
I have tried recent 1.4 and 1.5-snapshot to 1) open the parquet and
save it again or 2 apply schema to rdd and save dataframe as parquet
but now I get this error (right in the beginning):
java.lang.OutOfMemoryError: Java heap space
at
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at
parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48)
at
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
at
parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at
parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
at
parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at
org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
at
org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:386)
at
org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:298)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
<http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:142)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at parquet.io.api.Binary.fromByteArray(Binary.java:159)
at
parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:94)
at
parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:67)
at parquet.column.Encoding$4.initDictionary(Encoding.java:131)
at
parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:325)
at
parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:63)
at
parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:58)
at
parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
at
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
<http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)
I am not sure if this is related to your patch or some other bug. My
error doesn't show up in newer versions, so this is the problem to fix
now.
Thanks
On 13 June 2015 at 06:31, Davies Liu <dav...@databricks.com
<mailto:dav...@databricks.com>> wrote:
Maybe it's related to a bug, which is fixed by
https://github.com/apache/spark/pull/6558 recently.
On Fri, Jun 12, 2015 at 5:38 AM, Bipin Nag <bipin....@gmail.com
<mailto:bipin....@gmail.com>> wrote:
> Hi Cheng,
>
> Yes, some rows contain unit instead of decimal values. I believe
some rows
> from original source I had don't have any value i.e. it is null.
And that
> shows up as unit. How does the spark-sql or parquet handle null
in place of
> decimal values, assuming that field is nullable. I will have to
change it
> properly.
>
> Thanks for helping out.
> Bipin
>
> On 12 June 2015 at 14:57, Cheng Lian <lian.cs....@gmail.com
<mailto:lian.cs....@gmail.com>> wrote:
>>
>> On 6/10/15 8:53 PM, Bipin Nag wrote:
>>
>> Hi Cheng,
>>
>> I am using Spark 1.3.1 binary available for Hadoop 2.6. I am
loading an
>> existing parquet file, then repartitioning and saving it. Doing
this gives
>> the error. The code for this doesn't look like causing
problem. I have a
>> feeling the source - the existing parquet is the culprit.
>>
>> I created that parquet using a jdbcrdd (pulled from microsoft
sql server).
>> First I saved jdbcrdd as an objectfile on disk. Then loaded it
again, made a
>> dataframe from it using a schema then saved it as a parquet.
>>
>> Following is the code :
>> For saving jdbcrdd:
>> name - fullqualifiedtablename
>> pk - string for primarykey
>> pklast - last id to pull
>> val myRDD = new JdbcRDD( sc, () =>
>> DriverManager.getConnection(url,username,password) ,
>> "SELECT * FROM " + name + " WITH (NOLOCK) WHERE ? <=
"+pk+" and
>> "+pk+" <= ?",
>> 1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
>> myRDD.saveAsObjectFile("rawdata/"+name);
>>
>> For applying schema and saving the parquet:
>> val myschema = schemamap(name)
>> val myrdd =
>> sc.objectFile[Array[Object]]("/home/bipin/rawdata/"+name).map(x =>
>> org.apache.spark.sql.Row(x:_*))
>>
>> Have you tried to print out x here to check its contents? My
guess is that
>> x actually contains unit values. For example, the follow Spark
shell code
>> can reproduce a similar exception:
>>
>> import org.apache.spark.sql.types._
>> import org.apache.spark.sql.Row
>>
>> val schema = StructType(StructField("dec", DecimalType(10, 0))
:: Nil)
>> val rdd = sc.parallelize(1 to 10).map(_ => Array(())).map(arr
=> Row(arr:
>> _*))
>> val df = sqlContext.createDataFrame(rdd, schema)
>>
>> df.saveAsParquetFile("file:///tmp/foo")
>>
>> val actualdata = sqlContext.createDataFrame(myrdd, myschema)
>> actualdata.saveAsParquetFile("/home/bipin/stageddata/"+name)
>>
>> Schema structtype can be made manually, though I pull table's
metadata and
>> make one. It is a simple string translation (see sql docs
and/or spark
>> datatypes)
>>
>> That is how I created the parquet file. Any help to solve the
issue is
>> appreciated.
>> Thanks
>> Bipin
>>
>>
>> On 9 June 2015 at 20:44, Cheng Lian <lian.cs....@gmail.com
<mailto:lian.cs....@gmail.com>> wrote:
>>>
>>> Would you please provide a snippet that reproduce this issue? What
>>> version of Spark were you using?
>>>
>>> Cheng
>>>
>>> On 6/9/15 8:18 PM, bipin wrote:
>>>>
>>>> Hi,
>>>> When I try to save my data frame as a parquet file I get the
following
>>>> error:
>>>>
>>>> java.lang.ClassCastException: scala.runtime.BoxedUnit cannot
be cast to
>>>> org.apache.spark.sql.types.Decimal
>>>> at
>>>>
>>>>
org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
>>>> at
>>>>
>>>>
org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
>>>> at
>>>>
>>>>
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
>>>> at
>>>>
>>>>
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
>>>> at
>>>>
>>>>
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
>>>> at
>>>>
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
>>>> at
>>>>
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
>>>> at
>>>>
>>>> org.apache.spark.sql.parquet.ParquetRelation2.org
<http://org.apache.spark.sql.parquet.ParquetRelation2.org>$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
>>>> at
>>>>
>>>>
org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
>>>> at
>>>>
>>>>
org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
>>>> at
>>>>
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>> at
>>>>
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>> at
>>>>
>>>>
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> at
>>>>
>>>>
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> How to fix this problem ?
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>>
http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
>>>> Sent from the Apache Spark User List mailing list archive at
Nabble.com.
>>>>
>>>>
---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
>>>> For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>
>>>>
>>>>
>>>
>>
>