This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new d083da76b4d6 [SPARK-46663][PYTHON][3.5] Disable memory profiler for 
pandas UDFs with iterators
d083da76b4d6 is described below

commit d083da76b4d6b4f1351f2b4597840e2cc1a8683a
Author: Xinrong Meng <xinr...@apache.org>
AuthorDate: Thu Jan 18 09:01:05 2024 +0900

    [SPARK-46663][PYTHON][3.5] Disable memory profiler for pandas UDFs with 
iterators
    
    ### What changes were proposed in this pull request?
    When using pandas UDFs with iterators, if users enable the profiling spark 
conf, a warning indicating non-support should be raised, and profiling should 
be disabled.
    
    However, currently, after raising the not-supported warning, the memory 
profiler is still being enabled.
    
    The PR proposed to fix that.
    
    ### Why are the changes needed?
    A bug fix to eliminate misleading behavior.
    
    ### Does this PR introduce _any_ user-facing change?
    The noticeable changes will affect only those using the PySpark shell. This 
is because, in the PySpark shell, the memory profiler will raise an error, 
which in turn blocks the execution of the UDF.
    
    ### How was this patch tested?
    Manual test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Setup:
    ```py
    $ ./bin/pyspark --conf spark.python.profile=true
    
    >>> from typing import Iterator
    >>> from pyspark.sql.functions import *
    >>> import pandas as pd
    >>> pandas_udf("long")
    ... def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    ...     for s in iterator:
    ...         yield s + 1
    ...
    >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
    ```
    
    Before:
    ```
    >>> df.select(plus_one(df.v)).show()
    UserWarning: Profiling UDFs with iterators input/output is not supported.
    Traceback (most recent call last):
    ...
    OSError: could not get source code
    ```
    
    After:
    ```
    >>> df.select(plus_one(df.v)).show()
    /Users/xinrong.meng/spark/python/pyspark/sql/udf.py:417: UserWarning: 
Profiling UDFs with iterators input/output is not supported.
    +-----------+
    |plus_one(v)|
    +-----------+
    |          2|
    |          3|
    |          4|
    +-----------+
    ```
    
    Closes #44760 from xinrong-meng/PR_TOOL_PICK_PR_44668_BRANCH-3.5.
    
    Authored-by: Xinrong Meng <xinr...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/tests/test_udf_profiler.py | 53 ++++++++++++++++++++++++++-
 python/pyspark/sql/udf.py                     | 32 ++++++++--------
 2 files changed, 67 insertions(+), 18 deletions(-)

diff --git a/python/pyspark/sql/tests/test_udf_profiler.py 
b/python/pyspark/sql/tests/test_udf_profiler.py
index 136f423d0a35..019e502ec67c 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -19,11 +19,19 @@ import tempfile
 import unittest
 import os
 import sys
+import warnings
 from io import StringIO
+from typing import Iterator
 
 from pyspark import SparkConf
 from pyspark.sql import SparkSession
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import udf, pandas_udf
+from pyspark.testing.sqlutils import (
+    have_pandas,
+    have_pyarrow,
+    pandas_requirement_message,
+    pyarrow_requirement_message,
+)
 from pyspark.profiler import UDFBasicProfiler
 
 
@@ -101,6 +109,49 @@ class UDFProfilerTests(unittest.TestCase):
         df = self.spark.range(10)
         df.select(add1("id"), add2("id"), add1("id")).collect()
 
+    # Unsupported
+    def exec_pandas_udf_iter_to_iter(self):
+        import pandas as pd
+
+        @pandas_udf("int")
+        def iter_to_iter(batch_ser: Iterator[pd.Series]) -> 
Iterator[pd.Series]:
+            for ser in batch_ser:
+                yield ser + 1
+
+        self.spark.range(10).select(iter_to_iter("id")).collect()
+
+    # Unsupported
+    def exec_map(self):
+        import pandas as pd
+
+        def map(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
+            for pdf in pdfs:
+                yield pdf[pdf.id == 1]
+
+        df = self.spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 
5.0)], ("id", "v"))
+        df.mapInPandas(map, schema=df.schema).collect()
+
+    @unittest.skipIf(not have_pandas, pandas_requirement_message)  # type: 
ignore
+    @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)  # type: 
ignore
+    def test_unsupported(self):
+        with warnings.catch_warnings(record=True) as warns:
+            warnings.simplefilter("always")
+            self.exec_pandas_udf_iter_to_iter()
+            user_warns = [warn.message for warn in warns if 
isinstance(warn.message, UserWarning)]
+            self.assertTrue(len(user_warns) > 0)
+            self.assertTrue(
+                "Profiling UDFs with iterators input/output is not supported" 
in str(user_warns[0])
+            )
+
+        with warnings.catch_warnings(record=True) as warns:
+            warnings.simplefilter("always")
+            self.exec_map()
+            user_warns = [warn.message for warn in warns if 
isinstance(warn.message, UserWarning)]
+            self.assertTrue(len(user_warns) > 0)
+            self.assertTrue(
+                "Profiling UDFs with iterators input/output is not supported" 
in str(user_warns[0])
+            )
+
 
 if __name__ == "__main__":
     from pyspark.sql.tests.test_udf_profiler import *  # noqa: F401
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 7d7784dd5226..bdd3aba502b8 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -28,7 +28,6 @@ from typing import Callable, Any, TYPE_CHECKING, Optional, 
cast, Union
 from py4j.java_gateway import JavaObject
 
 from pyspark import SparkContext
-from pyspark.profiler import Profiler
 from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType
 from pyspark.sql.column import Column, _to_java_column, _to_java_expr, _to_seq
 from pyspark.sql.types import (
@@ -338,24 +337,23 @@ class UserDefinedFunction:
 
     def __call__(self, *cols: "ColumnOrName") -> Column:
         sc = get_active_spark_context()
-        profiler: Optional[Profiler] = None
-        memory_profiler: Optional[Profiler] = None
-        if sc.profiler_collector:
-            profiler_enabled = sc._conf.get("spark.python.profile", "false") 
== "true"
-            memory_profiler_enabled = 
sc._conf.get("spark.python.profile.memory", "false") == "true"
+        profiler_enabled = sc._conf.get("spark.python.profile", "false") == 
"true"
+        memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", 
"false") == "true"
 
+        if profiler_enabled or memory_profiler_enabled:
             # Disable profiling Pandas UDFs with iterators as input/output.
-            if profiler_enabled or memory_profiler_enabled:
-                if self.evalType in [
-                    PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
-                    PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
-                    PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
-                ]:
-                    profiler_enabled = memory_profiler_enabled = False
-                    warnings.warn(
-                        "Profiling UDFs with iterators input/output is not 
supported.",
-                        UserWarning,
-                    )
+            if self.evalType in [
+                PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
+                PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
+                PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
+            ]:
+                warnings.warn(
+                    "Profiling UDFs with iterators input/output is not 
supported.",
+                    UserWarning,
+                )
+                judf = self._judf
+                jPythonUDF = judf.apply(_to_seq(sc, cols, _to_java_column))
+                return Column(jPythonUDF)
 
             # Disallow enabling two profilers at the same time.
             if profiler_enabled and memory_profiler_enabled:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to