[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r223177785
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 can fail on special rows, the workaround is to incorporate the 
condition into the functions.
 
 .. note:: The user-defined functions do not take keyword arguments on 
the calling side.
+
+.. note:: The data type of returned `pandas.Series` from the 
user-defined functions should be
+matched with defined returnType. When there is mismatch between 
them, it is not guaranteed
+that the conversion by SparkSQL during serialization is correct at 
all and users might get
--- End diff --

Yeah, as actually we don't intentionally cast the returned data.

How about:
```
When there is mismatch between them, Spark might do conversion on returned 
data.
The conversion is not guaranteed to be correct and results should be 
checked for accuracy by users.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r223173637
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 can fail on special rows, the workaround is to incorporate the 
condition into the functions.
 
 .. note:: The user-defined functions do not take keyword arguments on 
the calling side.
+
+.. note:: The data type of returned `pandas.Series` from the 
user-defined functions should be
+matched with defined returnType. When there is mismatch between 
them, it is not guaranteed
+that the conversion by SparkSQL during serialization is correct at 
all and users might get
--- End diff --

>  an attempt will be made to cast the data and results should be checked 
for accuracy."

it sounds like the casting is intentional. I think the casting logic is not 
that clear as far as I can tell, comparing SQL casting logic. Can we leave this 
not guaranteed for now and document the casting logic here instead? Does Arrow 
have some kind of documentation for type conversion BTW?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r223070065
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 can fail on special rows, the workaround is to incorporate the 
condition into the functions.
 
 .. note:: The user-defined functions do not take keyword arguments on 
the calling side.
+
+.. note:: The data type of returned `pandas.Series` from the 
user-defined functions should be
+matched with defined returnType. When there is mismatch between 
them, it is not guaranteed
+that the conversion by SparkSQL during serialization is correct at 
all and users might get
--- End diff --

instead of saying "conversion is not guaranteed" which sounds like results 
might be arbitrary, could we say "..mismatch between them, an attempt will be 
made to cast the data and results should be checked for accuracy."?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222885910
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 can fail on special rows, the workaround is to incorporate the 
condition into the functions.
 
 .. note:: The user-defined functions do not take keyword arguments on 
the calling side.
+
+.. note:: The data type of returned `pandas.Series` from the 
user-defined functions should be
+matched with defined returnType. When there is mismatch between 
them, it is not guaranteed
+that the conversion by SparkSQL during serialization is correct at 
all and users might get
--- End diff --

maybe I am concerning too much .. but how about just say .. 

```
... defined returnType (see :meth:`types.to_arrow_type` and 
:meth:`types.from_arrow_type`). 
When there is mismatch between them, the conversion is not guaranteed.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222885267
  
--- Diff: python/pyspark/worker.py ---
@@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type):
 arrow_return_type = to_arrow_type(return_type)
 
 def verify_result_length(*a):
+import pyarrow as pa
 result = f(*a)
 if not hasattr(result, "__len__"):
 raise TypeError("Return type of the user-defined function 
should be "
 "Pandas.Series, but is 
{}".format(type(result)))
 if len(result) != len(a[0]):
 raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
"expected %d, got %d" % (len(a[0]), 
len(result)))
+
+# Ensure return type of Pandas.Series matches the arrow return 
type of the user-defined
+# function. Otherwise, we may produce incorrect serialized data.
+# Note: for timestamp type, we only need to ensure both types are 
timestamp because the
+# serializer will do conversion.
+try:
+arrow_type_of_result = pa.from_numpy_dtype(result.dtype)
+both_are_timestamp = 
pa.types.is_timestamp(arrow_type_of_result) and \
+pa.types.is_timestamp(arrow_return_type)
+if not both_are_timestamp and arrow_return_type != 
arrow_type_of_result:
+print("WARN: Arrow type %s of return Pandas.Series of the 
user-defined function's "
--- End diff --

Yes .. I support to just fix the doc first here only and make a PR 
separately later if needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222617651
  
--- Diff: python/pyspark/worker.py ---
@@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type):
 arrow_return_type = to_arrow_type(return_type)
 
 def verify_result_length(*a):
+import pyarrow as pa
 result = f(*a)
 if not hasattr(result, "__len__"):
 raise TypeError("Return type of the user-defined function 
should be "
 "Pandas.Series, but is 
{}".format(type(result)))
 if len(result) != len(a[0]):
 raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
"expected %d, got %d" % (len(a[0]), 
len(result)))
+
+# Ensure return type of Pandas.Series matches the arrow return 
type of the user-defined
+# function. Otherwise, we may produce incorrect serialized data.
+# Note: for timestamp type, we only need to ensure both types are 
timestamp because the
+# serializer will do conversion.
+try:
+arrow_type_of_result = pa.from_numpy_dtype(result.dtype)
+both_are_timestamp = 
pa.types.is_timestamp(arrow_type_of_result) and \
+pa.types.is_timestamp(arrow_return_type)
+if not both_are_timestamp and arrow_return_type != 
arrow_type_of_result:
+print("WARN: Arrow type %s of return Pandas.Series of the 
user-defined function's "
--- End diff --

hmm, I'm neutral on whether we should display this warning message, before 
we have an option to check the unsafe conversion by PyArrow. @HyukjinKwon if 
you are also supportive, I will remove this and leave this PR as documentation 
only.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222616380
  
--- Diff: python/pyspark/worker.py ---
@@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type):
 arrow_return_type = to_arrow_type(return_type)
 
 def verify_result_length(*a):
+import pyarrow as pa
 result = f(*a)
 if not hasattr(result, "__len__"):
 raise TypeError("Return type of the user-defined function 
should be "
 "Pandas.Series, but is 
{}".format(type(result)))
 if len(result) != len(a[0]):
 raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
"expected %d, got %d" % (len(a[0]), 
len(result)))
+
+# Ensure return type of Pandas.Series matches the arrow return 
type of the user-defined
+# function. Otherwise, we may produce incorrect serialized data.
+# Note: for timestamp type, we only need to ensure both types are 
timestamp because the
+# serializer will do conversion.
+try:
+arrow_type_of_result = pa.from_numpy_dtype(result.dtype)
+both_are_timestamp = 
pa.types.is_timestamp(arrow_type_of_result) and \
+pa.types.is_timestamp(arrow_return_type)
+if not both_are_timestamp and arrow_return_type != 
arrow_type_of_result:
+print("WARN: Arrow type %s of return Pandas.Series of the 
user-defined function's "
+  "dtype %s doesn't match the arrow type %s "
+  "of defined return type %s" % (arrow_type_of_result, 
result.dtype,
+ arrow_return_type, 
return_type),
+  file=sys.stderr)
+except:
+print("WARN: Can't infer arrow type of Pandas.Series's dtype: 
%s, which might not "
+  "match the arrow type %s of defined return type %s" % 
(result.dtype,
+ 
arrow_return_type,
+ 
return_type),
--- End diff --

Sorry I may misunderstand, do you mean L113 and L114 should be aligned with 
L112? But after that, lint-python will complain. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222501309
  
--- Diff: python/pyspark/worker.py ---
@@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type):
 arrow_return_type = to_arrow_type(return_type)
 
 def verify_result_length(*a):
+import pyarrow as pa
 result = f(*a)
 if not hasattr(result, "__len__"):
 raise TypeError("Return type of the user-defined function 
should be "
 "Pandas.Series, but is 
{}".format(type(result)))
 if len(result) != len(a[0]):
 raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
"expected %d, got %d" % (len(a[0]), 
len(result)))
+
+# Ensure return type of Pandas.Series matches the arrow return 
type of the user-defined
+# function. Otherwise, we may produce incorrect serialized data.
+# Note: for timestamp type, we only need to ensure both types are 
timestamp because the
+# serializer will do conversion.
+try:
+arrow_type_of_result = pa.from_numpy_dtype(result.dtype)
+both_are_timestamp = 
pa.types.is_timestamp(arrow_type_of_result) and \
+pa.types.is_timestamp(arrow_return_type)
+if not both_are_timestamp and arrow_return_type != 
arrow_type_of_result:
+print("WARN: Arrow type %s of return Pandas.Series of the 
user-defined function's "
--- End diff --

Yeah, it might be useful to see the warning if doing some local tests etc.  
My only concern is that users might be confused why they see a warning locally, 
but doesn't appear in logs.. Man, it would be nice to have some proper python 
logging for this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222173904
  
--- Diff: python/pyspark/worker.py ---
@@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type):
 arrow_return_type = to_arrow_type(return_type)
 
 def verify_result_length(*a):
+import pyarrow as pa
 result = f(*a)
 if not hasattr(result, "__len__"):
 raise TypeError("Return type of the user-defined function 
should be "
 "Pandas.Series, but is 
{}".format(type(result)))
 if len(result) != len(a[0]):
 raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
"expected %d, got %d" % (len(a[0]), 
len(result)))
+
+# Ensure return type of Pandas.Series matches the arrow return 
type of the user-defined
+# function. Otherwise, we may produce incorrect serialized data.
+# Note: for timestamp type, we only need to ensure both types are 
timestamp because the
+# serializer will do conversion.
+try:
+arrow_type_of_result = pa.from_numpy_dtype(result.dtype)
+both_are_timestamp = 
pa.types.is_timestamp(arrow_type_of_result) and \
+pa.types.is_timestamp(arrow_return_type)
+if not both_are_timestamp and arrow_return_type != 
arrow_type_of_result:
+print("WARN: Arrow type %s of return Pandas.Series of the 
user-defined function's "
+  "dtype %s doesn't match the arrow type %s "
+  "of defined return type %s" % (arrow_type_of_result, 
result.dtype,
+ arrow_return_type, 
return_type),
+  file=sys.stderr)
+except:
+print("WARN: Can't infer arrow type of Pandas.Series's dtype: 
%s, which might not "
+  "match the arrow type %s of defined return type %s" % 
(result.dtype,
+ 
arrow_return_type,
+ 
return_type),
--- End diff --

ok. thanks. :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222173421
  
--- Diff: python/pyspark/worker.py ---
@@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type):
 arrow_return_type = to_arrow_type(return_type)
 
 def verify_result_length(*a):
+import pyarrow as pa
 result = f(*a)
 if not hasattr(result, "__len__"):
 raise TypeError("Return type of the user-defined function 
should be "
 "Pandas.Series, but is 
{}".format(type(result)))
 if len(result) != len(a[0]):
 raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
"expected %d, got %d" % (len(a[0]), 
len(result)))
+
+# Ensure return type of Pandas.Series matches the arrow return 
type of the user-defined
+# function. Otherwise, we may produce incorrect serialized data.
+# Note: for timestamp type, we only need to ensure both types are 
timestamp because the
+# serializer will do conversion.
+try:
+arrow_type_of_result = pa.from_numpy_dtype(result.dtype)
+both_are_timestamp = 
pa.types.is_timestamp(arrow_type_of_result) and \
+pa.types.is_timestamp(arrow_return_type)
+if not both_are_timestamp and arrow_return_type != 
arrow_type_of_result:
+print("WARN: Arrow type %s of return Pandas.Series of the 
user-defined function's "
+  "dtype %s doesn't match the arrow type %s "
+  "of defined return type %s" % (arrow_type_of_result, 
result.dtype,
+ arrow_return_type, 
return_type),
+  file=sys.stderr)
+except:
+print("WARN: Can't infer arrow type of Pandas.Series's dtype: 
%s, which might not "
+  "match the arrow type %s of defined return type %s" % 
(result.dtype,
+ 
arrow_return_type,
+ 
return_type),
--- End diff --

I would fix the indentation here tho :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222015287
  
--- Diff: python/pyspark/worker.py ---
@@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type):
 arrow_return_type = to_arrow_type(return_type)
 
 def verify_result_length(*a):
+import pyarrow as pa
 result = f(*a)
 if not hasattr(result, "__len__"):
 raise TypeError("Return type of the user-defined function 
should be "
 "Pandas.Series, but is 
{}".format(type(result)))
 if len(result) != len(a[0]):
 raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
"expected %d, got %d" % (len(a[0]), 
len(result)))
+
+# Ensure return type of Pandas.Series matches the arrow return 
type of the user-defined
+# function. Otherwise, we may produce incorrect serialized data.
+# Note: for timestamp type, we only need to ensure both types are 
timestamp because the
+# serializer will do conversion.
+try:
+arrow_type_of_result = pa.from_numpy_dtype(result.dtype)
+both_are_timestamp = 
pa.types.is_timestamp(arrow_type_of_result) and \
+pa.types.is_timestamp(arrow_return_type)
+if not both_are_timestamp and arrow_return_type != 
arrow_type_of_result:
+print("WARN: Arrow type %s of return Pandas.Series of the 
user-defined function's "
--- End diff --

No, but as the other print usage in `worker.py`, I think this can be seen 
in the worker log?

This is also useful when testing in pyspark shell.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-02 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222007837
  
--- Diff: python/pyspark/worker.py ---
@@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type):
 arrow_return_type = to_arrow_type(return_type)
 
 def verify_result_length(*a):
+import pyarrow as pa
 result = f(*a)
 if not hasattr(result, "__len__"):
 raise TypeError("Return type of the user-defined function 
should be "
 "Pandas.Series, but is 
{}".format(type(result)))
 if len(result) != len(a[0]):
 raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
"expected %d, got %d" % (len(a[0]), 
len(result)))
+
+# Ensure return type of Pandas.Series matches the arrow return 
type of the user-defined
+# function. Otherwise, we may produce incorrect serialized data.
+# Note: for timestamp type, we only need to ensure both types are 
timestamp because the
+# serializer will do conversion.
+try:
+arrow_type_of_result = pa.from_numpy_dtype(result.dtype)
+both_are_timestamp = 
pa.types.is_timestamp(arrow_type_of_result) and \
+pa.types.is_timestamp(arrow_return_type)
+if not both_are_timestamp and arrow_return_type != 
arrow_type_of_result:
+print("WARN: Arrow type %s of return Pandas.Series of the 
user-defined function's "
--- End diff --

Will this appear when being run in an executor?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-02 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/22610

[WIP][SPARK-25461][PySpark][SQL] Print warning when return type of 
Pandas.Series mismatches the arrow return type of pandas udf

## What changes were proposed in this pull request?

For Pandas UDFs, we get arrow type from defined Catalyst return data type 
of UDFs. We use this arrow type to do serialization of data. If the defined 
return data type doesn't match with actual return type of Pandas.Series 
returned by Pandas UDFs, it has a risk to return incorrect data from Python 
side.

This WIP work proposes to check if returned Pandas.Series's dtype matches 
with defined return type of Pandas UDFs.

Although we can disallow it by throwing an exception to let users know they 
might need to set correct return type. But looks like we leverage such behavior 
in current codebase. For example, there is a test 
`test_vectorized_udf_null_short`:

```python
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("short", ShortType())
df = self.spark.createDataFrame(data, schema)
short_f = pandas_udf(lambda x: x, ShortType())
res = df.select(short_f(col('short')))
self.assertEquals(df.collect(), res.collect())
```
So instead, this work for now just prints warning message if such 
mismatching is detected. So users can read this message when debugging that 
their Pandas UDFs don't produce expected results.

## How was this patch tested?

Manually test by running:

```python
from pyspark.sql.functions import pandas_udf
import pandas as pd

values = [1.0] * 5 + [2.0] * 5
pdf = pd.DataFrame({'A': values})
df = spark.createDataFrame(pdf)
@pandas_udf(returnType=BooleanType())
def to_boolean(column):
return column
df.select(['A']).withColumn('to_boolean', to_boolean('A')).show()
```

Output:

```
WARN: Arrow type double of return Pandas.Series of the user-defined 
function's dtype float64 doesn't match the arrow type bool of defined return 
type B
ooleanType  

+---+--+

|  A|to_boolean|

+---+--+

|1.0| false|

|1.0| false|

|1.0| false|
  
|1.0| false|
 
|1.0| false|

|2.0| false|
   
|2.0| false|

|2.0| false|

|2.0| false|

|2.0| false|
   
+---+--+  
```   

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-25461

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22610.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22610


commit 2fa15bda48ba64a102f114dc9119cb3c310200c4
Author: Liang-Chi Hsieh 
Date:   2018-09-26T09:01:40Z

Ensure return type of Pandas.Series matches the arrow return type of pandas 
udf.

commit d206b7cf78f898e622f539a15e45515fcbd9e54a
Author: Liang-Chi Hsieh 
Date:   2018-10-02T05:29:44Z

Print warning message instead of throwing exception.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org