Hi Spark community,
I encountered an unexpected behavior when using `percentile()` and
`count_distinct()` in a simple groupBy aggregation, and I’d like to confirm
whether this is expected behavior or a potential correctness issue.
Environment:
- Apache Spark 3.5.7
- Data source: Parquet
- Schema:
root
|-- user_id: long (nullable = true)
|-- duration: long (nullable = true)
|-- date: date (nullable = true)
Reproduction code:
------------------------------------------------------------
df.groupBy("date").agg(
F.count_distinct('user_id').alias('n_users'),
F.percentile('duration', 0.95).alias('p95')
).explain()
------------------------------------------------------------
Physical plan:
------------------------------------------------------------
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ObjectHashAggregate(keys=[date#561],
functions=[percentile(duration#559L, 0.95, 1, 0, 0, false), count(distinct
user_id#558L)])
+- Exchange hashpartitioning(date#561, 200), ENSURE_REQUIREMENTS,
[plan_id=717]
+- ObjectHashAggregate(keys=[date#561],
functions=[merge_percentile(duration#559L, 0.95, 1, 0, 0, false),
partial_count(distinct user_id#558L)])
+- ObjectHashAggregate(keys=[date#561, user_id#558L],
functions=[merge_percentile(duration#559L, 0.95, 1, 0, 0, false)])
+- Exchange hashpartitioning(date#561, user_id#558L, 200),
ENSURE_REQUIREMENTS, [plan_id=713]
+- ObjectHashAggregate(keys=[date#561, user_id#558L],
functions=[partial_percentile(duration#559L, 0.95, 1, 0, 0, false)])
+- FileScan parquet [user_id#558L,duration#559L,date#561]
...
------------------------------------------------------------
Question:
Why is there an additional `ObjectHashAggregate` on `(date, user_id)` when
the logical aggregation only groups by `date`?
For the case of groupBy("date") + count_distinct + sum, it seems fine.
But for percentile, could this lead to a potential correctness issue?
(when [date,
user_id] is not unique)
--
*camper42 (曹丰宇)*
Douban, Inc.