This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6cc73fa24d feat: Improve metrics for aggregate streams. (#18325)
6cc73fa24d is described below
commit 6cc73fa24d0d4d4006375e3bc20d6c267fc55c44
Author: Emily Matheys <[email protected]>
AuthorDate: Tue Oct 28 23:59:07 2025 +0200
feat: Improve metrics for aggregate streams. (#18325)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #18323 .
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
Adds more detailed metrics, so it is easier to identify which part of
the aggregate streams are actually slow.
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
Added a metrics struct, and used it in the functions common to the
aggregate streams.
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Yes, added some tests to verify the metrics are actually updated and can
be retrieved.
I've also ran the groupby benchmarks to ensure we don't create timers in
a way that could impact performance, and it seems ok, all the changes
are within what I'd expect as std variation on a local machine.
```
Comparing main and agg-metrics
--------------------
Benchmark h2o.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ agg-metrics ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 1252.42 ms │ 1196.62 ms │ no change │
│ QQuery 2 │ 3976.62 ms │ 3392.89 ms │ +1.17x faster │
│ QQuery 3 │ 3448.29 ms │ 2918.47 ms │ +1.18x faster │
│ QQuery 4 │ 1909.15 ms │ 1632.98 ms │ +1.17x faster │
│ QQuery 5 │ 3056.36 ms │ 2831.82 ms │ +1.08x faster │
│ QQuery 6 │ 2663.13 ms │ 2594.64 ms │ no change │
│ QQuery 7 │ 2802.28 ms │ 2592.43 ms │ +1.08x faster │
│ QQuery 8 │ 4489.29 ms │ 4199.00 ms │ +1.07x faster │
│ QQuery 9 │ 7001.75 ms │ 6622.98 ms │ +1.06x faster │
│ QQuery 10 │ 4725.80 ms │ 4619.37 ms │ no change │
└──────────────┴────────────┴─────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 35325.09ms │
│ Total Time (agg-metrics) │ 32601.19ms │
│ Average Time (main) │ 3532.51ms │
│ Average Time (agg-metrics) │ 3260.12ms │
│ Queries Faster │ 7 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 3 │
│ Queries with Failure │ 0 │
└────────────────────────────┴────────────┘
```
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
Nothing that is direct to the user, additional metrics will now be
available, but no breaking changes.
---------
Co-authored-by: Raz Luvaton <[email protected]>
Co-authored-by: Eshed Schacham <[email protected]>
---
.../src/aggregates/group_values/metrics.rs | 214 +++++++++++++++++++++
.../src/aggregates/group_values/mod.rs | 3 +
.../physical-plan/src/aggregates/row_hash.rs | 35 +++-
.../physical-plan/src/aggregates/topk_stream.rs | 33 +++-
4 files changed, 278 insertions(+), 7 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/group_values/metrics.rs
b/datafusion/physical-plan/src/aggregates/group_values/metrics.rs
new file mode 100644
index 0000000000..c4e29ea710
--- /dev/null
+++ b/datafusion/physical-plan/src/aggregates/group_values/metrics.rs
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for the various group-by implementations.
+
+use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
+
+pub(crate) struct GroupByMetrics {
+ /// Time spent calculating the group IDs from the evaluated grouping
columns.
+ pub(crate) time_calculating_group_ids: Time,
+ /// Time spent evaluating the inputs to the aggregate functions.
+ pub(crate) aggregate_arguments_time: Time,
+ /// Time spent evaluating the aggregate expressions themselves
+ /// (e.g. summing all elements and counting number of elements for `avg`
aggregate).
+ pub(crate) aggregation_time: Time,
+ /// Time spent emitting the final results and constructing the record batch
+ /// which includes finalizing the grouping expressions
+ /// (e.g. emit from the hash table in case of hash aggregation) and the
accumulators
+ pub(crate) emitting_time: Time,
+}
+
+impl GroupByMetrics {
+ pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) ->
Self {
+ Self {
+ time_calculating_group_ids: MetricBuilder::new(metrics)
+ .subset_time("time_calculating_group_ids", partition),
+ aggregate_arguments_time: MetricBuilder::new(metrics)
+ .subset_time("aggregate_arguments_time", partition),
+ aggregation_time: MetricBuilder::new(metrics)
+ .subset_time("aggregation_time", partition),
+ emitting_time: MetricBuilder::new(metrics)
+ .subset_time("emitting_time", partition),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
+ use crate::metrics::MetricsSet;
+ use crate::test::TestMemoryExec;
+ use crate::{collect, ExecutionPlan};
+ use arrow::array::{Float64Array, UInt32Array};
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow::record_batch::RecordBatch;
+ use datafusion_common::Result;
+ use datafusion_execution::TaskContext;
+ use datafusion_functions_aggregate::count::count_udaf;
+ use datafusion_functions_aggregate::sum::sum_udaf;
+ use datafusion_physical_expr::aggregate::AggregateExprBuilder;
+ use datafusion_physical_expr::expressions::col;
+ use std::sync::Arc;
+
+ /// Helper function to verify all three GroupBy metrics exist and have
non-zero values
+ fn assert_groupby_metrics(metrics: &MetricsSet) {
+ let agg_arguments_time =
metrics.sum_by_name("aggregate_arguments_time");
+ assert!(agg_arguments_time.is_some());
+ assert!(agg_arguments_time.unwrap().as_usize() > 0);
+
+ let aggregation_time = metrics.sum_by_name("aggregation_time");
+ assert!(aggregation_time.is_some());
+ assert!(aggregation_time.unwrap().as_usize() > 0);
+
+ let emitting_time = metrics.sum_by_name("emitting_time");
+ assert!(emitting_time.is_some());
+ assert!(emitting_time.unwrap().as_usize() > 0);
+ }
+
+ #[tokio::test]
+ async fn test_groupby_metrics_partial_mode() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::UInt32, false),
+ Field::new("b", DataType::Float64, false),
+ ]));
+
+ // Create multiple batches to ensure metrics accumulate
+ let batches = (0..5)
+ .map(|i| {
+ RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(UInt32Array::from(vec![1, 2, 3, 4])),
+ Arc::new(Float64Array::from(vec![
+ i as f64,
+ (i + 1) as f64,
+ (i + 2) as f64,
+ (i + 3) as f64,
+ ])),
+ ],
+ )
+ .unwrap()
+ })
+ .collect::<Vec<_>>();
+
+ let input = TestMemoryExec::try_new_exec(&[batches],
Arc::clone(&schema), None)?;
+
+ let group_by =
+ PhysicalGroupBy::new_single(vec![(col("a", &schema)?,
"a".to_string())]);
+
+ let aggregates = vec![
+ Arc::new(
+ AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
+ .schema(Arc::clone(&schema))
+ .alias("SUM(b)")
+ .build()?,
+ ),
+ Arc::new(
+ AggregateExprBuilder::new(count_udaf(), vec![col("b",
&schema)?])
+ .schema(Arc::clone(&schema))
+ .alias("COUNT(b)")
+ .build()?,
+ ),
+ ];
+
+ let aggregate_exec = Arc::new(AggregateExec::try_new(
+ AggregateMode::Partial,
+ group_by,
+ aggregates,
+ vec![None, None],
+ input,
+ schema,
+ )?);
+
+ let task_ctx = Arc::new(TaskContext::default());
+ let _result =
+ collect(Arc::clone(&aggregate_exec) as _,
Arc::clone(&task_ctx)).await?;
+
+ let metrics = aggregate_exec.metrics().unwrap();
+ assert_groupby_metrics(&metrics);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_groupby_metrics_final_mode() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::UInt32, false),
+ Field::new("b", DataType::Float64, false),
+ ]));
+
+ let batches = (0..3)
+ .map(|i| {
+ RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(UInt32Array::from(vec![1, 2, 3])),
+ Arc::new(Float64Array::from(vec![
+ i as f64,
+ (i + 1) as f64,
+ (i + 2) as f64,
+ ])),
+ ],
+ )
+ .unwrap()
+ })
+ .collect::<Vec<_>>();
+
+ let partial_input =
+ TestMemoryExec::try_new_exec(&[batches], Arc::clone(&schema),
None)?;
+
+ let group_by =
+ PhysicalGroupBy::new_single(vec![(col("a", &schema)?,
"a".to_string())]);
+
+ let aggregates = vec![Arc::new(
+ AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
+ .schema(Arc::clone(&schema))
+ .alias("SUM(b)")
+ .build()?,
+ )];
+
+ // Create partial aggregate
+ let partial_aggregate = Arc::new(AggregateExec::try_new(
+ AggregateMode::Partial,
+ group_by.clone(),
+ aggregates.clone(),
+ vec![None],
+ partial_input,
+ Arc::clone(&schema),
+ )?);
+
+ // Create final aggregate
+ let final_aggregate = Arc::new(AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by.as_final(),
+ aggregates,
+ vec![None],
+ partial_aggregate,
+ schema,
+ )?);
+
+ let task_ctx = Arc::new(TaskContext::default());
+ let _result =
+ collect(Arc::clone(&final_aggregate) as _,
Arc::clone(&task_ctx)).await?;
+
+ let metrics = final_aggregate.metrics().unwrap();
+ assert_groupby_metrics(&metrics);
+
+ Ok(())
+ }
+}
diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
index 316fbe11ae..5f2a2faa11 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
@@ -46,8 +46,11 @@ use crate::aggregates::{
order::GroupOrdering,
};
+mod metrics;
mod null_builder;
+pub(crate) use metrics::GroupByMetrics;
+
/// Stores the group values during hash aggregation.
///
/// # Background
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 6132a8b0ad..98c8cb235c 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -23,7 +23,7 @@ use std::vec;
use super::order::GroupOrdering;
use super::AggregateExec;
-use crate::aggregates::group_values::{new_group_values, GroupValues};
+use crate::aggregates::group_values::{new_group_values, GroupByMetrics,
GroupValues};
use crate::aggregates::order::GroupOrderingFull;
use crate::aggregates::{
create_schema, evaluate_group_by, evaluate_many, evaluate_optional,
AggregateMode,
@@ -49,6 +49,7 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_common::instant::Instant;
use futures::ready;
use futures::stream::{Stream, StreamExt};
use log::debug;
@@ -430,6 +431,9 @@ pub(crate) struct GroupedHashAggregateStream {
/// Execution metrics
baseline_metrics: BaselineMetrics,
+
+ /// Aggregation-specific metrics
+ group_by_metrics: GroupByMetrics,
}
impl GroupedHashAggregateStream {
@@ -447,6 +451,7 @@ impl GroupedHashAggregateStream {
let batch_size = context.session_config().batch_size();
let input = agg.input.execute(partition, Arc::clone(&context))?;
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
+ let group_by_metrics = GroupByMetrics::new(&agg.metrics, partition);
let timer = baseline_metrics.elapsed_compute().timer();
@@ -609,6 +614,7 @@ impl GroupedHashAggregateStream {
current_group_indices: Default::default(),
exec_state,
baseline_metrics,
+ group_by_metrics,
batch_size,
group_ordering,
input_done: false,
@@ -830,12 +836,25 @@ impl GroupedHashAggregateStream {
evaluate_group_by(&self.group_by, &batch)?
};
+ // Only create the timer if there are actual aggregate arguments to
evaluate
+ let timer = match (
+ self.spill_state.is_stream_merging,
+ self.spill_state.merging_aggregate_arguments.is_empty(),
+ self.aggregate_arguments.is_empty(),
+ ) {
+ (true, false, _) | (false, _, false) => {
+ Some(self.group_by_metrics.aggregate_arguments_time.timer())
+ }
+ _ => None,
+ };
+
// Evaluate the aggregation expressions.
let input_values = if self.spill_state.is_stream_merging {
evaluate_many(&self.spill_state.merging_aggregate_arguments,
&batch)?
} else {
evaluate_many(&self.aggregate_arguments, &batch)?
};
+ drop(timer);
// Evaluate the filter expressions, if any, against the inputs
let filter_values = if self.spill_state.is_stream_merging {
@@ -846,6 +865,8 @@ impl GroupedHashAggregateStream {
};
for group_values in &group_by_values {
+ let groups_start_time = Instant::now();
+
// calculate the group indices for each input row
let starting_num_groups = self.group_values.len();
self.group_values
@@ -862,6 +883,12 @@ impl GroupedHashAggregateStream {
)?;
}
+ // Use this instant for both measurements to save a syscall
+ let agg_start_time = Instant::now();
+ self.group_by_metrics
+ .time_calculating_group_ids
+ .add_duration(agg_start_time - groups_start_time);
+
// Gather the inputs to call the actual accumulator
let t = self
.accumulators
@@ -897,6 +924,9 @@ impl GroupedHashAggregateStream {
acc.merge_batch(values, group_indices, None,
total_num_groups)?;
}
}
+ self.group_by_metrics
+ .aggregation_time
+ .add_elapsed(agg_start_time);
}
}
@@ -941,6 +971,7 @@ impl GroupedHashAggregateStream {
return Ok(None);
}
+ let timer = self.group_by_metrics.emitting_time.timer();
let mut output = self.group_values.emit(emit_to)?;
if let EmitTo::First(n) = emit_to {
self.group_ordering.remove_groups(n);
@@ -961,12 +992,14 @@ impl GroupedHashAggregateStream {
| AggregateMode::SinglePartitioned =>
output.push(acc.evaluate(emit_to)?),
}
}
+ drop(timer);
// emit reduces the memory usage. Ignore Err from
update_memory_reservation. Even if it is
// over the target memory size after emission, we can emit again
rather than returning Err.
let _ = self.update_memory_reservation();
let batch = RecordBatch::try_new(schema, output)?;
debug_assert!(batch.num_rows() > 0);
+
Ok(Some(batch))
}
diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs
b/datafusion/physical-plan/src/aggregates/topk_stream.rs
index 9aaadfd52b..eb1b7543cb 100644
--- a/datafusion/physical-plan/src/aggregates/topk_stream.rs
+++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs
@@ -17,11 +17,13 @@
//! A memory-conscious aggregation implementation that limits group buckets to
a fixed number
+use crate::aggregates::group_values::GroupByMetrics;
use crate::aggregates::topk::priority_map::PriorityMap;
use crate::aggregates::{
aggregate_expressions, evaluate_group_by, evaluate_many, AggregateExec,
PhysicalGroupBy,
};
+use crate::metrics::BaselineMetrics;
use crate::{RecordBatchStream, SendableRecordBatchStream};
use arrow::array::{Array, ArrayRef, RecordBatch};
use arrow::datatypes::SchemaRef;
@@ -42,6 +44,8 @@ pub struct GroupedTopKAggregateStream {
started: bool,
schema: SchemaRef,
input: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
+ group_by_metrics: GroupByMetrics,
aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
group_by: PhysicalGroupBy,
priority_map: PriorityMap,
@@ -57,6 +61,8 @@ impl GroupedTopKAggregateStream {
let agg_schema = Arc::clone(&aggr.schema);
let group_by = aggr.group_by.clone();
let input = aggr.input.execute(partition, Arc::clone(&context))?;
+ let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition);
+ let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition);
let aggregate_arguments =
aggregate_expressions(&aggr.aggr_expr, &aggr.mode,
group_by.expr.len())?;
let (val_field, desc) = aggr
@@ -75,6 +81,8 @@ impl GroupedTopKAggregateStream {
row_count: 0,
schema: agg_schema,
input,
+ baseline_metrics,
+ group_by_metrics,
aggregate_arguments,
group_by,
priority_map,
@@ -90,6 +98,8 @@ impl RecordBatchStream for GroupedTopKAggregateStream {
impl GroupedTopKAggregateStream {
fn intern(&mut self, ids: ArrayRef, vals: ArrayRef) -> Result<()> {
+ let _timer = self.group_by_metrics.time_calculating_group_ids.timer();
+
let len = ids.len();
self.priority_map.set_batch(ids, Arc::clone(&vals));
@@ -111,7 +121,10 @@ impl Stream for GroupedTopKAggregateStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
+ let emitting_time = self.group_by_metrics.emitting_time.clone();
while let Poll::Ready(res) = self.input.poll_next_unpin(cx) {
+ let _timer = elapsed_compute.timer();
match res {
// got a batch, convert to rows and append to our TreeMap
Some(Ok(batch)) => {
@@ -140,10 +153,15 @@ impl Stream for GroupedTopKAggregateStream {
"Exactly 1 group value required"
);
let group_by_values = Arc::clone(&group_by_values[0][0]);
- let input_values = evaluate_many(
- &self.aggregate_arguments,
- batches.first().unwrap(),
- )?;
+ let input_values = {
+ let _timer =
(!self.aggregate_arguments.is_empty()).then(|| {
+
self.group_by_metrics.aggregate_arguments_time.timer()
+ });
+ evaluate_many(
+ &self.aggregate_arguments,
+ batches.first().unwrap(),
+ )?
+ };
assert_eq!(input_values.len(), 1, "Exactly 1 input
required");
assert_eq!(input_values[0].len(), 1, "Exactly 1 input
required");
let input_values = Arc::clone(&input_values[0][0]);
@@ -157,8 +175,11 @@ impl Stream for GroupedTopKAggregateStream {
trace!("partition {} emit None", self.partition);
return Poll::Ready(None);
}
- let cols = self.priority_map.emit()?;
- let batch = RecordBatch::try_new(Arc::clone(&self.schema),
cols)?;
+ let batch = {
+ let _timer = emitting_time.timer();
+ let cols = self.priority_map.emit()?;
+ RecordBatch::try_new(Arc::clone(&self.schema), cols)?
+ };
trace!(
"partition {} emit batch with {} rows",
self.partition,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]