[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553756#comment-16553756 ] Hyukjin Kwon commented on SPARK-24760: -- +1 for not a problem resolution for now. > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at >
[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553315#comment-16553315 ] Wes McKinney commented on SPARK-24760: -- If data comes to Spark from pandas, any "NaN" values should be treated as "null". Any other behavior is going to cause users significant problems > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at
[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541967#comment-16541967 ] Bryan Cutler commented on SPARK-24760: -- Yeah, createDataFrame is inconsistent with pandas_udf here, but you could argue the other way that your example with arrow disabled is incorrect. If NaN values are introduced into the Pandas DataFrame not manually, but maybe as a result of reindexing, then these represent NULL values and the user would expect to see it in Spark that way. For example: {code:java} In [19]: pdf = pd.DataFrame({"x": [1.0, 2.0, 4.0]}, index=[0, 1, 3]).reindex(range(4)) In [20]: pdf Out[20]: x 0 1.0 1 2.0 2 NaN 3 4.0 In [21]: pdf.isnull() Out[21]: x 0 False 1 False 2 True 3 False In [23]: spark.conf.set("spark.sql.execution.arrow.enabled", True) In [24]: spark.createDataFrame(pdf).show() ++ | x| ++ | 1.0| | 2.0| |null| | 4.0| ++ {code} [~ueshin] and [~hyukjin.kwon] what do you think the correct behavior is for createDataFrames from Pandas that has NaN values? > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at >
[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541209#comment-16541209 ] Mortada Mehyar commented on SPARK-24760: [~bryanc] thanks for the example. It looks like reasonable behavior of pandas/pyarrow to me though, because the pd.Series you have is of dtype 'O', and it is getting converted into a float dtype. And None becomes NaN whether the conversion happens in pandas or pyarrow. So, I actually found another example in pyspark which highlights the NaN handling issue, without creating a pandas_udf: {code} mortada_mehyar ~ $ pyspark --conf "spark.sql.execution.arrow.enabled=true" In [1]: import pandas as pd In [2]: df = pd.DataFrame({'a': [1, 2], 'b': [float('nan'), 1.2]}) In [3]: spark.createDataFrame(df).show() +---++ | a| b| +---++ | 1|null| | 2| 1.2| +---++ mortada_mehyar ~ $ pyspark --conf "spark.sql.execution.arrow.enabled=false" In [1]: import pandas as pd In [2]: df = pd.DataFrame({'a': [1, 2], 'b': [float('nan'), 1.2]}) In [3]: spark.createDataFrame(df).show() +---+---+ | a| b| +---+---+ | 1|NaN| | 2|1.2| +---+---+ {code} Note the code is identical and the only difference is the config value for `spark.sql.execution.arrow.enabled`, which defaults to false in spark 2.3.1. I think this would be quite a surprising behavior for users. cc [~wesmckinn] would appreciate your input, thanks! > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at >
[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538924#comment-16538924 ] Bryan Cutler commented on SPARK-24760: -- Pandas uses NaNs as a special value that it interprets as a NULL or a missing value {code:python} In [35]: s = pd.Series([1, None, float('nan')], dtype='O') In [36]: s Out[36]: 0 1 1 None 2 NaN dtype: object In [37]: s.isnull() Out[37]: 0 False 1 True 2 True dtype: bool{code} I'm not sure if there is a way to change the above behavior in Pandas. The only other possibility I can think of is to specify your own NULL mask when pyspark converts Pandas data to Arrow. This doesn't seem to be supported in Arrow currently {code:python} In [39]: pa.Array.from_pandas(s, mask=[False, True, False], type=pa.float64()) Out[39]: [ 1.0, NA, NA ] {code} > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at >
[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538234#comment-16538234 ] Mortada Mehyar commented on SPARK-24760: I still think something is not right here. It is true that pandas does not have a perfect equivalent to the NULL value in Spark, but I don't see why that means a pandas NaN needs to be changed to NULL in pyarrow or Spark? Spark clearly treats NaN and NULL as different values. Doesn't it make more sense to map a pandas NaN to a Spark NaN? Perhaps this is a better example to drive this point home: {code:java} In [1]: d = [{'value': 1.0}, ...: {'value': -2.0}, ...: {'value': float('nan')}, ...: {'value': 3.0}, ...: {'value': float('inf')}] ...: df = spark.createDataFrame(d, "value: double") ...: df.show() ...: ++ | value| ++ | 1.0| | -2.0| | NaN| | 3.0| |Infinity| ++ In [2]: from pyspark.sql.functions import pandas_udf, udf In [3]: from pyspark.sql.types import DoubleType In [4]: @udf(DoubleType()) ...: def identity_python(x): ...: return x In [5]: @pandas_udf(DoubleType()) ...: def identity_pandas(x): ...: return x In [6]: df.select(df['value'], identity_python(df['value'])).show() ++--+ | value|identity_python(value)| ++--+ | 1.0| 1.0| | -2.0| -2.0| | NaN| NaN| | 3.0| 3.0| |Infinity| Infinity| ++--+ In [7]: df.select(df['value'], identity_pandas(df['value'])).show() ++--+ | value|identity_pandas(value)| ++--+ | 1.0| 1.0| | -2.0| -2.0| | NaN| null| | 3.0| 3.0| |Infinity| Infinity| ++--+{code} Here we are simply defining identify functions as python UDF and pandas UDF. Why should the Spark pandas_pdf single out the NaN and turn that into NULL? > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at >
[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537893#comment-16537893 ] Bryan Cutler commented on SPARK-24760: -- Pandas interprets NaN to be missing data for numeric values (see [https://pandas.pydata.org/pandas-docs/stable/missing_data.html#inserting-missing-data)], which then becomes a NULL value in pyarrow and Spark. I think you might be able to configure Pandas differently to use another sentinel value for this. I'll close this since it's not a Spark issue. > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at
[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537864#comment-16537864 ] Mortada Mehyar commented on SPARK-24760: Setting the "new" column to be nullable indeed makes it not raise an exception, however the NaN value gets turned into NULL, which I'd argue is still not the correct behavior {code:java} In [6]: @pandas_udf("key: string, value: int, new: double", PandasUDFType.GROUPED_MAP) ...: def func(pdf): ...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) ...: return pdf In [7]: In [7]: df.groupby('key').apply(func).show() +---+-++ |key|value| new| +---+-++ | b| 3| 3.0| | b| -2|null| | a| 1| 1.0| | a| 2| 2.0| +---+-++{code} > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at
[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537817#comment-16537817 ] Mortada Mehyar commented on SPARK-24760: [~icexelloss] but NaN is not really a "null" value though, as least that seems inconsistent with what Spark considers to be null For instance: {code:java} In [39]: foo = spark.createDataFrame([(1, float('nan')), (2, None), (3, float('inf'))], ("a", "b")) In [40]: foo.printSchema() root |-- a: long (nullable = true) |-- b: double (nullable = true) In [41]: foo.show() +---++ | a| b| +---++ | 1| NaN| | 2| null| | 3|Infinity| +---++ In [42]: foo[F.isnull(foo['b'])].show() +---++ | a| b| +---++ | 2|null| +---++ In [43]: foo[F.isnan(foo['b'])].show() +---+---+ | a| b| +---+---+ | 1|NaN| +---+---+{code} > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at >
[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537662#comment-16537662 ] Li Jin commented on SPARK-24760: I think the issue here is that the output schema for the UDF is not correctly specified. You specified the output schema of the UDF to the same as input, which means the column "new" is not nullable, but you are returning a null value in your UDF. It doesn't match the schema and therefore the exception. Does that make sense? > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at
[jira] [Commented] (SPARK-24760) Pandas UDF does not handle NaN correctly
[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536540#comment-16536540 ] Mortada Mehyar commented on SPARK-24760: cc [~icexelloss] [~hyukjin.kwon] > Pandas UDF does not handle NaN correctly > > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 >Reporter: Mortada Mehyar >Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd >...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-+ > |key|value| > +---+-+ > | a|1| > | a|2| > | b|3| > | b| -2| > +---+-+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) >...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-+--+ > |key|value|new| > +---+-+--+ > | b|3| 3.0| > | b| -2| Infinity| > | a|1| 1.0| > | a|2| 2.0| > +---+-+--+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) >...: def func(pdf): >...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) >...: return pdf >...: >...: df.groupby('key').apply(func).show() > [Stage 23:==> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at >