[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-02 Thread gatorsmile
GitHub user gatorsmile opened a pull request:

https://github.com/apache/spark/pull/20137

[SPARK-22939] [PySpark] Support Spark UDF in registerFunction [WIP]

## What changes were proposed in this pull request?
```Python
import random
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
random_udf = udf(lambda: int(random.random() * 100), 
IntegerType()).asNondeterministic()
spark.catalog.registerFunction("random_udf", random_udf, StringType())
spark.sql("SELECT random_udf()").collect()
```

We will get the following error.
```
Py4JError: An error occurred while calling o29.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
```

This PR is to support it. 

## How was this patch tested?
WIP

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gatorsmile/spark registerFunction

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20137.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20137


commit 8216b6bb52082883fc9b212cd9ab21227f2b8491
Author: gatorsmile 
Date:   2018-01-02T15:28:19Z

wip




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159443510
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction(
+... "random_udf", random_udf, StringType())  # doctest: +SKIP
--- End diff --

why skip the test? we can use a fixed seed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159445253
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -378,6 +378,23 @@ def test_udf2(self):
 [res] = self.spark.sql("SELECT strlen(a) FROM test WHERE strlen(a) 
> 1").collect()
 self.assertEqual(4, res[0])
 
+def test_non_deterministic_udf(self):
+import random
+from pyspark.sql.functions import udf
+random_udf = udf(lambda: random.randint(6, 6), 
IntegerType()).asNondeterministic()
+self.assertEqual(random_udf.deterministic, False)
+random_udf1 = self.spark.catalog.registerFunction("randInt", 
random_udf, StringType())
+self.assertEqual(random_udf1.deterministic, False)
+[row] = self.spark.sql("SELECT randInt()").collect()
+self.assertEqual(row[0], "6")
+[row] = self.spark.range(1).select(random_udf1()).collect()
+self.assertEqual(row[0], "6")
+[row] = self.spark.range(1).select(random_udf()).collect()
+self.assertEqual(row[0], 6)
+pydoc.render_doc(udf(lambda: random.randint(6, 6), IntegerType()))
--- End diff --

what does it do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159448650
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction(
+... "random_udf", random_udf, StringType())  # doctest: +SKIP
--- End diff --

The output contains a hex value. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159448813
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -378,6 +378,23 @@ def test_udf2(self):
 [res] = self.spark.sql("SELECT strlen(a) FROM test WHERE strlen(a) 
> 1").collect()
 self.assertEqual(4, res[0])
 
+def test_non_deterministic_udf(self):
+import random
+from pyspark.sql.functions import udf
+random_udf = udf(lambda: random.randint(6, 6), 
IntegerType()).asNondeterministic()
+self.assertEqual(random_udf.deterministic, False)
+random_udf1 = self.spark.catalog.registerFunction("randInt", 
random_udf, StringType())
+self.assertEqual(random_udf1.deterministic, False)
+[row] = self.spark.sql("SELECT randInt()").collect()
+self.assertEqual(row[0], "6")
+[row] = self.spark.range(1).select(random_udf1()).collect()
+self.assertEqual(row[0], "6")
+[row] = self.spark.range(1).select(random_udf()).collect()
+self.assertEqual(row[0], 6)
+pydoc.render_doc(udf(lambda: random.randint(6, 6), IntegerType()))
--- End diff --

This is to test a help function. See 
https://github.com/gatorsmile/spark/blob/85f11bfbfb564acb670097ff4ce520bfbc79b855/python/pyspark/sql/tests.py#L1681-L1688


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159506888
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
--- End diff --

Let's fix the doc for this too. It says `:param f: python function` but we 
could describe that it takes Python native function, wrapped function and 
`UserDefinedFunction` too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159507510
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -162,7 +168,8 @@ def wrapper(*args):
 wrapper.func = self.func
 wrapper.returnType = self.returnType
 wrapper.evalType = self.evalType
-wrapper.asNondeterministic = self.asNondeterministic
+wrapper.deterministic = self.deterministic
+wrapper.asNondeterministic = lambda: 
self.asNondeterministic()._wrapped()
--- End diff --

Can we do:

```python
   wrapper.asNondeterministic = functools.wraps(
   self.asNondeterministic)(lambda: 
self.asNondeterministic()._wrapped())
```

So that it can produce a proper pydoc when we do `help(udf(lambda: 1, 
"integer").asNondeterministic)` (not `help(udf(lambda: 1, 
"integer").asNondeterministic())`.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159506133
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -130,14 +133,17 @@ def _create_judf(self):
 wrapped_func = _wrap_function(sc, self.func, self.returnType)
 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
 judf = 
sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
-self._name, wrapped_func, jdt, self.evalType, 
self._deterministic)
+self._name, wrapped_func, jdt, self.evalType, 
self.deterministic)
 return judf
 
 def __call__(self, *cols):
 judf = self._judf
 sc = SparkContext._active_spark_context
 return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
 
+# This function is for improving the online help system in the 
interactive interpreter.
+# For example, the built-in help / pydoc.help. It wraps the UDF with 
the docstring and
+# argument annotation. (See: SPARK-19161)
--- End diff --

I think we can put this in the docstring of `_wrapped` between L148 and 
150L.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159505249
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction(
+... "random_udf", random_udf, StringType())  # doctest: +SKIP
--- End diff --

BTW, I think we can remove `# doctest: +SKIP` for this line because this 
line simply assigns a value to  `newRandom_udf`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159506607
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction(
+... "random_udf", random_udf, StringType())  # doctest: +SKIP
+>>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
+[Row(random_udf()=u'82')]
+>>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
+[Row(random_udf()=u'62')]
 """
-udf = UserDefinedFunction(f, returnType=returnType, name=name,
-  evalType=PythonEvalType.SQL_BATCHED_UDF)
+
+if hasattr(f, 'asNondeterministic'):
--- End diff --

Actually, this one made me to suggest `wrapper._unwrapped = lambda: self` 
way.

So, here this can be wrapped function or `UserDefinedFunction` and I 
thought it's not quite clear what we expect here by `hasattr(f, 
'asNondeterministic')`.

Could we at least leave come comments saying that this can be both wrapped 
function for `UserDefinedFunction` and `UserDefinedFunction` itself?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159509498
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -172,5 +179,5 @@ def asNondeterministic(self):
 
 .. versionadded:: 2.3
 """
-self._deterministic = False
+self.deterministic = False
--- End diff --

Can we call it `udfDeterministic` to be consistent with Scala side?


https://github.com/apache/spark/blob/ff48b1b338241039a7189e7a3c04333b1256fdb3/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala#L33

The opposite works fine to me too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159511213
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -378,6 +378,23 @@ def test_udf2(self):
 [res] = self.spark.sql("SELECT strlen(a) FROM test WHERE strlen(a) 
> 1").collect()
 self.assertEqual(4, res[0])
 
+def test_non_deterministic_udf(self):
+import random
+from pyspark.sql.functions import udf
+random_udf = udf(lambda: random.randint(6, 6), 
IntegerType()).asNondeterministic()
+self.assertEqual(random_udf.deterministic, False)
+random_udf1 = self.spark.catalog.registerFunction("randInt", 
random_udf, StringType())
+self.assertEqual(random_udf1.deterministic, False)
+[row] = self.spark.sql("SELECT randInt()").collect()
+self.assertEqual(row[0], "6")
+[row] = self.spark.range(1).select(random_udf1()).collect()
+self.assertEqual(row[0], "6")
+[row] = self.spark.range(1).select(random_udf()).collect()
+self.assertEqual(row[0], 6)
+pydoc.render_doc(udf(lambda: random.randint(6, 6), IntegerType()))
--- End diff --

Can we put this tests there or make this separate from 
`test_non_deterministic_udf`? Adding comments is also fine to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159549810
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction(
+... "random_udf", random_udf, StringType())  # doctest: +SKIP
--- End diff --

`newRandom_udf ` is also used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159549932
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction(
+... "random_udf", random_udf, StringType())  # doctest: +SKIP
+>>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
+[Row(random_udf()=u'82')]
+>>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
+[Row(random_udf()=u'62')]
 """
-udf = UserDefinedFunction(f, returnType=returnType, name=name,
-  evalType=PythonEvalType.SQL_BATCHED_UDF)
+
+if hasattr(f, 'asNondeterministic'):
--- End diff --

will add a comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159550091
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -130,14 +133,17 @@ def _create_judf(self):
 wrapped_func = _wrap_function(sc, self.func, self.returnType)
 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
 judf = 
sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
-self._name, wrapped_func, jdt, self.evalType, 
self._deterministic)
+self._name, wrapped_func, jdt, self.evalType, 
self.deterministic)
 return judf
 
 def __call__(self, *cols):
 judf = self._judf
 sc = SparkContext._active_spark_context
 return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
 
+# This function is for improving the online help system in the 
interactive interpreter.
+# For example, the built-in help / pydoc.help. It wraps the UDF with 
the docstring and
+# argument annotation. (See: SPARK-19161)
--- End diff --

I do not want to expose these comments to the doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159550505
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -172,5 +179,5 @@ def asNondeterministic(self):
 
 .. versionadded:: 2.3
 """
-self._deterministic = False
+self.deterministic = False
--- End diff --

`deterministic` is used in `UserDefinedFunction.scala`. Users can use it to 
check whether this UDF is deterministic or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159551517
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -162,7 +168,8 @@ def wrapper(*args):
 wrapper.func = self.func
 wrapper.returnType = self.returnType
 wrapper.evalType = self.evalType
-wrapper.asNondeterministic = self.asNondeterministic
+wrapper.deterministic = self.deterministic
+wrapper.asNondeterministic = lambda: 
self.asNondeterministic()._wrapped()
--- End diff --

good to know the difference


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159551571
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -378,6 +378,23 @@ def test_udf2(self):
 [res] = self.spark.sql("SELECT strlen(a) FROM test WHERE strlen(a) 
> 1").collect()
 self.assertEqual(4, res[0])
 
+def test_non_deterministic_udf(self):
+import random
+from pyspark.sql.functions import udf
+random_udf = udf(lambda: random.randint(6, 6), 
IntegerType()).asNondeterministic()
+self.assertEqual(random_udf.deterministic, False)
+random_udf1 = self.spark.catalog.registerFunction("randInt", 
random_udf, StringType())
+self.assertEqual(random_udf1.deterministic, False)
+[row] = self.spark.sql("SELECT randInt()").collect()
+self.assertEqual(row[0], "6")
+[row] = self.spark.range(1).select(random_udf1()).collect()
+self.assertEqual(row[0], "6")
+[row] = self.spark.range(1).select(random_udf()).collect()
+self.assertEqual(row[0], 6)
+pydoc.render_doc(udf(lambda: random.randint(6, 6), IntegerType()))
--- End diff --

will add a comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159551657
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159576756
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -162,7 +168,8 @@ def wrapper(*args):
 wrapper.func = self.func
 wrapper.returnType = self.returnType
 wrapper.evalType = self.evalType
-wrapper.asNondeterministic = self.asNondeterministic
+wrapper.deterministic = self.deterministic
+wrapper.asNondeterministic = lambda: 
self.asNondeterministic()._wrapped()
--- End diff --

I will leave this unchanged. Maybe you can submit a follow-up PR to address 
it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159577391
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -227,15 +227,15 @@ def dropGlobalTempView(self, viewName):
 @ignore_unicode_prefix
 @since(2.0)
 def registerFunction(self, name, f, returnType=StringType()):
-"""Registers a python function (including lambda function) as a UDF
+"""Registers a Python function (including lambda function) or a 
wrapped/native UDF
--- End diff --

I'm really confusing when reading this document, it would be much more 
clear to me if we can just say
```
Registers a Python function (including lambda function) or a 
:class:`UserDefinedFunction`
```
This wrapping logic was added in https://github.com/apache/spark/pull/16534 
, is it really worth?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159577480
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -162,7 +168,8 @@ def wrapper(*args):
 wrapper.func = self.func
 wrapper.returnType = self.returnType
 wrapper.evalType = self.evalType
-wrapper.asNondeterministic = self.asNondeterministic
+wrapper.deterministic = self.deterministic
+wrapper.asNondeterministic = lambda: 
self.asNondeterministic()._wrapped()
--- End diff --

Definitely. Will give a try within the following week tho ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159578328
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -227,15 +227,15 @@ def dropGlobalTempView(self, viewName):
 @ignore_unicode_prefix
 @since(2.0)
 def registerFunction(self, name, f, returnType=StringType()):
-"""Registers a python function (including lambda function) as a UDF
+"""Registers a Python function (including lambda function) or a 
wrapped/native UDF
--- End diff --

It indeed added some complexity. However, I believe nothing is blocked by 
#16534 now if I understand correctly.

The changes #16534 is quite nice because IMHO Python guys probably use 
`help()` and `dir()` more frequently then reading the API doc in the website. 
For the set of UDFs are provided as a library, I think that's quite worth to 
keep.

How about leaving this wrapper logic as is for now and then we bring this 
discussion back when actually something is blocked (or being too complicated) 
by this?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159579617
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -227,15 +227,15 @@ def dropGlobalTempView(self, viewName):
 @ignore_unicode_prefix
 @since(2.0)
 def registerFunction(self, name, f, returnType=StringType()):
-"""Registers a python function (including lambda function) as a UDF
+"""Registers a Python function (including lambda function) or a 
wrapped/native UDF
--- End diff --

Another idea just in case it helps:

```
Registers a Python function as a UDF or a user defined function.
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159579886
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -227,15 +227,15 @@ def dropGlobalTempView(self, viewName):
 @ignore_unicode_prefix
 @since(2.0)
 def registerFunction(self, name, f, returnType=StringType()):
-"""Registers a python function (including lambda function) as a UDF
+"""Registers a Python function (including lambda function) or a 
wrapped/native UDF
--- End diff --

BTW, to be honest, I remember I gave several quick tries to get rid of the 
wrapper but keep the docstring correctly at that time but I failed to make a 
good alternative.

Might be good to try if there is a claver way to get rid of the wrapper but 
keep the doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159580038
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -227,15 +227,15 @@ def dropGlobalTempView(self, viewName):
 @ignore_unicode_prefix
 @since(2.0)
 def registerFunction(self, name, f, returnType=StringType()):
-"""Registers a python function (including lambda function) as a UDF
+"""Registers a Python function (including lambda function) or a 
wrapped/native UDF
--- End diff --

SGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159582570
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction("random_udf", 
random_udf, StringType())
+>>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
+[Row(random_udf()=u'82')]
+>>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
+[Row(random_udf()=u'62')]
 """
-udf = UserDefinedFunction(f, returnType=returnType, name=name,
-  evalType=PythonEvalType.SQL_BATCHED_UDF)
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+udf = UserDefinedFunction(f.func, returnType=returnType, 
name=name,
+  
evalType=PythonEvalType.SQL_BATCHED_UDF,
--- End diff --

cc @ueshin @icexelloss , shall we support register pandas UDF here too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159582683
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction("random_udf", 
random_udf, StringType())
+>>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
+[Row(random_udf()=u'82')]
+>>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
+[Row(random_udf()=u'62')]
 """
-udf = UserDefinedFunction(f, returnType=returnType, name=name,
-  evalType=PythonEvalType.SQL_BATCHED_UDF)
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+udf = UserDefinedFunction(f.func, returnType=returnType, 
name=name,
+  
evalType=PythonEvalType.SQL_BATCHED_UDF,
--- End diff --

seems we can support it by just changing 
`evalType=PythonEvalType.SQL_BATCHED_UDF` to `evalType=f.evalType`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159599247
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction("random_udf", 
random_udf, StringType())
+>>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
+[Row(random_udf()=u'82')]
+>>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
+[Row(random_udf()=u'62')]
 """
-udf = UserDefinedFunction(f, returnType=returnType, name=name,
-  evalType=PythonEvalType.SQL_BATCHED_UDF)
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+udf = UserDefinedFunction(f.func, returnType=returnType, 
name=name,
+  
evalType=PythonEvalType.SQL_BATCHED_UDF,
--- End diff --

+1 but I think there's no way to use a group map UDF in SQL syntax if I 
understood correctly. I think we can safely fail fast for now as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159600332
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction("random_udf", 
random_udf, StringType())
+>>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
+[Row(random_udf()=u'82')]
+>>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
+[Row(random_udf()=u'62')]
 """
-udf = UserDefinedFunction(f, returnType=returnType, name=name,
-  evalType=PythonEvalType.SQL_BATCHED_UDF)
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+udf = UserDefinedFunction(f.func, returnType=returnType, 
name=name,
+  
evalType=PythonEvalType.SQL_BATCHED_UDF,
--- End diff --

SGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159647399
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction("random_udf", 
random_udf, StringType())
+>>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
+[Row(random_udf()=u'82')]
+>>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
+[Row(random_udf()=u'62')]
 """
-udf = UserDefinedFunction(f, returnType=returnType, name=name,
-  evalType=PythonEvalType.SQL_BATCHED_UDF)
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+udf = UserDefinedFunction(f.func, returnType=returnType, 
name=name,
+  
evalType=PythonEvalType.SQL_BATCHED_UDF,
--- End diff --

Will support the pandas UDF as a separate PR. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20137


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20137: [SPARK-22939] [PySpark] Support Spark UDF in regi...

2018-01-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20137#discussion_r159693340
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+
+>>> import random
+>>> from pyspark.sql.functions import udf
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
+>>> newRandom_udf = spark.catalog.registerFunction("random_udf", 
random_udf, StringType())
+>>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
+[Row(random_udf()=u'82')]
+>>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
+[Row(random_udf()=u'62')]
 """
-udf = UserDefinedFunction(f, returnType=returnType, name=name,
-  evalType=PythonEvalType.SQL_BATCHED_UDF)
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+udf = UserDefinedFunction(f.func, returnType=returnType, 
name=name,
+  
evalType=PythonEvalType.SQL_BATCHED_UDF,
--- End diff --

+1 too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org