[ https://issues.apache.org/jira/browse/SPARK-25951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-25951: ------------------------------------ Assignee: (was: Apache Spark) > Redundant shuffle if column is renamed > -------------------------------------- > > Key: SPARK-25951 > URL: https://issues.apache.org/jira/browse/SPARK-25951 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.3.0 > Reporter: Ohad Raviv > Priority: Minor > > we've noticed that sometimes a column rename causes extra shuffle: > {code:java} > val N = 1 << 12 > spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") > val t1 = spark.range(N).selectExpr("floor(id/4) as key1") > val t2 = spark.range(N).selectExpr("floor(id/4) as key2") > import org.apache.spark.sql.functions._ > t1.groupBy("key1").agg(count(lit("1")).as("cnt1")) > .join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2", > "key3"), > col("key1")===col("key3")) > .explain() > {code} > results in: > {noformat} > == Physical Plan == > *(6) SortMergeJoin [key1#6L], [key3#22L], Inner > :- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0 > : +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], > output=[key1#6L, cnt1#14L]) > : +- Exchange hashpartitioning(key1#6L, 2) > : +- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], > output=[key1#6L, count#39L]) > : +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L] > : +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0))) > : +- *(1) Range (0, 4096, step=1, splits=1) > +- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(key3#22L, 2) > +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], > output=[key3#22L, cnt2#19L]) > +- Exchange hashpartitioning(key2#10L, 2) > +- *(3) HashAggregate(keys=[key2#10L], > functions=[partial_count(1)], output=[key2#10L, count#41L]) > +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS > key2#10L] > +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / > 4.0))) > +- *(3) Range (0, 4096, step=1, splits=1) > {noformat} > I was able to track it down to this code in class HashPartitioning: > {code:java} > case h: HashClusteredDistribution => > expressions.length == h.expressions.length && > expressions.zip(h.expressions).forall { > case (l, r) => l.semanticEquals(r) > } > {code} > the semanticEquals returns false as it compares key2 and key3 eventhough key3 > is just a rename of key2 -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org