Jurriaan Pruis created SPARK-15326: -------------------------------------- Summary: Doing multiple union on a Dataframe will result in a very inefficient query plan Key: SPARK-15326 URL: https://issues.apache.org/jira/browse/SPARK-15326 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.1, 2.0.0 Reporter: Jurriaan Pruis
While working with a very skewed dataset I noticed that repeated unions on a dataframe will result in a query plan with 2^(union) - 1 unions. With large datasets this will be very inefficient. I tried to replicate this behaviour using a PySpark example: {code} from pyspark import SparkContext from pyspark.sql import SQLContext import pyspark.sql.functions as F sc = SparkContext() sqlCtx = SQLContext(sc) df = sqlCtx.range(10000000) def r(name, max_val=100): return F.round(F.lit(max_val) * F.pow(F.rand(), 4)).cast('integer').alias(name) # Create a skewed dataset skewed = df.select('id', r('a'), r('b'), r('c'), r('d'), r('e'), r('f')) # Find the skewed values in the dataset top_10_percent = skewed.freqItems(['a', 'b', 'c', 'd', 'e', 'f'], 0.10).collect()[0] def skewjoin(skewed, right, column, freqItems): freqItems = freqItems[column + '_freqItems'] skewed = skewed.alias('skewed') cond = F.col(column).isin(freqItems) # First broadcast join the frequent (skewed) values filtered = skewed.filter(cond).join(F.broadcast(right.filter(cond)), column, 'left_outer') # Use a regular join for the non skewed values (with big tables this will use a SortMergeJoin) non_skewed = skewed.filter(cond == False).join(right.filter(cond == False), column, 'left_outer') # join them together and replace the column with the column found in the right DataFrame return filtered.unionAll(non_skewed).select('skewed.*', right['id'].alias(column + '_key')).drop(column) # Create the dataframes that will be joined to the skewed dataframe right_size = 100 df_a = sqlCtx.range(right_size).select('id', F.col('id').alias('a')) df_b = sqlCtx.range(right_size).select('id', F.col('id').alias('b')) df_c = sqlCtx.range(right_size).select('id', F.col('id').alias('c')) df_d = sqlCtx.range(right_size).select('id', F.col('id').alias('d')) df_e = sqlCtx.range(right_size).select('id', F.col('id').alias('e')) df_f = sqlCtx.range(right_size).select('id', F.col('id').alias('f')) # Join everything together df = skewed df = skewjoin(df, df_a, 'a', top_10_percent) df = skewjoin(df, df_b, 'b', top_10_percent) df = skewjoin(df, df_c, 'c', top_10_percent) df = skewjoin(df, df_d, 'd', top_10_percent) df = skewjoin(df, df_e, 'e', top_10_percent) df = skewjoin(df, df_f, 'f', top_10_percent) # df.explain() shows the plan where it does 63 unions (2^(number_of_skewjoins) - 1) # which will be very inefficient and slow df.explain(True) # Evaluate the plan # You'd expect this to return 10000000, but it does not, it returned 10000140 on my system # (probably because it will recalculate the random columns? Not sure though) print(df.count()) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org