did a quick check, the result is correct.
In our case, (date, user_id) is unique, seems that it’s just our code that
caused an unexpected and poor-performing query plan, no potential
correctness issue,
thanks for help.
---
df = spark.createDataFrame([
("2025-01-01", 1, 0),
("2025-01-01", 1, 1),
("2025-01-01", 2, 2),
("2025-01-01", 2, 3),
("2025-01-01", 3, 4),
("2025-01-01", 3, 5),
("2025-01-01", 4, 6),
("2025-01-01", 4, 7),
("2025-01-01", 5, 8),
("2025-01-01", 5, 9),
("2025-01-02", 1, 0),
("2025-01-02", 1, 1),
("2025-01-02", 2, 2),
("2025-01-02", 2, 3),
("2025-01-02", 3, 4),
("2025-01-02", 3, 5),
("2025-01-02", 4, 6),
("2025-01-02", 4, 7),
("2025-01-02", 5, 8),
("2025-01-02", 5, 9),
("2025-01-03", 1, 0),
("2025-01-03", 1, 1),
("2025-01-03", 2, 2),
("2025-01-03", 2, 3),
("2025-01-03", 3, 4),
("2025-01-03", 3, 5),
("2025-01-03", 4, 6),
("2025-01-03", 4, 7),
("2025-01-03", 5, 8),
("2025-01-03", 5, 9),
], schema="date string, user_id long, duration long")
---
---
df.groupBy("date").agg(
F.count_distinct('user_id').alias('n_users'),
F.sum("duration")
).toPandas()
date n_users sum(duration)
0 2025-01-03 5 45
1 2025-01-01 5 45
2 2025-01-02 5 45
---
Herman van Hovell <[email protected]> 于2025年10月17日周五 00:56写道:
> Hi FengYu,
>
> This is because of count_distinct() on user id. We rewrite this into a
> multi-step aggregation where we first aggregate per on the grouping keys
> (date) and the distinct values (user_id), and then we aggregate on the
> grouping keys only. The first step is needed to get the distinct values.
>
> I hope this helps.
>
> Cheers,
> Herman
>
> On Thu, Oct 16, 2025 at 6:08 AM FengYu Cao <[email protected]> wrote:
>
>> Hi Spark community,
>>
>> I encountered an unexpected behavior when using `percentile()` and
>> `count_distinct()` in a simple groupBy aggregation, and I’d like to confirm
>> whether this is expected behavior or a potential correctness issue.
>>
>> Environment:
>> - Apache Spark 3.5.7
>> - Data source: Parquet
>> - Schema:
>> root
>> |-- user_id: long (nullable = true)
>> |-- duration: long (nullable = true)
>> |-- date: date (nullable = true)
>>
>> Reproduction code:
>> ------------------------------------------------------------
>> df.groupBy("date").agg(
>> F.count_distinct('user_id').alias('n_users'),
>> F.percentile('duration', 0.95).alias('p95')
>> ).explain()
>> ------------------------------------------------------------
>>
>> Physical plan:
>> ------------------------------------------------------------
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- ObjectHashAggregate(keys=[date#561],
>> functions=[percentile(duration#559L, 0.95, 1, 0, 0, false), count(distinct
>> user_id#558L)])
>> +- Exchange hashpartitioning(date#561, 200), ENSURE_REQUIREMENTS,
>> [plan_id=717]
>> +- ObjectHashAggregate(keys=[date#561],
>> functions=[merge_percentile(duration#559L, 0.95, 1, 0, 0, false),
>> partial_count(distinct user_id#558L)])
>> +- ObjectHashAggregate(keys=[date#561, user_id#558L],
>> functions=[merge_percentile(duration#559L, 0.95, 1, 0, 0, false)])
>> +- Exchange hashpartitioning(date#561, user_id#558L, 200),
>> ENSURE_REQUIREMENTS, [plan_id=713]
>> +- ObjectHashAggregate(keys=[date#561, user_id#558L],
>> functions=[partial_percentile(duration#559L, 0.95, 1, 0, 0, false)])
>> +- FileScan parquet
>> [user_id#558L,duration#559L,date#561] ...
>> ------------------------------------------------------------
>>
>> Question:
>> Why is there an additional `ObjectHashAggregate` on `(date, user_id)`
>> when the logical aggregation only groups by `date`?
>>
>> For the case of groupBy("date") + count_distinct + sum, it seems fine.
>> But for percentile, could this lead to a potential correctness issue?
>> (when [date, user_id] is not unique)
>>
>> --
>> *camper42 (曹丰宇)*
>> Douban, Inc.
>>
>
--
*camper42 (曹丰宇)*
Douban, Inc.