[ https://issues.apache.org/jira/browse/SPARK-25511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-25511: ---------------------------------- Component/s: (was: Optimizer) SQL > Map with "null" key not working in spark 2.3 > -------------------------------------------- > > Key: SPARK-25511 > URL: https://issues.apache.org/jira/browse/SPARK-25511 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.1 > Reporter: Ravi Shankar > Priority: Major > > I had a use case where i was creating a histogram of column values through a > UDAF in a Map data type. It is basically just a group by count on a column's > value that is returned as a Map<ColumnDataType, LongType>. I needed to plugin > all invalid values for the column as a "null -> count" in the map that was > returned. In 2.1.x, this was working fine and i could create a Map with > "null" being a key. This is not working in 2.3 and wondering if this is > expected and if i have to change my application code: > > {code:java} > val myList = List(("a", "1"), ("b", "2"), ("a", "3"), (null, "4")) > val map = myList.toMap > val data = List(List("sublime", map)) > val rdd = sc.parallelize(data).map(l ⇒ Row.fromSeq(l.toSeq)) > val datasetSchema = StructType(List(StructField("name", StringType, true), > StructField("songs", MapType(StringType, StringType, true), true))) > val df = spark.createDataFrame(rdd, datasetSchema) > df.take(5).foreach(println) > {code} > Output in spark 2.1.x: > {code:java} > scala> df.take(5).foreach(println) > [sublime,Map(a -> 3, b -> 2, null -> 4)] > {code} > Output in spark 2.3.x: > {code:java} > 2018-09-21 15:35:25 ERROR Executor:91 - Exception in task 2.0 in stage 14.0 > (TID 39) > java.lang.RuntimeException: Error while encoding: > java.lang.NullPointerException: Null value appeared in non-nullable field: > If the schema is inferred from a Scala tuple/case class, or a Java bean, > please try to use scala.Option[_] or other nullable types (e.g. > java.lang.Integer instead of int/scala.Int). > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, > fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, > org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS > name#38 > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) > AS songs#39 > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException: Null value appeared in > non-nullable field: > If the schema is inferred from a Scala tuple/case class, or a Java bean, > please try to use scala.Option[_] or other nullable types (e.g. > java.lang.Integer instead of int/scala.Int). > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288) > ... 23 more > 2018-09-21 15:35:25 WARN TaskSetManager:66 - Lost task 2.0 in stage 14.0 > (TID 39, localhost, executor driver): java.lang.RuntimeException: Error while > encoding: java.lang.NullPointerException: Null value appeared in non-nullable > field: > If the schema is inferred from a Scala tuple/case class, or a Java bean, > please try to use scala.Option[_] or other nullable types (e.g. > java.lang.Integer instead of int/scala.Int). > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, > fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, > org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS > name#38 > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) > AS songs#39 > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException: Null value appeared in > non-nullable field: > If the schema is inferred from a Scala tuple/case class, or a Java bean, > please try to use scala.Option[_] or other nullable types (e.g. > java.lang.Integer instead of int/scala.Int). > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288) > ... 23 more > 2018-09-21 15:35:25 ERROR TaskSetManager:70 - Task 2 in stage 14.0 failed 1 > times; aborting job > org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in > stage 14.0 failed 1 times, most recent failure: Lost task 2.0 in stage 14.0 > (TID 39, localhost, executor driver): java.lang.RuntimeException: Error while > encoding: java.lang.NullPointerException: Null value appeared in non-nullable > field: > If the schema is inferred from a Scala tuple/case class, or a Java bean, > please try to use scala.Option[_] or other nullable types (e.g. > java.lang.Integer instead of int/scala.Int). > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, > fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, > org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS > name#38 > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) > AS songs#39 > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException: Null value appeared in > non-nullable field: > If the schema is inferred from a Scala tuple/case class, or a Java bean, > please try to use scala.Option[_] or other nullable types (e.g. > java.lang.Integer instead of int/scala.Int). > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288) > ... 23 more > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067) > at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) > at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484) > at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) > at org.apache.spark.sql.Dataset.head(Dataset.scala:2484) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2698) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) > at org.apache.spark.sql.Dataset.show(Dataset.scala:723) > at org.apache.spark.sql.Dataset.show(Dataset.scala:682) > at org.apache.spark.sql.Dataset.show(Dataset.scala:691) > ... 55 elided > Caused by: java.lang.RuntimeException: Error while encoding: > java.lang.NullPointerException: Null value appeared in non-nullable field: > If the schema is inferred from a Scala tuple/case class, or a Java bean, > please try to use scala.Option[_] or other nullable types (e.g. > java.lang.Integer instead of int/scala.Int). > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, > fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, > org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS > name#38 > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) > AS songs#39 > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException: Null value appeared in > non-nullable field: > If the schema is inferred from a Scala tuple/case class, or a Java bean, > please try to use scala.Option[_] or other nullable types (e.g. > java.lang.Integer instead of int/scala.Int). > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288) > ... 23 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org