[ https://issues.apache.org/jira/browse/SPARK-23560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390111#comment-16390111 ]
Bruce Robbins commented on SPARK-23560: --------------------------------------- The main issue is that an AttributeReference instance never considers a GetStructField instance to be semantically equal, even if they both refer to the same field. When I patched AttributeReference.semanticEquals to allow comparison with a GetStructField instance, the extra exchange drops out of the plan. It's not a production-quality patch, as the check is rudimentary, and I am not sure two such things should ever match. But it does confirm how the extra Exchange is getting added to the plan. In a nutshell, the HashAggregate is getting its output partitionings from the Exchange below the Sort in the two branches under the SortMergeJoin in the plan, which contains the expressions for both table1.id1 and table2.id2 as an GetStructField instances. However, the HashAggregate's distribution (held in its parent's requiredChildDistributions field) contains an expression for table1.id1 as a AttributeReference reference. When EnsureRequirements tries to determine whether to add an Exchange to the plan, the two expressions for table1.id don't match (using AttributeReference.semanticEquals), resulting in an Exchange getting added. In the "join" case, the two expressions for table1.id1 are both AttributeReference instances, so they match. As for a fix, I need to explore the planner/optimizer code some more and try to determine if two expressions with different implementations should be comparable, or maybe if these two expressions should have had the same implementation. > A joinWith followed by groupBy requires extra shuffle > ----------------------------------------------------- > > Key: SPARK-23560 > URL: https://issues.apache.org/jira/browse/SPARK-23560 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.3.0 > Environment: debian 8.9, macos x high sierra > Reporter: Bruce Robbins > Priority: Major > > Depending on the size of the input, a joinWith followed by a groupBy requires > more shuffles than a join followed by a groupBy. > For example, here's a joinWith on two CSV files, followed by a groupBy: > {noformat} > import org.apache.spark.sql.types._ > val schema = StructType(StructField("id1", LongType) :: StructField("id2", > LongType) :: Nil) > val df1 = spark.read.schema(schema).csv("ds1.csv") > val df2 = spark.read.schema(schema).csv("ds2.csv") > val result1 = df1.joinWith(df2, df1.col("id1") === > df2.col("id2")).groupBy("_1.id1").count > result1.explain > == Physical Plan == > *(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)]) > +- Exchange hashpartitioning(_1#8.id1#19L, 200) > +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], > functions=[partial_count(1)]) > +- *(5) Project [_1#8] > +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner > :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#8.id1, 200) > : +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS > _1#8] > : +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, > Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: > [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint> > +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_2#9.id2, 200) > +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS > _2#9] > +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, > Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: > [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint> > {noformat} > Using join, there is one less shuffle: > {noformat} > val result2 = df1.join(df2, df1.col("id1") === > df2.col("id2")).groupBy(df1("id1")).count > result2.explain > == Physical Plan == > *(5) HashAggregate(keys=[id1#0L], functions=[count(1)]) > +- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)]) > +- *(5) Project [id1#0L] > +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner > :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id1#0L, 200) > : +- *(1) Project [id1#0L] > : +- *(1) Filter isnotnull(id1#0L) > : +- *(1) FileScan csv [id1#0L] Batched: false, Format: > CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], > PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint> > +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id2#5L, 200) > +- *(3) Project [id2#5L] > +- *(3) Filter isnotnull(id2#5L) > +- *(3) FileScan csv [id2#5L] Batched: false, Format: > CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], > PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint> > {noformat} > The extra exchange is reflected in the run time of the query. > My tests were on inputs with more than 2 million records. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org