Repository: spark
Updated Branches:
  refs/heads/branch-2.3 3e3e9386e -> 7236914e5


[SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs for 
non-deterministic cases

## What changes were proposed in this pull request?

Add tests for using non deterministic UDFs in aggregate.

Update pandas_udf docstring w.r.t to determinism.

## How was this patch tested?
test_nondeterministic_udf_in_aggregate

Author: Li Jin <ice.xell...@gmail.com>

Closes #20142 from icexelloss/SPARK-22930-pandas-udf-deterministic.

(cherry picked from commit f2dd8b923759e8771b0e5f59bfa7ae4ad7e6a339)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7236914e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7236914e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7236914e

Branch: refs/heads/branch-2.3
Commit: 7236914e5e7aeb4eb919530b6edbad70256cca52
Parents: 3e3e938
Author: Li Jin <ice.xell...@gmail.com>
Authored: Sat Jan 6 16:11:20 2018 +0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Sat Jan 6 16:11:58 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/functions.py | 12 ++++++++-
 python/pyspark/sql/tests.py     | 52 ++++++++++++++++++++++++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7236914e/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index a4ed562..733e32b 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2214,7 +2214,17 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 
        .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
 
-    .. note:: The user-defined function must be deterministic.
+    .. note:: The user-defined functions are considered deterministic by 
default. Due to
+        optimization, duplicate invocations may be eliminated or the function 
may even be invoked
+        more times than it is present in the query. If your function is not 
deterministic, call
+        `asNondeterministic` on the user defined function. E.g.:
+
+    >>> @pandas_udf('double', PandasUDFType.SCALAR)  # doctest: +SKIP
+    ... def random(v):
+    ...     import numpy as np
+    ...     import pandas as pd
+    ...     return pd.Series(np.random.randn(len(v))
+    >>> random = random.asNondeterministic()  # doctest: +SKIP
 
     .. note:: The user-defined functions do not support conditional 
expressions or short curcuiting
         in boolean expressions and it ends up with being executed all 
internally. If the functions

http://git-wip-us.apache.org/repos/asf/spark/blob/7236914e/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 6dc767f..689736d 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -386,6 +386,7 @@ class SQLTests(ReusedSQLTestCase):
         self.assertEqual(row[0], 5)
 
     def test_nondeterministic_udf(self):
+        # Test that nondeterministic UDFs are evaluated only once in chained 
UDF evaluations
         from pyspark.sql.functions import udf
         import random
         udf_random_col = udf(lambda: int(100 * random.random()), 
IntegerType()).asNondeterministic()
@@ -413,6 +414,18 @@ class SQLTests(ReusedSQLTestCase):
         pydoc.render_doc(random_udf)
         pydoc.render_doc(random_udf1)
 
+    def test_nondeterministic_udf_in_aggregate(self):
+        from pyspark.sql.functions import udf, sum
+        import random
+        udf_random_col = udf(lambda: int(100 * random.random()), 
'int').asNondeterministic()
+        df = self.spark.range(10)
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegexp(AnalysisException, 
"nondeterministic"):
+                df.groupby('id').agg(sum(udf_random_col())).collect()
+            with self.assertRaisesRegexp(AnalysisException, 
"nondeterministic"):
+                df.agg(sum(udf_random_col())).collect()
+
     def test_chained_udf(self):
         self.spark.catalog.registerFunction("double", lambda x: x + x, 
IntegerType())
         [row] = self.spark.sql("SELECT double(1)").collect()
@@ -3567,6 +3580,18 @@ class VectorizedUDFTests(ReusedSQLTestCase):
         time.tzset()
         ReusedSQLTestCase.tearDownClass()
 
+    @property
+    def random_udf(self):
+        from pyspark.sql.functions import pandas_udf
+
+        @pandas_udf('double')
+        def random_udf(v):
+            import pandas as pd
+            import numpy as np
+            return pd.Series(np.random.random(len(v)))
+        random_udf = random_udf.asNondeterministic()
+        return random_udf
+
     def test_vectorized_udf_basic(self):
         from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10).select(
@@ -3950,6 +3975,33 @@ class VectorizedUDFTests(ReusedSQLTestCase):
         finally:
             self.spark.conf.set("spark.sql.session.timeZone", orig_tz)
 
+    def test_nondeterministic_udf(self):
+        # Test that nondeterministic UDFs are evaluated only once in chained 
UDF evaluations
+        from pyspark.sql.functions import udf, pandas_udf, col
+
+        @pandas_udf('double')
+        def plus_ten(v):
+            return v + 10
+        random_udf = self.random_udf
+
+        df = self.spark.range(10).withColumn('rand', random_udf(col('id')))
+        result1 = df.withColumn('plus_ten(rand)', 
plus_ten(df['rand'])).toPandas()
+
+        self.assertEqual(random_udf.deterministic, False)
+        self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10))
+
+    def test_nondeterministic_udf_in_aggregate(self):
+        from pyspark.sql.functions import pandas_udf, sum
+
+        df = self.spark.range(10)
+        random_udf = self.random_udf
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegexp(AnalysisException, 
'nondeterministic'):
+                df.groupby(df.id).agg(sum(random_udf(df.id))).collect()
+            with self.assertRaisesRegexp(AnalysisException, 
'nondeterministic'):
+                df.agg(sum(random_udf(df.id))).collect()
+
 
 @unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
 class GroupbyApplyTests(ReusedSQLTestCase):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to