[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18659


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r140424910
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,55 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = ((s, None) if not isinstance(s, (list, tuple)) else s for 
s in series)
--- End diff --

`s for s in series` is same as `series`, isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-21 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r140396700
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,55 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
--- End diff --

Do we need this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r140389432
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

I'm fine to disallow 0-parameter pandas udf, as it's not a common case. We 
can add it when people request it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-20 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r140121928
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.ChainedPythonFunctions
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A physical plan that evaluates a [[PythonUDF]], one partition of tuples 
at a time.
+ *
+ * Python evaluation works by sending the necessary (projected) input data 
via a socket to an
+ * external Python process, and combine the result from the Python process 
with the original row.
+ *
+ * For each row we send to Python, we also put it in a queue first. For 
each output row from Python,
+ * we drain the queue to find the original input row. Note that if the 
Python process is way too
+ * slow, this could lead to the queue growing unbounded and spill into 
disk when run out of memory.
+ *
+ * Here is a diagram to show how this works:
+ *
+ *Downstream (for parent)
+ * /  \
+ */ socket  (output of UDF)
+ *   / \
+ *RowQueuePython
+ *   \ /
+ *\ socket  (input of UDF)
+ * \ /
+ *  upstream (from child)
--- End diff --

that's fine but either looks fine and not a big deal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-20 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r140053372
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

It would be a lot cleaner to just not allow 0-parameters. Is it an option 
to not allow 0-parameter UDFs for pandas_udfs @ueshin @cloud-fan ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-20 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r140049300
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
--- End diff --

Thanks @HyukjinKwon , I suppose if there are more than a few series then it 
might make some difference.  In that case, every little bit helps so sounds 
good to me!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-20 Thread LiShuMing
Github user LiShuMing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139947140
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.ChainedPythonFunctions
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A physical plan that evaluates a [[PythonUDF]], one partition of tuples 
at a time.
+ *
+ * Python evaluation works by sending the necessary (projected) input data 
via a socket to an
+ * external Python process, and combine the result from the Python process 
with the original row.
+ *
+ * For each row we send to Python, we also put it in a queue first. For 
each output row from Python,
+ * we drain the queue to find the original input row. Note that if the 
Python process is way too
+ * slow, this could lead to the queue growing unbounded and spill into 
disk when run out of memory.
+ *
+ * Here is a diagram to show how this works:
+ *
+ *Downstream (for parent)
+ * /  \
+ */ socket  (output of UDF)
+ *   / \
+ *RowQueuePython
+ *   \ /
+ *\ socket  (input of UDF)
+ * \ /
+ *  upstream (from child)
--- End diff --

Maybe I put myself uncomfortable to see `Downstream` upper, forgive me..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-20 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139945845
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.ChainedPythonFunctions
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A physical plan that evaluates a [[PythonUDF]], one partition of tuples 
at a time.
+ *
+ * Python evaluation works by sending the necessary (projected) input data 
via a socket to an
+ * external Python process, and combine the result from the Python process 
with the original row.
+ *
+ * For each row we send to Python, we also put it in a queue first. For 
each output row from Python,
+ * we drain the queue to find the original input row. Note that if the 
Python process is way too
+ * slow, this could lead to the queue growing unbounded and spill into 
disk when run out of memory.
+ *
+ * Here is a diagram to show how this works:
+ *
+ *Downstream (for parent)
+ * /  \
+ */ socket  (output of UDF)
+ *   / \
+ *RowQueuePython
+ *   \ /
+ *\ socket  (input of UDF)
+ * \ /
+ *  upstream (from child)
--- End diff --

I think `upstream` is fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-20 Thread LiShuMing
Github user LiShuMing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139942881
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.ChainedPythonFunctions
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A physical plan that evaluates a [[PythonUDF]], one partition of tuples 
at a time.
+ *
+ * Python evaluation works by sending the necessary (projected) input data 
via a socket to an
+ * external Python process, and combine the result from the Python process 
with the original row.
+ *
+ * For each row we send to Python, we also put it in a queue first. For 
each output row from Python,
+ * we drain the queue to find the original input row. Note that if the 
Python process is way too
+ * slow, this could lead to the queue growing unbounded and spill into 
disk when run out of memory.
+ *
+ * Here is a diagram to show how this works:
+ *
+ *Downstream (for parent)
+ * /  \
+ */ socket  (output of UDF)
+ *   / \
+ *RowQueuePython
+ *   \ /
+ *\ socket  (input of UDF)
+ * \ /
+ *  upstream (from child)
--- End diff --

Is `Upstream` better?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139856165
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

Ah, I was thinking that disallowing 0-parameter panda_udf could be an 
option ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139855188
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
--- End diff --

This might not be a big deal but .. I usually use generator if it iterates 
once and is discarded. This should consume less memory too as list 
comprehension should be evaluated once first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139852189
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
--- End diff --

Yea, it actually affects the performance because we can avoid an extra loop:

```python
def im_map(x):
print("I am map %s" % x)
return x

def im_gen(x):
print("I am gen %s" % x)
return x

def im_list(x):
print("I am list %s" % x)
return x

items = list(range(3))
map(im_map, [im_list(item) for item in items])
map(im_map, (im_gen(item) for item in items))
```

And .. this actually affects the performance up to my knowledge:

```python
import time

items = list(xrange(int(1e8)))

for _ in xrange(10):
s = time.time()
_ = map(lambda x: x, [item for item in items])
print "I am list comprehension with a list: %s" % (time.time() - s)
s = time.time()
_ = map(lambda x: x, (item for item in items))
print "I am generator expression with a list: %s" % (time.time() - s)
```

This gives me ~13% improvement :).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139843267
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

I agree it is still a bit weird.. Did you mean disallowing 0-parameter 
panda_udfs or requiring 0-parameter panda_udfs to accept `kwargs`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139841884
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
+arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) 
for s in series]
+batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
+return super(ArrowPandasSerializer, self).dumps(batch)
+
+def loads(self, obj):
+"""
+Deserialize an ArrowRecordBatch to an Arrow table and return as a 
list of pandas.Series
+followed by a dictionary containing length of the loaded batches.
+"""
+import pyarrow as pa
+reader = pa.RecordBatchFileReader(pa.BufferReader(obj))
+batches = [reader.get_batch(i) for i in 
range(reader.num_record_batches)]
+# NOTE: a 0-parameter pandas_udf will produce an empty batch that 
can have num_rows set
+num_rows = sum([batch.num_rows for batch in batches])
--- End diff --

I guess this makes sense because its a summation, no sense in making a list 
then adding it all up


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139841646
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
--- End diff --

That would work, but does it help much since `series` will already be a 
list or tuple?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139840306
  
--- Diff: python/pyspark/worker.py ---
@@ -71,7 +73,19 @@ def wrap_udf(f, return_type):
 return lambda *a: f(*a)
 
 
-def read_single_udf(pickleSer, infile):
+def wrap_pandas_udf(f, return_type):
+def verify_result_length(*a):
+kwargs = a[-1]
+result = f(*a[:-1], **kwargs)
+if len(result) != kwargs["length"]:
+raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
+   "expected %d, got %d\nUse input vector 
length or kwarg['length']"
+   % (kwargs["length"], len(result)))
+return result, toArrowType(return_type)
--- End diff --

sure, that sounds good thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139592201
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

How about disallowing it for now? I think it could be an option if 
0-parameter UDF alone should not be supported consistently. `return 
pd.Series(1).repeat(kwargs['length'])` looks still a little bit weird ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139585713
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
+arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) 
for s in series]
+batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
+return super(ArrowPandasSerializer, self).dumps(batch)
+
+def loads(self, obj):
+"""
+Deserialize an ArrowRecordBatch to an Arrow table and return as a 
list of pandas.Series
+followed by a dictionary containing length of the loaded batches.
+"""
+import pyarrow as pa
+reader = pa.RecordBatchFileReader(pa.BufferReader(obj))
+batches = [reader.get_batch(i) for i in 
range(reader.num_record_batches)]
+# NOTE: a 0-parameter pandas_udf will produce an empty batch that 
can have num_rows set
+num_rows = sum([batch.num_rows for batch in batches])
--- End diff --

I'd use generator comprehension here too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139585787
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
+arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) 
for s in series]
+batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
+return super(ArrowPandasSerializer, self).dumps(batch)
+
+def loads(self, obj):
+"""
+Deserialize an ArrowRecordBatch to an Arrow table and return as a 
list of pandas.Series
+followed by a dictionary containing length of the loaded batches.
+"""
+import pyarrow as pa
+reader = pa.RecordBatchFileReader(pa.BufferReader(obj))
+batches = [reader.get_batch(i) for i in 
range(reader.num_record_batches)]
--- End diff --

And .. `xrange` here too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139585473
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
--- End diff --

I'd use generator comprehension.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139585376
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
+arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) 
for s in series]
+batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
--- End diff --

I'd use `xrange`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139583530
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3122,6 +3122,185 @@ def test_filtered_frame(self):
 self.assertTrue(pdf.empty)
 
 
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class VectorizedUDFTests(ReusedPySparkTestCase):
+
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def test_vectorized_udf_basic(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.range(10).select(
+col('id').cast('string').alias('str'),
+col('id').cast('int').alias('int'),
+col('id').alias('long'),
+col('id').cast('float').alias('float'),
+col('id').cast('double').alias('double'),
+col('id').cast('boolean').alias('bool'))
+f = lambda x: x
+str_f = pandas_udf(f, StringType())
+int_f = pandas_udf(f, IntegerType())
+long_f = pandas_udf(f, LongType())
+float_f = pandas_udf(f, FloatType())
+double_f = pandas_udf(f, DoubleType())
+bool_f = pandas_udf(f, BooleanType())
+res = df.select(str_f(col('str')), int_f(col('int')),
+long_f(col('long')), float_f(col('float')),
+double_f(col('double')), bool_f(col('bool')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_boolean(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(True,), (True,), (None,), (False,)]
+schema = StructType().add("bool", BooleanType())
+df = self.spark.createDataFrame(data, schema)
+bool_f = pandas_udf(lambda x: x, BooleanType())
+res = df.select(bool_f(col('bool')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_byte(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("byte", ByteType())
+df = self.spark.createDataFrame(data, schema)
+byte_f = pandas_udf(lambda x: x, ByteType())
+res = df.select(byte_f(col('byte')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_short(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("short", ShortType())
+df = self.spark.createDataFrame(data, schema)
+short_f = pandas_udf(lambda x: x, ShortType())
+res = df.select(short_f(col('short')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_int(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("int", IntegerType())
+df = self.spark.createDataFrame(data, schema)
+int_f = pandas_udf(lambda x: x, IntegerType())
+res = df.select(int_f(col('int')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_long(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("long", LongType())
+df = self.spark.createDataFrame(data, schema)
+long_f = pandas_udf(lambda x: x, LongType())
+res = df.select(long_f(col('long')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_float(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(3.0,), (5.0,), (-1.0,), (None,)]
+schema = StructType().add("float", FloatType())
+df = self.spark.createDataFrame(data, schema)
+float_f = pandas_udf(lambda x: x, FloatType())
+res = df.select(float_f(col('float')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_double(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(3.0,), (5.0,), (-1.0,), (None,)]
+schema = StructType().add("double", DoubleType())
+df = self.spark.createDataFrame(data, schema)
+double_f = pandas_udf(lambda x: x, DoubleType())
+res = df.select(double_f(col('double')))

[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139580569
  
--- Diff: python/pyspark/worker.py ---
@@ -71,7 +73,19 @@ def wrap_udf(f, return_type):
 return lambda *a: f(*a)
 
 
-def read_single_udf(pickleSer, infile):
+def wrap_pandas_udf(f, return_type):
+def verify_result_length(*a):
+kwargs = a[-1]
+result = f(*a[:-1], **kwargs)
+if len(result) != kwargs["length"]:
+raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
+   "expected %d, got %d\nUse input vector 
length or kwarg['length']"
+   % (kwargs["length"], len(result)))
+return result, toArrowType(return_type)
--- End diff --

Can we move `toArrowType(return_type)` out of `verify_result_length` to 
avoid calculating it for each block?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139579800
  
--- Diff: python/pyspark/worker.py ---
@@ -71,7 +73,19 @@ def wrap_udf(f, return_type):
 return lambda *a: f(*a)
 
 
-def read_single_udf(pickleSer, infile):
+def wrap_pandas_udf(f, return_type):
+def verify_result_length(*a):
+kwargs = a[-1]
+result = f(*a[:-1], **kwargs)
+if len(result) != kwargs["length"]:
+raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
+   "expected %d, got %d\nUse input vector 
length or kwarg['length']"
--- End diff --

typo: `kwarg['length']` -> `kwargs['length']`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139562988
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
+arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) 
for s in series]
--- End diff --

With pyarrow 0.7, it is now able to internally cast the type if needed, so 
this is working.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139562519
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3122,6 +3122,185 @@ def test_filtered_frame(self):
 self.assertTrue(pdf.empty)
 
 
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class VectorizedUDFTests(ReusedPySparkTestCase):
+
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def test_vectorized_udf_basic(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.range(10).select(
+col('id').cast('string').alias('str'),
+col('id').cast('int').alias('int'),
+col('id').alias('long'),
+col('id').cast('float').alias('float'),
+col('id').cast('double').alias('double'),
+col('id').cast('boolean').alias('bool'))
+f = lambda x: x
+str_f = pandas_udf(f, StringType())
+int_f = pandas_udf(f, IntegerType())
+long_f = pandas_udf(f, LongType())
+float_f = pandas_udf(f, FloatType())
+double_f = pandas_udf(f, DoubleType())
+bool_f = pandas_udf(f, BooleanType())
+res = df.select(str_f(col('str')), int_f(col('int')),
+long_f(col('long')), float_f(col('float')),
+double_f(col('double')), bool_f(col('bool')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_boolean(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(True,), (True,), (None,), (False,)]
+schema = StructType().add("bool", BooleanType())
+df = self.spark.createDataFrame(data, schema)
+bool_f = pandas_udf(lambda x: x, BooleanType())
+res = df.select(bool_f(col('bool')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_byte(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("byte", ByteType())
+df = self.spark.createDataFrame(data, schema)
+byte_f = pandas_udf(lambda x: x, ByteType())
+res = df.select(byte_f(col('byte')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_short(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("short", ShortType())
+df = self.spark.createDataFrame(data, schema)
+short_f = pandas_udf(lambda x: x, ShortType())
+res = df.select(short_f(col('short')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_int(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("int", IntegerType())
+df = self.spark.createDataFrame(data, schema)
+int_f = pandas_udf(lambda x: x, IntegerType())
+res = df.select(int_f(col('int')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_long(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(None,), (2,), (3,), (4,)]
+schema = StructType().add("long", LongType())
+df = self.spark.createDataFrame(data, schema)
+long_f = pandas_udf(lambda x: x, LongType())
+res = df.select(long_f(col('long')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_float(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(3.0,), (5.0,), (-1.0,), (None,)]
+schema = StructType().add("float", FloatType())
+df = self.spark.createDataFrame(data, schema)
+float_f = pandas_udf(lambda x: x, FloatType())
+res = df.select(float_f(col('float')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_null_double(self):
+from pyspark.sql.functions import pandas_udf, col
+data = [(3.0,), (5.0,), (-1.0,), (None,)]
+schema = StructType().add("double", DoubleType())
+df = self.spark.createDataFrame(data, schema)
+double_f = pandas_udf(lambda x: x, DoubleType())
+res = 

[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139496371
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

If the user function doesn't define the keyword args, then it is wrapped 
with a placeholder so that `worker.py` can expect the function to always have 
keywords.  I thought this was better than trying to do inspection on the worker 
while running the UDF.

I'm not crazy about the 0-parameter pandas_udf, but if we have to support 
it here then it does need to get the required length of output somehow, unless 
we repeat/slice the output to make the length correct.

I'm ok with making `**kwargs` mandatory for 0-parameter UDFs and optional 
for others.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139493581
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, 
PythonEvalType, PythonRunner}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A physical plan that evaluates a [[PythonUDF]],
+ */
+case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], child: SparkPlan)
+  extends SparkPlan {
+
+  def children: Seq[SparkPlan] = child :: Nil
+
+  override def producedAttributes: AttributeSet = 
AttributeSet(output.drop(child.output.length))
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
--- End diff --

Yes, these functions are duplicated, as well as some code in `doExecute()`. 
 I could add a common base class like `EvalPythonExec` to clean this up, and 
maybe move to the same file?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139366789
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

Should we make the kwargs as mandatory for 0-argument pandas udf? I think a 
0-argument pandas udf without the kwargs seems no making sense as it can't 
guess the size of returning Series.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139365685
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

We wrap a kwargs placeholder for the function, but we don't actually pass 
it into the function. So different than the 0-argument pandas udf in SPIP, we 
explicitly ask it to define a kwargs? Namely we don't have really 0-argument 
pandas udf, because it at least has kwargs defined? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139359087
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, 
PythonEvalType, PythonRunner}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A physical plan that evaluates a [[PythonUDF]],
+ */
+case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], child: SparkPlan)
+  extends SparkPlan {
+
+  def children: Seq[SparkPlan] = child :: Nil
+
+  override def producedAttributes: AttributeSet = 
AttributeSet(output.drop(child.output.length))
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
+udf.children match {
+  case Seq(u: PythonUDF) =>
+val (chained, children) = collectFunctions(u)
+(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+  case children =>
+// There should not be any other UDFs, or the children can't be 
evaluated directly.
+assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+(ChainedPythonFunctions(Seq(udf.func)), udf.children)
+}
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute().map(_.copy())
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+
+inputRDD.mapPartitions { iter =>
+
+  // The queue used to buffer input rows so we can drain it to
+  // combine input with output from Python.
+  val queue = HybridRowQueue(TaskContext.get().taskMemoryManager(),
+new File(Utils.getLocalDir(SparkEnv.get.conf)), 
child.output.length)
+  TaskContext.get().addTaskCompletionListener({ ctx =>
+queue.close()
+  })
+
+  val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip
+
+  // flatten all the arguments
+  val allInputs = new ArrayBuffer[Expression]
+  val dataTypes = new ArrayBuffer[DataType]
+  val argOffsets = inputs.map { input =>
+input.map { e =>
+  if (allInputs.exists(_.semanticEquals(e))) {
+allInputs.indexWhere(_.semanticEquals(e))
+  } else {
+allInputs += e
+dataTypes += e.dataType
+allInputs.length - 1
+  }
+}.toArray
+  }.toArray
+  val projection = newMutableProjection(allInputs, child.output)
+  val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
+StructField(s"_$i", dt)
+  })
+
+  // Input iterator to Python: input rows are grouped so we send them 
in batches to Python.
+  // For each row, add it to the queue.
+  val projectedRowIter = iter.map { inputRow =>
+queue.add(inputRow.asInstanceOf[UnsafeRow])
+projection(inputRow)
+  }
+
+  val context = TaskContext.get()
+
+  val inputIterator = ArrowConverters.toPayloadIterator(
+  projectedRowIter, schema, conf.arrowMaxRecordsPerBatch, context).
+map(_.asPythonSerializable)
+
+  val schemaOut = 

[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139358487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, 
PythonEvalType, PythonRunner}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A physical plan that evaluates a [[PythonUDF]],
+ */
+case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], child: SparkPlan)
+  extends SparkPlan {
+
+  def children: Seq[SparkPlan] = child :: Nil
+
+  override def producedAttributes: AttributeSet = 
AttributeSet(output.drop(child.output.length))
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
+udf.children match {
+  case Seq(u: PythonUDF) =>
+val (chained, children) = collectFunctions(u)
+(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+  case children =>
+// There should not be any other UDFs, or the children can't be 
evaluated directly.
+assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+(ChainedPythonFunctions(Seq(udf.func)), udf.children)
+}
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute().map(_.copy())
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+
+inputRDD.mapPartitions { iter =>
+
+  // The queue used to buffer input rows so we can drain it to
+  // combine input with output from Python.
+  val queue = HybridRowQueue(TaskContext.get().taskMemoryManager(),
+new File(Utils.getLocalDir(SparkEnv.get.conf)), 
child.output.length)
+  TaskContext.get().addTaskCompletionListener({ ctx =>
+queue.close()
+  })
+
+  val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip
+
+  // flatten all the arguments
+  val allInputs = new ArrayBuffer[Expression]
+  val dataTypes = new ArrayBuffer[DataType]
+  val argOffsets = inputs.map { input =>
+input.map { e =>
+  if (allInputs.exists(_.semanticEquals(e))) {
+allInputs.indexWhere(_.semanticEquals(e))
+  } else {
+allInputs += e
+dataTypes += e.dataType
+allInputs.length - 1
+  }
+}.toArray
+  }.toArray
+  val projection = newMutableProjection(allInputs, child.output)
+  val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
+StructField(s"_$i", dt)
+  })
+
+  // Input iterator to Python: input rows are grouped so we send them 
in batches to Python.
+  // For each row, add it to the queue.
--- End diff --

The comment is wrong now. We don't group input rows here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139357733
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, 
PythonEvalType, PythonRunner}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A physical plan that evaluates a [[PythonUDF]],
+ */
+case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], child: SparkPlan)
+  extends SparkPlan {
+
+  def children: Seq[SparkPlan] = child :: Nil
+
+  override def producedAttributes: AttributeSet = 
AttributeSet(output.drop(child.output.length))
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
--- End diff --

`producedAttributes` and `collectFunctions` looks duplicate between 
`ArrowEvalPythonExec` and `BatchEvalPythonExec`. We can duplicate them, maybe 
in later PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-16 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139289721
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,33 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize
+"""
+import pyarrow as pa
--- End diff --

I am okay with leaving it as is here. I think we should catch and throw it 
with better messages in all cases later but let's talk about this in another 
place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-16 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139289612
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,33 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize
+"""
+import pyarrow as pa
--- End diff --

Ah, hm .. let me check the previous discussions and think about this a bit 
more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-16 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139289505
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,33 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize
+"""
+import pyarrow as pa
--- End diff --

Ah, I see. Thanks for explaining it. Sure, I am okay with leaving it as is 
here.
I think we should catch and throw it with better messages in all cases 
(probably in the entry points) but probably let's talk about this later in 
another place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-15 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139261343
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,33 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize
+"""
+import pyarrow as pa
--- End diff --

Yeah, it would probably be best to handle it the same way as in 
`toPandas()`.

That got me thinking that it is a little weird to have an SQLConf 
"spark.sql.execution.arrow.enable" that is set for `toPandas()` but has no 
bearing with `pandas_udf`.  It doesn't need to since it is an explicit call but 
seems a little contradictory, what do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org