Re: Dataframe saves for a large set but throws OOM for a small dataset
Sorry, for the confusion this was supposed to be answer for another thread. Bijay On Sat, Apr 30, 2016 at 2:37 PM, Bijay Kumar Pathakwrote: > Hi, > > I was facing the same issue on Spark 1.6. My data size was around 100 GB > and was writing in the partition Hive table. > > I was able to solve this issue by starting from 6G of memory and reaching > upto 15GB of memory per executor with overhead of 2GB and partitioning > the DataFrame before doing the insert overwrite into the Hive Table. From > my experience Parquet puts lot of memory pressure in executor, try > increasing your executor memory. > > Here are relevant JIRA ticket: > https://issues.apache.org/jira/browse/SPARK-8890 > https://issues.apache.org/jira/browse/PARQUET-222 > > Thanks, > Bijay > > > > On Sat, Apr 30, 2016 at 1:52 PM, Brandon White > wrote: > >> randomSplit instead of randomSample >> On Apr 30, 2016 1:51 PM, "Brandon White" wrote: >> >>> val df = globalDf >>> val filteredDfs= filterExpressions.map { expr => >>> val filteredDf = df.filter(expr) >>> val samples = filteredDf.randomSample([.7, .3]) >>>(samples(0), samples(1) >>> } >>> >>> val largeDfs = filteredDfs.(_._1) >>> val smallDfs = filteredDfs(_._2) >>> >>> val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head) >>> val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head) >>> >>> unionedLargeDfs.write.parquet(output) // works fine >>> unionedSmallDfs.write.parquet(output) // breaks with OOM stack trace in >>> first thread >>> >>> There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g >>> memory. >>> On Apr 30, 2016 1:22 PM, "Ted Yu" wrote: >>> Can you provide a bit more information: Does the smaller dataset have skew ? Which release of Spark are you using ? How much memory did you specify ? Thanks On Sat, Apr 30, 2016 at 1:17 PM, Brandon White wrote: > Hello, > > I am writing to datasets. One dataset is x2 larger than the other. > Both datasets are written to parquet the exact same way using > > df.write.mode("Overwrite").parquet(outputFolder) > > The smaller dataset OOMs while the larger dataset writes perfectly > fine. Here is the stack trace: Any ideas what is going on here and how I > can fix it? > > Exception in thread "main" java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:2367) > at > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) > at > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) > at > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) > at java.lang.StringBuilder.append(StringBuilder.java:132) > at scala.StringContext.standardInterpolator(StringContext.scala:123) > at scala.StringContext.s(StringContext.scala:90) > at > org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) > at > org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) > at > org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304) > >
Re: Dataframe saves for a large set but throws OOM for a small dataset
Hi, I was facing the same issue on Spark 1.6. My data size was around 100 GB and was writing in the partition Hive table. I was able to solve this issue by starting from 6G of memory and reaching upto 15GB of memory per executor with overhead of 2GB and partitioning the DataFrame before doing the insert overwrite into the Hive Table. From my experience Parquet puts lot of memory pressure in executor, try increasing your executor memory. Here are relevant JIRA ticket: https://issues.apache.org/jira/browse/SPARK-8890 https://issues.apache.org/jira/browse/PARQUET-222 Thanks, Bijay On Sat, Apr 30, 2016 at 1:52 PM, Brandon Whitewrote: > randomSplit instead of randomSample > On Apr 30, 2016 1:51 PM, "Brandon White" wrote: > >> val df = globalDf >> val filteredDfs= filterExpressions.map { expr => >> val filteredDf = df.filter(expr) >> val samples = filteredDf.randomSample([.7, .3]) >>(samples(0), samples(1) >> } >> >> val largeDfs = filteredDfs.(_._1) >> val smallDfs = filteredDfs(_._2) >> >> val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head) >> val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head) >> >> unionedLargeDfs.write.parquet(output) // works fine >> unionedSmallDfs.write.parquet(output) // breaks with OOM stack trace in >> first thread >> >> There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g >> memory. >> On Apr 30, 2016 1:22 PM, "Ted Yu" wrote: >> >>> Can you provide a bit more information: >>> >>> Does the smaller dataset have skew ? >>> >>> Which release of Spark are you using ? >>> >>> How much memory did you specify ? >>> >>> Thanks >>> >>> On Sat, Apr 30, 2016 at 1:17 PM, Brandon White >>> wrote: >>> Hello, I am writing to datasets. One dataset is x2 larger than the other. Both datasets are written to parquet the exact same way using df.write.mode("Overwrite").parquet(outputFolder) The smaller dataset OOMs while the larger dataset writes perfectly fine. Here is the stack trace: Any ideas what is going on here and how I can fix it? Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) at java.lang.StringBuilder.append(StringBuilder.java:132) at scala.StringContext.standardInterpolator(StringContext.scala:123) at scala.StringContext.s(StringContext.scala:90) at org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304) >>> >>>
Re: Dataframe saves for a large set but throws OOM for a small dataset
randomSplit instead of randomSample On Apr 30, 2016 1:51 PM, "Brandon White"wrote: > val df = globalDf > val filteredDfs= filterExpressions.map { expr => > val filteredDf = df.filter(expr) > val samples = filteredDf.randomSample([.7, .3]) >(samples(0), samples(1) > } > > val largeDfs = filteredDfs.(_._1) > val smallDfs = filteredDfs(_._2) > > val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head) > val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head) > > unionedLargeDfs.write.parquet(output) // works fine > unionedSmallDfs.write.parquet(output) // breaks with OOM stack trace in > first thread > > There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g > memory. > On Apr 30, 2016 1:22 PM, "Ted Yu" wrote: > >> Can you provide a bit more information: >> >> Does the smaller dataset have skew ? >> >> Which release of Spark are you using ? >> >> How much memory did you specify ? >> >> Thanks >> >> On Sat, Apr 30, 2016 at 1:17 PM, Brandon White >> wrote: >> >>> Hello, >>> >>> I am writing to datasets. One dataset is x2 larger than the other. Both >>> datasets are written to parquet the exact same way using >>> >>> df.write.mode("Overwrite").parquet(outputFolder) >>> >>> The smaller dataset OOMs while the larger dataset writes perfectly fine. >>> Here is the stack trace: Any ideas what is going on here and how I can fix >>> it? >>> >>> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space >>> at java.util.Arrays.copyOf(Arrays.java:2367) >>> at >>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) >>> at >>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) >>> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) >>> at java.lang.StringBuilder.append(StringBuilder.java:132) >>> at scala.StringContext.standardInterpolator(StringContext.scala:123) >>> at scala.StringContext.s(StringContext.scala:90) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947) >>> at >>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) >>> at >>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) >>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) >>> at >>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) >>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) >>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) >>> at >>> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304) >>> >> >>
Re: Dataframe saves for a large set but throws OOM for a small dataset
Can you provide a bit more information: Does the smaller dataset have skew ? Which release of Spark are you using ? How much memory did you specify ? Thanks On Sat, Apr 30, 2016 at 1:17 PM, Brandon Whitewrote: > Hello, > > I am writing to datasets. One dataset is x2 larger than the other. Both > datasets are written to parquet the exact same way using > > df.write.mode("Overwrite").parquet(outputFolder) > > The smaller dataset OOMs while the larger dataset writes perfectly fine. > Here is the stack trace: Any ideas what is going on here and how I can fix > it? > > Exception in thread "main" java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:2367) > at > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) > at > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) > at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) > at java.lang.StringBuilder.append(StringBuilder.java:132) > at scala.StringContext.standardInterpolator(StringContext.scala:123) > at scala.StringContext.s(StringContext.scala:90) > at > org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) > at > org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) > at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304) >
Dataframe saves for a large set but throws OOM for a small dataset
Hello, I am writing to datasets. One dataset is x2 larger than the other. Both datasets are written to parquet the exact same way using df.write.mode("Overwrite").parquet(outputFolder) The smaller dataset OOMs while the larger dataset writes perfectly fine. Here is the stack trace: Any ideas what is going on here and how I can fix it? Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) at java.lang.StringBuilder.append(StringBuilder.java:132) at scala.StringContext.standardInterpolator(StringContext.scala:123) at scala.StringContext.s(StringContext.scala:90) at org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)