Re: df.partitionBy().parquet() java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-12-02 Thread Cheng Lian
You may try to set Hadoop conf "parquet.enable.summary-metadata" to 
false to disable writing Parquet summary files (_metadata and 
_common_metadata).


By default Parquet writes the summary files by collecting footers of all 
part-files in the dataset while committing the job. Spark also follows 
this convention. However, it turned out that the summary files aren't 
very useful in practice now, unless you have other downstream tools that 
strictly depend on the summary files. For example, if you don't need 
schema merging, Spark simply picks a random part-file to discovery the 
dataset schema. If you need schema merging, Spark has to read footers of 
all part-files anyway (but in a distributed, parallel way).


Cheng

On 12/3/15 6:11 AM, Don Drake wrote:
Does anyone have any suggestions on creating a large amount of parquet 
files? Especially in regards to the last phase where it creates the 
_metadata.


Thanks.

-Don

On Sat, Nov 28, 2015 at 9:02 AM, Don Drake > wrote:


I have a 2TB dataset that I have in a DataFrame that I am
attempting to partition by 2 fields and my YARN job seems to write
the partitioned dataset successfully.  I can see the output in
HDFS once all Spark tasks are done.

After the spark tasks are done, the job appears to be running for
over an hour, until I get the following (full stack trace below):

java.lang.OutOfMemoryError: GC overhead limit exceeded
at

org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(ParquetMetadataConverter.java:238)

I had set the driver memory to be 20GB.

I attempted to read in the partitioned dataset and got another
error saying the /_metadata directory was not a parquet file.  I
removed the _metadata directory and was able to query the data,
but it appeared to not use the partitioned directory when I
attempted to filter the data (it read every directory).

This is Spark 1.5.2 and I saw the same problem when running the
code in both Scala and Python.

Any suggestions are appreciated.

-Don

15/11/25 00:00:19 ERROR datasources.InsertIntoHadoopFsRelation:
Aborting job.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at

org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(ParquetMetadataConverter.java:238)
at

org.apache.parquet.format.converter.ParquetMetadataConverter.addRowGroup(ParquetMetadataConverter.java:167)
at

org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetMetadata(ParquetMetadataConverter.java:79)
at

org.apache.parquet.hadoop.ParquetFileWriter.serializeFooter(ParquetFileWriter.java:405)
at

org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:433)
at

org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:423)
at

org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
at

org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at

org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:208)
at

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
at

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at

org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at

org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at

org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at

org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at

org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at

org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at


Re: df.partitionBy().parquet() java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-12-02 Thread Adrien Mogenet
Very interested in that topic too, thanks Cheng for the direction!

We'll give it a try as well.

On 3 December 2015 at 01:40, Cheng Lian  wrote:

> You may try to set Hadoop conf "parquet.enable.summary-metadata" to false
> to disable writing Parquet summary files (_metadata and _common_metadata).
>
> By default Parquet writes the summary files by collecting footers of all
> part-files in the dataset while committing the job. Spark also follows this
> convention. However, it turned out that the summary files aren't very
> useful in practice now, unless you have other downstream tools that
> strictly depend on the summary files. For example, if you don't need schema
> merging, Spark simply picks a random part-file to discovery the dataset
> schema. If you need schema merging, Spark has to read footers of all
> part-files anyway (but in a distributed, parallel way).
>
> Cheng
>
> On 12/3/15 6:11 AM, Don Drake wrote:
>
> Does anyone have any suggestions on creating a large amount of parquet
> files? Especially in regards to the last phase where it creates the
> _metadata.
>
> Thanks.
>
> -Don
>
> On Sat, Nov 28, 2015 at 9:02 AM, Don Drake  wrote:
>
>> I have a 2TB dataset that I have in a DataFrame that I am attempting to
>> partition by 2 fields and my YARN job seems to write the partitioned
>> dataset successfully.  I can see the output in HDFS once all Spark tasks
>> are done.
>>
>> After the spark tasks are done, the job appears to be running for over an
>> hour, until I get the following (full stack trace below):
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at
>> org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(ParquetMetadataConverter.java:238)
>>
>> I had set the driver memory to be 20GB.
>>
>> I attempted to read in the partitioned dataset and got another error
>> saying the /_metadata directory was not a parquet file.  I removed the
>> _metadata directory and was able to query the data, but it appeared to not
>> use the partitioned directory when I attempted to filter the data (it read
>> every directory).
>>
>> This is Spark 1.5.2 and I saw the same problem when running the code in
>> both Scala and Python.
>>
>> Any suggestions are appreciated.
>>
>> -Don
>>
>> 15/11/25 00:00:19 ERROR datasources.InsertIntoHadoopFsRelation: Aborting
>> job.
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at
>> org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(ParquetMetadataConverter.java:238)
>> at
>> org.apache.parquet.format.converter.ParquetMetadataConverter.addRowGroup(ParquetMetadataConverter.java:167)
>> at
>> org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetMetadata(ParquetMetadataConverter.java:79)
>> at
>> org.apache.parquet.hadoop.ParquetFileWriter.serializeFooter(ParquetFileWriter.java:405)
>> at
>> org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:433)
>> at
>> org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:423)
>> at
>> org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
>> at
>> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
>> at
>> org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:208)
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>> at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
>> at
>> 

df.partitionBy().parquet() java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-11-28 Thread Don Drake
I have a 2TB dataset that I have in a DataFrame that I am attempting to
partition by 2 fields and my YARN job seems to write the partitioned
dataset successfully.  I can see the output in HDFS once all Spark tasks
are done.

After the spark tasks are done, the job appears to be running for over an
hour, until I get the following (full stack trace below):

java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(ParquetMetadataConverter.java:238)

I had set the driver memory to be 20GB.

I attempted to read in the partitioned dataset and got another error saying
the /_metadata directory was not a parquet file.  I removed the _metadata
directory and was able to query the data, but it appeared to not use the
partitioned directory when I attempted to filter the data (it read every
directory).

This is Spark 1.5.2 and I saw the same problem when running the code in
both Scala and Python.

Any suggestions are appreciated.

-Don

15/11/25 00:00:19 ERROR datasources.InsertIntoHadoopFsRelation: Aborting
job.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(ParquetMetadataConverter.java:238)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.addRowGroup(ParquetMetadataConverter.java:167)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetMetadata(ParquetMetadataConverter.java:79)
at
org.apache.parquet.hadoop.ParquetFileWriter.serializeFooter(ParquetFileWriter.java:405)
at
org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:433)
at
org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:423)
at
org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
at
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at
org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:208)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at
org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)
at com.dondrake.qra.ScalaApp$.main(ScalaApp.scala:53)
at com.dondrake.qra.ScalaApp.main(ScalaApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
15/11/25 00:00:20 ERROR actor.ActorSystemImpl: exception on LARS? timer
thread
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
15/11/25 00:00:20 ERROR akka.ErrorMonitor: exception on LARS? timer thread
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
at