Rakesh Kumar created SPARK-52246:
------------------------------------

             Summary: SPJ: Support auto-shuffle one side + less join keys than 
partition keys produces incorrect results
                 Key: SPARK-52246
                 URL: https://issues.apache.org/jira/browse/SPARK-52246
             Project: Spark
          Issue Type: Sub-task
          Components: SQL
    Affects Versions: 4.0.0
            Reporter: Rakesh Kumar


Spark's auto-shuffle one side when there are less join keys than partition 
could produce incorrect results.

 

*How to reproduce*

test("SPARK-48012: one-side shuffle with partition transforms " +
    "with join key tail part of the partition keys ") {
    val items_partitions = Array(bucket(2, "id"))
    createTable(items, itemsColumns, items_partitions)

    sql(s"INSERT INTO testcat.ns.$items VALUES " +
      "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
      "(1, 'aa', 30.0, cast('2020-01-02' as timestamp)), " +
      "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
      "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))")

    createTable(purchases, purchasesColumns, Array.empty)
    sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
      "(1, 42.0, cast('2020-01-01' as timestamp)), " +
      "(1, 89.0, cast('2020-01-03' as timestamp)), " +
      "(3, 19.5, cast('2020-02-01' as timestamp)), " +
      "(5, 26.0, cast('2023-01-01' as timestamp)), " +
      "(6, 50.0, cast('2023-02-01' as timestamp))")

    withSQLConf(
      SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
      SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true",
      SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
      SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> 
"false",
      SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> 
"true")

{       val df = createJoinTestDF(Seq("arrive_time" -> "time", "id" -> 
"item_id"))       val shuffles = 
collectShuffles(df.queryExecution.executedPlan)       assert(shuffles.size == 
1, "SPJ should be triggered")       checkAnswer(df, Seq(Row(1, "aa", 40.0, 
42.0)))     }

  }

 

 

Issue:

The generated physical plan auto-shuffles by choosing the wrong 
keygroupedpartitioning partitioning function as highlighted below

 

== Physical Plan ==

AdaptiveSparkPlan isFinalPlan=true

+- == Final Plan ==

LocalTableScan <empty>, [id#37L, name#38, purchase_price#35, 
sale_price#36|#37L, name#38, purchase_price#35, sale_price#36]

+- == Initial Plan ==

Sort [id#37L ASC NULLS FIRST, purchase_price#35 ASC NULLS FIRST, sale_price#36 
ASC NULLS FIRST|#37L ASC NULLS FIRST, purchase_price#35 ASC NULLS FIRST, 
sale_price#36 ASC NULLS FIRST], true, 0

+- Exchange rangepartitioning(id#37L ASC NULLS FIRST, purchase_price#35 ASC 
NULLS FIRST, sale_price#36 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, 
[plan_id=79]

+- Project [id#37L, name#38, price#39 AS purchase_price#35, price#42 AS 
sale_price#36|#37L, name#38, price#39 AS purchase_price#35, price#42 AS 
sale_price#36]

+- SortMergeJoin [arrive_time#40, id#37L|#40, id#37L], [time#43, 
item_id#41L|#43, item_id#41L], Inner

:- Sort [arrive_time#40 ASC NULLS FIRST, id#37L ASC NULLS FIRST|#40 ASC NULLS 
FIRST, id#37L ASC NULLS FIRST], false, 0

: +- Project [id#37L, name#38, price#39, arrive_time#40|#37L, name#38, 
price#39, arrive_time#40]

: +- Filter (isnotnull(arrive_time#40) AND isnotnull(id#37L))

: +- BatchScan testcat.ns.items[id#37L, name#38, price#39, arrive_time#40|#37L, 
name#38, price#39, arrive_time#40] class 
org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan 
RuntimeFilters: []

+- Sort [time#43 ASC NULLS FIRST, item_id#41L ASC NULLS FIRST|#43 ASC NULLS 
FIRST, item_id#41L ASC NULLS FIRST], false, 0

*+- Exchange keygroupedpartitioning(transformexpression(bucket, time#43, 
Some(2)), 2, [0], [1]), ENSURE_REQUIREMENTS, [plan_id=73]*

+- Project [item_id#41L, price#42, time#43|#41L, price#42, time#43]

+- Filter (isnotnull(time#43) AND isnotnull(item_id#41L))

+- BatchScan testcat.ns.purchases[item_id#41L, price#42, time#43|#41L, 
price#42, time#43] class 
org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan 
RuntimeFilters: []

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to