[ 
https://issues.apache.org/jira/browse/SPARK-52245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rakesh Kumar resolved SPARK-52245.
----------------------------------
    Resolution: Duplicate

> SPJ: Support auto-shuffle one side + less join keys than partition keys 
> produces incorrect results
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-52245
>                 URL: https://issues.apache.org/jira/browse/SPARK-52245
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.0.0
>            Reporter: Rakesh Kumar
>            Priority: Major
>
> Spark's auto-shuffle one side when there are less join keys than partition 
> could produce incorrect results.
>  
> *How to reproduce*
> test("SPARK-48012: one-side shuffle with partition transforms " +
>     "with join key tail part of the partition keys ") {
>     val items_partitions = Array(bucket(2, "id"))
>     createTable(items, itemsColumns, items_partitions)
>     sql(s"INSERT INTO testcat.ns.$items VALUES " +
>       "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
>       "(1, 'aa', 30.0, cast('2020-01-02' as timestamp)), " +
>       "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
>       "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))")
>     createTable(purchases, purchasesColumns, Array.empty)
>     sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
>       "(1, 42.0, cast('2020-01-01' as timestamp)), " +
>       "(1, 89.0, cast('2020-01-03' as timestamp)), " +
>       "(3, 19.5, cast('2020-02-01' as timestamp)), " +
>       "(5, 26.0, cast('2023-01-01' as timestamp)), " +
>       "(6, 50.0, cast('2023-02-01' as timestamp))")
>     withSQLConf(
>       SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
>       SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true",
>       SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
>       SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> 
> "false",
>       SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> 
> "true") {
>       val df = createJoinTestDF(Seq("arrive_time" -> "time", "id" -> 
> "item_id"))
>       val shuffles = collectShuffles(df.queryExecution.executedPlan)
>       assert(shuffles.size == 1, "SPJ should be triggered")
>       checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0)))
>     }
>   }
>  
>  
> Issue:
> The generated physical plan auto-shuffles by choosing the wrong 
> keygroupedpartitioning partitioning function as highlighted below
>  
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=true
> +- == Final Plan ==
> LocalTableScan <empty>, [id#37L, name#38, purchase_price#35, sale_price#36]
> +- == Initial Plan ==
> Sort [id#37L ASC NULLS FIRST, purchase_price#35 ASC NULLS FIRST, 
> sale_price#36 ASC NULLS FIRST], true, 0
> +- Exchange rangepartitioning(id#37L ASC NULLS FIRST, purchase_price#35 ASC 
> NULLS FIRST, sale_price#36 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, 
> [plan_id=79]
> +- Project [id#37L, name#38, price#39 AS purchase_price#35, price#42 AS 
> sale_price#36]
> +- SortMergeJoin [arrive_time#40, id#37L], [time#43, item_id#41L], Inner
> :- Sort [arrive_time#40 ASC NULLS FIRST, id#37L ASC NULLS FIRST], false, 0
> : +- Project [id#37L, name#38, price#39, arrive_time#40]
> : +- Filter (isnotnull(arrive_time#40) AND isnotnull(id#37L))
> : +- BatchScan testcat.ns.items[id#37L, name#38, price#39, arrive_time#40] 
> class 
> org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan 
> RuntimeFilters: []
> +- Sort [time#43 ASC NULLS FIRST, item_id#41L ASC NULLS FIRST], false, 0
> *+- Exchange keygroupedpartitioning(transformexpression(bucket, time#43, 
> Some(2)), 2, [0], [1]), ENSURE_REQUIREMENTS, [plan_id=73]*
> +- Project [item_id#41L, price#42, time#43]
> +- Filter (isnotnull(time#43) AND isnotnull(item_id#41L))
> +- BatchScan testcat.ns.purchases[item_id#41L, price#42, time#43] class 
> org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan 
> RuntimeFilters: []
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to