colin fang created SPARK-28148:
----------------------------------

             Summary: repartition after join is not optimized away
                 Key: SPARK-28148
                 URL: https://issues.apache.org/jira/browse/SPARK-28148
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.4.3
            Reporter: colin fang


Partitioning & sorting is usually retained after join.

{code}
spark.conf.set('spark.sql.shuffle.partitions', '42')

df1 = spark.range(5000000, numPartitions=5)
df2 = spark.range(10000000, numPartitions=5)
df3 = spark.range(20000000, numPartitions=5)

# Reuse previous partitions & sort.
df1.join(df2, on='id').join(df3, on='id').explain()
# == Physical Plan ==
# *(8) Project [id#367L]
# +- *(8) SortMergeJoin [id#367L], [id#374L], Inner
#    :- *(5) Project [id#367L]
#    :  +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
#    :     :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
#    :     :  +- Exchange hashpartitioning(id#367L, 42)
#    :     :     +- *(1) Range (0, 5000000, step=1, splits=5)
#    :     +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
#    :        +- Exchange hashpartitioning(id#369L, 42)
#    :           +- *(3) Range (0, 10000000, step=1, splits=5)
#    +- *(7) Sort [id#374L ASC NULLS FIRST], false, 0
#       +- Exchange hashpartitioning(id#374L, 42)
#          +- *(6) Range (0, 20000000, step=1, splits=5)

{code}

However here:  Partitions persist through left join, sort doesn't.

{code}
df1.join(df2, on='id', 
how='left').repartition('id').sortWithinPartitions('id').explain()
# == Physical Plan ==
# *(5) Sort [id#367L ASC NULLS FIRST], false, 0
# +- *(5) Project [id#367L]
#    +- SortMergeJoin [id#367L], [id#369L], LeftOuter
#       :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
#       :  +- Exchange hashpartitioning(id#367L, 42)
#       :     +- *(1) Range (0, 5000000, step=1, splits=5)
#       +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
#          +- Exchange hashpartitioning(id#369L, 42)
#             +- *(3) Range (0, 10000000, step=1, splits=5)
{code}

Also here: Partitions do not persist though inner join.


{code}
df1.join(df2, on='id').repartition('id').sortWithinPartitions('id').explain()
# == Physical Plan ==
# *(6) Sort [id#367L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#367L, 42)
#    +- *(5) Project [id#367L]
#       +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
#          :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
#          :  +- Exchange hashpartitioning(id#367L, 42)
#          :     +- *(1) Range (0, 5000000, step=1, splits=5)
#          +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
#             +- Exchange hashpartitioning(id#369L, 42)
#                +- *(3) Range (0, 10000000, step=1, splits=5)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to