[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-23 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553756#comment-16553756
 ] 

Hyukjin Kwon commented on SPARK-24760:
--

+1 for not a problem resolution for now.

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>   at 
> 

[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-23 Thread Wes McKinney (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553315#comment-16553315
 ] 

Wes McKinney commented on SPARK-24760:
--

If data comes to Spark from pandas, any "NaN" values should be treated as 
"null". Any other behavior is going to cause users significant problems

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at 

[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-12 Thread Bryan Cutler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541967#comment-16541967
 ] 

Bryan Cutler commented on SPARK-24760:
--

Yeah, createDataFrame is inconsistent with pandas_udf here, but you could argue 
the other way that your example with arrow disabled is incorrect.  If NaN 
values are introduced into the Pandas DataFrame not manually, but maybe as a 
result of reindexing, then these represent NULL values and the user would 
expect to see it in Spark that way.  For example:
{code:java}
 
 In [19]: pdf = pd.DataFrame({"x": [1.0, 2.0, 4.0]}, index=[0, 1, 
3]).reindex(range(4))
 
 In [20]: pdf
 Out[20]:
   x
 0 1.0
 1 2.0
 2 NaN
 3 4.0
 
 In [21]: pdf.isnull()
 Out[21]: 
 x
 0 False
 1 False
 2 True
 3 False

 In [23]: spark.conf.set("spark.sql.execution.arrow.enabled", True)
 
 In [24]: spark.createDataFrame(pdf).show()
 ++
 | x|
 ++
 | 1.0|
 | 2.0|
 |null|
 | 4.0|
 ++

{code}
[~ueshin] and [~hyukjin.kwon] what do you think the correct behavior is for 
createDataFrames from Pandas that has NaN values?

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at 
> 

[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-12 Thread Mortada Mehyar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541209#comment-16541209
 ] 

Mortada Mehyar commented on SPARK-24760:


[~bryanc] thanks for the example. It looks like reasonable behavior of 
pandas/pyarrow to me though, because the pd.Series you have is of dtype 'O', 
and it is getting converted into a float dtype. And None becomes NaN whether 
the conversion happens in pandas or pyarrow.

So, I actually found another example in pyspark which highlights the NaN 
handling issue, without creating a pandas_udf:
{code}
mortada_mehyar ~ $ pyspark --conf "spark.sql.execution.arrow.enabled=true"

In [1]: import pandas as pd
In [2]: df = pd.DataFrame({'a': [1, 2], 'b': [float('nan'), 1.2]})

In [3]: spark.createDataFrame(df).show()
+---++
|  a|   b|
+---++
|  1|null|
|  2| 1.2|
+---++

mortada_mehyar ~ $ pyspark --conf "spark.sql.execution.arrow.enabled=false"

In [1]: import pandas as pd
In [2]: df = pd.DataFrame({'a': [1, 2], 'b': [float('nan'), 1.2]})

In [3]: spark.createDataFrame(df).show()
+---+---+
|  a|  b|
+---+---+
|  1|NaN|
|  2|1.2|
+---+---+
{code}

Note the code is identical and the only difference is the config value for 
`spark.sql.execution.arrow.enabled`, which defaults to false in spark 2.3.1. I 
think this would be quite a surprising behavior for users.

cc [~wesmckinn] would appreciate your input, thanks!

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> 

[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-10 Thread Bryan Cutler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538924#comment-16538924
 ] 

Bryan Cutler commented on SPARK-24760:
--

Pandas uses NaNs as a special value that it interprets as a NULL or a missing 
value
{code:python}
In [35]: s = pd.Series([1, None, float('nan')], dtype='O')

In [36]: s
Out[36]:
0   1
1    None
2 NaN
dtype: object

In [37]: s.isnull()
Out[37]:
0    False
1 True
2 True
dtype: bool{code}

I'm not sure if there is a way to change the above behavior in Pandas.  The 
only other possibility I can think of is to specify your own NULL mask when 
pyspark converts Pandas data to Arrow.  This doesn't seem to be supported in 
Arrow currently

{code:python}
In [39]: pa.Array.from_pandas(s, mask=[False, True, False], type=pa.float64())
Out[39]: 

[
  1.0,
  NA,
  NA
]
{code}

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> 

[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-10 Thread Mortada Mehyar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538234#comment-16538234
 ] 

Mortada Mehyar commented on SPARK-24760:


I still think something is not right here. It is true that pandas does not have 
a perfect equivalent to the NULL value in Spark, but I don't see why that means 
a pandas NaN needs to be changed to NULL in pyarrow or Spark? Spark clearly 
treats NaN and NULL as different values. Doesn't it make more sense to map a 
pandas NaN to a Spark NaN? Perhaps this is a better example to drive this point 
home:
{code:java}
In [1]: d = [{'value': 1.0},
   ...:  {'value': -2.0},
   ...:  {'value': float('nan')},
   ...:  {'value': 3.0},
   ...:  {'value': float('inf')}]
   ...: df = spark.createDataFrame(d, "value: double")
   ...: df.show()
   ...:
++
|   value|
++
| 1.0|
|    -2.0|
| NaN|
| 3.0|
|Infinity|
++


In [2]: from pyspark.sql.functions import pandas_udf, udf

In [3]: from pyspark.sql.types import DoubleType

In [4]: @udf(DoubleType())
   ...: def identity_python(x):
   ...: return x

In [5]: @pandas_udf(DoubleType())
   ...: def identity_pandas(x):
   ...: return x


In [6]: df.select(df['value'], identity_python(df['value'])).show()
++--+
|   value|identity_python(value)|
++--+
| 1.0|   1.0|
|    -2.0|  -2.0|
| NaN|   NaN|
| 3.0|   3.0|
|Infinity|  Infinity|
++--+


In [7]: df.select(df['value'], identity_pandas(df['value'])).show()
++--+
|   value|identity_pandas(value)|
++--+
| 1.0|   1.0|
|    -2.0|  -2.0|
| NaN|  null|
| 3.0|   3.0|
|Infinity|  Infinity|
++--+{code}
Here we are simply defining identify functions as python UDF and pandas UDF. 
Why should the Spark pandas_pdf single out the NaN and turn that into NULL?

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> 

[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-09 Thread Bryan Cutler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537893#comment-16537893
 ] 

Bryan Cutler commented on SPARK-24760:
--

Pandas interprets NaN to be missing data for numeric values (see 
[https://pandas.pydata.org/pandas-docs/stable/missing_data.html#inserting-missing-data)],
 which then becomes a NULL value in pyarrow and Spark.  I think you might be 
able to configure Pandas differently to use another sentinel value for this.  
I'll close this since it's not a Spark issue.

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at 

[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-09 Thread Mortada Mehyar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537864#comment-16537864
 ] 

Mortada Mehyar commented on SPARK-24760:


Setting the "new" column to be nullable indeed makes it not raise an exception, 
however the NaN value gets turned into NULL, which I'd argue is still not the 
correct behavior
{code:java}
In [6]: @pandas_udf("key: string, value: int, new: double", 
PandasUDFType.GROUPED_MAP)
   ...: def func(pdf):
   ...:     pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
   ...: return pdf


In [7]:

In [7]: df.groupby('key').apply(func).show()
+---+-++
|key|value| new|
+---+-++
|  b|    3| 3.0|
|  b|   -2|null|
|  a|    1| 1.0|
|  a|    2| 2.0|
+---+-++{code}

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at 

[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-09 Thread Mortada Mehyar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537817#comment-16537817
 ] 

Mortada Mehyar commented on SPARK-24760:


[~icexelloss] but NaN is not really a "null" value though, as least that seems 
inconsistent with what Spark considers to be null

For instance:
{code:java}
In [39]: foo = spark.createDataFrame([(1, float('nan')), (2, None), (3, 
float('inf'))], ("a", "b"))

In [40]: foo.printSchema()
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)


In [41]: foo.show()
+---++
|  a|   b|
+---++
|  1| NaN|
|  2|    null|
|  3|Infinity|
+---++


In [42]: foo[F.isnull(foo['b'])].show()
+---++
|  a|   b|
+---++
|  2|null|
+---++


In [43]: foo[F.isnan(foo['b'])].show()
+---+---+
|  a|  b|
+---+---+
|  1|NaN|
+---+---+{code}
 

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> 

[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-09 Thread Li Jin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537662#comment-16537662
 ] 

Li Jin commented on SPARK-24760:


I think the issue here is that the output schema for the UDF is not correctly 
specified.

You specified the output schema of the UDF to the same as input, which means 
the column "new" is not nullable, but you are returning a null value in your 
UDF. It doesn't match the schema and therefore the exception.

Does that make sense?

 

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at 

[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly

2018-07-08 Thread Mortada Mehyar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536540#comment-16536540
 ] 

Mortada Mehyar commented on SPARK-24760:


cc [~icexelloss] [~hyukjin.kwon]

> Pandas UDF does not handle NaN correctly
> 
>
> Key: SPARK-24760
> URL: https://issues.apache.org/jira/browse/SPARK-24760
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Pandas 0.23.1
>Reporter: Mortada Mehyar
>Priority: Minor
>
> I noticed that having `NaN` values when using the new Pandas UDF feature 
> triggers a JVM exception. Not sure if this is an issue with PySpark or 
> PyArrow. Here is a somewhat contrived example to showcase the problem.
> {code}
> In [1]: import pandas as pd
>...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
> In [2]: d = [{'key': 'a', 'value': 1},
>  {'key': 'a', 'value': 2},
>  {'key': 'b', 'value': 3},
>  {'key': 'b', 'value': -2}]
> df = spark.createDataFrame(d, "key: string, value: int")
> df.show()
> +---+-+
> |key|value|
> +---+-+
> |  a|1|
> |  a|2|
> |  b|3|
> |  b|   -2|
> +---+-+
> In [3]: df_tmp = df.withColumn('new', lit(1.0))  # add a DoubleType column
> df_tmp.printSchema()
> root
>  |-- key: string (nullable = true)
>  |-- value: integer (nullable = true)
>  |-- new: double (nullable = false)
> {code}
> And the Pandas UDF is simply creating a new column where negative values 
> would be set to a particular float, in this case INF and it works fine
> {code}
> In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf'))
>...: return pdf
> In [5]: df.groupby('key').apply(func).show()
> +---+-+--+
> |key|value|new|
> +---+-+--+
> |  b|3|   3.0|
> |  b|   -2|  Infinity|
> |  a|1|   1.0|
> |  a|2|   2.0|
> +---+-+--+
> {code}
> However if we set this value to NaN then it triggers an exception:
> {code}
> In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP)
>...: def func(pdf):
>...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan'))
>...: return pdf
>...:
>...: df.groupby('key').apply(func).show()
> [Stage 23:==> (73 + 2) / 
> 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage 
> 23.0 (TID 414)
> java.lang.IllegalStateException: Value at index is null
>   at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344)
>   at 
> org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99)
>   at 
> org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>   at 
>