Jurriaan Pruis created SPARK-15326:
--------------------------------------

             Summary: Doing multiple union on a Dataframe will result in a very 
inefficient query plan
                 Key: SPARK-15326
                 URL: https://issues.apache.org/jira/browse/SPARK-15326
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.6.1, 2.0.0
            Reporter: Jurriaan Pruis


While working with a very skewed dataset I noticed that repeated unions on a 
dataframe will result in a query plan with 2^(union) - 1 unions. With large 
datasets this will be very inefficient.

I tried to replicate this behaviour using a PySpark example:
{code}
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F

sc = SparkContext()
sqlCtx = SQLContext(sc)

df = sqlCtx.range(10000000)

def r(name, max_val=100):
    return F.round(F.lit(max_val) * F.pow(F.rand(), 
4)).cast('integer').alias(name)

# Create a skewed dataset
skewed = df.select('id', r('a'), r('b'), r('c'), r('d'), r('e'), r('f'))
# Find the skewed values in the dataset
top_10_percent = skewed.freqItems(['a', 'b', 'c', 'd', 'e', 'f'], 
0.10).collect()[0]

def skewjoin(skewed, right, column, freqItems):
    freqItems = freqItems[column + '_freqItems']
    skewed = skewed.alias('skewed')
    cond = F.col(column).isin(freqItems)
    # First broadcast join the frequent (skewed) values
    filtered = skewed.filter(cond).join(F.broadcast(right.filter(cond)), 
column, 'left_outer')
    # Use a regular join for the non skewed values (with big tables this will 
use a SortMergeJoin)
    non_skewed = skewed.filter(cond == False).join(right.filter(cond == False), 
column, 'left_outer')
    # join them together and replace the column with the column found in the 
right DataFrame
    return filtered.unionAll(non_skewed).select('skewed.*', 
right['id'].alias(column + '_key')).drop(column)


# Create the dataframes that will be joined to the skewed dataframe
right_size = 100

df_a = sqlCtx.range(right_size).select('id', F.col('id').alias('a'))
df_b = sqlCtx.range(right_size).select('id', F.col('id').alias('b'))
df_c = sqlCtx.range(right_size).select('id', F.col('id').alias('c'))
df_d = sqlCtx.range(right_size).select('id', F.col('id').alias('d'))
df_e = sqlCtx.range(right_size).select('id', F.col('id').alias('e'))
df_f = sqlCtx.range(right_size).select('id', F.col('id').alias('f'))

# Join everything together
df = skewed
df = skewjoin(df, df_a, 'a', top_10_percent)
df = skewjoin(df, df_b, 'b', top_10_percent)
df = skewjoin(df, df_c, 'c', top_10_percent)
df = skewjoin(df, df_d, 'd', top_10_percent)
df = skewjoin(df, df_e, 'e', top_10_percent)
df = skewjoin(df, df_f, 'f', top_10_percent)


# df.explain() shows the plan where it does 63 unions (2^(number_of_skewjoins) 
- 1)
# which will be very inefficient and slow
df.explain(True)

# Evaluate the plan
# You'd expect this to return 10000000, but it does not, it returned 10000140 
on my system
# (probably because it will recalculate the random columns? Not sure though)
print(df.count())
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to