This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 679a85f808 Add metrics for skipped rows (#11706)
679a85f808 is described below
commit 679a85f808ce130ff354c762d29315eb2dc32b3f
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Aug 7 12:59:06 2024 -0400
Add metrics for skipped rows (#11706)
---
.../physical-plan/src/aggregates/row_hash.rs | 47 ++++++++++++++++------
1 file changed, 34 insertions(+), 13 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 1b84befb02..ed3d6d49f9 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -28,12 +28,12 @@ use crate::aggregates::{
PhysicalGroupBy,
};
use crate::common::IPCWriter;
-use crate::metrics::{BaselineMetrics, RecordOutput};
+use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput};
use crate::sorts::sort::sort_batch;
use crate::sorts::streaming_merge;
use crate::spill::read_spill_as_stream;
use crate::stream::RecordBatchStreamAdapter;
-use crate::{aggregates, ExecutionPlan, PhysicalExpr};
+use crate::{aggregates, metrics, ExecutionPlan, PhysicalExpr};
use crate::{RecordBatchStream, SendableRecordBatchStream};
use arrow::array::*;
@@ -117,10 +117,22 @@ struct SkipAggregationProbe {
/// Flag indicating that further updates of `SkipAggregationProbe`
/// state won't make any effect
is_locked: bool,
+
+ /// Number of rows where state was output without aggregation.
+ ///
+ /// * If 0, all input rows were aggregated (should_skip was always false)
+ ///
+ /// * if greater than zero, the number of rows which were output directly
+ /// without aggregation
+ skipped_aggregation_rows: metrics::Count,
}
impl SkipAggregationProbe {
- fn new(probe_rows_threshold: usize, probe_ratio_threshold: f64) -> Self {
+ fn new(
+ probe_rows_threshold: usize,
+ probe_ratio_threshold: f64,
+ skipped_aggregation_rows: metrics::Count,
+ ) -> Self {
Self {
input_rows: 0,
num_groups: 0,
@@ -128,6 +140,7 @@ impl SkipAggregationProbe {
probe_ratio_threshold,
should_skip: false,
is_locked: false,
+ skipped_aggregation_rows,
}
}
@@ -160,6 +173,11 @@ impl SkipAggregationProbe {
self.should_skip = false;
self.is_locked = true;
}
+
+ /// Record the number of rows that were output directly without aggregation
+ fn record_skipped(&mut self, batch: &RecordBatch) {
+ self.skipped_aggregation_rows.add(batch.num_rows());
+ }
}
/// HashTable based Grouping Aggregator
@@ -473,17 +491,17 @@ impl GroupedHashAggregateStream {
.all(|acc| acc.supports_convert_to_state())
&& agg_group_by.is_single()
{
+ let options = &context.session_config().options().execution;
+ let probe_rows_threshold =
+ options.skip_partial_aggregation_probe_rows_threshold;
+ let probe_ratio_threshold =
+ options.skip_partial_aggregation_probe_ratio_threshold;
+ let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics)
+ .counter("skipped_aggregation_rows", partition);
Some(SkipAggregationProbe::new(
- context
- .session_config()
- .options()
- .execution
- .skip_partial_aggregation_probe_rows_threshold,
- context
- .session_config()
- .options()
- .execution
- .skip_partial_aggregation_probe_ratio_threshold,
+ probe_rows_threshold,
+ probe_ratio_threshold,
+ skipped_aggregation_rows,
))
} else {
None
@@ -611,6 +629,9 @@ impl Stream for GroupedHashAggregateStream {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let _timer = elapsed_compute.timer();
+ if let Some(probe) =
self.skip_aggregation_probe.as_mut() {
+ probe.record_skipped(&batch);
+ }
let states = self.transform_to_states(batch)?;
return Poll::Ready(Some(Ok(
states.record_output(&self.baseline_metrics)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]