[ 
https://issues.apache.org/jira/browse/SPARK-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343775#comment-15343775
 ] 

Jurriaan Pruis commented on SPARK-15326:
----------------------------------------

[~hvanhovell] unfortunately that doesn't work. The reason I'm doing this UNION 
is that the data in the `left` table is skewed on the column used in the join 
condition. You don't want Spark to repartition on that column when the skewed 
data (which happens when doing the regular join) is included because that will 
cause skewed partitions and Spark going OOM in the executors. 

I hope there'll be a good solution to this soon, because Spark doesn't really 
play nice with those kinds of datasets right now.

> Doing multiple unions on a Dataframe will result in a very inefficient query 
> plan
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-15326
>                 URL: https://issues.apache.org/jira/browse/SPARK-15326
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1, 2.0.0
>            Reporter: Jurriaan Pruis
>         Attachments: Query Plan.pdf, skewed_join.py, skewed_join_plan.txt
>
>
> While working with a very skewed dataset I noticed that repeated unions on a 
> dataframe will result in a query plan with 2^(union) - 1 unions. With large 
> datasets this will be very inefficient.
> I tried to replicate this behaviour using a PySpark example (I've attached 
> the output of the explain() to this JIRA):
> {code}
> df = sqlCtx.range(10000000)
> def r(name, max_val=100):
>     return F.round(F.lit(max_val) * F.pow(F.rand(), 
> 4)).cast('integer').alias(name)
> # Create a skewed dataset
> skewed = df.select('id', r('a'), r('b'), r('c'), r('d'), r('e'), r('f'))
> # Find the skewed values in the dataset
> top_10_percent = skewed.freqItems(['a', 'b', 'c', 'd', 'e', 'f'], 
> 0.10).collect()[0]
> def skewjoin(skewed, right, column, freqItems):
>     freqItems = freqItems[column + '_freqItems']
>     skewed = skewed.alias('skewed')
>     cond = F.col(column).isin(freqItems)
>     # First broadcast join the frequent (skewed) values
>     filtered = skewed.filter(cond).join(F.broadcast(right.filter(cond)), 
> column, 'left_outer')
>     # Use a regular join for the non skewed values (with big tables this will 
> use a SortMergeJoin)
>     non_skewed = skewed.filter(cond == False).join(right.filter(cond == 
> False), column, 'left_outer')
>     # join them together and replace the column with the column found in the 
> right DataFrame
>     return filtered.unionAll(non_skewed).select('skewed.*', 
> right['id'].alias(column + '_key')).drop(column)
> # Create the dataframes that will be joined to the skewed dataframe
> right_size = 100
> df_a = sqlCtx.range(right_size).select('id', F.col('id').alias('a'))
> df_b = sqlCtx.range(right_size).select('id', F.col('id').alias('b'))
> df_c = sqlCtx.range(right_size).select('id', F.col('id').alias('c'))
> df_d = sqlCtx.range(right_size).select('id', F.col('id').alias('d'))
> df_e = sqlCtx.range(right_size).select('id', F.col('id').alias('e'))
> df_f = sqlCtx.range(right_size).select('id', F.col('id').alias('f'))
> # Join everything together
> df = skewed
> df = skewjoin(df, df_a, 'a', top_10_percent)
> df = skewjoin(df, df_b, 'b', top_10_percent)
> df = skewjoin(df, df_c, 'c', top_10_percent)
> df = skewjoin(df, df_d, 'd', top_10_percent)
> df = skewjoin(df, df_e, 'e', top_10_percent)
> df = skewjoin(df, df_f, 'f', top_10_percent)
> # df.explain() shows the plan where it does 63 unions 
> (2^(number_of_skewjoins) - 1)
> # which will be very inefficient and slow
> df.explain(True)
> # Evaluate the plan
> # You'd expect this to return 10000000, but it does not, it returned 10000140 
> on my system
> # (probably because it will recalculate the random columns? Not sure though)
> print(df.count())
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to