[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18659 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r140424910 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,55 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = ((s, None) if not isinstance(s, (list, tuple)) else s for s in series) --- End diff -- `s for s in series` is same as `series`, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r140396700 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,55 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() --- End diff -- Do we need this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r140389432 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- I'm fine to disallow 0-parameter pandas udf, as it's not a common case. We can add it when people request it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r140121928 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at a time. + * + * Python evaluation works by sending the necessary (projected) input data via a socket to an + * external Python process, and combine the result from the Python process with the original row. + * + * For each row we send to Python, we also put it in a queue first. For each output row from Python, + * we drain the queue to find the original input row. Note that if the Python process is way too + * slow, this could lead to the queue growing unbounded and spill into disk when run out of memory. + * + * Here is a diagram to show how this works: + * + *Downstream (for parent) + * / \ + */ socket (output of UDF) + * / \ + *RowQueuePython + * \ / + *\ socket (input of UDF) + * \ / + * upstream (from child) --- End diff -- that's fine but either looks fine and not a big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r140053372 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- It would be a lot cleaner to just not allow 0-parameters. Is it an option to not allow 0-parameter UDFs for pandas_udfs @ueshin @cloud-fan ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r140049300 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] --- End diff -- Thanks @HyukjinKwon , I suppose if there are more than a few series then it might make some difference. In that case, every little bit helps so sounds good to me! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user LiShuMing commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139947140 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at a time. + * + * Python evaluation works by sending the necessary (projected) input data via a socket to an + * external Python process, and combine the result from the Python process with the original row. + * + * For each row we send to Python, we also put it in a queue first. For each output row from Python, + * we drain the queue to find the original input row. Note that if the Python process is way too + * slow, this could lead to the queue growing unbounded and spill into disk when run out of memory. + * + * Here is a diagram to show how this works: + * + *Downstream (for parent) + * / \ + */ socket (output of UDF) + * / \ + *RowQueuePython + * \ / + *\ socket (input of UDF) + * \ / + * upstream (from child) --- End diff -- Maybe I put myself uncomfortable to see `Downstream` upper, forgive me.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139945845 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at a time. + * + * Python evaluation works by sending the necessary (projected) input data via a socket to an + * external Python process, and combine the result from the Python process with the original row. + * + * For each row we send to Python, we also put it in a queue first. For each output row from Python, + * we drain the queue to find the original input row. Note that if the Python process is way too + * slow, this could lead to the queue growing unbounded and spill into disk when run out of memory. + * + * Here is a diagram to show how this works: + * + *Downstream (for parent) + * / \ + */ socket (output of UDF) + * / \ + *RowQueuePython + * \ / + *\ socket (input of UDF) + * \ / + * upstream (from child) --- End diff -- I think `upstream` is fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user LiShuMing commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139942881 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at a time. + * + * Python evaluation works by sending the necessary (projected) input data via a socket to an + * external Python process, and combine the result from the Python process with the original row. + * + * For each row we send to Python, we also put it in a queue first. For each output row from Python, + * we drain the queue to find the original input row. Note that if the Python process is way too + * slow, this could lead to the queue growing unbounded and spill into disk when run out of memory. + * + * Here is a diagram to show how this works: + * + *Downstream (for parent) + * / \ + */ socket (output of UDF) + * / \ + *RowQueuePython + * \ / + *\ socket (input of UDF) + * \ / + * upstream (from child) --- End diff -- Is `Upstream` better? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139856165 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- Ah, I was thinking that disallowing 0-parameter panda_udf could be an option ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139855188 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] --- End diff -- This might not be a big deal but .. I usually use generator if it iterates once and is discarded. This should consume less memory too as list comprehension should be evaluated once first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139852189 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] --- End diff -- Yea, it actually affects the performance because we can avoid an extra loop: ```python def im_map(x): print("I am map %s" % x) return x def im_gen(x): print("I am gen %s" % x) return x def im_list(x): print("I am list %s" % x) return x items = list(range(3)) map(im_map, [im_list(item) for item in items]) map(im_map, (im_gen(item) for item in items)) ``` And .. this actually affects the performance up to my knowledge: ```python import time items = list(xrange(int(1e8))) for _ in xrange(10): s = time.time() _ = map(lambda x: x, [item for item in items]) print "I am list comprehension with a list: %s" % (time.time() - s) s = time.time() _ = map(lambda x: x, (item for item in items)) print "I am generator expression with a list: %s" % (time.time() - s) ``` This gives me ~13% improvement :). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139843267 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- I agree it is still a bit weird.. Did you mean disallowing 0-parameter panda_udfs or requiring 0-parameter panda_udfs to accept `kwargs`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139841884 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] +arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) for s in series] +batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))]) +return super(ArrowPandasSerializer, self).dumps(batch) + +def loads(self, obj): +""" +Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series +followed by a dictionary containing length of the loaded batches. +""" +import pyarrow as pa +reader = pa.RecordBatchFileReader(pa.BufferReader(obj)) +batches = [reader.get_batch(i) for i in range(reader.num_record_batches)] +# NOTE: a 0-parameter pandas_udf will produce an empty batch that can have num_rows set +num_rows = sum([batch.num_rows for batch in batches]) --- End diff -- I guess this makes sense because its a summation, no sense in making a list then adding it all up --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139841646 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] --- End diff -- That would work, but does it help much since `series` will already be a list or tuple? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139840306 --- Diff: python/pyspark/worker.py --- @@ -71,7 +73,19 @@ def wrap_udf(f, return_type): return lambda *a: f(*a) -def read_single_udf(pickleSer, infile): +def wrap_pandas_udf(f, return_type): +def verify_result_length(*a): +kwargs = a[-1] +result = f(*a[:-1], **kwargs) +if len(result) != kwargs["length"]: +raise RuntimeError("Result vector from pandas_udf was not the required length: " + "expected %d, got %d\nUse input vector length or kwarg['length']" + % (kwargs["length"], len(result))) +return result, toArrowType(return_type) --- End diff -- sure, that sounds good thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139592201 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- How about disallowing it for now? I think it could be an option if 0-parameter UDF alone should not be supported consistently. `return pd.Series(1).repeat(kwargs['length'])` looks still a little bit weird .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139585713 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] +arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) for s in series] +batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))]) +return super(ArrowPandasSerializer, self).dumps(batch) + +def loads(self, obj): +""" +Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series +followed by a dictionary containing length of the loaded batches. +""" +import pyarrow as pa +reader = pa.RecordBatchFileReader(pa.BufferReader(obj)) +batches = [reader.get_batch(i) for i in range(reader.num_record_batches)] +# NOTE: a 0-parameter pandas_udf will produce an empty batch that can have num_rows set +num_rows = sum([batch.num_rows for batch in batches]) --- End diff -- I'd use generator comprehension here too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139585787 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] +arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) for s in series] +batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))]) +return super(ArrowPandasSerializer, self).dumps(batch) + +def loads(self, obj): +""" +Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series +followed by a dictionary containing length of the loaded batches. +""" +import pyarrow as pa +reader = pa.RecordBatchFileReader(pa.BufferReader(obj)) +batches = [reader.get_batch(i) for i in range(reader.num_record_batches)] --- End diff -- And .. `xrange` here too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139585473 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] --- End diff -- I'd use generator comprehension. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139585376 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] +arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) for s in series] +batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))]) --- End diff -- I'd use `xrange`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139583530 --- Diff: python/pyspark/sql/tests.py --- @@ -3122,6 +3122,185 @@ def test_filtered_frame(self): self.assertTrue(pdf.empty) +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class VectorizedUDFTests(ReusedPySparkTestCase): + +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def test_vectorized_udf_basic(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.range(10).select( +col('id').cast('string').alias('str'), +col('id').cast('int').alias('int'), +col('id').alias('long'), +col('id').cast('float').alias('float'), +col('id').cast('double').alias('double'), +col('id').cast('boolean').alias('bool')) +f = lambda x: x +str_f = pandas_udf(f, StringType()) +int_f = pandas_udf(f, IntegerType()) +long_f = pandas_udf(f, LongType()) +float_f = pandas_udf(f, FloatType()) +double_f = pandas_udf(f, DoubleType()) +bool_f = pandas_udf(f, BooleanType()) +res = df.select(str_f(col('str')), int_f(col('int')), +long_f(col('long')), float_f(col('float')), +double_f(col('double')), bool_f(col('bool'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_boolean(self): +from pyspark.sql.functions import pandas_udf, col +data = [(True,), (True,), (None,), (False,)] +schema = StructType().add("bool", BooleanType()) +df = self.spark.createDataFrame(data, schema) +bool_f = pandas_udf(lambda x: x, BooleanType()) +res = df.select(bool_f(col('bool'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_byte(self): +from pyspark.sql.functions import pandas_udf, col +data = [(None,), (2,), (3,), (4,)] +schema = StructType().add("byte", ByteType()) +df = self.spark.createDataFrame(data, schema) +byte_f = pandas_udf(lambda x: x, ByteType()) +res = df.select(byte_f(col('byte'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_short(self): +from pyspark.sql.functions import pandas_udf, col +data = [(None,), (2,), (3,), (4,)] +schema = StructType().add("short", ShortType()) +df = self.spark.createDataFrame(data, schema) +short_f = pandas_udf(lambda x: x, ShortType()) +res = df.select(short_f(col('short'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_int(self): +from pyspark.sql.functions import pandas_udf, col +data = [(None,), (2,), (3,), (4,)] +schema = StructType().add("int", IntegerType()) +df = self.spark.createDataFrame(data, schema) +int_f = pandas_udf(lambda x: x, IntegerType()) +res = df.select(int_f(col('int'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_long(self): +from pyspark.sql.functions import pandas_udf, col +data = [(None,), (2,), (3,), (4,)] +schema = StructType().add("long", LongType()) +df = self.spark.createDataFrame(data, schema) +long_f = pandas_udf(lambda x: x, LongType()) +res = df.select(long_f(col('long'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_float(self): +from pyspark.sql.functions import pandas_udf, col +data = [(3.0,), (5.0,), (-1.0,), (None,)] +schema = StructType().add("float", FloatType()) +df = self.spark.createDataFrame(data, schema) +float_f = pandas_udf(lambda x: x, FloatType()) +res = df.select(float_f(col('float'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_double(self): +from pyspark.sql.functions import pandas_udf, col +data = [(3.0,), (5.0,), (-1.0,), (None,)] +schema = StructType().add("double", DoubleType()) +df = self.spark.createDataFrame(data, schema) +double_f = pandas_udf(lambda x: x, DoubleType()) +res = df.select(double_f(col('double')))
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139580569 --- Diff: python/pyspark/worker.py --- @@ -71,7 +73,19 @@ def wrap_udf(f, return_type): return lambda *a: f(*a) -def read_single_udf(pickleSer, infile): +def wrap_pandas_udf(f, return_type): +def verify_result_length(*a): +kwargs = a[-1] +result = f(*a[:-1], **kwargs) +if len(result) != kwargs["length"]: +raise RuntimeError("Result vector from pandas_udf was not the required length: " + "expected %d, got %d\nUse input vector length or kwarg['length']" + % (kwargs["length"], len(result))) +return result, toArrowType(return_type) --- End diff -- Can we move `toArrowType(return_type)` out of `verify_result_length` to avoid calculating it for each block? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139579800 --- Diff: python/pyspark/worker.py --- @@ -71,7 +73,19 @@ def wrap_udf(f, return_type): return lambda *a: f(*a) -def read_single_udf(pickleSer, infile): +def wrap_pandas_udf(f, return_type): +def verify_result_length(*a): +kwargs = a[-1] +result = f(*a[:-1], **kwargs) +if len(result) != kwargs["length"]: +raise RuntimeError("Result vector from pandas_udf was not the required length: " + "expected %d, got %d\nUse input vector length or kwarg['length']" --- End diff -- typo: `kwarg['length']` -> `kwargs['length']` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139562988 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] +arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) for s in series] --- End diff -- With pyarrow 0.7, it is now able to internally cast the type if needed, so this is working. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139562519 --- Diff: python/pyspark/sql/tests.py --- @@ -3122,6 +3122,185 @@ def test_filtered_frame(self): self.assertTrue(pdf.empty) +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class VectorizedUDFTests(ReusedPySparkTestCase): + +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def test_vectorized_udf_basic(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.range(10).select( +col('id').cast('string').alias('str'), +col('id').cast('int').alias('int'), +col('id').alias('long'), +col('id').cast('float').alias('float'), +col('id').cast('double').alias('double'), +col('id').cast('boolean').alias('bool')) +f = lambda x: x +str_f = pandas_udf(f, StringType()) +int_f = pandas_udf(f, IntegerType()) +long_f = pandas_udf(f, LongType()) +float_f = pandas_udf(f, FloatType()) +double_f = pandas_udf(f, DoubleType()) +bool_f = pandas_udf(f, BooleanType()) +res = df.select(str_f(col('str')), int_f(col('int')), +long_f(col('long')), float_f(col('float')), +double_f(col('double')), bool_f(col('bool'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_boolean(self): +from pyspark.sql.functions import pandas_udf, col +data = [(True,), (True,), (None,), (False,)] +schema = StructType().add("bool", BooleanType()) +df = self.spark.createDataFrame(data, schema) +bool_f = pandas_udf(lambda x: x, BooleanType()) +res = df.select(bool_f(col('bool'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_byte(self): +from pyspark.sql.functions import pandas_udf, col +data = [(None,), (2,), (3,), (4,)] +schema = StructType().add("byte", ByteType()) +df = self.spark.createDataFrame(data, schema) +byte_f = pandas_udf(lambda x: x, ByteType()) +res = df.select(byte_f(col('byte'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_short(self): +from pyspark.sql.functions import pandas_udf, col +data = [(None,), (2,), (3,), (4,)] +schema = StructType().add("short", ShortType()) +df = self.spark.createDataFrame(data, schema) +short_f = pandas_udf(lambda x: x, ShortType()) +res = df.select(short_f(col('short'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_int(self): +from pyspark.sql.functions import pandas_udf, col +data = [(None,), (2,), (3,), (4,)] +schema = StructType().add("int", IntegerType()) +df = self.spark.createDataFrame(data, schema) +int_f = pandas_udf(lambda x: x, IntegerType()) +res = df.select(int_f(col('int'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_long(self): +from pyspark.sql.functions import pandas_udf, col +data = [(None,), (2,), (3,), (4,)] +schema = StructType().add("long", LongType()) +df = self.spark.createDataFrame(data, schema) +long_f = pandas_udf(lambda x: x, LongType()) +res = df.select(long_f(col('long'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_float(self): +from pyspark.sql.functions import pandas_udf, col +data = [(3.0,), (5.0,), (-1.0,), (None,)] +schema = StructType().add("float", FloatType()) +df = self.spark.createDataFrame(data, schema) +float_f = pandas_udf(lambda x: x, FloatType()) +res = df.select(float_f(col('float'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_null_double(self): +from pyspark.sql.functions import pandas_udf, col +data = [(3.0,), (5.0,), (-1.0,), (None,)] +schema = StructType().add("double", DoubleType()) +df = self.spark.createDataFrame(data, schema) +double_f = pandas_udf(lambda x: x, DoubleType()) +res = df.select(double_f(col('double')))
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139496371 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- If the user function doesn't define the keyword args, then it is wrapped with a placeholder so that `worker.py` can expect the function to always have keywords. I thought this was better than trying to do inspection on the worker while running the UDF. I'm not crazy about the 0-parameter pandas_udf, but if we have to support it here then it does need to get the required length of output somehow, unless we repeat/slice the output to make the length correct. I'm ok with making `**kwargs` mandatory for 0-parameter UDFs and optional for others. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139493581 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], + */ +case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan) + extends SparkPlan { + + def children: Seq[SparkPlan] = child :: Nil + + override def producedAttributes: AttributeSet = AttributeSet(output.drop(child.output.length)) + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { --- End diff -- Yes, these functions are duplicated, as well as some code in `doExecute()`. I could add a common base class like `EvalPythonExec` to clean this up, and maybe move to the same file? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139366789 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- Should we make the kwargs as mandatory for 0-argument pandas udf? I think a 0-argument pandas udf without the kwargs seems no making sense as it can't guess the size of returning Series. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139365685 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- We wrap a kwargs placeholder for the function, but we don't actually pass it into the function. So different than the 0-argument pandas udf in SPIP, we explicitly ask it to define a kwargs? Namely we don't have really 0-argument pandas udf, because it at least has kwargs defined? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139359087 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], + */ +case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan) + extends SparkPlan { + + def children: Seq[SparkPlan] = child :: Nil + + override def producedAttributes: AttributeSet = AttributeSet(output.drop(child.output.length)) + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { +udf.children match { + case Seq(u: PythonUDF) => +val (chained, children) = collectFunctions(u) +(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) + case children => +// There should not be any other UDFs, or the children can't be evaluated directly. +assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) +(ChainedPythonFunctions(Seq(udf.func)), udf.children) +} + } + + protected override def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute().map(_.copy()) +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) + +inputRDD.mapPartitions { iter => + + // The queue used to buffer input rows so we can drain it to + // combine input with output from Python. + val queue = HybridRowQueue(TaskContext.get().taskMemoryManager(), +new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length) + TaskContext.get().addTaskCompletionListener({ ctx => +queue.close() + }) + + val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip + + // flatten all the arguments + val allInputs = new ArrayBuffer[Expression] + val dataTypes = new ArrayBuffer[DataType] + val argOffsets = inputs.map { input => +input.map { e => + if (allInputs.exists(_.semanticEquals(e))) { +allInputs.indexWhere(_.semanticEquals(e)) + } else { +allInputs += e +dataTypes += e.dataType +allInputs.length - 1 + } +}.toArray + }.toArray + val projection = newMutableProjection(allInputs, child.output) + val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => +StructField(s"_$i", dt) + }) + + // Input iterator to Python: input rows are grouped so we send them in batches to Python. + // For each row, add it to the queue. + val projectedRowIter = iter.map { inputRow => +queue.add(inputRow.asInstanceOf[UnsafeRow]) +projection(inputRow) + } + + val context = TaskContext.get() + + val inputIterator = ArrowConverters.toPayloadIterator( + projectedRowIter, schema, conf.arrowMaxRecordsPerBatch, context). +map(_.asPythonSerializable) + + val schemaOut = StructT
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139358487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], + */ +case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan) + extends SparkPlan { + + def children: Seq[SparkPlan] = child :: Nil + + override def producedAttributes: AttributeSet = AttributeSet(output.drop(child.output.length)) + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { +udf.children match { + case Seq(u: PythonUDF) => +val (chained, children) = collectFunctions(u) +(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) + case children => +// There should not be any other UDFs, or the children can't be evaluated directly. +assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) +(ChainedPythonFunctions(Seq(udf.func)), udf.children) +} + } + + protected override def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute().map(_.copy()) +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) + +inputRDD.mapPartitions { iter => + + // The queue used to buffer input rows so we can drain it to + // combine input with output from Python. + val queue = HybridRowQueue(TaskContext.get().taskMemoryManager(), +new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length) + TaskContext.get().addTaskCompletionListener({ ctx => +queue.close() + }) + + val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip + + // flatten all the arguments + val allInputs = new ArrayBuffer[Expression] + val dataTypes = new ArrayBuffer[DataType] + val argOffsets = inputs.map { input => +input.map { e => + if (allInputs.exists(_.semanticEquals(e))) { +allInputs.indexWhere(_.semanticEquals(e)) + } else { +allInputs += e +dataTypes += e.dataType +allInputs.length - 1 + } +}.toArray + }.toArray + val projection = newMutableProjection(allInputs, child.output) + val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => +StructField(s"_$i", dt) + }) + + // Input iterator to Python: input rows are grouped so we send them in batches to Python. + // For each row, add it to the queue. --- End diff -- The comment is wrong now. We don't group input rows here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139357733 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], + */ +case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan) + extends SparkPlan { + + def children: Seq[SparkPlan] = child :: Nil + + override def producedAttributes: AttributeSet = AttributeSet(output.drop(child.output.length)) + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { --- End diff -- `producedAttributes` and `collectFunctions` looks duplicate between `ArrowEvalPythonExec` and `BatchEvalPythonExec`. We can duplicate them, maybe in later PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139289721 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,33 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize +""" +import pyarrow as pa --- End diff -- I am okay with leaving it as is here. I think we should catch and throw it with better messages in all cases later but let's talk about this in another place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139289612 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,33 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize +""" +import pyarrow as pa --- End diff -- Ah, hm .. let me check the previous discussions and think about this a bit more. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139289505 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,33 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize +""" +import pyarrow as pa --- End diff -- Ah, I see. Thanks for explaining it. Sure, I am okay with leaving it as is here. I think we should catch and throw it with better messages in all cases (probably in the entry points) but probably let's talk about this later in another place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139261343 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,33 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize +""" +import pyarrow as pa --- End diff -- Yeah, it would probably be best to handle it the same way as in `toPandas()`. That got me thinking that it is a little weird to have an SQLConf "spark.sql.execution.arrow.enable" that is set for `toPandas()` but has no bearing with `pandas_udf`. It doesn't need to since it is an explicit call but seems a little contradictory, what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org