This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new fcec11ad932 [SPARK-40121][PYTHON][SQL] Initialize projection used for 
Python UDF
fcec11ad932 is described below

commit fcec11ad9329553f4bea024227bdc6468da85278
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Thu Aug 18 12:23:02 2022 +0900

    [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF
    
    This PR proposes to initialize the projection so non-deterministic 
expressions can be evaluated with Python UDFs.
    
    To make the Python UDF working with non-deterministic expressions.
    
    Yes.
    
    ```python
    from pyspark.sql.functions import udf, rand
    spark.range(10).select(udf(lambda x: x, "double")(rand())).show()
    ```
    
    **Before**
    
    ```
    java.lang.NullPointerException
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
 Source)
            at 
org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126)
            at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
            at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
            at 
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
            at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
            at 
scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
    ```
    
    **After**
    
    ```
    +----------------------------------+
    |<lambda>rand(-2507211707257730645)|
    +----------------------------------+
    |                0.7691724424045242|
    |               0.09602244075319044|
    |                0.3006471278112862|
    |                0.4182649571961977|
    |               0.29349096650900974|
    |                0.7987097908937618|
    |                0.5324802583101007|
    |                  0.72460930912789|
    |                0.1367749768412846|
    |               0.17277322931919348|
    +----------------------------------+
    ```
    
    Manually tested, and unittest was added.
    
    Closes #37552 from HyukjinKwon/SPARK-40121.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 336c9bc535895530cc3983b24e7507229fa9570d)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/tests/test_udf.py                              | 8 +++++++-
 .../org/apache/spark/sql/execution/python/EvalPythonExec.scala    | 1 +
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index fc475f1121d..5e6738a2f8e 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -23,7 +23,7 @@ import unittest
 
 from pyspark import SparkContext
 from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import udf, rand
 from pyspark.sql.udf import UserDefinedFunction
 from pyspark.sql.types import StringType, IntegerType, BooleanType, 
DoubleType, LongType, \
     ArrayType, StructType, StructField
@@ -705,6 +705,12 @@ class UDFTests(ReusedSQLTestCase):
         finally:
             shutil.rmtree(path)
 
+    def test_udf_with_rand(self):
+        # SPARK-40121: rand() with Python UDF.
+        self.assertEqual(
+            len(self.spark.range(10).select(udf(lambda x: x, 
DoubleType())(rand())).collect()), 10
+        )
+
 
 class UDFInitializationTests(unittest.TestCase):
     def tearDown(self):
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index fca43e454bf..1447788a609 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode {
         }.toArray
       }.toArray
       val projection = MutableProjection.create(allInputs.toSeq, child.output)
+      projection.initialize(context.partitionId())
       val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
         StructField(s"_$i", dt)
       }.toSeq)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to