[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them

2017-04-27 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986190#comment-15986190
 ] 

Takeshi Yamamuro commented on SPARK-20312:
--

I'll close this because the issue has already been fixed. If any issue, feel 
free to reopen this. Thanks!

> query optimizer calls udf with null values when it doesn't expect them
> --
>
> Key: SPARK-20312
> URL: https://issues.apache.org/jira/browse/SPARK-20312
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Albert Meltzer
>
> When optimizing an outer join, spark passes an empty row to both sides to see 
> if nulls would be ignored (side comment: for half-outer joins it subsequently 
> ignores the assessment on the dominant side).
> For some reason, a condition such as {{xx IS NOT NULL && udf(xx) IS NOT 
> NULL}} might result in checking the right side first, and an exception if the 
> udf doesn't expect a null input (given the left side first).
> A example is SIMILAR to the following (see actual query plans separately):
> {noformat}
> def func(value: Any): Int = ... // return AnyVal which probably causes a IS 
> NOT NULL added filter on the result
> val df1 = sparkSession
>   .table(...)
>   .select("col1", "col2") // LongType both
> val df11 = df1
>   .filter(df1("col1").isNotNull)
>   .withColumn("col3", functions.udf(func)(df1("col1"))
>   .repartition(df1("col2"))
>   .sortWithinPartitions(df1("col2"))
> val df2 = ... // load other data containing col2, similarly repartition and 
> sort
> val df3 =
>   df1.join(df2, Seq("col2"), "left_outer")
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them

2017-04-27 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986189#comment-15986189
 ] 

Takeshi Yamamuro commented on SPARK-20312:
--

I checked there was no issue in the current master. IIUC this issue has been 
fixed in SPARK-20359. This fix has already been applied into branch-2.1

> query optimizer calls udf with null values when it doesn't expect them
> --
>
> Key: SPARK-20312
> URL: https://issues.apache.org/jira/browse/SPARK-20312
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Albert Meltzer
>
> When optimizing an outer join, spark passes an empty row to both sides to see 
> if nulls would be ignored (side comment: for half-outer joins it subsequently 
> ignores the assessment on the dominant side).
> For some reason, a condition such as {{xx IS NOT NULL && udf(xx) IS NOT 
> NULL}} might result in checking the right side first, and an exception if the 
> udf doesn't expect a null input (given the left side first).
> A example is SIMILAR to the following (see actual query plans separately):
> {noformat}
> def func(value: Any): Int = ... // return AnyVal which probably causes a IS 
> NOT NULL added filter on the result
> val df1 = sparkSession
>   .table(...)
>   .select("col1", "col2") // LongType both
> val df11 = df1
>   .filter(df1("col1").isNotNull)
>   .withColumn("col3", functions.udf(func)(df1("col1"))
>   .repartition(df1("col2"))
>   .sortWithinPartitions(df1("col2"))
> val df2 = ... // load other data containing col2, similarly repartition and 
> sort
> val df3 =
>   df1.join(df2, Seq("col2"), "left_outer")
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them

2017-04-24 Thread Albert Meltzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981694#comment-15981694
 ] 

Albert Meltzer commented on SPARK-20312:


[~maropu]: making the query a bit simpler might make the logic work 
differently. Try this:

{noformat}
case class C12(c1: Long, c2: Long)
case class C23(c2: Long, c3: Long)
case class C34(c3: Long, c4: Long)

import sparkSession.implicits._

val udf = functions.udf { (x: java.lang.Long) => x.toString }

val df12n: DataFrame = sparkSession.emptyDataset[C12].toDF()
val df23n: DataFrame = sparkSession.emptyDataset[C23].toDF()
val df34n: DataFrame = sparkSession.emptyDataset[C34].toDF()

val df12 = df12n
  .filter(df12n("c1").isNotNull)
  .filter(df12n("c2").isNotNull)
  .repartition(df12n("c2"))
  .sortWithinPartitions(df12n("c2"))

val df23 = df23n
  .filter(df23n("c2").isNotNull)
  .filter(df23n("c3").isNotNull)
  .repartition(df23n("c2"))
  .sortWithinPartitions(df23n("c2"))

val df34 = df34n
  .filter(df34n("c3").isNotNull)
  .filter(df34n("c4").isNotNull)
  .repartition(df34n("c3"))
  .sortWithinPartitions(df34n("c3"))

val df13: DataFrame = df23
  .join(df12, Seq("c2"), "left_outer")
  .select(df12("c1"), df23("c2"), df23("c3"))

val df14: DataFrame = df34
  .join(df13, Seq("c3"), "left_outer")
  .select(df13("c1"), df13("c2"), df34("c3"), df34("c4"))

val df: DataFrame = df14
  .withColumn("c0", udf(df14("c4")))
  .repartition(sparkSession.sparkContext.defaultParallelism)

val result = df
  .filter(functions.col("c3").isNotNull)
  .filter(functions.col("c4").isNotNull)
  .filter(functions.col("c0").isNotNull)
  .collect()
{noformat}


> query optimizer calls udf with null values when it doesn't expect them
> --
>
> Key: SPARK-20312
> URL: https://issues.apache.org/jira/browse/SPARK-20312
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Albert Meltzer
>
> When optimizing an outer join, spark passes an empty row to both sides to see 
> if nulls would be ignored (side comment: for half-outer joins it subsequently 
> ignores the assessment on the dominant side).
> For some reason, a condition such as {{xx IS NOT NULL && udf(xx) IS NOT 
> NULL}} might result in checking the right side first, and an exception if the 
> udf doesn't expect a null input (given the left side first).
> A example is SIMILAR to the following (see actual query plans separately):
> {noformat}
> def func(value: Any): Int = ... // return AnyVal which probably causes a IS 
> NOT NULL added filter on the result
> val df1 = sparkSession
>   .table(...)
>   .select("col1", "col2") // LongType both
> val df11 = df1
>   .filter(df1("col1").isNotNull)
>   .withColumn("col3", functions.udf(func)(df1("col1"))
>   .repartition(df1("col2"))
>   .sortWithinPartitions(df1("col2"))
> val df2 = ... // load other data containing col2, similarly repartition and 
> sort
> val df3 =
>   df1.join(df2, Seq("col2"), "left_outer")
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them

2017-04-17 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971997#comment-15971997
 ] 

Takeshi Yamamuro commented on SPARK-20312:
--

I made the query a bit simpler and tried though, I couldn't reproduce this in 
the master;
{code}
val testUdf = udf { (s: String) => s }

val df1 = Seq((1L, null), (2L, "")).toDF("a", "b")
  .filter($"b".isNotNull)
  .withColumn("c", testUdf($"b"))
  .repartition($"a")
  .sortWithinPartitions($"a")

val df2 = Seq((1L, "")).toDF("a", "b")
  .filter($"b".isNotNull)
  .repartition($"a")
  .sortWithinPartitions($"a")

df1.join(df2, "a" :: Nil, "left_outer").explain(true)

== Analyzed Logical Plan ==
a: bigint, b: string, c: string, b: string
Project [a#509L, b#510, c#515, b#528]
+- Join LeftOuter, (a#509L = a#527L)
   :- Sort [a#509L ASC NULLS FIRST], false
   :  +- RepartitionByExpression [a#509L], 200
   : +- Project [a#509L, b#510, UDF(b#510) AS c#515]
   :+- Filter isnotnull(b#510)
   :   +- Project [_1#506L AS a#509L, _2#507 AS b#510]
   :  +- LocalRelation [_1#506L, _2#507]
   +- Sort [a#527L ASC NULLS FIRST], false
  +- RepartitionByExpression [a#527L], 200
 +- Filter isnotnull(b#528)
+- Project [_1#524L AS a#527L, _2#525 AS b#528]
   +- LocalRelation [_1#524L, _2#525]

== Optimized Logical Plan ==
Project [a#509L, b#510, c#515, b#528]
+- Join LeftOuter, (a#509L = a#527L)
   :- Sort [a#509L ASC NULLS FIRST], false
   :  +- RepartitionByExpression [a#509L], 200
   : +- Project [_1#506L AS a#509L, _2#507 AS b#510, UDF(_2#507) AS c#515]
   :+- Filter isnotnull(_2#507)
   :   +- LocalRelation [_1#506L, _2#507]
   +- Sort [a#527L ASC NULLS FIRST], false
  +- RepartitionByExpression [a#527L], 200
 +- Project [_1#524L AS a#527L, _2#525 AS b#528]
+- Filter isnotnull(_2#525)
   +- LocalRelation [_1#524L, _2#525]
{code}

Do I miss anything? If yes, could you give me more?

> query optimizer calls udf with null values when it doesn't expect them
> --
>
> Key: SPARK-20312
> URL: https://issues.apache.org/jira/browse/SPARK-20312
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Albert Meltzer
>
> When optimizing an outer join, spark passes an empty row to both sides to see 
> if nulls would be ignored (side comment: for half-outer joins it subsequently 
> ignores the assessment on the dominant side).
> For some reason, a condition such as {{xx IS NOT NULL && udf(xx) IS NOT 
> NULL}} might result in checking the right side first, and an exception if the 
> udf doesn't expect a null input (given the left side first).
> A example is SIMILAR to the following (see actual query plans separately):
> {noformat}
> def func(value: Any): Int = ... // return AnyVal which probably causes a IS 
> NOT NULL added filter on the result
> val df1 = sparkSession
>   .table(...)
>   .select("col1", "col2") // LongType both
> val df11 = df1
>   .filter(df1("col1").isNotNull)
>   .withColumn("col3", functions.udf(func)(df1("col1"))
>   .repartition(df1("col2"))
>   .sortWithinPartitions(df1("col2"))
> val df2 = ... // load other data containing col2, similarly repartition and 
> sort
> val df3 =
>   df1.join(df2, Seq("col2"), "left_outer")
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them

2017-04-12 Thread Albert Meltzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966094#comment-15966094
 ] 

Albert Meltzer commented on SPARK-20312:


During query optimization, one of the subtrees becomes:

{noformat}
Project [col4#0L, col3#5L, col2#10L, col1#11L, 2016-11-11 AS version#59, 
col5#45]
+- Repartition 8, true
   +- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS col5#45]
  +- Filter isnotnull(UDF(col1#11L))
 +- Join LeftOuter, (col2#10L = col2#6L)
:- Sort [col2#10L ASC NULLS FIRST], false
:  +- RepartitionByExpression [col2#10L]
: +- Filter (isnotnull(col2#10L) && isnotnull(col1#11L))
:+- LocalRelation [col2#10L, col1#11L]
+- Project [col4#0L, col3#5L, col2#6L]
   +- Join LeftOuter, (col3#5L = col3#1L)
  :- Sort [col3#5L ASC NULLS FIRST], false
  :  +- RepartitionByExpression [col3#5L]
  : +- Filter (isnotnull(col3#5L) && isnotnull(col2#6L))
  :+- LocalRelation [col3#5L, col2#6L]
  +- Sort [col3#1L ASC NULLS FIRST], false
 +- RepartitionByExpression [col3#1L]
+- Filter (isnotnull(col4#0L) && isnotnull(col3#1L))
   +- LocalRelation [col4#0L, col3#1L]
{noformat}

And that causes evaluation of the UDF in the 
{{org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin}} class, before 
the filter of the parameter value.

> query optimizer calls udf with null values when it doesn't expect them
> --
>
> Key: SPARK-20312
> URL: https://issues.apache.org/jira/browse/SPARK-20312
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Albert Meltzer
>
> When optimizing an outer join, spark passes an empty row to both sides to see 
> if nulls would be ignored (side comment: for half-outer joins it subsequently 
> ignores the assessment on the dominant side).
> For some reason, a condition such as "x IS NOT NULL && udf(x) IS NOT NULL" 
> might result in checking the right side first, and an exception if the udf 
> doesn't expect a null input (given the left side first).
> A example is SIMILAR to the following (see actual query plans separately):
> def func(value: Any): Int = ... // return AnyVal which probably causes a IS 
> NOT NULL added filter on the result
> val df1 = sparkSession
>   .table(...)
>   .select("col1", "col2") // LongType both
> val df11 = df1
>   .filter(df1("col1").isNotNull)
>   .withColumn("col3", functions.udf(func)(df1("col1"))
>   .repartition(df1("col2"))
>   .sortWithinPartitions(df1("col2"))
> val df2 = ... // load other data containing col2, similarly repartition and 
> sort
> val df3 =
>   df1.join(df2, Seq("col2"), "left_outer")



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them

2017-04-12 Thread Albert Meltzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966091#comment-15966091
 ] 

Albert Meltzer commented on SPARK-20312:


Query plans are as follows:

{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `database`.`table`, 
OverwriteOptions(true,Map()), false
+- Filter (((isnotnull(col2#10L) && isnotnull(col1#11L)) && 
isnotnull(version#59)) && isnotnull(col5#45))
   +- Project [col4#0L, col3#5L, col2#10L, col1#11L, version#59, col5#45]
  +- Project [col4#0L, col3#5L, col2#10L, col1#11L, col5#45, 2016-11-11 AS 
version#59]
 +- Repartition 8, true
+- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS 
col5#45]
   +- Project [col4#0L, col3#5L, col2#10L, col1#11L]
  +- Project [col2#10L, col1#11L, col4#0L, col3#5L]
 +- Join LeftOuter, (col2#10L = col2#6L)
:- Sort [col2#10L ASC NULLS FIRST], false
:  +- RepartitionByExpression [col2#10L]
: +- Filter isnotnull(col1#11L)
:+- Filter isnotnull(col2#10L)
:   +- LocalRelation [col2#10L, col1#11L]
+- Project [col4#0L, col3#5L, col2#6L]
   +- Project [col3#5L, col2#6L, col4#0L]
  +- Join LeftOuter, (col3#5L = col3#1L)
 :- Sort [col3#5L ASC NULLS FIRST], false
 :  +- RepartitionByExpression [col3#5L]
 : +- Filter isnotnull(col2#6L)
 :+- Filter isnotnull(col3#5L)
 :   +- LocalRelation [col3#5L, col2#6L]
 +- Sort [col3#1L ASC NULLS FIRST], false
+- RepartitionByExpression [col3#1L]
   +- Filter isnotnull(col3#1L)
  +- Filter isnotnull(col4#0L)
 +- LocalRelation [col4#0L, col3#1L]

== Analyzed Logical Plan ==
InsertIntoTable MetastoreRelation database, table, Map(version -> None, col5 -> 
None), OverwriteOptions(true,Map()), false
+- Filter (((isnotnull(col2#10L) && isnotnull(col1#11L)) && 
isnotnull(version#59)) && isnotnull(col5#45))
   +- Project [col4#0L, col3#5L, col2#10L, col1#11L, version#59, col5#45]
  +- Project [col4#0L, col3#5L, col2#10L, col1#11L, col5#45, 2016-11-11 AS 
version#59]
 +- Repartition 8, true
+- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS 
col5#45]
   +- Project [col4#0L, col3#5L, col2#10L, col1#11L]
  +- Project [col2#10L, col1#11L, col4#0L, col3#5L]
 +- Join LeftOuter, (col2#10L = col2#6L)
:- Sort [col2#10L ASC NULLS FIRST], false
:  +- RepartitionByExpression [col2#10L]
: +- Filter isnotnull(col1#11L)
:+- Filter isnotnull(col2#10L)
:   +- LocalRelation [col2#10L, col1#11L]
+- Project [col4#0L, col3#5L, col2#6L]
   +- Project [col3#5L, col2#6L, col4#0L]
  +- Join LeftOuter, (col3#5L = col3#1L)
 :- Sort [col3#5L ASC NULLS FIRST], false
 :  +- RepartitionByExpression [col3#5L]
 : +- Filter isnotnull(col2#6L)
 :+- Filter isnotnull(col3#5L)
 :   +- LocalRelation [col3#5L, col2#6L]
 +- Sort [col3#1L ASC NULLS FIRST], false
+- RepartitionByExpression [col3#1L]
   +- Filter isnotnull(col3#1L)
  +- Filter isnotnull(col4#0L)
 +- LocalRelation [col4#0L, col3#1L]

== Optimized Logical Plan ==
InsertIntoTable MetastoreRelation database, table, Map(version -> None, col5 -> 
None), OverwriteOptions(true,Map()), false
+- Project [col4#0L, col3#5L, col2#10L, col1#11L, 2016-11-11 AS version#59, 
col5#45]
   +- Repartition 8, true
  +- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS 
col5#45]
 +- Join LeftOuter, (col2#10L = col2#6L)
:- Sort [col2#10L ASC NULLS FIRST], false
:  +- RepartitionByExpression [col2#10L]
: +- Filter ((isnotnull(col2#10L) && isnotnull(col1#11L)) && 
isnotnull(UDF(col1#11L)))
:+- LocalRelation [col2#10L, col1#11L]
+- Project [col4#0L, col3#5L, col2#6L]
   +- Join