Cheng Su created SPARK-32330:
--------------------------------

             Summary: Preserve shuffled hash join build side partitioning
                 Key: SPARK-32330
                 URL: https://issues.apache.org/jira/browse/SPARK-32330
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.1.0
            Reporter: Cheng Su


Currently `ShuffledHashJoin.outputPartitioning` inherits from 
`HashJoin.outputPartitioning`, which only preserves stream side partitioning:

`HashJoin.scala`
{code:java}
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
{code}
This loses build side partitioning information, and causes extra shuffle if 
there's another join / group-by after this join.

Example:

 
{code:java}
// code placeholder
withSQLConf(
    SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
    SQLConf.SHUFFLE_PARTITIONS.key -> "2",
    SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
  val df1 = spark.range(10).select($"id".as("k1"))
  val df2 = spark.range(30).select($"id".as("k2"))
  Seq("inner", "cross").foreach(joinType => {
    val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
      .queryExecution.executedPlan
    assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
    // No extra shuffle before aggregate
    assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
  })
}{code}
 

Current physical plan (having an extra shuffle on `k1` before aggregate)

 
{code:java}
*(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, 
count#235L])
+- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
   +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], 
output=[k1#220L, count#239L])
      +- *(3) Project [k1#220L]
         +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
            :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
            :  +- *(1) Project [id#218L AS k1#220L]
            :     +- *(1) Range (0, 10, step=1, splits=2)
            +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
               +- *(2) Project [id#222L AS k2#224L]
                  +- *(2) Range (0, 30, step=1, splits=2){code}
 

Ideal physical plan (no shuffle on `k1` before aggregate)
{code:java}
 *(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, 
count#235L])
+- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], 
output=[k1#220L, count#239L])
   +- *(3) Project [k1#220L]
      +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
         :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
         :  +- *(1) Project [id#218L AS k1#220L]
         :     +- *(1) Range (0, 10, step=1, splits=2)
         +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
            +- *(2) Project [id#222L AS k2#224L]
               +- *(2) Range (0, 30, step=1, splits=2){code}
 

This can be fixed by overriding `outputPartitioning` method in 
`ShuffledHashJoinExec`, similar to `SortMergeJoinExec`.

`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to