This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f162fd3255 refactor: include metric output_batches into 
BaselineMetrics (#18491)
f162fd3255 is described below

commit f162fd325565e14be8e4cace17d8a3a8b2764cc8
Author: Suhail <[email protected]>
AuthorDate: Mon Nov 10 05:08:13 2025 +0530

    refactor: include metric output_batches into BaselineMetrics (#18491)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #17027
    
    ## Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    `output_batches` should be a common metric in all operators, thus should
    ideally be added to `BaselineMetrics`
    ```
    > explain analyze select * from generate_series(1, 1000000) as t1(v1) order 
by v1 desc;
    
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | plan_type         | plan                                                  
                                                                                
                                                                                
               |
    
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | Plan with Metrics | SortExec: expr=[v1@0 DESC], 
preserve_partitioning=[false], metrics=[output_rows=1000000, 
elapsed_compute=535.320324ms, output_bytes=7.6 MB, output_batches=123, 
spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, batches_split=0] |
    |                   |   ProjectionExec: expr=[value@0 as v1], 
metrics=[output_rows=1000000, elapsed_compute=208.379µs, output_bytes=7.7 MB, 
output_batches=123]                                                             
                               |
    |                   |     LazyMemoryExec: partitions=1, 
batch_generators=[generate_series: start=1, end=1000000, batch_size=8192], 
metrics=[output_rows=1000000, elapsed_compute=15.924291ms, output_bytes=7.7 MB, 
output_batches=123]                     |
    |                   |                                                       
                                                                                
                                                                                
               |
    
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    1 row(s) fetched.
    Elapsed 0.492 second
    ```
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    - Added `output_batches` into `BaselineMetrics` with `DEV` MetricType
    - Tracked through `record_poll()` API
    - Changes are similar to https://github.com/apache/datafusion/pull/18268
    - Refactored `assert_metrics` macro to take multiple metrics strings for
    substring check
    - Added `output_bytes` and `output_batches` tracking in `TopK` operator
    - Added `baseline` metrics for `RepartitionExec`
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    Added UT
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
    Changes in the `EXPLAIN ANALYZE` output, `output_batches` will be added
    to `metrics=[...]`
---
 datafusion/core/tests/sql/explain_analyze.rs       | 102 ++++++++++++---------
 datafusion/core/tests/sql/mod.rs                   |  18 ++--
 datafusion/physical-plan/src/joins/cross_join.rs   |   1 -
 .../physical-plan/src/joins/hash_join/stream.rs    |   4 -
 .../physical-plan/src/joins/nested_loop_join.rs    |   4 -
 .../src/joins/sort_merge_join/metrics.rs           |   8 --
 .../src/joins/sort_merge_join/stream.rs            |   6 +-
 .../physical-plan/src/joins/stream_join_utils.rs   |   6 --
 .../physical-plan/src/joins/symmetric_hash_join.rs |   1 -
 datafusion/physical-plan/src/joins/utils.rs        |   6 --
 datafusion/physical-plan/src/metrics/baseline.rs   |  14 +++
 datafusion/physical-plan/src/metrics/builder.rs    |   8 ++
 datafusion/physical-plan/src/metrics/mod.rs        |   1 +
 datafusion/physical-plan/src/metrics/value.rs      |  43 +++++----
 datafusion/physical-plan/src/repartition/mod.rs    |  31 +++++--
 datafusion/physical-plan/src/sorts/sort.rs         |   5 +-
 datafusion/physical-plan/src/topk/mod.rs           |   6 +-
 datafusion/physical-plan/src/unnest.rs             |   9 +-
 docs/source/user-guide/metrics.md                  |   1 +
 19 files changed, 157 insertions(+), 117 deletions(-)

diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 26b71b5496..e56d4e6d8b 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -61,12 +61,9 @@ async fn explain_analyze_baseline_metrics() {
     assert_metrics!(
         &formatted,
         "AggregateExec: mode=Partial, gby=[]",
-        "metrics=[output_rows=3, elapsed_compute="
-    );
-    assert_metrics!(
-        &formatted,
-        "AggregateExec: mode=Partial, gby=[]",
-        "output_bytes="
+        "metrics=[output_rows=3, elapsed_compute=",
+        "output_bytes=",
+        "output_batches=3"
     );
 
     assert_metrics!(
@@ -75,59 +72,76 @@ async fn explain_analyze_baseline_metrics() {
         "reduction_factor=5.1% (5/99)"
     );
 
-    assert_metrics!(
-        &formatted,
-        "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
-        "metrics=[output_rows=5, elapsed_compute="
-    );
-    assert_metrics!(
-        &formatted,
-        "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
-        "output_bytes="
-    );
-    assert_metrics!(
-        &formatted,
-        "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
-        "metrics=[output_rows=99, elapsed_compute="
-    );
+    {
+        let expected_batch_count_after_repartition =
+            if cfg!(not(feature = "force_hash_collisions")) {
+                "output_batches=3"
+            } else {
+                "output_batches=1"
+            };
+
+        assert_metrics!(
+            &formatted,
+            "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
+            "metrics=[output_rows=5, elapsed_compute=",
+            "output_bytes=",
+            expected_batch_count_after_repartition
+        );
+
+        assert_metrics!(
+            &formatted,
+            "RepartitionExec: partitioning=Hash([c1@0], 3), 
input_partitions=3",
+            "metrics=[output_rows=5, elapsed_compute=",
+            "output_bytes=",
+            expected_batch_count_after_repartition
+        );
+
+        assert_metrics!(
+            &formatted,
+            "ProjectionExec: expr=[]",
+            "metrics=[output_rows=5, elapsed_compute=",
+            "output_bytes=",
+            expected_batch_count_after_repartition
+        );
+
+        assert_metrics!(
+            &formatted,
+            "CoalesceBatchesExec: target_batch_size=4096",
+            "metrics=[output_rows=5, elapsed_compute",
+            "output_bytes=",
+            expected_batch_count_after_repartition
+        );
+    }
+
     assert_metrics!(
         &formatted,
         "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
-        "output_bytes="
+        "metrics=[output_rows=99, elapsed_compute=",
+        "output_bytes=",
+        "output_batches=1"
     );
+
     assert_metrics!(
         &formatted,
         "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
         "selectivity=99% (99/100)"
     );
-    assert_metrics!(
-        &formatted,
-        "ProjectionExec: expr=[]",
-        "metrics=[output_rows=5, elapsed_compute="
-    );
-    assert_metrics!(&formatted, "ProjectionExec: expr=[]", "output_bytes=");
-    assert_metrics!(
-        &formatted,
-        "CoalesceBatchesExec: target_batch_size=4096",
-        "metrics=[output_rows=5, elapsed_compute"
-    );
-    assert_metrics!(
-        &formatted,
-        "CoalesceBatchesExec: target_batch_size=4096",
-        "output_bytes="
-    );
+
     assert_metrics!(
         &formatted,
         "UnionExec",
-        "metrics=[output_rows=3, elapsed_compute="
+        "metrics=[output_rows=3, elapsed_compute=",
+        "output_bytes=",
+        "output_batches=3"
     );
-    assert_metrics!(&formatted, "UnionExec", "output_bytes=");
+
     assert_metrics!(
         &formatted,
         "WindowAggExec",
-        "metrics=[output_rows=1, elapsed_compute="
+        "metrics=[output_rows=1, elapsed_compute=",
+        "output_bytes=",
+        "output_batches=1"
     );
-    assert_metrics!(&formatted, "WindowAggExec", "output_bytes=");
 
     fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
         use datafusion::physical_plan;
@@ -228,9 +242,13 @@ async fn explain_analyze_level() {
 
     for (level, needle, should_contain) in [
         (ExplainAnalyzeLevel::Summary, "spill_count", false),
+        (ExplainAnalyzeLevel::Summary, "output_batches", false),
         (ExplainAnalyzeLevel::Summary, "output_rows", true),
+        (ExplainAnalyzeLevel::Summary, "output_bytes", true),
         (ExplainAnalyzeLevel::Dev, "spill_count", true),
         (ExplainAnalyzeLevel::Dev, "output_rows", true),
+        (ExplainAnalyzeLevel::Dev, "output_bytes", true),
+        (ExplainAnalyzeLevel::Dev, "output_batches", true),
     ] {
         let plan = collect_plan(sql, level).await;
         assert_eq!(
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 743c8750b5..426ec213b3 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -40,18 +40,24 @@ use std::io::Write;
 use std::path::PathBuf;
 use tempfile::TempDir;
 
-/// A macro to assert that some particular line contains two substrings
+/// A macro to assert that some particular line contains the given substrings
 ///
-/// Usage: `assert_metrics!(actual, operator_name, metrics)`
+/// Usage: `assert_metrics!(actual, operator_name, metrics_1, metrics_2, ...)`
 macro_rules! assert_metrics {
-    ($ACTUAL: expr, $OPERATOR_NAME: expr, $METRICS: expr) => {
+    ($ACTUAL: expr, $OPERATOR_NAME: expr, $($METRICS: expr),+) => {
         let found = $ACTUAL
             .lines()
-            .any(|line| line.contains($OPERATOR_NAME) && 
line.contains($METRICS));
+            .any(|line| line.contains($OPERATOR_NAME) $( && 
line.contains($METRICS))+);
+
+        let mut metrics = String::new();
+        $(metrics.push_str(format!(" '{}',", $METRICS).as_str());)+
+        // remove the last `,` from the string
+        metrics.pop();
+
         assert!(
             found,
-            "Can not find a line with both '{}' and '{}' in\n\n{}",
-            $OPERATOR_NAME, $METRICS, $ACTUAL
+            "Cannot find a line with operator name '{}' and metrics containing 
values {} in :\n\n{}",
+            $OPERATOR_NAME, metrics, $ACTUAL
         );
     };
 }
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs 
b/datafusion/physical-plan/src/joins/cross_join.rs
index fc32bb6fc9..2c531786c9 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -650,7 +650,6 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
                         self.left_index += 1;
                     }
 
-                    self.join_metrics.output_batches.add(1);
                     return Ok(StatefulStreamResult::Ready(Some(batch)));
                 }
             }
diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs 
b/datafusion/physical-plan/src/joins/hash_join/stream.rs
index bb3465365e..1f4aeecb29 100644
--- a/datafusion/physical-plan/src/joins/hash_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs
@@ -494,7 +494,6 @@ impl HashJoinStream {
                 &self.column_indices,
                 self.join_type,
             )?;
-            self.join_metrics.output_batches.add(1);
             timer.done();
 
             self.state = HashJoinStreamState::FetchProbeBatch;
@@ -597,7 +596,6 @@ impl HashJoinStream {
             )?
         };
 
-        self.join_metrics.output_batches.add(1);
         timer.done();
 
         if next_offset.is_none() {
@@ -653,8 +651,6 @@ impl HashJoinStream {
         if let Ok(ref batch) = result {
             self.join_metrics.input_batches.add(1);
             self.join_metrics.input_rows.add(batch.num_rows());
-
-            self.join_metrics.output_batches.add(1);
         }
         timer.done();
 
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 1f0cdf391c..9377ace33a 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -1483,10 +1483,6 @@ impl NestedLoopJoinStream {
     fn maybe_flush_ready_batch(&mut self) -> 
Option<Poll<Option<Result<RecordBatch>>>> {
         if self.output_buffer.has_completed_batch() {
             if let Some(batch) = self.output_buffer.next_completed_batch() {
-                // HACK: this is not part of `BaselineMetrics` yet, so update 
it
-                // manually
-                self.metrics.join_metrics.output_batches.add(1);
-
                 // Update output rows for selectivity metric
                 let output_rows = batch.num_rows();
                 self.metrics.selectivity.add_part(output_rows);
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs
index 5920cd663a..ac476853d5 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs
@@ -31,8 +31,6 @@ pub(super) struct SortMergeJoinMetrics {
     input_batches: Count,
     /// Number of rows consumed by this operator
     input_rows: Count,
-    /// Number of batches produced by this operator
-    output_batches: Count,
     /// Execution metrics
     baseline_metrics: BaselineMetrics,
     /// Peak memory used for buffered data.
@@ -49,8 +47,6 @@ impl SortMergeJoinMetrics {
         let input_batches =
             MetricBuilder::new(metrics).counter("input_batches", partition);
         let input_rows = MetricBuilder::new(metrics).counter("input_rows", 
partition);
-        let output_batches =
-            MetricBuilder::new(metrics).counter("output_batches", partition);
         let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", 
partition);
         let spill_metrics = SpillMetrics::new(metrics, partition);
 
@@ -60,7 +56,6 @@ impl SortMergeJoinMetrics {
             join_time,
             input_batches,
             input_rows,
-            output_batches,
             baseline_metrics,
             peak_mem_used,
             spill_metrics,
@@ -82,9 +77,6 @@ impl SortMergeJoinMetrics {
     pub fn input_rows(&self) -> Count {
         self.input_rows.clone()
     }
-    pub fn output_batches(&self) -> Count {
-        self.output_batches.clone()
-    }
 
     pub fn peak_mem_used(&self) -> Gauge {
         self.peak_mem_used.clone()
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
index 1185866b9f..28020450c4 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
@@ -35,6 +35,7 @@ use std::task::{Context, Poll};
 
 use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics;
 use crate::joins::utils::{compare_join_arrays, JoinFilter};
+use crate::metrics::RecordOutput;
 use crate::spill::spill_manager::SpillManager;
 use crate::{PhysicalExpr, RecordBatchStream, SendableRecordBatchStream};
 
@@ -1462,10 +1463,7 @@ impl SortMergeJoinStream {
     fn output_record_batch_and_reset(&mut self) -> Result<RecordBatch> {
         let record_batch =
             concat_batches(&self.schema, 
&self.staging_output_record_batches.batches)?;
-        self.join_metrics.output_batches().add(1);
-        self.join_metrics
-            .baseline_metrics()
-            .record_output(record_batch.num_rows());
+        (&record_batch).record_output(&self.join_metrics.baseline_metrics());
         // If join filter exists, `self.output_size` is not accurate as we 
don't know the exact
         // number of rows in the output record batch. If streamed row joined 
with buffered rows,
         // once join filter is applied, the number of output rows may be more 
than 1.
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs 
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index 80221a7799..f4a3cd92f1 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -682,8 +682,6 @@ pub struct StreamJoinMetrics {
     pub(crate) right: StreamJoinSideMetrics,
     /// Memory used by sides in bytes
     pub(crate) stream_memory_usage: metrics::Gauge,
-    /// Number of batches produced by this operator
-    pub(crate) output_batches: metrics::Count,
     /// Number of rows produced by this operator
     pub(crate) baseline_metrics: BaselineMetrics,
 }
@@ -709,13 +707,9 @@ impl StreamJoinMetrics {
         let stream_memory_usage =
             MetricBuilder::new(metrics).gauge("stream_memory_usage", 
partition);
 
-        let output_batches =
-            MetricBuilder::new(metrics).counter("output_batches", partition);
-
         Self {
             left,
             right,
-            output_batches,
             stream_memory_usage,
             baseline_metrics: BaselineMetrics::new(metrics, partition),
         }
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index be4646e88b..a9a2bbff42 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -1376,7 +1376,6 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
                     }
                 }
                 Some((batch, _)) => {
-                    self.metrics.output_batches.add(1);
                     return self
                         .metrics
                         .baseline_metrics
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index 9b589b674c..6ff8298154 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -1327,8 +1327,6 @@ pub(crate) struct BuildProbeJoinMetrics {
     pub(crate) input_batches: metrics::Count,
     /// Number of rows consumed by probe-side this operator
     pub(crate) input_rows: metrics::Count,
-    /// Number of batches produced by this operator
-    pub(crate) output_batches: metrics::Count,
 }
 
 // This Drop implementation updates the elapsed compute part of the metrics.
@@ -1372,9 +1370,6 @@ impl BuildProbeJoinMetrics {
 
         let input_rows = MetricBuilder::new(metrics).counter("input_rows", 
partition);
 
-        let output_batches =
-            MetricBuilder::new(metrics).counter("output_batches", partition);
-
         Self {
             build_time,
             build_input_batches,
@@ -1383,7 +1378,6 @@ impl BuildProbeJoinMetrics {
             join_time,
             input_batches,
             input_rows,
-            output_batches,
             baseline,
         }
     }
diff --git a/datafusion/physical-plan/src/metrics/baseline.rs 
b/datafusion/physical-plan/src/metrics/baseline.rs
index 858773b946..8dc2f30d9f 100644
--- a/datafusion/physical-plan/src/metrics/baseline.rs
+++ b/datafusion/physical-plan/src/metrics/baseline.rs
@@ -63,6 +63,9 @@ pub struct BaselineMetrics {
     /// multiple times.
     /// Issue: <https://github.com/apache/datafusion/issues/16841>
     output_bytes: Count,
+
+    /// output batches: the total output batch count
+    output_batches: Count,
     // Remember to update `docs/source/user-guide/metrics.md` when updating 
comments
     // or adding new metrics
 }
@@ -86,6 +89,9 @@ impl BaselineMetrics {
             output_bytes: MetricBuilder::new(metrics)
                 .with_type(super::MetricType::SUMMARY)
                 .output_bytes(partition),
+            output_batches: MetricBuilder::new(metrics)
+                .with_type(super::MetricType::DEV)
+                .output_batches(partition),
         }
     }
 
@@ -100,6 +106,7 @@ impl BaselineMetrics {
             elapsed_compute: self.elapsed_compute.clone(),
             output_rows: Default::default(),
             output_bytes: Default::default(),
+            output_batches: Default::default(),
         }
     }
 
@@ -113,6 +120,11 @@ impl BaselineMetrics {
         &self.output_rows
     }
 
+    /// return the metric for the total number of output batches produced
+    pub fn output_batches(&self) -> &Count {
+        &self.output_batches
+    }
+
     /// Records the fact that this operator's execution is complete
     /// (recording the `end_time` metric).
     ///
@@ -229,6 +241,7 @@ impl RecordOutput for RecordBatch {
         bm.record_output(self.num_rows());
         let n_bytes = get_record_batch_memory_size(&self);
         bm.output_bytes.add(n_bytes);
+        bm.output_batches.add(1);
         self
     }
 }
@@ -238,6 +251,7 @@ impl RecordOutput for &RecordBatch {
         bm.record_output(self.num_rows());
         let n_bytes = get_record_batch_memory_size(self);
         bm.output_bytes.add(n_bytes);
+        bm.output_batches.add(1);
         self
     }
 }
diff --git a/datafusion/physical-plan/src/metrics/builder.rs 
b/datafusion/physical-plan/src/metrics/builder.rs
index 6ea947b6d2..91b2440122 100644
--- a/datafusion/physical-plan/src/metrics/builder.rs
+++ b/datafusion/physical-plan/src/metrics/builder.rs
@@ -161,6 +161,14 @@ impl<'a> MetricBuilder<'a> {
         count
     }
 
+    /// Consume self and create a new counter for recording total output 
batches
+    pub fn output_batches(self, partition: usize) -> Count {
+        let count = Count::new();
+        self.with_partition(partition)
+            .build(MetricValue::OutputBatches(count.clone()));
+        count
+    }
+
     /// Consume self and create a new gauge for reporting current memory usage
     pub fn mem_used(self, partition: usize) -> Gauge {
         let gauge = Gauge::new();
diff --git a/datafusion/physical-plan/src/metrics/mod.rs 
b/datafusion/physical-plan/src/metrics/mod.rs
index 4e98af722d..613c031808 100644
--- a/datafusion/physical-plan/src/metrics/mod.rs
+++ b/datafusion/physical-plan/src/metrics/mod.rs
@@ -299,6 +299,7 @@ impl MetricsSet {
             MetricValue::SpillCount(_) => false,
             MetricValue::SpilledBytes(_) => false,
             MetricValue::OutputBytes(_) => false,
+            MetricValue::OutputBatches(_) => false,
             MetricValue::SpilledRows(_) => false,
             MetricValue::CurrentMemoryUsage(_) => false,
             MetricValue::Gauge { name, .. } => name == metric_name,
diff --git a/datafusion/physical-plan/src/metrics/value.rs 
b/datafusion/physical-plan/src/metrics/value.rs
index 298d63e5e2..7f31f75794 100644
--- a/datafusion/physical-plan/src/metrics/value.rs
+++ b/datafusion/physical-plan/src/metrics/value.rs
@@ -551,6 +551,8 @@ pub enum MetricValue {
     SpilledBytes(Count),
     /// Total size of output bytes produced: "output_bytes" metric
     OutputBytes(Count),
+    /// Total number of output batches produced: "output_batches" metric
+    OutputBatches(Count),
     /// Total size of spilled rows produced: "spilled_rows" metric
     SpilledRows(Count),
     /// Current memory used
@@ -618,6 +620,9 @@ impl PartialEq for MetricValue {
             (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) 
=> {
                 count == other
             }
+            (MetricValue::OutputBatches(count), 
MetricValue::OutputBatches(other)) => {
+                count == other
+            }
             (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) 
=> {
                 count == other
             }
@@ -699,6 +704,7 @@ impl MetricValue {
             Self::SpillCount(_) => "spill_count",
             Self::SpilledBytes(_) => "spilled_bytes",
             Self::OutputBytes(_) => "output_bytes",
+            Self::OutputBatches(_) => "output_batches",
             Self::SpilledRows(_) => "spilled_rows",
             Self::CurrentMemoryUsage(_) => "mem_used",
             Self::ElapsedCompute(_) => "elapsed_compute",
@@ -721,6 +727,7 @@ impl MetricValue {
             Self::SpillCount(count) => count.value(),
             Self::SpilledBytes(bytes) => bytes.value(),
             Self::OutputBytes(bytes) => bytes.value(),
+            Self::OutputBatches(count) => count.value(),
             Self::SpilledRows(count) => count.value(),
             Self::CurrentMemoryUsage(used) => used.value(),
             Self::ElapsedCompute(time) => time.value(),
@@ -755,6 +762,7 @@ impl MetricValue {
             Self::SpillCount(_) => Self::SpillCount(Count::new()),
             Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
             Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
+            Self::OutputBatches(_) => Self::OutputBatches(Count::new()),
             Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
             Self::CurrentMemoryUsage(_) => 
Self::CurrentMemoryUsage(Gauge::new()),
             Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
@@ -802,6 +810,7 @@ impl MetricValue {
             | (Self::SpillCount(count), Self::SpillCount(other_count))
             | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
             | (Self::OutputBytes(count), Self::OutputBytes(other_count))
+            | (Self::OutputBatches(count), Self::OutputBatches(other_count))
             | (Self::SpilledRows(count), Self::SpilledRows(other_count))
             | (
                 Self::Count { count, .. },
@@ -879,6 +888,7 @@ impl MetricValue {
             Self::OutputRows(_) => 0,
             Self::ElapsedCompute(_) => 1,
             Self::OutputBytes(_) => 2,
+            Self::OutputBatches(_) => 3,
             // Other metrics
             Self::PruningMetrics { name, .. } => match name.as_ref() {
                 // The following metrics belong to `DataSourceExec` with a 
Parquet data source.
@@ -888,23 +898,23 @@ impl MetricValue {
                 // You may update these metrics as long as their relative 
order remains unchanged.
                 //
                 // Reference PR: 
<https://github.com/apache/datafusion/pull/18379>
-                "files_ranges_pruned_statistics" => 3,
-                "row_groups_pruned_statistics" => 4,
-                "row_groups_pruned_bloom_filter" => 5,
-                "page_index_rows_pruned" => 6,
-                _ => 7,
+                "files_ranges_pruned_statistics" => 4,
+                "row_groups_pruned_statistics" => 5,
+                "row_groups_pruned_bloom_filter" => 6,
+                "page_index_rows_pruned" => 7,
+                _ => 8,
             },
-            Self::SpillCount(_) => 8,
-            Self::SpilledBytes(_) => 9,
-            Self::SpilledRows(_) => 10,
-            Self::CurrentMemoryUsage(_) => 11,
-            Self::Count { .. } => 12,
-            Self::Gauge { .. } => 13,
-            Self::Time { .. } => 14,
-            Self::Ratio { .. } => 15,
-            Self::StartTimestamp(_) => 16, // show timestamps last
-            Self::EndTimestamp(_) => 17,
-            Self::Custom { .. } => 18,
+            Self::SpillCount(_) => 9,
+            Self::SpilledBytes(_) => 10,
+            Self::SpilledRows(_) => 11,
+            Self::CurrentMemoryUsage(_) => 12,
+            Self::Count { .. } => 13,
+            Self::Gauge { .. } => 14,
+            Self::Time { .. } => 15,
+            Self::Ratio { .. } => 16,
+            Self::StartTimestamp(_) => 17, // show timestamps last
+            Self::EndTimestamp(_) => 18,
+            Self::Custom { .. } => 19,
         }
     }
 
@@ -919,6 +929,7 @@ impl Display for MetricValue {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
         match self {
             Self::OutputRows(count)
+            | Self::OutputBatches(count)
             | Self::SpillCount(count)
             | Self::SpilledRows(count)
             | Self::Count { count, .. } => {
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 74cf798895..8f73fe86cf 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -915,6 +915,7 @@ impl ExecutionPlan for RepartitionExec {
                             Arc::clone(&reservation),
                             spill_stream,
                             1, // Each receiver handles one input partition
+                            BaselineMetrics::new(&metrics, partition),
                         )) as SendableRecordBatchStream
                     })
                     .collect::<Vec<_>>();
@@ -952,6 +953,7 @@ impl ExecutionPlan for RepartitionExec {
                     reservation,
                     spill_stream,
                     num_input_partitions,
+                    BaselineMetrics::new(&metrics, partition),
                 )) as SendableRecordBatchStream)
             }
         })
@@ -1402,6 +1404,9 @@ struct PerPartitionStream {
     /// In non-preserve-order mode, multiple input partitions send to the same 
channel,
     /// each sending None when complete. We must wait for all of them.
     remaining_partitions: usize,
+
+    /// Execution metrics
+    baseline_metrics: BaselineMetrics,
 }
 
 impl PerPartitionStream {
@@ -1412,6 +1417,7 @@ impl PerPartitionStream {
         reservation: SharedMemoryReservation,
         spill_stream: SendableRecordBatchStream,
         num_input_partitions: usize,
+        baseline_metrics: BaselineMetrics,
     ) -> Self {
         Self {
             schema,
@@ -1421,18 +1427,17 @@ impl PerPartitionStream {
             spill_stream,
             state: StreamState::ReadingMemory,
             remaining_partitions: num_input_partitions,
+            baseline_metrics,
         }
     }
-}
-
-impl Stream for PerPartitionStream {
-    type Item = Result<RecordBatch>;
 
-    fn poll_next(
-        mut self: Pin<&mut Self>,
+    fn poll_next_inner(
+        self: &mut Pin<&mut Self>,
         cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
+    ) -> Poll<Option<Result<RecordBatch>>> {
         use futures::StreamExt;
+        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
+        let _timer = cloned_time.timer();
 
         loop {
             match self.state {
@@ -1508,6 +1513,18 @@ impl Stream for PerPartitionStream {
     }
 }
 
+impl Stream for PerPartitionStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let poll = self.poll_next_inner(cx);
+        self.baseline_metrics.record_poll(poll)
+    }
+}
+
 impl RecordBatchStream for PerPartitionStream {
     /// Get the schema
     fn schema(&self) -> SchemaRef {
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index a95fad19f6..2b31ff3da9 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -34,7 +34,8 @@ use crate::filter_pushdown::{
 };
 use crate::limit::LimitStream;
 use crate::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, 
SplitMetrics,
+    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, 
SpillMetrics,
+    SplitMetrics,
 };
 use crate::projection::{make_with_child, update_ordering, ProjectionExec};
 use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
@@ -738,7 +739,7 @@ impl ExternalSorter {
 
             let sorted = sort_batch(&batch, &expressions, None)?;
 
-            metrics.record_output(sorted.num_rows());
+            (&sorted).record_output(&metrics);
             drop(batch);
             drop(reservation);
             Ok(sorted)
diff --git a/datafusion/physical-plan/src/topk/mod.rs 
b/datafusion/physical-plan/src/topk/mod.rs
index 9435de1cc4..0b5ab784df 100644
--- a/datafusion/physical-plan/src/topk/mod.rs
+++ b/datafusion/physical-plan/src/topk/mod.rs
@@ -26,7 +26,9 @@ use datafusion_expr::{ColumnarValue, Operator};
 use std::mem::size_of;
 use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
 
-use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, 
MetricBuilder};
+use super::metrics::{
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, 
RecordOutput,
+};
 use crate::spill::get_record_batch_memory_size;
 use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
 
@@ -596,7 +598,7 @@ impl TopK {
         // break into record batches as needed
         let mut batches = vec![];
         if let Some(mut batch) = heap.emit()? {
-            metrics.baseline.output_rows().add(batch.num_rows());
+            (&batch).record_output(&metrics.baseline);
 
             loop {
                 if batch.num_rows() <= batch_size {
diff --git a/datafusion/physical-plan/src/unnest.rs 
b/datafusion/physical-plan/src/unnest.rs
index 7212c76413..22132f2f86 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -277,8 +277,6 @@ struct UnnestMetrics {
     input_batches: metrics::Count,
     /// Number of rows consumed
     input_rows: metrics::Count,
-    /// Number of batches produced
-    output_batches: metrics::Count,
 }
 
 impl UnnestMetrics {
@@ -288,14 +286,10 @@ impl UnnestMetrics {
 
         let input_rows = MetricBuilder::new(metrics).counter("input_rows", 
partition);
 
-        let output_batches =
-            MetricBuilder::new(metrics).counter("output_batches", partition);
-
         Self {
             baseline_metrics: BaselineMetrics::new(metrics, partition),
             input_batches,
             input_rows,
-            output_batches,
         }
     }
 }
@@ -361,7 +355,6 @@ impl UnnestStream {
                     let Some(result_batch) = result else {
                         continue;
                     };
-                    self.metrics.output_batches.add(1);
                     
(&result_batch).record_output(&self.metrics.baseline_metrics);
 
                     // Empty record batches should not be emitted.
@@ -375,7 +368,7 @@ impl UnnestStream {
                         produced {} output batches containing {} rows in {}",
                         self.metrics.input_batches,
                         self.metrics.input_rows,
-                        self.metrics.output_batches,
+                        self.metrics.baseline_metrics.output_batches(),
                         self.metrics.baseline_metrics.output_rows(),
                         self.metrics.baseline_metrics.elapsed_compute(),
                     );
diff --git a/docs/source/user-guide/metrics.md 
b/docs/source/user-guide/metrics.md
index 1fb2f4a5c7..43bfcd2afe 100644
--- a/docs/source/user-guide/metrics.md
+++ b/docs/source/user-guide/metrics.md
@@ -32,6 +32,7 @@ DataFusion operators expose runtime metrics so you can 
understand where time is
 | elapsed_compute | CPU time the operator actively spends processing work.     
                                                                                
                                                        |
 | output_rows     | Total number of rows the operator produces.                
                                                                                
                                                        |
 | output_bytes    | Memory usage of all output batches. Note: This value may 
be overestimated. If multiple output `RecordBatch` instances share underlying 
memory buffers, their sizes will be counted multiple times. |
+| output_batches  | Total number of output batches the operator produces.      
                                                                                
                                                        |
 
 ## Operator-specific Metrics
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to