ueshin commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482312796


##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
                 io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
             )
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_pandas(self):
+        # FlatMapGroupsInBatchExec
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(pdf):
+            v = pdf.v
+            return pdf.assign(v=(v - v.mean()) / v.std())
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_pandas(self):
+        # FlatMapCoGroupsInBatchExec
+        import pandas as pd
+
+        df1 = self.spark.createDataFrame(
+            [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), 
(20000102, 2, 4.0)],
+            ("time", "id", "v1"),
+        )
+        df2 = self.spark.createDataFrame(
+            [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+        )
+
+        def asof_join(left, right):
+            return pd.merge_asof(left, right, on="time", by="id")
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+                asof_join, schema="time int, id int, v1 double, v2 string"
+            ).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_arrow(self):
+        # FlatMapGroupsInBatchExec
+        import pyarrow.compute as pc
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(table):
+            v = table.column("v")
+            norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+            return table.set_column(1, "v", norm)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInArrow(normalize, schema="id long, v 
double").show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_arrow(self):
+        import pyarrow as pa
+
+        df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 
4.0)], ("id", "v1"))
+        df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+        def summarize(l, r):

Review Comment:
   Shall we rename to `left` and `right` according to the lint error?



##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
                 io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
             )
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_pandas(self):
+        # FlatMapGroupsInBatchExec
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(pdf):
+            v = pdf.v
+            return pdf.assign(v=(v - v.mean()) / v.std())
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_pandas(self):
+        # FlatMapCoGroupsInBatchExec
+        import pandas as pd
+
+        df1 = self.spark.createDataFrame(
+            [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), 
(20000102, 2, 4.0)],
+            ("time", "id", "v1"),
+        )
+        df2 = self.spark.createDataFrame(
+            [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+        )
+
+        def asof_join(left, right):
+            return pd.merge_asof(left, right, on="time", by="id")
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+                asof_join, schema="time int, id int, v1 double, v2 string"
+            ).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_arrow(self):
+        # FlatMapGroupsInBatchExec
+        import pyarrow.compute as pc
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(table):
+            v = table.column("v")
+            norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+            return table.set_column(1, "v", norm)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInArrow(normalize, schema="id long, v 
double").show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_arrow(self):
+        import pyarrow as pa
+
+        df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 
4.0)], ("id", "v1"))
+        df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+        def summarize(l, r):

Review Comment:
   Shall we rename `left` and `right` according to the lint error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to