Szehon Ho created SPARK-44641:
---------------------------------
Summary: Results duplicated when SPJ partial-cluster and pushdown
enabled but conditions unmet
Key: SPARK-44641
URL: https://issues.apache.org/jira/browse/SPARK-44641
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.4.1
Reporter: Szehon Ho
Adding the following test case in KeyGroupedPartitionSuite demonstrates the
problem.
{code:java}
test("test join key is the second partition key and a transform") {
val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
createTable(items, items_schema, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
createTable(purchases, purchases_schema, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")
withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
"true") {
val df = sql("SELECT id, name, i.price as purchase_price, " +
"p.item_id, p.price as sale_price " +
s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
"ON i.arrive_time = p.time " +
"ORDER BY id, purchase_price, p.item_id, sale_price")
val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are
partition keys")
checkAnswer(df,
Seq(
Row(1, "aa", 40.0, 1, 42.0),
Row(1, "aa", 40.0, 2, 11.0),
Row(1, "aa", 41.0, 1, 44.0),
Row(1, "aa", 41.0, 1, 45.0),
Row(2, "bb", 10.0, 1, 42.0),
Row(2, "bb", 10.0, 2, 11.0),
Row(2, "bb", 10.5, 1, 42.0),
Row(2, "bb", 10.5, 2, 11.0),
Row(3, "cc", 15.5, 3, 19.5)
)
)
}
}{code}
This tests the case the datasourceV2 returns multiple splits for same partition.
In this case, SPJ is not triggered, but the following code in DSV2Scan:
https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194
intended to fill the empty partition for 'pushdown-vallue' will still iterate
through non-grouped partition and lookup from grouped partition to fill the
map, resulting in some duplicate input data fed into the join.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]