colin fang created SPARK-28148: ---------------------------------- Summary: repartition after join is not optimized away Key: SPARK-28148 URL: https://issues.apache.org/jira/browse/SPARK-28148 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.3 Reporter: colin fang
Partitioning & sorting is usually retained after join. {code} spark.conf.set('spark.sql.shuffle.partitions', '42') df1 = spark.range(5000000, numPartitions=5) df2 = spark.range(10000000, numPartitions=5) df3 = spark.range(20000000, numPartitions=5) # Reuse previous partitions & sort. df1.join(df2, on='id').join(df3, on='id').explain() # == Physical Plan == # *(8) Project [id#367L] # +- *(8) SortMergeJoin [id#367L], [id#374L], Inner # :- *(5) Project [id#367L] # : +- *(5) SortMergeJoin [id#367L], [id#369L], Inner # : :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0 # : : +- Exchange hashpartitioning(id#367L, 42) # : : +- *(1) Range (0, 5000000, step=1, splits=5) # : +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0 # : +- Exchange hashpartitioning(id#369L, 42) # : +- *(3) Range (0, 10000000, step=1, splits=5) # +- *(7) Sort [id#374L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#374L, 42) # +- *(6) Range (0, 20000000, step=1, splits=5) {code} However here: Partitions persist through left join, sort doesn't. {code} df1.join(df2, on='id', how='left').repartition('id').sortWithinPartitions('id').explain() # == Physical Plan == # *(5) Sort [id#367L ASC NULLS FIRST], false, 0 # +- *(5) Project [id#367L] # +- SortMergeJoin [id#367L], [id#369L], LeftOuter # :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0 # : +- Exchange hashpartitioning(id#367L, 42) # : +- *(1) Range (0, 5000000, step=1, splits=5) # +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#369L, 42) # +- *(3) Range (0, 10000000, step=1, splits=5) {code} Also here: Partitions do not persist though inner join. {code} df1.join(df2, on='id').repartition('id').sortWithinPartitions('id').explain() # == Physical Plan == # *(6) Sort [id#367L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#367L, 42) # +- *(5) Project [id#367L] # +- *(5) SortMergeJoin [id#367L], [id#369L], Inner # :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0 # : +- Exchange hashpartitioning(id#367L, 42) # : +- *(1) Range (0, 5000000, step=1, splits=5) # +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0 # +- Exchange hashpartitioning(id#369L, 42) # +- *(3) Range (0, 10000000, step=1, splits=5) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org