This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 14f34f641e fix: preserve byte-size statistics in AggregateExec (#18885)
14f34f641e is described below
commit 14f34f641e6c11ca3719736db6cbd63a063ea8a5
Author: Tamar <[email protected]>
AuthorDate: Tue Nov 25 23:30:33 2025 +0200
fix: preserve byte-size statistics in AggregateExec (#18885)
Previously, AggregateExec dropped total_byte_size statistics
(Precision::Absent) through aggregation operations, preventing the
optimizer from making informed decisions about memory allocation and
execution strategies(join side selection -> dynamic filters).
This commit implements proportional byte-size scaling based on row count
ratios:
- Added calculate_scaled_byte_size helper with inline optimization
- Scales byte size for Final/FinalPartitioned without GROUP BY
- Scales byte size proportionally for all other aggregation modes
- Always returns Precision::Inexact for estimates (semantically correct)
- Returns Precision::Absent when insufficient input statistics
Added test coverage for edge cases (absent statistics, zero rows).
## Which issue does this PR close?
https://github.com/apache/datafusion/issues/18850
- Closes #18850
## Rationale for this change
Without byte-size statistics, the optimizer cannot estimate memory
requirements for join-side selection, dynamic filter generation, and
memory allocation decisions. This preserves statistics using
proportional scaling (bytes_per_row × output_rows).
## What changes are included in this PR?
1. Modified `statistics_inner` to calculate proportional byte size
instead of returning `Precision::Absent`
2. Added `calculate_scaled_byte_size` helper (inline optimized, guards
against division by zero)
3. Updated test assertions and added edge case coverage
## Are these changes tested?
Yes:
- New `test_aggregate_statistics_edge_cases` covers edge cases scenarios
- Existing tests confirm stats propagate correctly through the
aggregation pipeline
## Are there any user-facing changes?
No breaking changes.
Internal optimization that may improve query planning and provide more
accurate memory estimates in EXPLAIN output.
Co-authored-by: Daniël Heres <[email protected]>
---
datafusion/core/tests/dataframe/mod.rs | 102 +++++++++---------
.../physical_optimizer/partition_statistics.rs | 4 +-
datafusion/physical-plan/src/aggregates/mod.rs | 118 ++++++++++++++++++++-
datafusion/sqllogictest/test_files/union.slt | 20 ++--
4 files changed, 182 insertions(+), 62 deletions(-)
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index fb6dc3bcba..0c6ccf1b07 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -3325,30 +3325,33 @@ async fn test_count_wildcard_on_where_scalar_subquery()
-> Result<()> {
assert_snapshot!(
pretty_format_batches(&sql_results).unwrap(),
@r"
-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
- | plan_type | plan
|
-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
- | logical_plan | Projection: t1.a, t1.b
|
- | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL
THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) |
- | | Projection: t1.a, t1.b, __scalar_sq_1.count(*),
__scalar_sq_1.__always_true |
- | | Left Join: t1.a = __scalar_sq_1.a
|
- | | TableScan: t1 projection=[a, b]
|
- | | SubqueryAlias: __scalar_sq_1
|
- | | Projection: count(Int64(1)) AS count(*), t2.a,
Boolean(true) AS __always_true |
- | | Aggregate: groupBy=[[t2.a]],
aggr=[[count(Int64(1))]] |
- | | TableScan: t2 projection=[a]
|
- | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0
ELSE count(*)@2 END > 0, projection=[a@0, b@1] |
- | | CoalesceBatchesExec: target_batch_size=8192
|
- | | HashJoinExec: mode=CollectLeft, join_type=Left,
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
- | | DataSourceExec: partitions=1, partition_sizes=[1]
|
- | | ProjectionExec: expr=[count(Int64(1))@1 as
count(*), a@0 as a, true as __always_true] |
- | | AggregateExec: mode=FinalPartitioned, gby=[a@0
as a], aggr=[count(Int64(1))] |
- | | CoalesceBatchesExec: target_batch_size=8192
|
- | | RepartitionExec: partitioning=Hash([a@0],
4), input_partitions=1 |
- | | AggregateExec: mode=Partial, gby=[a@0 as
a], aggr=[count(Int64(1))] |
- | | DataSourceExec: partitions=1,
partition_sizes=[1] |
- | |
|
-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
+ | plan_type | plan
|
+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
+ | logical_plan | Projection: t1.a, t1.b
|
+ | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL
THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) |
+ | | Projection: t1.a, t1.b, __scalar_sq_1.count(*),
__scalar_sq_1.__always_true |
+ | | Left Join: t1.a = __scalar_sq_1.a
|
+ | | TableScan: t1 projection=[a, b]
|
+ | | SubqueryAlias: __scalar_sq_1
|
+ | | Projection: count(Int64(1)) AS count(*), t2.a,
Boolean(true) AS __always_true |
+ | | Aggregate: groupBy=[[t2.a]],
aggr=[[count(Int64(1))]]
|
+ | | TableScan: t2 projection=[a]
|
+ | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0
ELSE count(*)@2 END > 0, projection=[a@0, b@1] |
+ | | RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1 |
+ | | ProjectionExec: expr=[a@2 as a, b@3 as b, count(*)@0
as count(*), __always_true@1 as __always_true] |
+ | | CoalesceBatchesExec: target_batch_size=8192
|
+ | | HashJoinExec: mode=CollectLeft, join_type=Right,
on=[(a@1, a@0)], projection=[count(*)@0, __always_true@2, a@3, b@4] |
+ | | CoalescePartitionsExec
|
+ | | ProjectionExec: expr=[count(Int64(1))@1 as
count(*), a@0 as a, true as __always_true] |
+ | | AggregateExec: mode=FinalPartitioned,
gby=[a@0 as a], aggr=[count(Int64(1))] |
+ | | CoalesceBatchesExec:
target_batch_size=8192
|
+ | | RepartitionExec:
partitioning=Hash([a@0], 4), input_partitions=1
|
+ | | AggregateExec: mode=Partial,
gby=[a@0 as a], aggr=[count(Int64(1))] |
+ | | DataSourceExec: partitions=1,
partition_sizes=[1] |
+ | | DataSourceExec: partitions=1,
partition_sizes=[1]
|
+ | |
|
+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
"
);
@@ -3380,30 +3383,33 @@ async fn test_count_wildcard_on_where_scalar_subquery()
-> Result<()> {
assert_snapshot!(
pretty_format_batches(&df_results).unwrap(),
@r"
-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
- | plan_type | plan
|
-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
- | logical_plan | Projection: t1.a, t1.b
|
- | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL
THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) |
- | | Projection: t1.a, t1.b, __scalar_sq_1.count(*),
__scalar_sq_1.__always_true |
- | | Left Join: t1.a = __scalar_sq_1.a
|
- | | TableScan: t1 projection=[a, b]
|
- | | SubqueryAlias: __scalar_sq_1
|
- | | Projection: count(*), t2.a, Boolean(true) AS
__always_true |
- | | Aggregate: groupBy=[[t2.a]],
aggr=[[count(Int64(1)) AS count(*)]] |
- | | TableScan: t2 projection=[a]
|
- | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0
ELSE count(*)@2 END > 0, projection=[a@0, b@1] |
- | | CoalesceBatchesExec: target_batch_size=8192
|
- | | HashJoinExec: mode=CollectLeft, join_type=Left,
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
- | | DataSourceExec: partitions=1, partition_sizes=[1]
|
- | | ProjectionExec: expr=[count(*)@1 as count(*), a@0
as a, true as __always_true] |
- | | AggregateExec: mode=FinalPartitioned, gby=[a@0
as a], aggr=[count(*)] |
- | | CoalesceBatchesExec: target_batch_size=8192
|
- | | RepartitionExec: partitioning=Hash([a@0],
4), input_partitions=1 |
- | | AggregateExec: mode=Partial, gby=[a@0 as
a], aggr=[count(*)] |
- | | DataSourceExec: partitions=1,
partition_sizes=[1] |
- | |
|
-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
+ | plan_type | plan
|
+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
+ | logical_plan | Projection: t1.a, t1.b
|
+ | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL
THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) |
+ | | Projection: t1.a, t1.b, __scalar_sq_1.count(*),
__scalar_sq_1.__always_true |
+ | | Left Join: t1.a = __scalar_sq_1.a
|
+ | | TableScan: t1 projection=[a, b]
|
+ | | SubqueryAlias: __scalar_sq_1
|
+ | | Projection: count(*), t2.a, Boolean(true) AS
__always_true |
+ | | Aggregate: groupBy=[[t2.a]],
aggr=[[count(Int64(1)) AS count(*)]]
|
+ | | TableScan: t2 projection=[a]
|
+ | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0
ELSE count(*)@2 END > 0, projection=[a@0, b@1] |
+ | | RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1 |
+ | | ProjectionExec: expr=[a@2 as a, b@3 as b, count(*)@0
as count(*), __always_true@1 as __always_true] |
+ | | CoalesceBatchesExec: target_batch_size=8192
|
+ | | HashJoinExec: mode=CollectLeft, join_type=Right,
on=[(a@1, a@0)], projection=[count(*)@0, __always_true@2, a@3, b@4] |
+ | | CoalescePartitionsExec
|
+ | | ProjectionExec: expr=[count(*)@1 as
count(*), a@0 as a, true as __always_true] |
+ | | AggregateExec: mode=FinalPartitioned,
gby=[a@0 as a], aggr=[count(*)] |
+ | | CoalesceBatchesExec:
target_batch_size=8192
|
+ | | RepartitionExec:
partitioning=Hash([a@0], 4), input_partitions=1
|
+ | | AggregateExec: mode=Partial,
gby=[a@0 as a], aggr=[count(*)] |
+ | | DataSourceExec: partitions=1,
partition_sizes=[1] |
+ | | DataSourceExec: partitions=1,
partition_sizes=[1]
|
+ | |
|
+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
"
);
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index 7034b71fd5..7045cb8ea1 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -627,7 +627,7 @@ mod test {
let expected_p0_statistics = Statistics {
num_rows: Precision::Inexact(2),
- total_byte_size: Precision::Absent,
+ total_byte_size: Precision::Inexact(110),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Absent,
@@ -645,7 +645,7 @@ mod test {
let expected_p1_statistics = Statistics {
num_rows: Precision::Inexact(2),
- total_byte_size: Precision::Absent,
+ total_byte_size: Precision::Inexact(110),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Absent,
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 6bf59fd3d3..f175fd4cdb 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -792,10 +792,13 @@ impl AggregateExec {
AggregateMode::Final | AggregateMode::FinalPartitioned
if self.group_by.expr.is_empty() =>
{
+ let total_byte_size =
+ Self::calculate_scaled_byte_size(child_statistics, 1);
+
Ok(Statistics {
num_rows: Precision::Exact(1),
column_statistics,
- total_byte_size: Precision::Absent,
+ total_byte_size,
})
}
_ => {
@@ -815,14 +818,48 @@ impl AggregateExec {
} else {
Precision::Absent
};
+
+ let total_byte_size = num_rows
+ .get_value()
+ .and_then(|&output_rows| {
+ Self::calculate_scaled_byte_size(child_statistics,
output_rows)
+ .get_value()
+ .map(|&bytes| Precision::Inexact(bytes))
+ })
+ .unwrap_or(Precision::Absent);
+
Ok(Statistics {
num_rows,
column_statistics,
- total_byte_size: Precision::Absent,
+ total_byte_size,
})
}
}
}
+
+ /// Calculate scaled byte size based on row count ratio.
+ /// Returns `Precision::Absent` if input statistics are insufficient.
+ /// Returns `Precision::Inexact` with the scaled value otherwise.
+ ///
+ /// This is a simple heuristic that assumes uniform row sizes.
+ #[inline]
+ fn calculate_scaled_byte_size(
+ input_stats: &Statistics,
+ target_row_count: usize,
+ ) -> Precision<usize> {
+ match (
+ input_stats.num_rows.get_value(),
+ input_stats.total_byte_size.get_value(),
+ ) {
+ (Some(&input_rows), Some(&input_bytes)) if input_rows > 0 => {
+ let bytes_per_row = input_bytes as f64 / input_rows as f64;
+ let scaled_bytes =
+ (bytes_per_row * target_row_count as f64).ceil() as usize;
+ Precision::Inexact(scaled_bytes)
+ }
+ _ => Precision::Absent,
+ }
+ }
}
impl DisplayAs for AggregateExec {
@@ -1921,6 +1958,10 @@ mod tests {
input_schema,
)?);
+ // Verify statistics are preserved proportionally through aggregation
+ let final_stats = merged_aggregate.partition_statistics(None)?;
+ assert!(final_stats.total_byte_size.get_value().is_some());
+
let task_ctx = if spill {
// enlarge memory limit to let the final aggregation finish
new_spill_ctx(2, 2600)
@@ -3146,4 +3187,77 @@ mod tests {
run_test_with_spill_pool_if_necessary(20_000, false).await?;
Ok(())
}
+
+ #[tokio::test]
+ async fn test_aggregate_statistics_edge_cases() -> Result<()> {
+ use crate::test::exec::StatisticsExec;
+ use datafusion_common::ColumnStatistics;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Float64, false),
+ ]));
+
+ // Test 1: Absent statistics remain absent
+ let input = Arc::new(StatisticsExec::new(
+ Statistics {
+ num_rows: Precision::Exact(100),
+ total_byte_size: Precision::Absent,
+ column_statistics: vec![
+ ColumnStatistics::new_unknown(),
+ ColumnStatistics::new_unknown(),
+ ],
+ },
+ (*schema).clone(),
+ )) as Arc<dyn ExecutionPlan>;
+
+ let agg = Arc::new(AggregateExec::try_new(
+ AggregateMode::Final,
+ PhysicalGroupBy::default(),
+ vec![Arc::new(
+ AggregateExprBuilder::new(count_udaf(), vec![col("a",
&schema)?])
+ .schema(Arc::clone(&schema))
+ .alias("COUNT(a)")
+ .build()?,
+ )],
+ vec![None],
+ input,
+ Arc::clone(&schema),
+ )?);
+
+ let stats = agg.partition_statistics(None)?;
+ assert_eq!(stats.total_byte_size, Precision::Absent);
+
+ // Test 2: Zero rows returns Absent (can't estimate output size from
zero input)
+ let input_zero = Arc::new(StatisticsExec::new(
+ Statistics {
+ num_rows: Precision::Exact(0),
+ total_byte_size: Precision::Exact(0),
+ column_statistics: vec![
+ ColumnStatistics::new_unknown(),
+ ColumnStatistics::new_unknown(),
+ ],
+ },
+ (*schema).clone(),
+ )) as Arc<dyn ExecutionPlan>;
+
+ let agg_zero = Arc::new(AggregateExec::try_new(
+ AggregateMode::Final,
+ PhysicalGroupBy::default(),
+ vec![Arc::new(
+ AggregateExprBuilder::new(count_udaf(), vec![col("a",
&schema)?])
+ .schema(Arc::clone(&schema))
+ .alias("COUNT(a)")
+ .build()?,
+ )],
+ vec![None],
+ input_zero,
+ Arc::clone(&schema),
+ )?);
+
+ let stats_zero = agg_zero.partition_statistics(None)?;
+ assert_eq!(stats_zero.total_byte_size, Precision::Absent);
+
+ Ok(())
+ }
}
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index 9e63f79f45..c20598239c 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -307,17 +307,17 @@ logical_plan
physical_plan
01)UnionExec
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0,
CAST(t2.id AS Int32)@2), (name@1, name@1)], NullsEqual: true
+03)----HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(CAST(t2.id AS
Int32)@2, id@0), (name@1, name@1)], NullsEqual: true
04)------CoalescePartitionsExec
-05)--------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as
name], aggr=[]
-06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------RepartitionExec: partitioning=Hash([id@0, name@1], 4),
input_partitions=4
-08)--------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as
name], aggr=[]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
-11)------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32)
as CAST(t2.id AS Int32)]
-12)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-13)----------DataSourceExec: partitions=1, partition_sizes=[1]
+05)--------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS
Int32) as CAST(t2.id AS Int32)]
+06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+08)------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as
name], aggr=[]
+09)--------CoalesceBatchesExec: target_batch_size=2
+10)----------RepartitionExec: partitioning=Hash([id@0, name@1], 4),
input_partitions=4
+11)------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name],
aggr=[]
+12)--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+13)----------------DataSourceExec: partitions=1, partition_sizes=[1]
14)--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
15)----CoalesceBatchesExec: target_batch_size=2
16)------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(CAST(t2.id
AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1], NullsEqual:
true
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]