[ https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308830#comment-16308830 ]
Takeshi Yamamuro commented on SPARK-22942: ------------------------------------------ I think you just need NULL checks; {code} val hasBlack: Seq[Row] => Boolean = (s: Seq[Row]) => { if (s != null) { s.exists{ case Row(num: Int, color: String) => color == "black" } } else { false } } {code} > Spark Sql UDF throwing NullPointer when adding a filter on a columns that > uses that UDF > --------------------------------------------------------------------------------------- > > Key: SPARK-22942 > URL: https://issues.apache.org/jira/browse/SPARK-22942 > Project: Spark > Issue Type: Bug > Components: Spark Shell, SQL > Affects Versions: 2.2.0 > Reporter: Matthew Fishkin > > I ran into an interesting issue when trying to do a `filter` on a dataframe > that has columns that were added using a UDF. I am able to replicate the > problem with a smaller set of data. > Given the dummy case classes: > {code:java} > case class Info(number: Int, color: String) > case class Record(name: String, infos: Seq[Info]) > {code} > And the following data: > {code:java} > val blue = Info(1, "blue") > val black = Info(2, "black") > val yellow = Info(3, "yellow") > val orange = Info(4, "orange") > val white = Info(5, "white") > val a = Record("a", Seq(blue, black, white)) > val a2 = Record("a", Seq(yellow, white, orange)) > val b = Record("b", Seq(blue, black)) > val c = Record("c", Seq(white, orange)) > val d = Record("d", Seq(orange, black)) > {code} > Create two dataframes (we will call them left and right) > {code:java} > val left = Seq(a, b).toDF > val right = Seq(a2, c, d).toDF > {code} > Join those two dataframes with an outer join (So two of our columns are > nullable now. > {code:java} > val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer") > joined.show(false) > res0: > +----+--------------------------------+-----------------------------------+ > |name|infos |infos | > +----+--------------------------------+-----------------------------------+ > |b |[[1,blue], [2,black]] |null | > |a |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]| > |c |null |[[5,white], [4,orange]] | > |d |null |[[4,orange], [2,black]] | > +----+--------------------------------+-----------------------------------+ > {code} > Then, take only the `name`s that exist in the right Dataframe > {code:java} > val rightOnly = joined.filter("l.infos is null").select($"name", > $"r.infos".as("r_infos")) > rightOnly.show(false) > res1: > +----+-----------------------+ > |name|r_infos | > +----+-----------------------+ > |c |[[5,white], [4,orange]]| > |d |[[4,orange], [2,black]]| > +----+-----------------------+ > {code} > Now, add a new column called `has_black` which will be true if the `r_infos` > contains _black_ as a color > {code:java} > def hasBlack = (s: Seq[Row]) => { > s.exists{ case Row(num: Int, color: String) => > color == "black" > } > } > val rightBreakdown = rightOnly.withColumn("has_black", > udf(hasBlack).apply($"r_infos")) > rightBreakdown.show(false) > res2: > +----+-----------------------+---------+ > |name|r_infos |has_black| > +----+-----------------------+---------+ > |c |[[5,white], [4,orange]]|false | > |d |[[4,orange], [2,black]]|true | > +----+-----------------------+---------+ > {code} > So far, _exactly_ what we expected. > *However*, when I try to filter `rightBreakdown`, it fails. > {code:java} > rightBreakdown.filter("has_black == true").show(false) > org.apache.spark.SparkException: Failed to execute user defined > function($anonfun$hasBlack$1: (array<struct<number:int,color:string>>) => > boolean) > at > org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075) > at > org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) > at > scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:145) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:152) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:150) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:150) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:116) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) > at > org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832) > at org.apache.spark.sql.Dataset.head(Dataset.scala:2153) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2366) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:245) > at org.apache.spark.sql.Dataset.show(Dataset.scala:646) > at org.apache.spark.sql.Dataset.show(Dataset.scala:623) > ... 58 elided > Caused by: java.lang.NullPointerException > at $anonfun$hasBlack$1.apply(<console>:41) > at $anonfun$hasBlack$1.apply(<console>:40) > at > org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:92) > at > org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91) > at > org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072) > ... 114 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org