[ https://issues.apache.org/jira/browse/SPARK-24760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541967#comment-16541967 ]
Bryan Cutler commented on SPARK-24760: -------------------------------------- Yeah, createDataFrame is inconsistent with pandas_udf here, but you could argue the other way that your example with arrow disabled is incorrect. If NaN values are introduced into the Pandas DataFrame not manually, but maybe as a result of reindexing, then these represent NULL values and the user would expect to see it in Spark that way. For example: {code:java} In [19]: pdf = pd.DataFrame({"x": [1.0, 2.0, 4.0]}, index=[0, 1, 3]).reindex(range(4)) In [20]: pdf Out[20]: x 0 1.0 1 2.0 2 NaN 3 4.0 In [21]: pdf.isnull() Out[21]: x 0 False 1 False 2 True 3 False In [23]: spark.conf.set("spark.sql.execution.arrow.enabled", True) In [24]: spark.createDataFrame(pdf).show() +----+ | x| +----+ | 1.0| | 2.0| |null| | 4.0| +----+ {code} [~ueshin] and [~hyukjin.kwon] what do you think the correct behavior is for createDataFrames from Pandas that has NaN values? > Pandas UDF does not handle NaN correctly > ---------------------------------------- > > Key: SPARK-24760 > URL: https://issues.apache.org/jira/browse/SPARK-24760 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Pandas 0.23.1 > Reporter: Mortada Mehyar > Priority: Minor > > I noticed that having `NaN` values when using the new Pandas UDF feature > triggers a JVM exception. Not sure if this is an issue with PySpark or > PyArrow. Here is a somewhat contrived example to showcase the problem. > {code} > In [1]: import pandas as pd > ...: from pyspark.sql.functions import lit, pandas_udf, PandasUDFType > In [2]: d = [{'key': 'a', 'value': 1}, > {'key': 'a', 'value': 2}, > {'key': 'b', 'value': 3}, > {'key': 'b', 'value': -2}] > df = spark.createDataFrame(d, "key: string, value: int") > df.show() > +---+-----+ > |key|value| > +---+-----+ > | a| 1| > | a| 2| > | b| 3| > | b| -2| > +---+-----+ > In [3]: df_tmp = df.withColumn('new', lit(1.0)) # add a DoubleType column > df_tmp.printSchema() > root > |-- key: string (nullable = true) > |-- value: integer (nullable = true) > |-- new: double (nullable = false) > {code} > And the Pandas UDF is simply creating a new column where negative values > would be set to a particular float, in this case INF and it works fine > {code} > In [4]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) > ...: def func(pdf): > ...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('inf')) > ...: return pdf > In [5]: df.groupby('key').apply(func).show() > +---+-----+----------+ > |key|value|new| > +---+-----+----------+ > | b| 3| 3.0| > | b| -2| Infinity| > | a| 1| 1.0| > | a| 2| 2.0| > +---+-----+----------+ > {code} > However if we set this value to NaN then it triggers an exception: > {code} > In [6]: @pandas_udf(df_tmp.schema, PandasUDFType.GROUPED_MAP) > ...: def func(pdf): > ...: pdf['new'] = pdf['value'].where(pdf['value'] > 0, float('nan')) > ...: return pdf > ...: > ...: df.groupby('key').apply(func).show() > [Stage 23:======================================================> (73 + 2) / > 75]2018-07-07 16:26:27 ERROR Executor:91 - Exception in task 36.0 in stage > 23.0 (TID 414) > java.lang.IllegalStateException: Value at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2018-07-07 16:26:27 WARN TaskSetManager:66 - Lost task 36.0 in stage 23.0 > (TID 414, localhost, executor driver): java.lang.IllegalStateException: Value > at index is null > at org.apache.arrow.vector.Float8Vector.get(Float8Vector.java:98) > at > org.apache.spark.sql.vectorized.ArrowColumnVector$DoubleAccessor.getDouble(ArrowColumnVector.java:344) > at > org.apache.spark.sql.vectorized.ArrowColumnVector.getDouble(ArrowColumnVector.java:99) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getDouble(MutableColumnarRow.java:126) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org