[ https://issues.apache.org/jira/browse/SPARK-31350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-31350: ------------------------------------ Assignee: Apache Spark > Coalesce bucketed tables for join if applicable > ----------------------------------------------- > > Key: SPARK-31350 > URL: https://issues.apache.org/jira/browse/SPARK-31350 > Project: Spark > Issue Type: New Feature > Components: SQL > Affects Versions: 3.1.0 > Reporter: Terry Kim > Assignee: Apache Spark > Priority: Major > > The following example of joining two bucketed tables introduces a full > shuffle: > {code:java} > spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0") > val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", > "k") > val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", > "k") > df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") > df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2") > val t1 = spark.table("t1") > val t2 = spark.table("t2") > val joined = t1.join(t2, t1("i") === t2("i")) > joined.explain(true) > == Physical Plan == > *(5) SortMergeJoin [i#44], [i#50], Inner > :- *(2) Sort [i#44 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(i#44, 200), true, [id=#105] > : +- *(1) Project [i#44, j#45, k#46] > : +- *(1) Filter isnotnull(i#44) > : +- *(1) ColumnarToRow > : +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, > DataFilters: [isnotnull(i#44)], Format: Parquet, Location: > InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], > ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8 > +- *(4) Sort [i#50 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(i#50, 200), true, [id=#115] > +- *(3) Project [i#50, j#51, k#52] > +- *(3) Filter isnotnull(i#50) > +- *(3) ColumnarToRow > +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, > DataFilters: [isnotnull(i#50)], Format: Parquet, Location: > InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], > ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4 > {code} > But one side can be coalesced to eliminate the shuffle. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org