Rakesh Kumar created SPARK-52245:
------------------------------------
Summary: SPJ: Support auto-shuffle one side + less join keys than
partition keys produces incorrect results
Key: SPARK-52245
URL: https://issues.apache.org/jira/browse/SPARK-52245
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 4.0.0
Reporter: Rakesh Kumar
Spark's auto-shuffle one side when there are less join keys than partition
could produce incorrect results.
*How to reproduce*
test("SPARK-48012: one-side shuffle with partition transforms " +
"with join key tail part of the partition keys ") {
val items_partitions = Array(bucket(2, "id"))
createTable(items, itemsColumns, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
"(1, 'aa', 30.0, cast('2020-01-02' as timestamp)), " +
"(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
"(4, 'cc', 15.5, cast('2020-02-01' as timestamp))")
createTable(purchases, purchasesColumns, Array.empty)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
"(1, 42.0, cast('2020-01-01' as timestamp)), " +
"(1, 89.0, cast('2020-01-03' as timestamp)), " +
"(3, 19.5, cast('2020-02-01' as timestamp)), " +
"(5, 26.0, cast('2023-01-01' as timestamp)), " +
"(6, 50.0, cast('2023-02-01' as timestamp))")
withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
"false",
SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
"true") {
val df = createJoinTestDF(Seq("arrive_time" -> "time", "id" -> "item_id"))
val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(shuffles.size == 1, "SPJ should be triggered")
checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0)))
}
}
Issue:
The generated physical plan auto-shuffles by choosing the wrong
keygroupedpartitioning partitioning function as highlighted below
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
LocalTableScan <empty>, [id#37L, name#38, purchase_price#35, sale_price#36]
+- == Initial Plan ==
Sort [id#37L ASC NULLS FIRST, purchase_price#35 ASC NULLS FIRST, sale_price#36
ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#37L ASC NULLS FIRST, purchase_price#35 ASC
NULLS FIRST, sale_price#36 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS,
[plan_id=79]
+- Project [id#37L, name#38, price#39 AS purchase_price#35, price#42 AS
sale_price#36]
+- SortMergeJoin [arrive_time#40, id#37L], [time#43, item_id#41L], Inner
:- Sort [arrive_time#40 ASC NULLS FIRST, id#37L ASC NULLS FIRST], false, 0
: +- Project [id#37L, name#38, price#39, arrive_time#40]
: +- Filter (isnotnull(arrive_time#40) AND isnotnull(id#37L))
: +- BatchScan testcat.ns.items[id#37L, name#38, price#39, arrive_time#40]
class
org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan
RuntimeFilters: []
+- Sort [time#43 ASC NULLS FIRST, item_id#41L ASC NULLS FIRST], false, 0
*+- Exchange keygroupedpartitioning(transformexpression(bucket, time#43,
Some(2)), 2, [0], [1]), ENSURE_REQUIREMENTS, [plan_id=73]*
+- Project [item_id#41L, price#42, time#43]
+- Filter (isnotnull(time#43) AND isnotnull(item_id#41L))
+- BatchScan testcat.ns.purchases[item_id#41L, price#42, time#43] class
org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan
RuntimeFilters: []
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]