[ https://issues.apache.org/jira/browse/SPARK-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jurriaan Pruis updated SPARK-15326: ----------------------------------- Attachment: Query Plan.pdf Also added a PDF of the Query Plan as shown in the web interface. When doing these kinds of joins in production it gets even more complicated then shown in here as each skewjoin call will result in both a BroadcastHashJoin and a SortMergeJoin which will be duplicated a few times because of the nested unions. > Doing multiple union on a Dataframe will result in a very inefficient query > plan > -------------------------------------------------------------------------------- > > Key: SPARK-15326 > URL: https://issues.apache.org/jira/browse/SPARK-15326 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.1, 2.0.0 > Reporter: Jurriaan Pruis > Attachments: Query Plan.pdf, skewed_join.py, skewed_join_plan.txt > > > While working with a very skewed dataset I noticed that repeated unions on a > dataframe will result in a query plan with 2^(union) - 1 unions. With large > datasets this will be very inefficient. > I tried to replicate this behaviour using a PySpark example (I've attached > the output of the explain() to this JIRA): > {code} > df = sqlCtx.range(10000000) > def r(name, max_val=100): > return F.round(F.lit(max_val) * F.pow(F.rand(), > 4)).cast('integer').alias(name) > # Create a skewed dataset > skewed = df.select('id', r('a'), r('b'), r('c'), r('d'), r('e'), r('f')) > # Find the skewed values in the dataset > top_10_percent = skewed.freqItems(['a', 'b', 'c', 'd', 'e', 'f'], > 0.10).collect()[0] > def skewjoin(skewed, right, column, freqItems): > freqItems = freqItems[column + '_freqItems'] > skewed = skewed.alias('skewed') > cond = F.col(column).isin(freqItems) > # First broadcast join the frequent (skewed) values > filtered = skewed.filter(cond).join(F.broadcast(right.filter(cond)), > column, 'left_outer') > # Use a regular join for the non skewed values (with big tables this will > use a SortMergeJoin) > non_skewed = skewed.filter(cond == False).join(right.filter(cond == > False), column, 'left_outer') > # join them together and replace the column with the column found in the > right DataFrame > return filtered.unionAll(non_skewed).select('skewed.*', > right['id'].alias(column + '_key')).drop(column) > # Create the dataframes that will be joined to the skewed dataframe > right_size = 100 > df_a = sqlCtx.range(right_size).select('id', F.col('id').alias('a')) > df_b = sqlCtx.range(right_size).select('id', F.col('id').alias('b')) > df_c = sqlCtx.range(right_size).select('id', F.col('id').alias('c')) > df_d = sqlCtx.range(right_size).select('id', F.col('id').alias('d')) > df_e = sqlCtx.range(right_size).select('id', F.col('id').alias('e')) > df_f = sqlCtx.range(right_size).select('id', F.col('id').alias('f')) > # Join everything together > df = skewed > df = skewjoin(df, df_a, 'a', top_10_percent) > df = skewjoin(df, df_b, 'b', top_10_percent) > df = skewjoin(df, df_c, 'c', top_10_percent) > df = skewjoin(df, df_d, 'd', top_10_percent) > df = skewjoin(df, df_e, 'e', top_10_percent) > df = skewjoin(df, df_f, 'f', top_10_percent) > # df.explain() shows the plan where it does 63 unions > (2^(number_of_skewjoins) - 1) > # which will be very inefficient and slow > df.explain(True) > # Evaluate the plan > # You'd expect this to return 10000000, but it does not, it returned 10000140 > on my system > # (probably because it will recalculate the random columns? Not sure though) > print(df.count()) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org