timsaucer commented on code in PR #1381:
URL:
https://github.com/apache/datafusion-python/pull/1381#discussion_r2855554863
##########
python/tests/test_plans.py:
##########
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None:
execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
assert str(original_execution_plan) == str(execution_plan)
+
+
+def test_metrics_tree_walk() -> None:
+ ctx = SessionContext()
+ ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+ df.collect()
+ plan = df.execution_plan()
+
+ results = plan.collect_metrics()
+ assert len(results) >= 1
+ found_metrics = False
+ for name, ms in results:
+ assert isinstance(name, str)
+ assert isinstance(ms, MetricsSet)
+ if ms.output_rows is not None and ms.output_rows > 0:
+ found_metrics = True
+ assert found_metrics
+
+
+def test_metric_properties() -> None:
+ ctx = SessionContext()
+ ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+ df.collect()
+ plan = df.execution_plan()
+
+ for _, ms in plan.collect_metrics():
+ r = repr(ms)
+ assert isinstance(r, str)
+ for metric in ms.metrics():
+ assert isinstance(metric, Metric)
+ assert isinstance(metric.name, str)
+ assert len(metric.name) > 0
+ assert metric.partition is None or isinstance(metric.partition,
int)
+ assert isinstance(metric.labels(), dict)
+ mr = repr(metric)
+ assert isinstance(mr, str)
+ assert len(mr) > 0
+ return
+ pytest.skip("No metrics found")
+
+
+def test_no_metrics_before_execution() -> None:
+ ctx = SessionContext()
+ ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
+ df = ctx.sql("SELECT * FROM t")
+ plan = df.execution_plan()
+ ms = plan.metrics()
+ assert ms is None or ms.output_rows is None or ms.output_rows == 0
+
+
+def test_collect_partitioned_metrics() -> None:
+ ctx = SessionContext()
+ ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+
+ df.collect_partitioned()
+ plan = df.execution_plan()
+
+ found_metrics = False
+ for _, ms in plan.collect_metrics():
+ if ms.output_rows is not None and ms.output_rows > 0:
Review Comment:
We should know exactly how many output rows to expect
##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
Tables created in memory from record batches are currently not
supported.
"""
return self._raw_plan.to_proto()
+
+ def metrics(self) -> MetricsSet | None:
+ """Return metrics for this plan node after execution, or None if
unavailable."""
+ raw = self._raw_plan.metrics()
+ if raw is None:
+ return None
+ return MetricsSet(raw)
+
+ def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+ """Walk the plan tree and collect metrics from all operators.
+
+ Returns a list of (operator_name, MetricsSet) tuples.
+ """
+ result: list[tuple[str, MetricsSet]] = []
+
+ def _walk(node: ExecutionPlan) -> None:
+ ms = node.metrics()
+ if ms is not None:
+ result.append((node.display(), ms))
+ for child in node.children():
+ _walk(child)
+
+ _walk(self)
+ return result
+
+
+class MetricsSet:
+ """A set of metrics for a single execution plan operator.
+
+ Provides both individual metric access and convenience aggregations
+ across partitions.
+ """
+
+ def __init__(self, raw: df_internal.MetricsSet) -> None:
+ """This constructor should not be called by the end user."""
+ self._raw = raw
+
+ def metrics(self) -> list[Metric]:
+ """Return all individual metrics in this set."""
+ return [Metric(m) for m in self._raw.metrics()]
+
+ @property
+ def output_rows(self) -> int | None:
+ """Sum of output_rows across all partitions."""
+ return self._raw.output_rows()
+
+ @property
+ def elapsed_compute(self) -> int | None:
+ """Sum of elapsed_compute across all partitions, in nanoseconds."""
+ return self._raw.elapsed_compute()
+
+ @property
+ def spill_count(self) -> int | None:
+ """Sum of spill_count across all partitions."""
+ return self._raw.spill_count()
+
+ @property
+ def spilled_bytes(self) -> int | None:
+ """Sum of spilled_bytes across all partitions."""
+ return self._raw.spilled_bytes()
+
+ @property
+ def spilled_rows(self) -> int | None:
+ """Sum of spilled_rows across all partitions."""
+ return self._raw.spilled_rows()
+
+ def sum_by_name(self, name: str) -> int | None:
+ """Return the sum of metrics matching the given name."""
Review Comment:
Sum across partitions, I presume. This is different because it works for an
arbitrary metric, right?
##########
python/tests/test_plans.py:
##########
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None:
execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
assert str(original_execution_plan) == str(execution_plan)
+
+
+def test_metrics_tree_walk() -> None:
+ ctx = SessionContext()
+ ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+ df.collect()
+ plan = df.execution_plan()
+
+ results = plan.collect_metrics()
+ assert len(results) >= 1
+ found_metrics = False
+ for name, ms in results:
+ assert isinstance(name, str)
+ assert isinstance(ms, MetricsSet)
+ if ms.output_rows is not None and ms.output_rows > 0:
+ found_metrics = True
+ assert found_metrics
+
+
+def test_metric_properties() -> None:
+ ctx = SessionContext()
+ ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+ df.collect()
+ plan = df.execution_plan()
+
+ for _, ms in plan.collect_metrics():
+ r = repr(ms)
+ assert isinstance(r, str)
+ for metric in ms.metrics():
+ assert isinstance(metric, Metric)
+ assert isinstance(metric.name, str)
+ assert len(metric.name) > 0
+ assert metric.partition is None or isinstance(metric.partition,
int)
+ assert isinstance(metric.labels(), dict)
+ mr = repr(metric)
+ assert isinstance(mr, str)
+ assert len(mr) > 0
+ return
+ pytest.skip("No metrics found")
Review Comment:
I don't think we want a skip here. I think we want to ensure in our unit
test that we do generate metrics. I think we should know a priori what metrics
we're getting in this test. While some values we can't use, some parts of it we
should be able to test directly, like the metric name and labels.
##########
python/tests/test_plans.py:
##########
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None:
execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
assert str(original_execution_plan) == str(execution_plan)
+
+
+def test_metrics_tree_walk() -> None:
+ ctx = SessionContext()
+ ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+ df.collect()
+ plan = df.execution_plan()
+
+ results = plan.collect_metrics()
+ assert len(results) >= 1
+ found_metrics = False
+ for name, ms in results:
+ assert isinstance(name, str)
+ assert isinstance(ms, MetricsSet)
+ if ms.output_rows is not None and ms.output_rows > 0:
+ found_metrics = True
+ assert found_metrics
+
+
+def test_metric_properties() -> None:
+ ctx = SessionContext()
+ ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+ df.collect()
+ plan = df.execution_plan()
+
+ for _, ms in plan.collect_metrics():
+ r = repr(ms)
+ assert isinstance(r, str)
+ for metric in ms.metrics():
+ assert isinstance(metric, Metric)
+ assert isinstance(metric.name, str)
+ assert len(metric.name) > 0
+ assert metric.partition is None or isinstance(metric.partition,
int)
+ assert isinstance(metric.labels(), dict)
+ mr = repr(metric)
+ assert isinstance(mr, str)
+ assert len(mr) > 0
+ return
+ pytest.skip("No metrics found")
+
+
+def test_no_metrics_before_execution() -> None:
+ ctx = SessionContext()
+ ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
+ df = ctx.sql("SELECT * FROM t")
+ plan = df.execution_plan()
+ ms = plan.metrics()
+ assert ms is None or ms.output_rows is None or ms.output_rows == 0
+
+
+def test_collect_partitioned_metrics() -> None:
+ ctx = SessionContext()
+ ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+
+ df.collect_partitioned()
+ plan = df.execution_plan()
+
+ found_metrics = False
+ for _, ms in plan.collect_metrics():
+ if ms.output_rows is not None and ms.output_rows > 0:
Review Comment:
Also for the unit tests to follow
##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
Tables created in memory from record batches are currently not
supported.
"""
return self._raw_plan.to_proto()
+
+ def metrics(self) -> MetricsSet | None:
+ """Return metrics for this plan node after execution, or None if
unavailable."""
+ raw = self._raw_plan.metrics()
+ if raw is None:
+ return None
+ return MetricsSet(raw)
+
+ def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+ """Walk the plan tree and collect metrics from all operators.
+
+ Returns a list of (operator_name, MetricsSet) tuples.
Review Comment:
I haven't dug in, but is `operator_name` the name of the execution plan?
##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
Tables created in memory from record batches are currently not
supported.
"""
return self._raw_plan.to_proto()
+
+ def metrics(self) -> MetricsSet | None:
+ """Return metrics for this plan node after execution, or None if
unavailable."""
+ raw = self._raw_plan.metrics()
+ if raw is None:
+ return None
+ return MetricsSet(raw)
+
+ def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+ """Walk the plan tree and collect metrics from all operators.
+
+ Returns a list of (operator_name, MetricsSet) tuples.
Review Comment:
"Walk the plan tree and collect metrics" probably does not make a lot of
sense to someone other than a developer. I think we can make this more user
focused.
##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
Tables created in memory from record batches are currently not
supported.
"""
return self._raw_plan.to_proto()
+
+ def metrics(self) -> MetricsSet | None:
+ """Return metrics for this plan node after execution, or None if
unavailable."""
+ raw = self._raw_plan.metrics()
+ if raw is None:
+ return None
+ return MetricsSet(raw)
+
+ def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+ """Walk the plan tree and collect metrics from all operators.
+
+ Returns a list of (operator_name, MetricsSet) tuples.
+ """
+ result: list[tuple[str, MetricsSet]] = []
+
+ def _walk(node: ExecutionPlan) -> None:
+ ms = node.metrics()
+ if ms is not None:
+ result.append((node.display(), ms))
+ for child in node.children():
+ _walk(child)
+
+ _walk(self)
+ return result
+
+
+class MetricsSet:
+ """A set of metrics for a single execution plan operator.
+
+ Provides both individual metric access and convenience aggregations
+ across partitions.
Review Comment:
A bit of an explanation is probably useful here. Again, I don't think we can
assume the user understands that there are both individual execution plan
metrics as well as aggregate. I *think* that some operators have metrics that
cannot be aggregated. In general I suspect we really do need some high level
documentation with examples we can point to that makes all of this more
concrete.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]