[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r141112455 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2187,28 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ +... .show() # doctest: +SKIP --- End diff -- Thanks @HyukjinKwon! Sorry, I didn't notice this :( I'll make a note to fix that spacing on a related change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19325 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140941580 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2187,28 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ +... .show() # doctest: +SKIP --- End diff -- (D'oh, not a big deal but two spaces before inline comments..) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140941134 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2187,28 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ +... .show() # doctest: +SKIP --- End diff -- I just double checked it passes ``` ./run-tests --python-executables=pypy --modules pyspark-sql ... Will test against the following Python executables: ['pypy'] Will test the following Python modules: ['pyspark-sql'] Starting test(pypy): pyspark.sql.functions ... Finished test(pypy): pyspark.sql.functions (74s) ... ``` Also, checked without ` # doctest: +SKIP`: ```diff diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 63e9a830bbc..3265ecc974b 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2199,7 +2199,7 @@ def pandas_udf(f=None, returnType=StringType()): ... >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ -... .show() # doctest: +SKIP +... .show() +--+--++ |slen(name)|to_upper(name)|add_one(age)| +--+--++ ``` ``` ./run-tests --python-executables=pypy --modules pyspark-sql ... Will test against the following Python executables: ['pypy'] Will test the following Python modules: ['pyspark-sql'] ... Starting test(pypy): pyspark.sql.functions ... Failed example: df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \ .show() Exception raised: Traceback (most recent call last): File "/usr/local/Cellar/pypy/5.8.0/libexec/lib-python/2.7/doctest.py", line 1315, in __run compileflags, 1) in test.globs File "", line 1, in df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \ File "/.../spark/python/pyspark/sql/dataframe.py", line 347, in show print(self._jdf.showString(n, 20, vertical)) File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o1373.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 93.0 failed 1 times, most recent failure: Lost task 0.0 in stage 93.0 (TID 1093, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 190, in main func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 112, in read_udfs arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 102, in read_single_udf return arg_offsets, wrap_pandas_udf(row_func, return_type) File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 77, in wrap_pandas_udf arrow_return_type = toArrowType(return_type) File "/.../spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1603, in toArrowType import pyarrow as pa ImportError: No module named pyarrow ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140940569 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2187,28 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ +... .show() # doctest: +SKIP --- End diff -- Yeah. It is. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140938172 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2187,28 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ +... .show() # doctest: +SKIP --- End diff -- Looks actually we do :). Let me test this one for sure in my local before merging it, (I have `pypy` installed in my local that does not have `pyarrow` or `pandas`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140934042 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2187,28 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ +... .show() # doctest: +SKIP --- End diff -- Seems we don't skip it actually? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140915736 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) --- End diff -- That's true, I see that `toPandas()` also skips doctests. I'll skip this now and can always enable later if we decide differently. @shaneknapp , looks like we will hold off on this so no need to do anything to Jenkins I believe, sorry to bug you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140908666 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) --- End diff -- Hm.. but shouldn't we skip those doctests because they are not hard dependencies anyway? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140891646 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) --- End diff -- Cool, thanks @shaneknapp! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140868188 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() ++--+--++ +|slen(name)|to_upper(name)|add_one(age)| ++--+--++ +| 8| JOHN DOE| 22| ++--+--++ """ +wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True) import inspect -# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder -if inspect.getargspec(f).keywords is None: -return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) -else: -return _create_udf(f, returnType=returnType, vectorized=True) +if not inspect.getargspec(wrapped_udf.func).args: --- End diff -- I see now, decorators with arguments are handled a little different.. How about we just inspect in `_udf` if the `vectorized` flag is set? Similar to what you have above but I don't think we need the checker arg. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140852903 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.map(new ArrowPayload(_)), context) // Verify that the output schema is correct -val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex - .map { case (attr, i) => attr.withName(s"_$i") }) -assert(schemaOut.equals(outputRowIterator.schema), - s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") +if (outputIterator.nonEmpty) { + val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex +.map { case (attr, i) => attr.withName(s"_$i") }) + assert(schemaOut.equals(outputRowIterator.schema), +s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") --- End diff -- Yeah, I tried to make one but since we are now casting the return Series in `ArrowPandasSerializer.dumps` with `astype` I have not found a case that triggers it. I think it would still be good to keep this, just in case there is some way it could happen and if we upgrade to Arrow 0.7 then we won't need the `astype` logic and this will be used instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140849264 --- Diff: python/pyspark/sql/tests.py --- @@ -3308,12 +3306,12 @@ def test_vectorized_udf_invalid_length(self): from pyspark.sql.functions import pandas_udf, col import pandas as pd df = self.spark.range(10) -raise_exception = pandas_udf(lambda: pd.Series(1), LongType()) +raise_exception = pandas_udf(lambda i: pd.Series(1), LongType()) with QuietTest(self.sc): with self.assertRaisesRegexp( Exception, --- End diff -- This one is actually a `Py4jJavaError` so I don't think we can recognize the `RuntimeError` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user shaneknapp commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140848562 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) --- End diff -- adding @JoshRosen too. the doc building node (amp-jenkins-worker-01) doesn't have arrow installed for the default conda python 2.7 environment. for the python 3 environment, we're running arrow 0.4.0. i looked at the script and it seems to be agnostic to python 2 vs 3... once i know which version of python we'll be running i can make sure that the version of arrow installed is correct. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140837024 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.map(new ArrowPayload(_)), context) // Verify that the output schema is correct -val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex - .map { case (attr, i) => attr.withName(s"_$i") }) -assert(schemaOut.equals(outputRowIterator.schema), - s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") +if (outputIterator.nonEmpty) { --- End diff -- ooops! that was a typo and you're right, it should have been `outputRowIterator` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140836046 --- Diff: python/pyspark/sql/tests.py --- @@ -3344,6 +3342,22 @@ def test_vectorized_udf_wrong_return_type(self): 'Invalid.*type.*string'): df.select(f(col('x'))).collect() +def test_vectorized_udf_decorator(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.range(10) + +@pandas_udf(returnType=LongType()) +def identity(x): +return x +res = df.select(identity(col('id'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_empty_partition(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) --- End diff -- Yeah, an empty partition leads to an empty iterator, so this is to make sure it can handle that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140835837 --- Diff: python/pyspark/worker.py --- @@ -80,14 +77,12 @@ def wrap_pandas_udf(f, return_type): arrow_return_type = toArrowType(return_type) def verify_result_length(*a): -kwargs = a[-1] -result = f(*a[:-1], **kwargs) -if len(result) != kwargs["length"]: +result = f(*a) +if len(result) != len(a[0]): --- End diff -- Good point. We should probably have a test that returns a scalar value too. I'm not sure we should limit the return type so much. As long as pyarrow can consume it, then it should be ok - it can also take a numpy array which might be useful. Otherwise it should raise a clear exception. Maybe checking that it has `__len__` is good enough? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140834562 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() ++--+--++ +|slen(name)|to_upper(name)|add_one(age)| ++--+--++ +| 8| JOHN DOE| 22| ++--+--++ """ +wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True) import inspect -# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder -if inspect.getargspec(f).keywords is None: -return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) -else: -return _create_udf(f, returnType=returnType, vectorized=True) +if not inspect.getargspec(wrapped_udf.func).args: --- End diff -- I'll look into this some more --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140834239 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() ++--+--++ +|slen(name)|to_upper(name)|add_one(age)| ++--+--++ +| 8| JOHN DOE| 22| ++--+--++ """ +wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True) import inspect -# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder -if inspect.getargspec(f).keywords is None: -return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) -else: -return _create_udf(f, returnType=returnType, vectorized=True) +if not inspect.getargspec(wrapped_udf.func).args: --- End diff -- Yeah, it probably would be a good idea to be explicit here since it's not obvious what type `getargspec` returns --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140833446 --- Diff: python/pyspark/serializers.py --- @@ -246,15 +243,9 @@ def cast_series(s, t): def loads(self, obj): """ Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series --- End diff -- Ha, no problem.. It should have a period, I just deleted it on accident. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140833242 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) --- End diff -- Hmm, I thought that the Jenkins environment for unit tests would be the same for doctests and have pyarrow installed. @holdenk or @shaneknapp do you know if that is the case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140626955 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.map(new ArrowPayload(_)), context) // Verify that the output schema is correct -val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex - .map { case (attr, i) => attr.withName(s"_$i") }) -assert(schemaOut.equals(outputRowIterator.schema), - s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") +if (outputIterator.nonEmpty) { + val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex +.map { case (attr, i) => attr.withName(s"_$i") }) + assert(schemaOut.equals(outputRowIterator.schema), +s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") --- End diff -- Looks like we don't have a test against this case. We should add a test for invalid schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140626292 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.map(new ArrowPayload(_)), context) // Verify that the output schema is correct -val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex - .map { case (attr, i) => attr.withName(s"_$i") }) -assert(schemaOut.equals(outputRowIterator.schema), - s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") +if (outputIterator.nonEmpty) { --- End diff -- I guess we should use `outputRowIterator`. Btw how about using `hasNext` instead of `nonEmpty`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140626184 --- Diff: python/pyspark/sql/tests.py --- @@ -3344,6 +3342,22 @@ def test_vectorized_udf_wrong_return_type(self): 'Invalid.*type.*string'): df.select(f(col('x'))).collect() +def test_vectorized_udf_decorator(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.range(10) + +@pandas_udf(returnType=LongType()) +def identity(x): +return x +res = df.select(identity(col('id'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_empty_partition(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) --- End diff -- Oh. I see. One partition is empty and it is related to the added stuff in `ArrowEvalPythonExec`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140626154 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.map(new ArrowPayload(_)), context) // Verify that the output schema is correct -val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex - .map { case (attr, i) => attr.withName(s"_$i") }) -assert(schemaOut.equals(outputRowIterator.schema), - s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") +if (outputIterator.nonEmpty) { --- End diff -- After `outputIterator` is consumed by `ArrowConverters`, can `nonEmpty` return a meaningful value? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140626051 --- Diff: python/pyspark/worker.py --- @@ -80,14 +77,12 @@ def wrap_pandas_udf(f, return_type): arrow_return_type = toArrowType(return_type) def verify_result_length(*a): -kwargs = a[-1] -result = f(*a[:-1], **kwargs) -if len(result) != kwargs["length"]: +result = f(*a) +if len(result) != len(a[0]): --- End diff -- Should we verify the returned is a Pandas.Series? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140625992 --- Diff: python/pyspark/sql/tests.py --- @@ -3344,6 +3342,22 @@ def test_vectorized_udf_wrong_return_type(self): 'Invalid.*type.*string'): df.select(f(col('x'))).collect() +def test_vectorized_udf_decorator(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.range(10) + +@pandas_udf(returnType=LongType()) +def identity(x): +return x +res = df.select(identity(col('id'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_empty_partition(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) --- End diff -- Maybe I miss something, but what this test is intended to test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140622826 --- Diff: python/pyspark/serializers.py --- @@ -246,15 +243,9 @@ def cast_series(s, t): def loads(self, obj): """ Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series --- End diff -- Ugh, this bugged me. `.` at the end .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140623893 --- Diff: python/pyspark/worker.py --- @@ -80,14 +77,12 @@ def wrap_pandas_udf(f, return_type): arrow_return_type = toArrowType(return_type) def verify_result_length(*a): -kwargs = a[-1] -result = f(*a[:-1], **kwargs) -if len(result) != kwargs["length"]: +result = f(*a) +if len(result) != len(a[0]): --- End diff -- I guess we are not guaranteed to have `__len__` in `result`, e.g., `pandas_udf(lambda x: 1, LongType())`. Probably, checking this attribute ahead should be done ahead, while we are here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140623560 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() ++--+--++ +|slen(name)|to_upper(name)|add_one(age)| ++--+--++ +| 8| JOHN DOE| 22| ++--+--++ """ +wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True) import inspect -# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder -if inspect.getargspec(f).keywords is None: -return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) -else: -return _create_udf(f, returnType=returnType, vectorized=True) +if not inspect.getargspec(wrapped_udf.func).args: --- End diff -- This is totally a personal preference based on my little experience. I usually avoid to use `if not something` expression .. because it confuses of the expected type, for example, this can be `None`, `0` or 0-length of list or tuples because it coerces this to a bool. To me, I usually do `is not None` or `len(..) > 0`. I am fine as is too (because I think it's a personal preference) but just wanted to leave a side note (and change it if this could persuade you too). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140624294 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() ++--+--++ +|slen(name)|to_upper(name)|add_one(age)| ++--+--++ +| 8| JOHN DOE| 22| ++--+--++ """ +wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True) import inspect -# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder -if inspect.getargspec(f).keywords is None: -return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) -else: -return _create_udf(f, returnType=returnType, vectorized=True) +if not inspect.getargspec(wrapped_udf.func).args: --- End diff -- It looks `wrapped_udf.func` could be `_udf` within `_create_udf`, that takes a single argument, for example: ```python @pandas_udf(returnType=LongType()) def add_one(): return 1 ``` I tried a rough idea to solve this: ```diff --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2124,11 +2124,14 @@ class UserDefinedFunction(object): return wrapper -def _create_udf(f, returnType, vectorized): +def _create_udf(f, returnType, vectorized, checker=None): def _udf(f, returnType=StringType(), vectorized=vectorized): udf_obj = UserDefinedFunction(f, returnType, vectorized=vectorized) -return udf_obj._wrapped() +wrapped = udf_obj._wrapped() +if checker is not None: +checker(wrapped) +return wrapped # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf if f is None or isinstance(f, (str, DataType)): @@ -2201,10 +2204,14 @@ def pandas_udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True) -import inspect -if not inspect.getargspec(wrapped_udf.func).args: -raise NotImplementedError("0-parameter pandas_udfs are not currently supported") + +def checker(wrapped): +import inspect +if not inspect.getargspec(wrapped.func).args: +raise NotImplementedError("0-parameter pandas_udfs are not currently supported") + +wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True, checker=checker) + return wrapped_udf ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140623030 --- Diff: python/pyspark/sql/tests.py --- @@ -3256,11 +3256,9 @@ def test_vectorized_udf_null_string(self): def test_vectorized_udf_zero_parameter(self): from pyspark.sql.functions import pandas_udf -import pandas as pd -df = self.spark.range(10) -f0 = pandas_udf(lambda **kwargs: pd.Series(1).repeat(kwargs['length']), LongType()) -res = df.select(f0()) -self.assertEquals(df.select(lit(1)).collect(), res.collect()) +with QuietTest(self.sc): +with self.assertRaisesRegexp(Exception, '0-parameter pandas_udfs.*not.*supported'): --- End diff -- I believe we could catch narrower one, `NotImplementedError`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140623282 --- Diff: python/pyspark/sql/tests.py --- @@ -3308,12 +3306,12 @@ def test_vectorized_udf_invalid_length(self): from pyspark.sql.functions import pandas_udf, col import pandas as pd df = self.spark.range(10) -raise_exception = pandas_udf(lambda: pd.Series(1), LongType()) +raise_exception = pandas_udf(lambda i: pd.Series(1), LongType()) with QuietTest(self.sc): with self.assertRaisesRegexp( Exception, --- End diff -- Here too, while we are here, let's catch narrower exception type. Looks `RuntimeError`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140623236 --- Diff: python/pyspark/sql/tests.py --- @@ -3308,12 +3306,12 @@ def test_vectorized_udf_invalid_length(self): from pyspark.sql.functions import pandas_udf, col import pandas as pd df = self.spark.range(10) -raise_exception = pandas_udf(lambda: pd.Series(1), LongType()) +raise_exception = pandas_udf(lambda i: pd.Series(1), LongType()) --- End diff -- Maybe `lambda _: ...`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140622457 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) --- End diff -- We could just do ` # doctest: +SKIP` maybe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140622410 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) --- End diff -- Have we installed pyarrow on Jenkins? The failed test complains `ImportError: No module named pyarrow`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org