[ 
https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308882#comment-16308882
 ] 

Matthew Fishkin edited comment on SPARK-22942 at 1/2/18 11:25 PM:
------------------------------------------------------------------

I would expect that to work too. I'm more curious why the null pointer is 
occurring when none of the data is null.

Interestingly, I found the following. When you change from 
{code:java}
val rightOnly = joined.filter("l.infos is null").select($"name", 
$"r.infos".as("r_infos"))
{code}
to
{code:java}
val rightOnly = joined.filter("l.infos is null and r.infos is not 
null").select($"name", $"r.infos".as("r_infos"))
{code}

the rest of the code above works. 

But I am pretty sure 
"l.infos is null and r.infos is not null" is equal to "l.infos is null". If one 
column of an outer join is null, the other must be defined.


was (Author: mjfish93):
I would expect that to work too. I'm more curious why the null pointer is 
occurring when none of the data is null.

Interestingly, I found the following. When you change from 
{code:java}
val rightOnly = joined.filter("l.infos is null").select($"name", 
$"r.infos".as("r_infos"))
{code}

{code:java}
val rightOnly = joined.filter("l.infos is null and r.infos is not 
null").select($"name", $"r.infos".as("r_infos"))
{code}

the rest of the code above works. 

But I am pretty sure 
"l.infos is null and r.infos is not null" is equal to "l.infos is null". If one 
column of an outer join is null, the other must be defined.

> Spark Sql UDF throwing NullPointer when adding a filter on a columns that 
> uses that UDF
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-22942
>                 URL: https://issues.apache.org/jira/browse/SPARK-22942
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell, SQL
>    Affects Versions: 2.2.0
>            Reporter: Matthew Fishkin
>
> I ran into an interesting issue when trying to do a `filter` on a dataframe 
> that has columns that were added using a UDF. I am able to replicate the 
> problem with a smaller set of data.
> Given the dummy case classes:
> {code:java}
> case class Info(number: Int, color: String)
> case class Record(name: String, infos: Seq[Info])
> {code}
> And the following data:
> {code:java}
> val blue = Info(1, "blue")
> val black = Info(2, "black")
> val yellow = Info(3, "yellow")
> val orange = Info(4, "orange")
> val white = Info(5, "white")
> val a  = Record("a", Seq(blue, black, white))
> val a2 = Record("a", Seq(yellow, white, orange))
> val b = Record("b", Seq(blue, black))
> val c = Record("c", Seq(white, orange))
>  val d = Record("d", Seq(orange, black))
> {code}
> Create two dataframes (we will call them left and right)
> {code:java}
> val left = Seq(a, b).toDF
> val right = Seq(a2, c, d).toDF
> {code}
> Join those two dataframes with an outer join (So two of our columns are 
> nullable now.
> {code:java}
> val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
> joined.show(false)
> res0:
> +----+--------------------------------+-----------------------------------+
> |name|infos                           |infos                              |
> +----+--------------------------------+-----------------------------------+
> |b   |[[1,blue], [2,black]]           |null                               |
> |a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
> |c   |null                            |[[5,white], [4,orange]]            |
> |d   |null                            |[[4,orange], [2,black]]            |
> +----+--------------------------------+-----------------------------------+
> {code}
> Then, take only the `name`s that exist in the right Dataframe
> {code:java}
> val rightOnly = joined.filter("l.infos is null").select($"name", 
> $"r.infos".as("r_infos"))
> rightOnly.show(false)
> res1:
> +----+-----------------------+
> |name|r_infos                |
> +----+-----------------------+
> |c   |[[5,white], [4,orange]]|
> |d   |[[4,orange], [2,black]]|
> +----+-----------------------+
> {code}
> Now, add a new column called `has_black` which will be true if the `r_infos` 
> contains _black_ as a color
> {code:java}
> def hasBlack = (s: Seq[Row]) => {
>   s.exists{ case Row(num: Int, color: String) =>
>     color == "black"
>   }
> }
> val rightBreakdown = rightOnly.withColumn("has_black", 
> udf(hasBlack).apply($"r_infos"))
> rightBreakdown.show(false)
> res2:
> +----+-----------------------+---------+
> |name|r_infos                |has_black|
> +----+-----------------------+---------+
> |c   |[[5,white], [4,orange]]|false    |
> |d   |[[4,orange], [2,black]]|true     |
> +----+-----------------------+---------+
> {code}
> So far, _exactly_ what we expected. 
> *However*, when I try to filter `rightBreakdown`, it fails.
> {code:java}
> rightBreakdown.filter("has_black == true").show(false)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$hasBlack$1: (array<struct<number:int,color:string>>) => 
> boolean)
>   at 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
>   at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
>   at scala.collection.immutable.List.exists(List.scala:84)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:145)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:152)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:150)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:150)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:116)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>   at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
>   at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
>   at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:646)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:623)
>   ... 58 elided
> Caused by: java.lang.NullPointerException
>   at $anonfun$hasBlack$1.apply(<console>:41)
>   at $anonfun$hasBlack$1.apply(<console>:40)
>   at 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:92)
>   at 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
>   ... 114 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to