[ 
https://issues.apache.org/jira/browse/SPARK-25511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-25511:
----------------------------------
    Component/s:     (was: Optimizer)
                 SQL

> Map with "null" key not working in spark 2.3
> --------------------------------------------
>
>                 Key: SPARK-25511
>                 URL: https://issues.apache.org/jira/browse/SPARK-25511
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.1
>            Reporter: Ravi Shankar
>            Priority: Major
>
> I had a use case where i was creating a histogram of column values through a 
> UDAF in a Map data type. It is basically just a group by count on a column's 
> value that is returned as a Map<ColumnDataType, LongType>. I needed to plugin 
> all invalid values for the column as a "null -> count" in the map that was 
> returned. In 2.1.x, this was working fine and i could create a Map with 
> "null" being a key. This is not working in 2.3 and wondering if this is 
> expected and if i have to change my application code: 
>  
> {code:java}
> val myList = List(("a", "1"), ("b", "2"), ("a", "3"), (null, "4"))
> val map = myList.toMap
> val data = List(List("sublime", map))
> val rdd = sc.parallelize(data).map(l ⇒ Row.fromSeq(l.toSeq))
> val datasetSchema = StructType(List(StructField("name", StringType, true), 
> StructField("songs", MapType(StringType, StringType, true), true)))
> val df = spark.createDataFrame(rdd, datasetSchema)
> df.take(5).foreach(println)
> {code}
> Output in spark 2.1.x:
> {code:java}
> scala> df.take(5).foreach(println)
> [sublime,Map(a -> 3, b -> 2, null -> 4)]
> {code}
> Output in spark 2.3.x:
> {code:java}
> 2018-09-21 15:35:25 ERROR Executor:91 - Exception in task 2.0 in stage 14.0 
> (TID 39)
> java.lang.RuntimeException: Error while encoding: 
> java.lang.NullPointerException: Null value appeared in non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, 
> please try to use scala.Option[_] or other nullable types (e.g. 
> java.lang.Integer instead of int/scala.Int).
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
> org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS 
> name#38
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) 
> AS songs#39
>       at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
>       at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
>       at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:109)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException: Null value appeared in 
> non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, 
> please try to use scala.Option[_] or other nullable types (e.g. 
> java.lang.Integer instead of int/scala.Int).
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
>       ... 23 more
> 2018-09-21 15:35:25 WARN  TaskSetManager:66 - Lost task 2.0 in stage 14.0 
> (TID 39, localhost, executor driver): java.lang.RuntimeException: Error while 
> encoding: java.lang.NullPointerException: Null value appeared in non-nullable 
> field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, 
> please try to use scala.Option[_] or other nullable types (e.g. 
> java.lang.Integer instead of int/scala.Int).
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
> org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS 
> name#38
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) 
> AS songs#39
>       at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
>       at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
>       at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:109)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException: Null value appeared in 
> non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, 
> please try to use scala.Option[_] or other nullable types (e.g. 
> java.lang.Integer instead of int/scala.Int).
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
>       ... 23 more
> 2018-09-21 15:35:25 ERROR TaskSetManager:70 - Task 2 in stage 14.0 failed 1 
> times; aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in 
> stage 14.0 failed 1 times, most recent failure: Lost task 2.0 in stage 14.0 
> (TID 39, localhost, executor driver): java.lang.RuntimeException: Error while 
> encoding: java.lang.NullPointerException: Null value appeared in non-nullable 
> field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, 
> please try to use scala.Option[_] or other nullable types (e.g. 
> java.lang.Integer instead of int/scala.Int).
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
> org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS 
> name#38
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) 
> AS songs#39
>       at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
>       at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
>       at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:109)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException: Null value appeared in 
> non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, 
> please try to use scala.Option[_] or other nullable types (e.g. 
> java.lang.Integer instead of int/scala.Int).
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
>       ... 23 more
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
>   at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
>   at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
>   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
>   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
>   at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
>   ... 55 elided
> Caused by: java.lang.RuntimeException: Error while encoding: 
> java.lang.NullPointerException: Null value appeared in non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, 
> please try to use scala.Option[_] or other nullable types (e.g. 
> java.lang.Integer instead of int/scala.Int).
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
> org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS 
> name#38
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) 
> AS songs#39
>   at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
>   at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
>   at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException: Null value appeared in 
> non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, 
> please try to use scala.Option[_] or other nullable types (e.g. 
> java.lang.Integer instead of int/scala.Int).
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
>   ... 23 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to