df.groupBy("date").agg(
F.count_distinct('user_id').alias('n_users'),
F.percentile("duration", [0.5, 1])
).toPandas()
date n_users percentile(duration, array(0.5, 1), 1)
0 2025-01-03 5 [4.5, 9.0]
1 2025-01-01 5 [4.5, 9.0]
2 2025-01-02 5 [4.5, 9.0]
df.groupBy("date").agg(
F.percentile("duration", [0.5, 1])
).toPandas()
date percentile(duration, array(0.5, 1), 1)
0 2025-01-01 [4.5, 9.0]
1 2025-01-02 [4.5, 9.0]
2 2025-01-03 [4.5, 9.0]
FengYu Cao <[email protected]> 于2025年10月17日周五 12:23写道:
> did a quick check, the result is correct.
>
> In our case, (date, user_id) is unique, seems that it’s just our code that
> caused an unexpected and poor-performing query plan, no potential
> correctness issue,
> thanks for help.
>
> ---
> df = spark.createDataFrame([
> ("2025-01-01", 1, 0),
> ("2025-01-01", 1, 1),
> ("2025-01-01", 2, 2),
> ("2025-01-01", 2, 3),
> ("2025-01-01", 3, 4),
> ("2025-01-01", 3, 5),
> ("2025-01-01", 4, 6),
> ("2025-01-01", 4, 7),
> ("2025-01-01", 5, 8),
> ("2025-01-01", 5, 9),
>
> ("2025-01-02", 1, 0),
> ("2025-01-02", 1, 1),
> ("2025-01-02", 2, 2),
> ("2025-01-02", 2, 3),
> ("2025-01-02", 3, 4),
> ("2025-01-02", 3, 5),
> ("2025-01-02", 4, 6),
> ("2025-01-02", 4, 7),
> ("2025-01-02", 5, 8),
> ("2025-01-02", 5, 9),
>
> ("2025-01-03", 1, 0),
> ("2025-01-03", 1, 1),
> ("2025-01-03", 2, 2),
> ("2025-01-03", 2, 3),
> ("2025-01-03", 3, 4),
> ("2025-01-03", 3, 5),
> ("2025-01-03", 4, 6),
> ("2025-01-03", 4, 7),
> ("2025-01-03", 5, 8),
> ("2025-01-03", 5, 9),
> ], schema="date string, user_id long, duration long")
> ---
>
> ---
> df.groupBy("date").agg(
> F.count_distinct('user_id').alias('n_users'),
> F.sum("duration")
> ).toPandas()
>
> date n_users sum(duration)
> 0 2025-01-03 5 45
> 1 2025-01-01 5 45
> 2 2025-01-02 5 45
> ---
>
> Herman van Hovell <[email protected]> 于2025年10月17日周五 00:56写道:
>
>> Hi FengYu,
>>
>> This is because of count_distinct() on user id. We rewrite this into a
>> multi-step aggregation where we first aggregate per on the grouping keys
>> (date) and the distinct values (user_id), and then we aggregate on the
>> grouping keys only. The first step is needed to get the distinct values.
>>
>> I hope this helps.
>>
>> Cheers,
>> Herman
>>
>> On Thu, Oct 16, 2025 at 6:08 AM FengYu Cao <[email protected]> wrote:
>>
>>> Hi Spark community,
>>>
>>> I encountered an unexpected behavior when using `percentile()` and
>>> `count_distinct()` in a simple groupBy aggregation, and I’d like to confirm
>>> whether this is expected behavior or a potential correctness issue.
>>>
>>> Environment:
>>> - Apache Spark 3.5.7
>>> - Data source: Parquet
>>> - Schema:
>>> root
>>> |-- user_id: long (nullable = true)
>>> |-- duration: long (nullable = true)
>>> |-- date: date (nullable = true)
>>>
>>> Reproduction code:
>>> ------------------------------------------------------------
>>> df.groupBy("date").agg(
>>> F.count_distinct('user_id').alias('n_users'),
>>> F.percentile('duration', 0.95).alias('p95')
>>> ).explain()
>>> ------------------------------------------------------------
>>>
>>> Physical plan:
>>> ------------------------------------------------------------
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- ObjectHashAggregate(keys=[date#561],
>>> functions=[percentile(duration#559L, 0.95, 1, 0, 0, false), count(distinct
>>> user_id#558L)])
>>> +- Exchange hashpartitioning(date#561, 200), ENSURE_REQUIREMENTS,
>>> [plan_id=717]
>>> +- ObjectHashAggregate(keys=[date#561],
>>> functions=[merge_percentile(duration#559L, 0.95, 1, 0, 0, false),
>>> partial_count(distinct user_id#558L)])
>>> +- ObjectHashAggregate(keys=[date#561, user_id#558L],
>>> functions=[merge_percentile(duration#559L, 0.95, 1, 0, 0, false)])
>>> +- Exchange hashpartitioning(date#561, user_id#558L, 200),
>>> ENSURE_REQUIREMENTS, [plan_id=713]
>>> +- ObjectHashAggregate(keys=[date#561, user_id#558L],
>>> functions=[partial_percentile(duration#559L, 0.95, 1, 0, 0, false)])
>>> +- FileScan parquet
>>> [user_id#558L,duration#559L,date#561] ...
>>> ------------------------------------------------------------
>>>
>>> Question:
>>> Why is there an additional `ObjectHashAggregate` on `(date, user_id)`
>>> when the logical aggregation only groups by `date`?
>>>
>>> For the case of groupBy("date") + count_distinct + sum, it seems fine.
>>> But for percentile, could this lead to a potential correctness issue?
>>> (when [date, user_id] is not unique)
>>>
>>> --
>>> *camper42 (曹丰宇)*
>>> Douban, Inc.
>>>
>>
>
> --
> *camper42 (曹丰宇)*
> Douban, Inc.
>
--
*camper42 (曹丰宇)*
Douban, Inc.
Mobile: +86 15691996359
E-mail: [email protected]