Re: Dataframe saves for a large set but throws OOM for a small dataset

2016-04-30 Thread Bijay Pathak
Sorry, for the confusion this was supposed to be answer for another thread.

Bijay

On Sat, Apr 30, 2016 at 2:37 PM, Bijay Kumar Pathak 
wrote:

> Hi,
>
> I was facing the same issue on Spark 1.6. My data size was around 100 GB
> and was writing in the partition Hive table.
>
> I was able to solve this issue by starting from 6G of memory and reaching
> upto 15GB of memory per executor  with overhead of 2GB  and partitioning
> the DataFrame before doing the insert overwrite into the Hive Table. From
> my experience Parquet puts lot of memory pressure in executor,  try
> increasing your executor memory.
>
> Here are relevant JIRA ticket:
> https://issues.apache.org/jira/browse/SPARK-8890
> https://issues.apache.org/jira/browse/PARQUET-222
>
> Thanks,
> Bijay
>
>
>
> On Sat, Apr 30, 2016 at 1:52 PM, Brandon White 
> wrote:
>
>> randomSplit instead of randomSample
>> On Apr 30, 2016 1:51 PM, "Brandon White"  wrote:
>>
>>> val df = globalDf
>>> val filteredDfs= filterExpressions.map { expr =>
>>>   val filteredDf = df.filter(expr)
>>>   val samples = filteredDf.randomSample([.7, .3])
>>>(samples(0), samples(1)
>>> }
>>>
>>> val largeDfs = filteredDfs.(_._1)
>>> val smallDfs = filteredDfs(_._2)
>>>
>>> val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head)
>>> val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head)
>>>
>>> unionedLargeDfs.write.parquet(output) // works fine
>>> unionedSmallDfs.write.parquet(output)  // breaks with OOM stack trace in
>>> first thread
>>>
>>> There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g
>>> memory.
>>> On Apr 30, 2016 1:22 PM, "Ted Yu"  wrote:
>>>
 Can you provide a bit more information:

 Does the smaller dataset have skew ?

 Which release of Spark are you using ?

 How much memory did you specify ?

 Thanks

 On Sat, Apr 30, 2016 at 1:17 PM, Brandon White  wrote:

> Hello,
>
> I am writing to datasets. One dataset is x2 larger than the other.
> Both datasets are written to parquet the exact same way using
>
> df.write.mode("Overwrite").parquet(outputFolder)
>
> The smaller dataset OOMs while the larger dataset writes perfectly
> fine. Here is the stack trace: Any ideas what is going on here and how I
> can fix it?
>
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2367)
> at
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
> at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
> at java.lang.StringBuilder.append(StringBuilder.java:132)
> at scala.StringContext.standardInterpolator(StringContext.scala:123)
> at scala.StringContext.s(StringContext.scala:90)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
> at
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)
>


>


Re: Dataframe saves for a large set but throws OOM for a small dataset

2016-04-30 Thread Bijay Kumar Pathak
Hi,

I was facing the same issue on Spark 1.6. My data size was around 100 GB
and was writing in the partition Hive table.

I was able to solve this issue by starting from 6G of memory and reaching
upto 15GB of memory per executor  with overhead of 2GB  and partitioning
the DataFrame before doing the insert overwrite into the Hive Table. From
my experience Parquet puts lot of memory pressure in executor,  try
increasing your executor memory.

Here are relevant JIRA ticket:
https://issues.apache.org/jira/browse/SPARK-8890
https://issues.apache.org/jira/browse/PARQUET-222

Thanks,
Bijay



On Sat, Apr 30, 2016 at 1:52 PM, Brandon White 
wrote:

> randomSplit instead of randomSample
> On Apr 30, 2016 1:51 PM, "Brandon White"  wrote:
>
>> val df = globalDf
>> val filteredDfs= filterExpressions.map { expr =>
>>   val filteredDf = df.filter(expr)
>>   val samples = filteredDf.randomSample([.7, .3])
>>(samples(0), samples(1)
>> }
>>
>> val largeDfs = filteredDfs.(_._1)
>> val smallDfs = filteredDfs(_._2)
>>
>> val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head)
>> val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head)
>>
>> unionedLargeDfs.write.parquet(output) // works fine
>> unionedSmallDfs.write.parquet(output)  // breaks with OOM stack trace in
>> first thread
>>
>> There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g
>> memory.
>> On Apr 30, 2016 1:22 PM, "Ted Yu"  wrote:
>>
>>> Can you provide a bit more information:
>>>
>>> Does the smaller dataset have skew ?
>>>
>>> Which release of Spark are you using ?
>>>
>>> How much memory did you specify ?
>>>
>>> Thanks
>>>
>>> On Sat, Apr 30, 2016 at 1:17 PM, Brandon White 
>>> wrote:
>>>
 Hello,

 I am writing to datasets. One dataset is x2 larger than the other. Both
 datasets are written to parquet the exact same way using

 df.write.mode("Overwrite").parquet(outputFolder)

 The smaller dataset OOMs while the larger dataset writes perfectly
 fine. Here is the stack trace: Any ideas what is going on here and how I
 can fix it?

 Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
 at java.util.Arrays.copyOf(Arrays.java:2367)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
 at
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
 at java.lang.StringBuilder.append(StringBuilder.java:132)
 at scala.StringContext.standardInterpolator(StringContext.scala:123)
 at scala.StringContext.s(StringContext.scala:90)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947)
 at
 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
 at
 org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
 at
 org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
 at
 org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
 at
 org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
 at
 org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
 at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
 at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
 at
 org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)

>>>
>>>


Re: Dataframe saves for a large set but throws OOM for a small dataset

2016-04-30 Thread Brandon White
randomSplit instead of randomSample
On Apr 30, 2016 1:51 PM, "Brandon White"  wrote:

> val df = globalDf
> val filteredDfs= filterExpressions.map { expr =>
>   val filteredDf = df.filter(expr)
>   val samples = filteredDf.randomSample([.7, .3])
>(samples(0), samples(1)
> }
>
> val largeDfs = filteredDfs.(_._1)
> val smallDfs = filteredDfs(_._2)
>
> val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head)
> val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head)
>
> unionedLargeDfs.write.parquet(output) // works fine
> unionedSmallDfs.write.parquet(output)  // breaks with OOM stack trace in
> first thread
>
> There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g
> memory.
> On Apr 30, 2016 1:22 PM, "Ted Yu"  wrote:
>
>> Can you provide a bit more information:
>>
>> Does the smaller dataset have skew ?
>>
>> Which release of Spark are you using ?
>>
>> How much memory did you specify ?
>>
>> Thanks
>>
>> On Sat, Apr 30, 2016 at 1:17 PM, Brandon White 
>> wrote:
>>
>>> Hello,
>>>
>>> I am writing to datasets. One dataset is x2 larger than the other. Both
>>> datasets are written to parquet the exact same way using
>>>
>>> df.write.mode("Overwrite").parquet(outputFolder)
>>>
>>> The smaller dataset OOMs while the larger dataset writes perfectly fine.
>>> Here is the stack trace: Any ideas what is going on here and how I can fix
>>> it?
>>>
>>> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
>>> at java.util.Arrays.copyOf(Arrays.java:2367)
>>> at
>>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>>> at
>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>>> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
>>> at java.lang.StringBuilder.append(StringBuilder.java:132)
>>> at scala.StringContext.standardInterpolator(StringContext.scala:123)
>>> at scala.StringContext.s(StringContext.scala:90)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
>>> at
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
>>> at
>>> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)
>>>
>>
>>


Re: Dataframe saves for a large set but throws OOM for a small dataset

2016-04-30 Thread Ted Yu
Can you provide a bit more information:

Does the smaller dataset have skew ?

Which release of Spark are you using ?

How much memory did you specify ?

Thanks

On Sat, Apr 30, 2016 at 1:17 PM, Brandon White 
wrote:

> Hello,
>
> I am writing to datasets. One dataset is x2 larger than the other. Both
> datasets are written to parquet the exact same way using
>
> df.write.mode("Overwrite").parquet(outputFolder)
>
> The smaller dataset OOMs while the larger dataset writes perfectly fine.
> Here is the stack trace: Any ideas what is going on here and how I can fix
> it?
>
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2367)
> at
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
> at java.lang.StringBuilder.append(StringBuilder.java:132)
> at scala.StringContext.standardInterpolator(StringContext.scala:123)
> at scala.StringContext.s(StringContext.scala:90)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
> at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)
>


Dataframe saves for a large set but throws OOM for a small dataset

2016-04-30 Thread Brandon White
Hello,

I am writing to datasets. One dataset is x2 larger than the other. Both
datasets are written to parquet the exact same way using

df.write.mode("Overwrite").parquet(outputFolder)

The smaller dataset OOMs while the larger dataset writes perfectly fine.
Here is the stack trace: Any ideas what is going on here and how I can fix
it?

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2367)
at
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
at java.lang.StringBuilder.append(StringBuilder.java:132)
at scala.StringContext.standardInterpolator(StringContext.scala:123)
at scala.StringContext.s(StringContext.scala:90)
at
org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)