timsaucer commented on code in PR #1381:
URL: 
https://github.com/apache/datafusion-python/pull/1381#discussion_r2855537956


##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.
+    """
+
+    def __init__(self, raw: df_internal.MetricsSet) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    def metrics(self) -> list[Metric]:
+        """Return all individual metrics in this set."""
+        return [Metric(m) for m in self._raw.metrics()]
+
+    @property
+    def output_rows(self) -> int | None:
+        """Sum of output_rows across all partitions."""
+        return self._raw.output_rows()
+
+    @property
+    def elapsed_compute(self) -> int | None:
+        """Sum of elapsed_compute across all partitions, in nanoseconds."""
+        return self._raw.elapsed_compute()
+
+    @property
+    def spill_count(self) -> int | None:
+        """Sum of spill_count across all partitions."""
+        return self._raw.spill_count()
+
+    @property
+    def spilled_bytes(self) -> int | None:
+        """Sum of spilled_bytes across all partitions."""
+        return self._raw.spilled_bytes()
+
+    @property
+    def spilled_rows(self) -> int | None:
+        """Sum of spilled_rows across all partitions."""
+        return self._raw.spilled_rows()
+
+    def sum_by_name(self, name: str) -> int | None:
+        """Return the sum of metrics matching the given name."""
+        return self._raw.sum_by_name(name)
+
+    def __repr__(self) -> str:
+        """Return a string representation of the metrics set."""
+        return repr(self._raw)
+
+
+class Metric:
+    """A single execution metric with name, value, partition, and labels."""
+
+    def __init__(self, raw: df_internal.Metric) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    @property
+    def name(self) -> str:
+        """The name of this metric (e.g. ``output_rows``)."""
+        return self._raw.name
+
+    @property
+    def value(self) -> int | None:
+        """The numeric value of this metric, or None for non-numeric types."""
+        return self._raw.value
+
+    @property
+    def partition(self) -> int | None:
+        """The partition this metric applies to, or None if global."""

Review Comment:
   Do you know if this partition is a hash or an index or something else? I am 
trying to figure out how a user makes use of this information.



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.
+    """
+
+    def __init__(self, raw: df_internal.MetricsSet) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    def metrics(self) -> list[Metric]:
+        """Return all individual metrics in this set."""
+        return [Metric(m) for m in self._raw.metrics()]
+
+    @property
+    def output_rows(self) -> int | None:
+        """Sum of output_rows across all partitions."""
+        return self._raw.output_rows()
+
+    @property
+    def elapsed_compute(self) -> int | None:
+        """Sum of elapsed_compute across all partitions, in nanoseconds."""
+        return self._raw.elapsed_compute()
+
+    @property
+    def spill_count(self) -> int | None:
+        """Sum of spill_count across all partitions."""
+        return self._raw.spill_count()
+
+    @property
+    def spilled_bytes(self) -> int | None:
+        """Sum of spilled_bytes across all partitions."""
+        return self._raw.spilled_bytes()
+
+    @property
+    def spilled_rows(self) -> int | None:
+        """Sum of spilled_rows across all partitions."""
+        return self._raw.spilled_rows()
+
+    def sum_by_name(self, name: str) -> int | None:
+        """Return the sum of metrics matching the given name."""
+        return self._raw.sum_by_name(name)
+
+    def __repr__(self) -> str:
+        """Return a string representation of the metrics set."""
+        return repr(self._raw)
+
+
+class Metric:
+    """A single execution metric with name, value, partition, and labels."""
+
+    def __init__(self, raw: df_internal.Metric) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    @property
+    def name(self) -> str:
+        """The name of this metric (e.g. ``output_rows``)."""
+        return self._raw.name
+
+    @property
+    def value(self) -> int | None:
+        """The numeric value of this metric, or None for non-numeric types."""
+        return self._raw.value
+
+    @property
+    def partition(self) -> int | None:
+        """The partition this metric applies to, or None if global."""
+        return self._raw.partition
+
+    def labels(self) -> dict[str, str]:
+        """Return the labels associated with this metric."""

Review Comment:
   Something to include in user documentation is an example of these labels.



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.
+    """
+
+    def __init__(self, raw: df_internal.MetricsSet) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    def metrics(self) -> list[Metric]:
+        """Return all individual metrics in this set."""
+        return [Metric(m) for m in self._raw.metrics()]
+
+    @property
+    def output_rows(self) -> int | None:
+        """Sum of output_rows across all partitions."""
+        return self._raw.output_rows()
+
+    @property
+    def elapsed_compute(self) -> int | None:
+        """Sum of elapsed_compute across all partitions, in nanoseconds."""
+        return self._raw.elapsed_compute()
+
+    @property
+    def spill_count(self) -> int | None:
+        """Sum of spill_count across all partitions."""
+        return self._raw.spill_count()
+
+    @property
+    def spilled_bytes(self) -> int | None:
+        """Sum of spilled_bytes across all partitions."""
+        return self._raw.spilled_bytes()
+
+    @property
+    def spilled_rows(self) -> int | None:
+        """Sum of spilled_rows across all partitions."""
+        return self._raw.spilled_rows()
+
+    def sum_by_name(self, name: str) -> int | None:
+        """Return the sum of metrics matching the given name."""
+        return self._raw.sum_by_name(name)
+
+    def __repr__(self) -> str:
+        """Return a string representation of the metrics set."""
+        return repr(self._raw)
+
+
+class Metric:
+    """A single execution metric with name, value, partition, and labels."""
+
+    def __init__(self, raw: df_internal.Metric) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    @property
+    def name(self) -> str:
+        """The name of this metric (e.g. ``output_rows``)."""
+        return self._raw.name
+
+    @property
+    def value(self) -> int | None:
+        """The numeric value of this metric, or None for non-numeric types."""
+        return self._raw.value

Review Comment:
   Is it not possible to give values for non-numeric metrics?



##########
src/dataframe.rs:
##########
@@ -802,7 +823,13 @@ impl PyDataFrame {
     }
 
     /// Get the execution plan for this `DataFrame`
+    ///
+    /// If the DataFrame has already been executed (e.g. via `collect()`),
+    /// returns the cached plan which includes populated metrics.
     fn execution_plan(&self, py: Python) -> 
PyDataFusionResult<PyExecutionPlan> {
+        if let Some(plan) = self.last_plan.lock().as_ref() {
+            return Ok(PyExecutionPlan::new(Arc::clone(plan)));
+        }
         let plan = wait_for_future(py, 
self.df.as_ref().clone().create_physical_plan())??;
         Ok(plan.into())

Review Comment:
   If you go the route of using the existing `last_plan` for `collect()` like 
in my other comment then I think you could set it here just like you do in 
collect().



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.
+    """
+
+    def __init__(self, raw: df_internal.MetricsSet) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    def metrics(self) -> list[Metric]:
+        """Return all individual metrics in this set."""
+        return [Metric(m) for m in self._raw.metrics()]
+
+    @property
+    def output_rows(self) -> int | None:
+        """Sum of output_rows across all partitions."""
+        return self._raw.output_rows()
+
+    @property
+    def elapsed_compute(self) -> int | None:
+        """Sum of elapsed_compute across all partitions, in nanoseconds."""
+        return self._raw.elapsed_compute()
+
+    @property
+    def spill_count(self) -> int | None:
+        """Sum of spill_count across all partitions."""

Review Comment:
   Same with spill count. Do you know what units it has?



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)

Review Comment:
   This is leading me to think we should have some high level documentation, 
probably in the DataFrame page (or a subpage under it). Some of the things it 
would be good to do are to explain to a user what kinds of information they 
could find under these metrics and why that data are not available until after 
the DataFrame has been executed.



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.
+    """
+
+    def __init__(self, raw: df_internal.MetricsSet) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    def metrics(self) -> list[Metric]:
+        """Return all individual metrics in this set."""
+        return [Metric(m) for m in self._raw.metrics()]
+
+    @property
+    def output_rows(self) -> int | None:
+        """Sum of output_rows across all partitions."""
+        return self._raw.output_rows()
+
+    @property
+    def elapsed_compute(self) -> int | None:
+        """Sum of elapsed_compute across all partitions, in nanoseconds."""

Review Comment:
   We probably want to describe what `elapsed_compute` is rather than assume 
user knowledge.



##########
src/metrics.rs:
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet, Metric};
+use pyo3::prelude::*;
+
+#[pyclass(frozen, name = "MetricsSet", module = "datafusion")]
+#[derive(Debug, Clone)]
+pub struct PyMetricsSet {
+    metrics: MetricsSet,
+}
+
+impl PyMetricsSet {
+    pub fn new(metrics: MetricsSet) -> Self {
+        Self { metrics }
+    }
+}
+
+#[pymethods]
+impl PyMetricsSet {
+    /// Returns all individual metrics in this set.
+    fn metrics(&self) -> Vec<PyMetric> {
+        self.metrics
+            .iter()
+            .map(|m| PyMetric::new(Arc::clone(m)))
+            .collect()
+    }
+
+    /// Returns the sum of all `output_rows` metrics, or None if not present.
+    fn output_rows(&self) -> Option<usize> {
+        self.metrics.output_rows()
+    }
+
+    /// Returns the sum of all `elapsed_compute` metrics in nanoseconds, or 
None if not present.
+    fn elapsed_compute(&self) -> Option<usize> {
+        self.metrics.elapsed_compute()
+    }
+
+    /// Returns the sum of all `spill_count` metrics, or None if not present.
+    fn spill_count(&self) -> Option<usize> {
+        self.metrics.spill_count()
+    }
+
+    /// Returns the sum of all `spilled_bytes` metrics, or None if not present.
+    fn spilled_bytes(&self) -> Option<usize> {
+        self.metrics.spilled_bytes()
+    }
+
+    /// Returns the sum of all `spilled_rows` metrics, or None if not present.
+    fn spilled_rows(&self) -> Option<usize> {
+        self.metrics.spilled_rows()
+    }
+
+    /// Returns the sum of metrics matching the given name.
+    fn sum_by_name(&self, name: &str) -> Option<usize> {
+        self.metrics.sum_by_name(name).map(|v| v.as_usize())
+    }
+
+    fn __repr__(&self) -> String {
+        format!("{}", self.metrics)
+    }
+}
+
+#[pyclass(frozen, name = "Metric", module = "datafusion")]
+#[derive(Debug, Clone)]
+pub struct PyMetric {
+    metric: Arc<Metric>,
+}
+
+impl PyMetric {
+    pub fn new(metric: Arc<Metric>) -> Self {
+        Self { metric }
+    }
+}
+
+#[pymethods]
+impl PyMetric {
+    /// Returns the name of this metric.
+    #[getter]
+    fn name(&self) -> String {
+        self.metric.value().name().to_string()
+    }
+
+    /// Returns the numeric value of this metric, or None for non-numeric 
types.
+    #[getter]
+    fn value(&self) -> Option<usize> {

Review Comment:
   It feels like we could return `Option<Py<PyAny>>` and try casting the value 
appropriately.



##########
src/dataframe.rs:
##########
@@ -1127,13 +1154,22 @@ impl PyDataFrame {
 
     fn execute_stream(&self, py: Python) -> 
PyDataFusionResult<PyRecordBatchStream> {
         let df = self.df.as_ref().clone();
-        let stream = spawn_future(py, async move { df.execute_stream().await 
})?;
+        let plan = wait_for_future(py, df.create_physical_plan())??;
+        *self.last_plan.lock() = Some(Arc::clone(&plan));
+        let task_ctx = Arc::new(self.df.as_ref().task_ctx());
+        let stream = spawn_future(py, async move { df_execute_stream(plan, 
task_ctx) })?;
         Ok(PyRecordBatchStream::new(stream))
     }
 
     fn execute_stream_partitioned(&self, py: Python) -> 
PyResult<Vec<PyRecordBatchStream>> {
         let df = self.df.as_ref().clone();
-        let streams = spawn_future(py, async move { 
df.execute_stream_partitioned().await })?;
+        let plan = wait_for_future(py, df.create_physical_plan())?
+            .map_err(PyDataFusionError::from)?;
+        *self.last_plan.lock() = Some(Arc::clone(&plan));
+        let task_ctx = Arc::new(self.df.as_ref().task_ctx());

Review Comment:
   It *feels* like we're doing this in a bunch of places, so maybe make a 
private helper function.



##########
src/dataframe.rs:
##########
@@ -626,7 +637,12 @@ impl PyDataFrame {
     /// Unless some order is specified in the plan, there is no
     /// guarantee of the order of the result.
     fn collect<'py>(&self, py: Python<'py>) -> PyResult<Vec<Bound<'py, 
PyAny>>> {
-        let batches = wait_for_future(py, self.df.as_ref().clone().collect())?
+        let df = self.df.as_ref().clone();
+        let plan = wait_for_future(py, df.create_physical_plan())?
+            .map_err(PyDataFusionError::from)?;
+        *self.last_plan.lock() = Some(Arc::clone(&plan));
+        let task_ctx = Arc::new(self.df.as_ref().task_ctx());
+        let batches = wait_for_future(py, df_collect(plan, task_ctx))?

Review Comment:
   If I run `collect()` twice on a DF, should we instead just do the lock on 
the last plan and clone it? I suspect there's not a huge performance difference 
the vast majority of the time as opposed to how you have it.



##########
src/metrics.rs:
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet, Metric};
+use pyo3::prelude::*;
+
+#[pyclass(frozen, name = "MetricsSet", module = "datafusion")]
+#[derive(Debug, Clone)]
+pub struct PyMetricsSet {
+    metrics: MetricsSet,
+}
+
+impl PyMetricsSet {
+    pub fn new(metrics: MetricsSet) -> Self {
+        Self { metrics }
+    }
+}
+
+#[pymethods]
+impl PyMetricsSet {
+    /// Returns all individual metrics in this set.
+    fn metrics(&self) -> Vec<PyMetric> {
+        self.metrics
+            .iter()
+            .map(|m| PyMetric::new(Arc::clone(m)))
+            .collect()
+    }
+
+    /// Returns the sum of all `output_rows` metrics, or None if not present.
+    fn output_rows(&self) -> Option<usize> {
+        self.metrics.output_rows()
+    }
+
+    /// Returns the sum of all `elapsed_compute` metrics in nanoseconds, or 
None if not present.

Review Comment:
   There is a lot of boiler plate comments like this where the function is self 
explanatory and not exposed to the end user.



##########
python/tests/test_plans.py:
##########
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None:
     execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
 
     assert str(original_execution_plan) == str(execution_plan)
+
+
+def test_metrics_tree_walk() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    results = plan.collect_metrics()
+    assert len(results) >= 1
+    found_metrics = False
+    for name, ms in results:
+        assert isinstance(name, str)
+        assert isinstance(ms, MetricsSet)
+        if ms.output_rows is not None and ms.output_rows > 0:
+            found_metrics = True
+    assert found_metrics
+
+
+def test_metric_properties() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    for _, ms in plan.collect_metrics():
+        r = repr(ms)
+        assert isinstance(r, str)
+        for metric in ms.metrics():
+            assert isinstance(metric, Metric)
+            assert isinstance(metric.name, str)
+            assert len(metric.name) > 0
+            assert metric.partition is None or isinstance(metric.partition, 
int)
+            assert isinstance(metric.labels(), dict)
+            mr = repr(metric)
+            assert isinstance(mr, str)
+            assert len(mr) > 0
+            return
+    pytest.skip("No metrics found")
+
+
+def test_no_metrics_before_execution() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
+    df = ctx.sql("SELECT * FROM t")
+    plan = df.execution_plan()
+    ms = plan.metrics()
+    assert ms is None or ms.output_rows is None or ms.output_rows == 0

Review Comment:
   We should know exactly which condition should be hit. Do you know when the 
metrics is None vs returns no useful output?



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.

Review Comment:
   On second read I now see this is aggregating across partitions. So does that 
mean the `metrics()` fn is returning per partition metrics for one 
ExecutionPlan? Asking for my understanding mostly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to