Rakesh Kumar created SPARK-52246: ------------------------------------ Summary: SPJ: Support auto-shuffle one side + less join keys than partition keys produces incorrect results Key: SPARK-52246 URL: https://issues.apache.org/jira/browse/SPARK-52246 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 4.0.0 Reporter: Rakesh Kumar
Spark's auto-shuffle one side when there are less join keys than partition could produce incorrect results. *How to reproduce* test("SPARK-48012: one-side shuffle with partition transforms " + "with join key tail part of the partition keys ") { val items_partitions = Array(bucket(2, "id")) createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + "(1, 'aa', 30.0, cast('2020-01-02' as timestamp)), " + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") createTable(purchases, purchasesColumns, Array.empty) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(1, 89.0, cast('2020-01-03' as timestamp)), " + "(3, 19.5, cast('2020-02-01' as timestamp)), " + "(5, 26.0, cast('2023-01-01' as timestamp)), " + "(6, 50.0, cast('2023-02-01' as timestamp))") withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { val df = createJoinTestDF(Seq("arrive_time" -> "time", "id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.size == 1, "SPJ should be triggered") checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0))) } } Issue: The generated physical plan auto-shuffles by choosing the wrong keygroupedpartitioning partitioning function as highlighted below == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == LocalTableScan <empty>, [id#37L, name#38, purchase_price#35, sale_price#36|#37L, name#38, purchase_price#35, sale_price#36] +- == Initial Plan == Sort [id#37L ASC NULLS FIRST, purchase_price#35 ASC NULLS FIRST, sale_price#36 ASC NULLS FIRST|#37L ASC NULLS FIRST, purchase_price#35 ASC NULLS FIRST, sale_price#36 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(id#37L ASC NULLS FIRST, purchase_price#35 ASC NULLS FIRST, sale_price#36 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=79] +- Project [id#37L, name#38, price#39 AS purchase_price#35, price#42 AS sale_price#36|#37L, name#38, price#39 AS purchase_price#35, price#42 AS sale_price#36] +- SortMergeJoin [arrive_time#40, id#37L|#40, id#37L], [time#43, item_id#41L|#43, item_id#41L], Inner :- Sort [arrive_time#40 ASC NULLS FIRST, id#37L ASC NULLS FIRST|#40 ASC NULLS FIRST, id#37L ASC NULLS FIRST], false, 0 : +- Project [id#37L, name#38, price#39, arrive_time#40|#37L, name#38, price#39, arrive_time#40] : +- Filter (isnotnull(arrive_time#40) AND isnotnull(id#37L)) : +- BatchScan testcat.ns.items[id#37L, name#38, price#39, arrive_time#40|#37L, name#38, price#39, arrive_time#40] class org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan RuntimeFilters: [] +- Sort [time#43 ASC NULLS FIRST, item_id#41L ASC NULLS FIRST|#43 ASC NULLS FIRST, item_id#41L ASC NULLS FIRST], false, 0 *+- Exchange keygroupedpartitioning(transformexpression(bucket, time#43, Some(2)), 2, [0], [1]), ENSURE_REQUIREMENTS, [plan_id=73]* +- Project [item_id#41L, price#42, time#43|#41L, price#42, time#43] +- Filter (isnotnull(time#43) AND isnotnull(item_id#41L)) +- BatchScan testcat.ns.purchases[item_id#41L, price#42, time#43|#41L, price#42, time#43] class org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan RuntimeFilters: [] -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org