[ https://issues.apache.org/jira/browse/SPARK-19122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16009146#comment-16009146 ]
Wenchen Fan commented on SPARK-19122: ------------------------------------- sorry I forgot to set the broadcast threshold, now I can reproduce this issue > Unnecessary shuffle+sort added if join predicates ordering differ from > bucketing and sorting order > -------------------------------------------------------------------------------------------------- > > Key: SPARK-19122 > URL: https://issues.apache.org/jira/browse/SPARK-19122 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.2, 2.1.0 > Reporter: Tejas Patil > > `table1` and `table2` are sorted and bucketed on columns `j` and `k` (in > respective order) > This is how they are generated: > {code} > val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", > "k").coalesce(1) > df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, > "j", "k").sortBy("j", "k").saveAsTable("table1") > df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, > "j", "k").sortBy("j", "k").saveAsTable("table2") > {code} > Now, if join predicates are specified in query in *same* order as bucketing > and sort order, there is no shuffle and sort. > {code} > scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.j=b.j AND > a.k=b.k").explain(true) > == Physical Plan == > *SortMergeJoin [j#61, k#62], [j#100, k#101], Inner > :- *Project [i#60, j#61, k#62] > : +- *Filter (isnotnull(k#62) && isnotnull(j#61)) > : +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: > ORC, Location: InMemoryFileIndex[file:/table1], PartitionFilters: [], > PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: > struct<i:int,j:int,k:string> > +- *Project [i#99, j#100, k#101] > +- *Filter (isnotnull(j#100) && isnotnull(k#101)) > +- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, > Format: ORC, Location: InMemoryFileIndex[file:/table2], PartitionFilters: [], > PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: > struct<i:int,j:int,k:string> > {code} > The same query with join predicates in *different* order from bucketing and > sort order leads to extra shuffle and sort being introduced > {code} > scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.k=b.k AND a.j=b.j > ").explain(true) > == Physical Plan == > *SortMergeJoin [k#62, j#61], [k#101, j#100], Inner > :- *Sort [k#62 ASC NULLS FIRST, j#61 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(k#62, j#61, 200) > : +- *Project [i#60, j#61, k#62] > : +- *Filter (isnotnull(k#62) && isnotnull(j#61)) > : +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, > Format: ORC, Location: InMemoryFileIndex[file:/table1], PartitionFilters: [], > PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: > struct<i:int,j:int,k:string> > +- *Sort [k#101 ASC NULLS FIRST, j#100 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(k#101, j#100, 200) > +- *Project [i#99, j#100, k#101] > +- *Filter (isnotnull(j#100) && isnotnull(k#101)) > +- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, > Format: ORC, Location: InMemoryFileIndex[file:/table2], PartitionFilters: [], > PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: > struct<i:int,j:int,k:string> > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org