[ https://issues.apache.org/jira/browse/SPARK-52245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rakesh Kumar resolved SPARK-52245. ---------------------------------- Resolution: Duplicate > SPJ: Support auto-shuffle one side + less join keys than partition keys > produces incorrect results > -------------------------------------------------------------------------------------------------- > > Key: SPARK-52245 > URL: https://issues.apache.org/jira/browse/SPARK-52245 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 4.0.0 > Reporter: Rakesh Kumar > Priority: Major > > Spark's auto-shuffle one side when there are less join keys than partition > could produce incorrect results. > > *How to reproduce* > test("SPARK-48012: one-side shuffle with partition transforms " + > "with join key tail part of the partition keys ") { > val items_partitions = Array(bucket(2, "id")) > createTable(items, itemsColumns, items_partitions) > sql(s"INSERT INTO testcat.ns.$items VALUES " + > "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + > "(1, 'aa', 30.0, cast('2020-01-02' as timestamp)), " + > "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + > "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") > createTable(purchases, purchasesColumns, Array.empty) > sql(s"INSERT INTO testcat.ns.$purchases VALUES " + > "(1, 42.0, cast('2020-01-01' as timestamp)), " + > "(1, 89.0, cast('2020-01-03' as timestamp)), " + > "(3, 19.5, cast('2020-02-01' as timestamp)), " + > "(5, 26.0, cast('2023-01-01' as timestamp)), " + > "(6, 50.0, cast('2023-02-01' as timestamp))") > withSQLConf( > SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", > SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true", > SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", > SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> > "false", > SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> > "true") { > val df = createJoinTestDF(Seq("arrive_time" -> "time", "id" -> > "item_id")) > val shuffles = collectShuffles(df.queryExecution.executedPlan) > assert(shuffles.size == 1, "SPJ should be triggered") > checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0))) > } > } > > > Issue: > The generated physical plan auto-shuffles by choosing the wrong > keygroupedpartitioning partitioning function as highlighted below > > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=true > +- == Final Plan == > LocalTableScan <empty>, [id#37L, name#38, purchase_price#35, sale_price#36] > +- == Initial Plan == > Sort [id#37L ASC NULLS FIRST, purchase_price#35 ASC NULLS FIRST, > sale_price#36 ASC NULLS FIRST], true, 0 > +- Exchange rangepartitioning(id#37L ASC NULLS FIRST, purchase_price#35 ASC > NULLS FIRST, sale_price#36 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, > [plan_id=79] > +- Project [id#37L, name#38, price#39 AS purchase_price#35, price#42 AS > sale_price#36] > +- SortMergeJoin [arrive_time#40, id#37L], [time#43, item_id#41L], Inner > :- Sort [arrive_time#40 ASC NULLS FIRST, id#37L ASC NULLS FIRST], false, 0 > : +- Project [id#37L, name#38, price#39, arrive_time#40] > : +- Filter (isnotnull(arrive_time#40) AND isnotnull(id#37L)) > : +- BatchScan testcat.ns.items[id#37L, name#38, price#39, arrive_time#40] > class > org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan > RuntimeFilters: [] > +- Sort [time#43 ASC NULLS FIRST, item_id#41L ASC NULLS FIRST], false, 0 > *+- Exchange keygroupedpartitioning(transformexpression(bucket, time#43, > Some(2)), 2, [0], [1]), ENSURE_REQUIREMENTS, [plan_id=73]* > +- Project [item_id#41L, price#42, time#43] > +- Filter (isnotnull(time#43) AND isnotnull(item_id#41L)) > +- BatchScan testcat.ns.purchases[item_id#41L, price#42, time#43] class > org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan > RuntimeFilters: [] > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org