[ https://issues.apache.org/jira/browse/SPARK-23560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruce Robbins updated SPARK-23560: ---------------------------------- Summary: Group by on struct field can add extra shuffle (was: A joinWith followed by groupBy requires extra shuffle) > Group by on struct field can add extra shuffle > ---------------------------------------------- > > Key: SPARK-23560 > URL: https://issues.apache.org/jira/browse/SPARK-23560 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.3.0 > Environment: debian 8.9, macos x high sierra > Reporter: Bruce Robbins > Priority: Major > > Depending on the size of the input, a joinWith followed by a groupBy requires > more shuffles than a join followed by a groupBy. > For example, here's a joinWith on two CSV files, followed by a groupBy: > {noformat} > import org.apache.spark.sql.types._ > val schema = StructType(StructField("id1", LongType) :: StructField("id2", > LongType) :: Nil) > val df1 = spark.read.schema(schema).csv("ds1.csv") > val df2 = spark.read.schema(schema).csv("ds2.csv") > val result1 = df1.joinWith(df2, df1.col("id1") === > df2.col("id2")).groupBy("_1.id1").count > result1.explain > == Physical Plan == > *(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)]) > +- Exchange hashpartitioning(_1#8.id1#19L, 200) > +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], > functions=[partial_count(1)]) > +- *(5) Project [_1#8] > +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner > :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#8.id1, 200) > : +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS > _1#8] > : +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, > Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: > [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint> > +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_2#9.id2, 200) > +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS > _2#9] > +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, > Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: > [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint> > {noformat} > Using join, there is one less shuffle: > {noformat} > val result2 = df1.join(df2, df1.col("id1") === > df2.col("id2")).groupBy(df1("id1")).count > result2.explain > == Physical Plan == > *(5) HashAggregate(keys=[id1#0L], functions=[count(1)]) > +- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)]) > +- *(5) Project [id1#0L] > +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner > :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id1#0L, 200) > : +- *(1) Project [id1#0L] > : +- *(1) Filter isnotnull(id1#0L) > : +- *(1) FileScan csv [id1#0L] Batched: false, Format: > CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], > PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint> > +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id2#5L, 200) > +- *(3) Project [id2#5L] > +- *(3) Filter isnotnull(id2#5L) > +- *(3) FileScan csv [id2#5L] Batched: false, Format: > CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], > PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint> > {noformat} > T-he extra exchange is reflected in the run time of the query.- Actually, I > recant this bit. In my particular tests, the extra exchange has negligible > impact on run time. All the difference is in stage 2. > My tests were on inputs with more than 2 million records. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org