randomSplit instead of randomSample On Apr 30, 2016 1:51 PM, "Brandon White" <bwwintheho...@gmail.com> wrote:
> val df = globalDf > val filteredDfs= filterExpressions.map { expr => > val filteredDf = df.filter(expr) > val samples = filteredDf.randomSample([.7, .3]) > (samples(0), samples(1) > } > > val largeDfs = filteredDfs.(_._1) > val smallDfs = filteredDfs(_._2) > > val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head) > val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head) > > unionedLargeDfs.write.parquet(output) // works fine > unionedSmallDfs.write.parquet(output) // breaks with OOM stack trace in > first thread > > There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g > memory. > On Apr 30, 2016 1:22 PM, "Ted Yu" <yuzhih...@gmail.com> wrote: > >> Can you provide a bit more information: >> >> Does the smaller dataset have skew ? >> >> Which release of Spark are you using ? >> >> How much memory did you specify ? >> >> Thanks >> >> On Sat, Apr 30, 2016 at 1:17 PM, Brandon White <bwwintheho...@gmail.com> >> wrote: >> >>> Hello, >>> >>> I am writing to datasets. One dataset is x2 larger than the other. Both >>> datasets are written to parquet the exact same way using >>> >>> df.write.mode("Overwrite").parquet(outputFolder) >>> >>> The smaller dataset OOMs while the larger dataset writes perfectly fine. >>> Here is the stack trace: Any ideas what is going on here and how I can fix >>> it? >>> >>> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space >>> at java.util.Arrays.copyOf(Arrays.java:2367) >>> at >>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) >>> at >>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) >>> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) >>> at java.lang.StringBuilder.append(StringBuilder.java:132) >>> at scala.StringContext.standardInterpolator(StringContext.scala:123) >>> at scala.StringContext.s(StringContext.scala:90) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947) >>> at >>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) >>> at >>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) >>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) >>> at >>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) >>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) >>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) >>> at >>> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304) >>> >> >>