timsaucer commented on code in PR #1381:
URL: 
https://github.com/apache/datafusion-python/pull/1381#discussion_r2855554863


##########
python/tests/test_plans.py:
##########
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None:
     execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
 
     assert str(original_execution_plan) == str(execution_plan)
+
+
+def test_metrics_tree_walk() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    results = plan.collect_metrics()
+    assert len(results) >= 1
+    found_metrics = False
+    for name, ms in results:
+        assert isinstance(name, str)
+        assert isinstance(ms, MetricsSet)
+        if ms.output_rows is not None and ms.output_rows > 0:
+            found_metrics = True
+    assert found_metrics
+
+
+def test_metric_properties() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    for _, ms in plan.collect_metrics():
+        r = repr(ms)
+        assert isinstance(r, str)
+        for metric in ms.metrics():
+            assert isinstance(metric, Metric)
+            assert isinstance(metric.name, str)
+            assert len(metric.name) > 0
+            assert metric.partition is None or isinstance(metric.partition, 
int)
+            assert isinstance(metric.labels(), dict)
+            mr = repr(metric)
+            assert isinstance(mr, str)
+            assert len(mr) > 0
+            return
+    pytest.skip("No metrics found")
+
+
+def test_no_metrics_before_execution() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
+    df = ctx.sql("SELECT * FROM t")
+    plan = df.execution_plan()
+    ms = plan.metrics()
+    assert ms is None or ms.output_rows is None or ms.output_rows == 0
+
+
+def test_collect_partitioned_metrics() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+
+    df.collect_partitioned()
+    plan = df.execution_plan()
+
+    found_metrics = False
+    for _, ms in plan.collect_metrics():
+        if ms.output_rows is not None and ms.output_rows > 0:

Review Comment:
   We should know exactly how many output rows to expect



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.
+    """
+
+    def __init__(self, raw: df_internal.MetricsSet) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    def metrics(self) -> list[Metric]:
+        """Return all individual metrics in this set."""
+        return [Metric(m) for m in self._raw.metrics()]
+
+    @property
+    def output_rows(self) -> int | None:
+        """Sum of output_rows across all partitions."""
+        return self._raw.output_rows()
+
+    @property
+    def elapsed_compute(self) -> int | None:
+        """Sum of elapsed_compute across all partitions, in nanoseconds."""
+        return self._raw.elapsed_compute()
+
+    @property
+    def spill_count(self) -> int | None:
+        """Sum of spill_count across all partitions."""
+        return self._raw.spill_count()
+
+    @property
+    def spilled_bytes(self) -> int | None:
+        """Sum of spilled_bytes across all partitions."""
+        return self._raw.spilled_bytes()
+
+    @property
+    def spilled_rows(self) -> int | None:
+        """Sum of spilled_rows across all partitions."""
+        return self._raw.spilled_rows()
+
+    def sum_by_name(self, name: str) -> int | None:
+        """Return the sum of metrics matching the given name."""

Review Comment:
   Sum across partitions, I presume. This is different because it works for an 
arbitrary metric, right?



##########
python/tests/test_plans.py:
##########
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None:
     execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
 
     assert str(original_execution_plan) == str(execution_plan)
+
+
+def test_metrics_tree_walk() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    results = plan.collect_metrics()
+    assert len(results) >= 1
+    found_metrics = False
+    for name, ms in results:
+        assert isinstance(name, str)
+        assert isinstance(ms, MetricsSet)
+        if ms.output_rows is not None and ms.output_rows > 0:
+            found_metrics = True
+    assert found_metrics
+
+
+def test_metric_properties() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    for _, ms in plan.collect_metrics():
+        r = repr(ms)
+        assert isinstance(r, str)
+        for metric in ms.metrics():
+            assert isinstance(metric, Metric)
+            assert isinstance(metric.name, str)
+            assert len(metric.name) > 0
+            assert metric.partition is None or isinstance(metric.partition, 
int)
+            assert isinstance(metric.labels(), dict)
+            mr = repr(metric)
+            assert isinstance(mr, str)
+            assert len(mr) > 0
+            return
+    pytest.skip("No metrics found")

Review Comment:
   I don't think we want a skip here. I think we want to ensure in our unit 
test that we do generate metrics. I think we should know a priori what metrics 
we're getting in this test. While some values we can't use, some parts of it we 
should be able to test directly, like the metric name and labels.



##########
python/tests/test_plans.py:
##########
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None:
     execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
 
     assert str(original_execution_plan) == str(execution_plan)
+
+
+def test_metrics_tree_walk() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    results = plan.collect_metrics()
+    assert len(results) >= 1
+    found_metrics = False
+    for name, ms in results:
+        assert isinstance(name, str)
+        assert isinstance(ms, MetricsSet)
+        if ms.output_rows is not None and ms.output_rows > 0:
+            found_metrics = True
+    assert found_metrics
+
+
+def test_metric_properties() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    for _, ms in plan.collect_metrics():
+        r = repr(ms)
+        assert isinstance(r, str)
+        for metric in ms.metrics():
+            assert isinstance(metric, Metric)
+            assert isinstance(metric.name, str)
+            assert len(metric.name) > 0
+            assert metric.partition is None or isinstance(metric.partition, 
int)
+            assert isinstance(metric.labels(), dict)
+            mr = repr(metric)
+            assert isinstance(mr, str)
+            assert len(mr) > 0
+            return
+    pytest.skip("No metrics found")
+
+
+def test_no_metrics_before_execution() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
+    df = ctx.sql("SELECT * FROM t")
+    plan = df.execution_plan()
+    ms = plan.metrics()
+    assert ms is None or ms.output_rows is None or ms.output_rows == 0
+
+
+def test_collect_partitioned_metrics() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+
+    df.collect_partitioned()
+    plan = df.execution_plan()
+
+    found_metrics = False
+    for _, ms in plan.collect_metrics():
+        if ms.output_rows is not None and ms.output_rows > 0:

Review Comment:
   Also for the unit tests to follow



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.

Review Comment:
   I haven't dug in, but is `operator_name` the name of the execution plan?



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.

Review Comment:
   "Walk the plan tree and collect metrics" probably does not make a lot of 
sense to someone other than a developer. I think we can make this more user 
focused.



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.

Review Comment:
   A bit of an explanation is probably useful here. Again, I don't think we can 
assume the user understands that there are both individual execution plan 
metrics as well as aggregate. I *think* that some operators have metrics that 
cannot be aggregated. In general I suspect we really do need some high level 
documentation with examples we can point to that makes all of this more 
concrete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to