This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9bd0d7c3ee13 [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs 9bd0d7c3ee13 is described below commit 9bd0d7c3ee135036f5b370ff37517ae9d4d9f155 Author: Xinrong Meng <xinr...@apache.org> AuthorDate: Wed Feb 7 13:50:09 2024 -0800 [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs ### What changes were proposed in this pull request? Support v2 (perf, memory) profiling in Aggregate (Series to Scalar) Pandas UDFs, which rely on physical plan nodes AggregateInPandasExec and WindowInPandasExec. ### Why are the changes needed? Complete v2 profiling support. ### Does this PR introduce _any_ user-facing change? Yes. V2 profiling in Aggregate Pandas UDFs is supported. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45035 from xinrong-meng/other_p. Lead-authored-by: Xinrong Meng <xinr...@apache.org> Co-authored-by: Takuya UESHIN <ues...@databricks.com> Signed-off-by: Takuya UESHIN <ues...@databricks.com> --- python/pyspark/sql/tests/test_udf_profiler.py | 61 ++++++++++++++++++++++ python/pyspark/tests/test_memory_profiler.py | 61 ++++++++++++++++++++++ .../execution/python/AggregateInPandasExec.scala | 3 +- .../sql/execution/python/WindowInPandasExec.scala | 2 +- 4 files changed, 124 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 7e3d8e2dbe55..99719b5475c1 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -28,6 +28,7 @@ from typing import Iterator, cast from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql.functions import col, pandas_udf, udf +from pyspark.sql.window import Window from pyspark.profiler import UDFBasicProfiler from pyspark.testing.sqlutils import ( ReusedSQLTestCase, @@ -333,6 +334,66 @@ class UDFProfiler2TestsMixin: self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys())) + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_perf_profiler_pandas_udf_window(self): + # WindowInPandasExec + import pandas as pd + + @pandas_udf("double") + def mean_udf(v: pd.Series) -> float: + return v.mean() + + df = self.spark.createDataFrame( + [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") + ) + w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + df.withColumn("mean_v", mean_udf("v").over(w)).show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showPerfProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"5.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_perf_profiler_aggregate_in_pandas(self): + # AggregateInPandasExec + import pandas as pd + + @pandas_udf("double") + def min_udf(v: pd.Series) -> float: + return v.min() + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + df = self.spark.createDataFrame( + [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"] + ) + df.groupBy(df.name).agg(min_udf(df.age)).show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showPerfProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index ae9aa24d1c4f..685a5890200e 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -30,6 +30,7 @@ from pyspark import SparkConf, SparkContext from pyspark.profiler import has_memory_profiler from pyspark.sql import SparkSession from pyspark.sql.functions import col, pandas_udf, udf +from pyspark.sql.window import Window from pyspark.testing.sqlutils import ( have_pandas, have_pyarrow, @@ -380,6 +381,66 @@ class MemoryProfiler2TestsMixin: self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys())) + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_memory_profiler_pandas_udf_window(self): + # WindowInPandasExec + import pandas as pd + + @pandas_udf("double") + def mean_udf(v: pd.Series) -> float: + return v.mean() + + df = self.spark.createDataFrame( + [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") + ) + w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + df.withColumn("mean_v", mean_udf("v").over(w)).show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showMemoryProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_memory_profiler_aggregate_in_pandas(self): + # AggregateInPandasExec + import pandas as pd + + @pandas_udf("double") + def min_udf(v: pd.Series) -> float: + return v.min() + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + df = self.spark.createDataFrame( + [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"] + ) + df.groupBy(df.name).agg(min_udf(df.age)).show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showMemoryProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index 876373177447..26871b68dde8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -181,8 +181,7 @@ case class AggregateInPandasExec( pythonRunnerConf, pythonMetrics, jobArtifactUUID, - None) // TODO(SPARK-46688): Support profiling on AggregateInPandasExec - .compute(projectedRowIter, context.partitionId(), context) + conf.pythonUDFProfiler).compute(projectedRowIter, context.partitionId(), context) val joinedAttributes = groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index c0a38eadbe64..294bcdadc2b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -88,7 +88,7 @@ case class WindowInPandasExec( child.output, longMetric("spillSize"), pythonMetrics, - None) // TODO(SPARK-46691): Support profiling on WindowInPandasExec + conf.pythonUDFProfiler) // Start processing. if (conf.usePartitionEvaluator) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org