randomSplit instead of randomSample
On Apr 30, 2016 1:51 PM, "Brandon White" <bwwintheho...@gmail.com> wrote:

> val df = globalDf
> val filteredDfs= filterExpressions.map { expr =>
>   val filteredDf = df.filter(expr)
>   val samples = filteredDf.randomSample([.7, .3])
>    (samples(0), samples(1)
> }
>
> val largeDfs = filteredDfs.(_._1)
> val smallDfs = filteredDfs(_._2)
>
> val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head)
> val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head)
>
> unionedLargeDfs.write.parquet(output) // works fine
> unionedSmallDfs.write.parquet(output)  // breaks with OOM stack trace in
> first thread
>
> There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g
> memory.
> On Apr 30, 2016 1:22 PM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>
>> Can you provide a bit more information:
>>
>> Does the smaller dataset have skew ?
>>
>> Which release of Spark are you using ?
>>
>> How much memory did you specify ?
>>
>> Thanks
>>
>> On Sat, Apr 30, 2016 at 1:17 PM, Brandon White <bwwintheho...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I am writing to datasets. One dataset is x2 larger than the other. Both
>>> datasets are written to parquet the exact same way using
>>>
>>> df.write.mode("Overwrite").parquet(outputFolder)
>>>
>>> The smaller dataset OOMs while the larger dataset writes perfectly fine.
>>> Here is the stack trace: Any ideas what is going on here and how I can fix
>>> it?
>>>
>>> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
>>> at java.util.Arrays.copyOf(Arrays.java:2367)
>>> at
>>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>>> at
>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>>> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
>>> at java.lang.StringBuilder.append(StringBuilder.java:132)
>>> at scala.StringContext.standardInterpolator(StringContext.scala:123)
>>> at scala.StringContext.s(StringContext.scala:90)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
>>> at
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
>>> at
>>> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)
>>>
>>
>>

Reply via email to