[
https://issues.apache.org/jira/browse/SPARK-55038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Feng Zhang updated SPARK-55038:
-------------------------------
Description:
h4. Summary
When `spark.sql.adaptive.enabled=true`
and`spark.sql.objectHashAggregate.sortBased.fallbackThreshold=1`, queries
using `array_agg(DISTINCT)` in correlated subqueries produce incorrect results.
h4. Reproducer
{code:java}
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.objectHashAggregate.sortBased.fallbackThreshold", "1")
spark.sql("""
WITH t AS (SELECT explode(array(15, 16, 17)) AS v)
SELECT
(SELECT COUNT(*) FROM t WHERE array_contains(arr1, v)) AS array_agg_cnt,
(SELECT COUNT(*) FROM t WHERE array_contains(arr2, v)) AS collect_set_cnt,
arr1,
arr2
FROM (
SELECT
array_agg(DISTINCT v) AS arr1,
collect_set(v) AS arr2
FROM t
)
""").show()
Expected: Both counts = 3
Actual: array_agg_cnt = 0, collect_set_cnt = 3
{code}
h4. Root Cause Analysis
The correlated subquery is decorrelated into a hash join using the array as the
join key.
When array_agg(DISTINCT) (which uses collect_list(distinct)) is computed
multiple times with sort-based aggregation + AQE, the computations produce
arrays with the same elements but different orderings. Since array equality is
element-by-element, the hash join fails to match.
h4. Workarounds
- Set spark.sql.adaptive.enabled=false, OR
- Set spark.sql.objectHashAggregate.sortBased.fallbackThreshold to default
(128)
h4. Test Results
- AQE=true, sortBased.fallbackThreshold=1: FAIL (cnt=0, expected 3)
- AQE=false, sortBased.fallbackThreshold=1: PASS (cnt=3)
- AQE=true, sortBased.fallbackThreshold=128 (default): PASS (cnt=3)
h4. Conclusion
The bug only occurs when both AQE is enabled and sortBased.fallbackThreshold=1.
was:
h4. Summary
When `spark.sql.adaptive.enabled=true`
and`spark.sql.objectHashAggregate.sortBased.fallbackThreshold=1`, queries
using `array_agg(DISTINCT)` in correlated subqueries produce incorrect results.
h4. Reproducer
{code:java}
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.objectHashAggregate.sortBased.fallbackThreshold",
"1") spark.sql("""
WITH t AS (SELECT explode(array(15, 16, 17)) AS v)
SELECT (SELECT COUNT(*) FROM t WHERE array_contains(arr, v)) AS cnt, arr
FROM (SELECT collect_set(v) AS arr FROM t)
""").show()
-- Expected: cnt = 3
-- Actual: cnt = 0
{code}
h4. Root Cause Analysis
The correlated subquery is decorrelated into a hash join using the array as the
join key.
When array_agg(DISTINCT) (which uses collect_list(distinct)) is computed
multiple times with sort-based aggregation + AQE, the computations produce
arrays with the same elements but different orderings. Since array equality is
element-by-element, the hash join fails to match.
h4. Workarounds
- Set spark.sql.adaptive.enabled=false, OR
- Set spark.sql.objectHashAggregate.sortBased.fallbackThreshold to default
(128)
h4. Test Results
- AQE=true, sortBased.fallbackThreshold=1: FAIL (cnt=0, expected 3)
- AQE=false, sortBased.fallbackThreshold=1: PASS (cnt=3)
- AQE=true, sortBased.fallbackThreshold=128 (default): PASS (cnt=3)
h4. Conclusion
The bug only occurs when both AQE is enabled and sortBased.fallbackThreshold=1.
> AQE + sortBased aggregation produces wrong results for array_agg(DISTINCT) in
> correlated subqueries
> ---------------------------------------------------------------------------------------------------
>
> Key: SPARK-55038
> URL: https://issues.apache.org/jira/browse/SPARK-55038
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.5.0, 4.0.0
> Environment: - Spark 4.0.1 (also reproducible on 3.5.x)
> - Reproduced in both local[4] and distributed mode
>
> Reporter: Feng Zhang
> Priority: Critical
>
> h4. Summary
> When `spark.sql.adaptive.enabled=true`
> and`spark.sql.objectHashAggregate.sortBased.fallbackThreshold=1`, queries
> using `array_agg(DISTINCT)` in correlated subqueries produce incorrect
> results.
>
> h4. Reproducer
> {code:java}
> spark.conf.set("spark.sql.adaptive.enabled", "true")
> spark.conf.set("spark.sql.objectHashAggregate.sortBased.fallbackThreshold",
> "1")
> spark.sql("""
> WITH t AS (SELECT explode(array(15, 16, 17)) AS v)
> SELECT
> (SELECT COUNT(*) FROM t WHERE array_contains(arr1, v)) AS array_agg_cnt,
> (SELECT COUNT(*) FROM t WHERE array_contains(arr2, v)) AS
> collect_set_cnt,
> arr1,
> arr2
> FROM (
> SELECT
> array_agg(DISTINCT v) AS arr1,
> collect_set(v) AS arr2
> FROM t
> )
> """).show()
> Expected: Both counts = 3
> Actual: array_agg_cnt = 0, collect_set_cnt = 3
> {code}
>
> h4. Root Cause Analysis
>
> The correlated subquery is decorrelated into a hash join using the array as
> the join key.
> When array_agg(DISTINCT) (which uses collect_list(distinct)) is computed
> multiple times with sort-based aggregation + AQE, the computations produce
> arrays with the same elements but different orderings. Since array equality
> is element-by-element, the hash join fails to match.
>
> h4. Workarounds
>
> - Set spark.sql.adaptive.enabled=false, OR
> - Set spark.sql.objectHashAggregate.sortBased.fallbackThreshold to default
> (128)
>
> h4. Test Results
>
> - AQE=true, sortBased.fallbackThreshold=1: FAIL (cnt=0, expected 3)
> - AQE=false, sortBased.fallbackThreshold=1: PASS (cnt=3)
> - AQE=true, sortBased.fallbackThreshold=128 (default): PASS (cnt=3)
>
> h4. Conclusion
>
> The bug only occurs when both AQE is enabled and
> sortBased.fallbackThreshold=1.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]