asolimando commented on code in PR #20926:
URL: https://github.com/apache/datafusion/pull/20926#discussion_r2942455195
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1083,14 +1097,32 @@ impl AggregateExec {
let num_rows = if let Some(value) =
child_statistics.num_rows.get_value()
{
if *value > 1 {
- child_statistics.num_rows.to_inexact()
+ let mut num_rows =
child_statistics.num_rows.to_inexact();
+
+ if !self.group_by.expr.is_empty() {
+ let ndv_product =
self.compute_group_ndv(child_statistics);
+ if let Some(ndv) = ndv_product {
+ let grouping_set_num =
self.group_by.groups.len();
+ let ndv_estimate =
ndv.saturating_mul(grouping_set_num);
+ num_rows = num_rows.map(|n|
n.min(ndv_estimate));
+ }
+ }
+
+ // If TopK mode is active, cap output rows by the limit
+ if let Some(limit_opts) = &self.limit_options {
+ num_rows = num_rows.map(|n|
n.min(limit_opts.limit));
+ }
+
+ num_rows
} else if *value == 0 {
child_statistics.num_rows
} else {
// num_rows = 1 case
let grouping_set_num = self.group_by.groups.len();
child_statistics.num_rows.map(|x| x * grouping_set_num)
}
+ } else if let Some(limit_opts) = &self.limit_options {
+ Precision::Inexact(limit_opts.limit)
Review Comment:
Here we might have `ndv` even if `num_rows` is unset, maybe we can return
`Inexact(min(ndv, limit))`?
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1083,14 +1097,32 @@ impl AggregateExec {
let num_rows = if let Some(value) =
child_statistics.num_rows.get_value()
{
if *value > 1 {
- child_statistics.num_rows.to_inexact()
+ let mut num_rows =
child_statistics.num_rows.to_inexact();
+
+ if !self.group_by.expr.is_empty() {
+ let ndv_product =
self.compute_group_ndv(child_statistics);
+ if let Some(ndv) = ndv_product {
+ let grouping_set_num =
self.group_by.groups.len();
+ let ndv_estimate =
ndv.saturating_mul(grouping_set_num);
Review Comment:
This doesn't look correct to me, grouping sets target different columns.
Consider `GROUPING SETS ((a), (b), (a, b))`, here you would compute `NDV(a)
* NDV(b) * 3`, while the number of distinct values should be `NDV(a) + NDV(b) +
NDV(a)*NDV(b)`.
[Trino](https://github.com/trinodb/trino/blob/43c8c3ba8bff814697c5926149ce13b9532f030b/core/trino-main/src/main/java/io/trino/cost/AggregationStatsRule.java#L53)
is bailing out for (multiple) grouping sets,
[Spark](https://github.com/apache/spark/blob/e8d8e6a8d040d26aae9571e968e0c64bda0875dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala#L38-L61)
does not take them into account as, AFAIU, it rewrites them as union of
aggregates at earlier phases.
It's fine to either bail out like Trino does, but if you want to support
this, the code would be something like this (pseudocode):
```
res = 0;
for each grouping set gs:
part_res = 0;
for each col in gs:
part_res *= ndv(col) + null_count(col(gs)) > 1 ? 1 : 0;
res += part_res;
```
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1113,6 +1145,24 @@ impl AggregateExec {
}
}
+ /// Computes `product(NDV_i + null_adjustment_i)` across all group-by
columns.
+ /// Returns `None` if any group-by column is not a direct column reference
+ /// or lacks `distinct_count` stats.
+ fn compute_group_ndv(&self, child_statistics: &Statistics) ->
Option<usize> {
+ let mut product: usize = 1;
+ for (expr, _) in self.group_by.expr.iter() {
+ let col = expr.as_any().downcast_ref::<Column>()?;
+ let col_stats = &child_statistics.column_statistics[col.index()];
+ let ndv = *col_stats.distinct_count.get_value()?;
Review Comment:
Since we multiply by ndv, we might end up with a total of zero if any ndv is
zero. If the column has only null values, you might have `num_rows > = 1` and
`ndv = 0`, so let's use `min(num_rows, ndv)` here to be more robust.
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1113,6 +1145,24 @@ impl AggregateExec {
}
}
+ /// Computes `product(NDV_i + null_adjustment_i)` across all group-by
columns.
+ /// Returns `None` if any group-by column is not a direct column reference
+ /// or lacks `distinct_count` stats.
+ fn compute_group_ndv(&self, child_statistics: &Statistics) ->
Option<usize> {
+ let mut product: usize = 1;
+ for (expr, _) in self.group_by.expr.iter() {
+ let col = expr.as_any().downcast_ref::<Column>()?;
+ let col_stats = &child_statistics.column_statistics[col.index()];
+ let ndv = *col_stats.distinct_count.get_value()?;
+ let null_adjustment = match col_stats.null_count.get_value() {
+ Some(&n) if n > 0 => 1usize,
+ _ => 0,
Review Comment:
Nit: it's a reasonable default but I'd add this to the comment of the
function explicitly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]