[ 
https://issues.apache.org/jira/browse/SPARK-25951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-25951:
-------------------------------
    Description: 
we've noticed that sometimes a column rename causes extra shuffle:
{code}
val N = 1 << 12

spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")

val t1 = spark.range(N).selectExpr("floor(id/4) as key1")
val t2 = spark.range(N).selectExpr("floor(id/4) as key2")

t1.groupBy("key1").agg(count(lit("1")).as("cnt1"))
.join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2",
 "key3"),
col("key1")===col("key3"))
.explain()
{code}

results in:

{noformat}
== Physical Plan ==
*(6) SortMergeJoin [key1#6L], [key3#22L], Inner
:- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0
:  +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, 
cnt1#14L])
:     +- Exchange hashpartitioning(key1#6L, 2)
:        +- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], 
output=[key1#6L, count#39L])
:           +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L]
:              +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0)))
:                 +- *(1) Range (0, 4096, step=1, splits=1)
+- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key3#22L, 2)
      +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], 
output=[key3#22L, cnt2#19L])
         +- Exchange hashpartitioning(key2#10L, 2)
            +- *(3) HashAggregate(keys=[key2#10L], 
functions=[partial_count(1)], output=[key2#10L, count#41L])
               +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS 
key2#10L]
                  +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0)))
                     +- *(3) Range (0, 4096, step=1, splits=1)
{noformat}
I was able to track it down to this code in class HashPartitioning:
{code}
case h: HashClusteredDistribution =>
      expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
          case (l, r) => l.semanticEquals(r)
 }
{code}
the semanticEquals returns false as it compares key2 and key3 eventhough key3 
is just a rename of key2

  was:
we've noticed that sometimes a column rename causes extra shuffle:
{code}
val N = 1 << 12

spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")

val t1 = spark.range(N).selectExpr("floor(id/4) as key1")
val t2 = spark.range(N).selectExpr("floor(id/4) as key2")

t1.groupBy("key1").agg(count(lit("1")).as("cnt1"))
.join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2",
 "key3"),
col("key1")===col("key3"))
.explain(true)
{code}

results in:

{code}
== Physical Plan ==
*(6) SortMergeJoin [key1#6L], [key3#22L], Inner
:- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0
: +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, 
cnt1#14L])
: +- Exchange hashpartitioning(key1#6L, 2)
: +- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], 
output=[key1#6L, count#39L])
: +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L]
: +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0)))
: +- *(1) Range (0, 4096, step=1, splits=1)
+- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key3#22L, 2)
+- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], output=[key3#22L, 
cnt2#19L])
+- Exchange hashpartitioning(key2#10L, 2)
+- *(3) HashAggregate(keys=[key2#10L], functions=[partial_count(1)], 
output=[key2#10L, count#41L])
+- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS key2#10L]
+- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0)))
+- *(3) Range (0, 4096, step=1, splits=1)
{code}
I was able to track it down to this code in class HashPartitioning:
{code}
case h: HashClusteredDistribution =>
      expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
          case (l, r) => l.semanticEquals(r)
 }
{code}
the semanticEquals returns false as it compares key2 and key3 eventhough key3 
is just a rename of key2


> Redundant shuffle if column is renamed
> --------------------------------------
>
>                 Key: SPARK-25951
>                 URL: https://issues.apache.org/jira/browse/SPARK-25951
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Ohad Raviv
>            Priority: Minor
>
> we've noticed that sometimes a column rename causes extra shuffle:
> {code}
> val N = 1 << 12
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")
> val t1 = spark.range(N).selectExpr("floor(id/4) as key1")
> val t2 = spark.range(N).selectExpr("floor(id/4) as key2")
> t1.groupBy("key1").agg(count(lit("1")).as("cnt1"))
> .join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2",
>  "key3"),
> col("key1")===col("key3"))
> .explain()
> {code}
> results in:
> {noformat}
> == Physical Plan ==
> *(6) SortMergeJoin [key1#6L], [key3#22L], Inner
> :- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0
> :  +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], 
> output=[key1#6L, cnt1#14L])
> :     +- Exchange hashpartitioning(key1#6L, 2)
> :        +- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], 
> output=[key1#6L, count#39L])
> :           +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L]
> :              +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0)))
> :                 +- *(1) Range (0, 4096, step=1, splits=1)
> +- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(key3#22L, 2)
>       +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], 
> output=[key3#22L, cnt2#19L])
>          +- Exchange hashpartitioning(key2#10L, 2)
>             +- *(3) HashAggregate(keys=[key2#10L], 
> functions=[partial_count(1)], output=[key2#10L, count#41L])
>                +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS 
> key2#10L]
>                   +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 
> 4.0)))
>                      +- *(3) Range (0, 4096, step=1, splits=1)
> {noformat}
> I was able to track it down to this code in class HashPartitioning:
> {code}
> case h: HashClusteredDistribution =>
>       expressions.length == h.expressions.length && 
> expressions.zip(h.expressions).forall {
>           case (l, r) => l.semanticEquals(r)
>  }
> {code}
> the semanticEquals returns false as it compares key2 and key3 eventhough key3 
> is just a rename of key2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to