Yuming Wang created SPARK-34966:
-----------------------------------

             Summary: Avoid shuffle if join type do not match
                 Key: SPARK-34966
                 URL: https://issues.apache.org/jira/browse/SPARK-34966
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.2.0
            Reporter: Yuming Wang


How to reproduce this issue:
{code:scala}
    spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
    spark.sql("CREATE TABLE t1 using parquet clustered by (id) into 200 buckets 
AS SELECT cast(id as bigint) FROM range(1000)")
    spark.sql("CREATE TABLE t2 using parquet clustered by (id) into 200 buckets 
AS SELECT cast(id as int) FROM range(500)")
    spark.sql("select * from t1 join t2 on (t1.id = t2.id)").explain
{code}

Current:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#14L], [cast(id#15 as bigint)], Inner
   :- Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(id#14L)
   :     +- FileScan parquet default.t1[id#14L] Batched: true, DataFilters: 
[isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark....,
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint>, SelectedBucketsCount: 200 out of 200
   +- Sort [cast(id#15 as bigint) ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(cast(id#15 as bigint), 200), 
ENSURE_REQUIREMENTS, [id=#58]
         +- Filter isnotnull(id#15)
            +- FileScan parquet default.t2[id#15] Batched: true, DataFilters: 
[isnotnull(id#15)], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark....,
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:int>
{noformat}

Expected:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#14L], [cast(id#15 as bigint)], Inner
   :- Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(id#14L)
   :     +- FileScan parquet default.t1[id#14L] Batched: true, DataFilters: 
[isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark....,
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint>, SelectedBucketsCount: 200 out of 200
   +- Sort [cast(id#15 as bigint) ASC NULLS FIRST], false, 0
      +- Filter isnotnull(id#15)
         +- FileScan parquet default.t2[id#15] Batched: true, DataFilters: 
[isnotnull(id#15)], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark....,
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:int>, SelectedBucketsCount: 200 out of 200
{noformat}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to