[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them
[ https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986190#comment-15986190 ] Takeshi Yamamuro commented on SPARK-20312: -- I'll close this because the issue has already been fixed. If any issue, feel free to reopen this. Thanks! > query optimizer calls udf with null values when it doesn't expect them > -- > > Key: SPARK-20312 > URL: https://issues.apache.org/jira/browse/SPARK-20312 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Albert Meltzer > > When optimizing an outer join, spark passes an empty row to both sides to see > if nulls would be ignored (side comment: for half-outer joins it subsequently > ignores the assessment on the dominant side). > For some reason, a condition such as {{xx IS NOT NULL && udf(xx) IS NOT > NULL}} might result in checking the right side first, and an exception if the > udf doesn't expect a null input (given the left side first). > A example is SIMILAR to the following (see actual query plans separately): > {noformat} > def func(value: Any): Int = ... // return AnyVal which probably causes a IS > NOT NULL added filter on the result > val df1 = sparkSession > .table(...) > .select("col1", "col2") // LongType both > val df11 = df1 > .filter(df1("col1").isNotNull) > .withColumn("col3", functions.udf(func)(df1("col1")) > .repartition(df1("col2")) > .sortWithinPartitions(df1("col2")) > val df2 = ... // load other data containing col2, similarly repartition and > sort > val df3 = > df1.join(df2, Seq("col2"), "left_outer") > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them
[ https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986189#comment-15986189 ] Takeshi Yamamuro commented on SPARK-20312: -- I checked there was no issue in the current master. IIUC this issue has been fixed in SPARK-20359. This fix has already been applied into branch-2.1 > query optimizer calls udf with null values when it doesn't expect them > -- > > Key: SPARK-20312 > URL: https://issues.apache.org/jira/browse/SPARK-20312 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Albert Meltzer > > When optimizing an outer join, spark passes an empty row to both sides to see > if nulls would be ignored (side comment: for half-outer joins it subsequently > ignores the assessment on the dominant side). > For some reason, a condition such as {{xx IS NOT NULL && udf(xx) IS NOT > NULL}} might result in checking the right side first, and an exception if the > udf doesn't expect a null input (given the left side first). > A example is SIMILAR to the following (see actual query plans separately): > {noformat} > def func(value: Any): Int = ... // return AnyVal which probably causes a IS > NOT NULL added filter on the result > val df1 = sparkSession > .table(...) > .select("col1", "col2") // LongType both > val df11 = df1 > .filter(df1("col1").isNotNull) > .withColumn("col3", functions.udf(func)(df1("col1")) > .repartition(df1("col2")) > .sortWithinPartitions(df1("col2")) > val df2 = ... // load other data containing col2, similarly repartition and > sort > val df3 = > df1.join(df2, Seq("col2"), "left_outer") > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them
[ https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981694#comment-15981694 ] Albert Meltzer commented on SPARK-20312: [~maropu]: making the query a bit simpler might make the logic work differently. Try this: {noformat} case class C12(c1: Long, c2: Long) case class C23(c2: Long, c3: Long) case class C34(c3: Long, c4: Long) import sparkSession.implicits._ val udf = functions.udf { (x: java.lang.Long) => x.toString } val df12n: DataFrame = sparkSession.emptyDataset[C12].toDF() val df23n: DataFrame = sparkSession.emptyDataset[C23].toDF() val df34n: DataFrame = sparkSession.emptyDataset[C34].toDF() val df12 = df12n .filter(df12n("c1").isNotNull) .filter(df12n("c2").isNotNull) .repartition(df12n("c2")) .sortWithinPartitions(df12n("c2")) val df23 = df23n .filter(df23n("c2").isNotNull) .filter(df23n("c3").isNotNull) .repartition(df23n("c2")) .sortWithinPartitions(df23n("c2")) val df34 = df34n .filter(df34n("c3").isNotNull) .filter(df34n("c4").isNotNull) .repartition(df34n("c3")) .sortWithinPartitions(df34n("c3")) val df13: DataFrame = df23 .join(df12, Seq("c2"), "left_outer") .select(df12("c1"), df23("c2"), df23("c3")) val df14: DataFrame = df34 .join(df13, Seq("c3"), "left_outer") .select(df13("c1"), df13("c2"), df34("c3"), df34("c4")) val df: DataFrame = df14 .withColumn("c0", udf(df14("c4"))) .repartition(sparkSession.sparkContext.defaultParallelism) val result = df .filter(functions.col("c3").isNotNull) .filter(functions.col("c4").isNotNull) .filter(functions.col("c0").isNotNull) .collect() {noformat} > query optimizer calls udf with null values when it doesn't expect them > -- > > Key: SPARK-20312 > URL: https://issues.apache.org/jira/browse/SPARK-20312 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Albert Meltzer > > When optimizing an outer join, spark passes an empty row to both sides to see > if nulls would be ignored (side comment: for half-outer joins it subsequently > ignores the assessment on the dominant side). > For some reason, a condition such as {{xx IS NOT NULL && udf(xx) IS NOT > NULL}} might result in checking the right side first, and an exception if the > udf doesn't expect a null input (given the left side first). > A example is SIMILAR to the following (see actual query plans separately): > {noformat} > def func(value: Any): Int = ... // return AnyVal which probably causes a IS > NOT NULL added filter on the result > val df1 = sparkSession > .table(...) > .select("col1", "col2") // LongType both > val df11 = df1 > .filter(df1("col1").isNotNull) > .withColumn("col3", functions.udf(func)(df1("col1")) > .repartition(df1("col2")) > .sortWithinPartitions(df1("col2")) > val df2 = ... // load other data containing col2, similarly repartition and > sort > val df3 = > df1.join(df2, Seq("col2"), "left_outer") > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them
[ https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971997#comment-15971997 ] Takeshi Yamamuro commented on SPARK-20312: -- I made the query a bit simpler and tried though, I couldn't reproduce this in the master; {code} val testUdf = udf { (s: String) => s } val df1 = Seq((1L, null), (2L, "")).toDF("a", "b") .filter($"b".isNotNull) .withColumn("c", testUdf($"b")) .repartition($"a") .sortWithinPartitions($"a") val df2 = Seq((1L, "")).toDF("a", "b") .filter($"b".isNotNull) .repartition($"a") .sortWithinPartitions($"a") df1.join(df2, "a" :: Nil, "left_outer").explain(true) == Analyzed Logical Plan == a: bigint, b: string, c: string, b: string Project [a#509L, b#510, c#515, b#528] +- Join LeftOuter, (a#509L = a#527L) :- Sort [a#509L ASC NULLS FIRST], false : +- RepartitionByExpression [a#509L], 200 : +- Project [a#509L, b#510, UDF(b#510) AS c#515] :+- Filter isnotnull(b#510) : +- Project [_1#506L AS a#509L, _2#507 AS b#510] : +- LocalRelation [_1#506L, _2#507] +- Sort [a#527L ASC NULLS FIRST], false +- RepartitionByExpression [a#527L], 200 +- Filter isnotnull(b#528) +- Project [_1#524L AS a#527L, _2#525 AS b#528] +- LocalRelation [_1#524L, _2#525] == Optimized Logical Plan == Project [a#509L, b#510, c#515, b#528] +- Join LeftOuter, (a#509L = a#527L) :- Sort [a#509L ASC NULLS FIRST], false : +- RepartitionByExpression [a#509L], 200 : +- Project [_1#506L AS a#509L, _2#507 AS b#510, UDF(_2#507) AS c#515] :+- Filter isnotnull(_2#507) : +- LocalRelation [_1#506L, _2#507] +- Sort [a#527L ASC NULLS FIRST], false +- RepartitionByExpression [a#527L], 200 +- Project [_1#524L AS a#527L, _2#525 AS b#528] +- Filter isnotnull(_2#525) +- LocalRelation [_1#524L, _2#525] {code} Do I miss anything? If yes, could you give me more? > query optimizer calls udf with null values when it doesn't expect them > -- > > Key: SPARK-20312 > URL: https://issues.apache.org/jira/browse/SPARK-20312 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Albert Meltzer > > When optimizing an outer join, spark passes an empty row to both sides to see > if nulls would be ignored (side comment: for half-outer joins it subsequently > ignores the assessment on the dominant side). > For some reason, a condition such as {{xx IS NOT NULL && udf(xx) IS NOT > NULL}} might result in checking the right side first, and an exception if the > udf doesn't expect a null input (given the left side first). > A example is SIMILAR to the following (see actual query plans separately): > {noformat} > def func(value: Any): Int = ... // return AnyVal which probably causes a IS > NOT NULL added filter on the result > val df1 = sparkSession > .table(...) > .select("col1", "col2") // LongType both > val df11 = df1 > .filter(df1("col1").isNotNull) > .withColumn("col3", functions.udf(func)(df1("col1")) > .repartition(df1("col2")) > .sortWithinPartitions(df1("col2")) > val df2 = ... // load other data containing col2, similarly repartition and > sort > val df3 = > df1.join(df2, Seq("col2"), "left_outer") > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them
[ https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966094#comment-15966094 ] Albert Meltzer commented on SPARK-20312: During query optimization, one of the subtrees becomes: {noformat} Project [col4#0L, col3#5L, col2#10L, col1#11L, 2016-11-11 AS version#59, col5#45] +- Repartition 8, true +- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS col5#45] +- Filter isnotnull(UDF(col1#11L)) +- Join LeftOuter, (col2#10L = col2#6L) :- Sort [col2#10L ASC NULLS FIRST], false : +- RepartitionByExpression [col2#10L] : +- Filter (isnotnull(col2#10L) && isnotnull(col1#11L)) :+- LocalRelation [col2#10L, col1#11L] +- Project [col4#0L, col3#5L, col2#6L] +- Join LeftOuter, (col3#5L = col3#1L) :- Sort [col3#5L ASC NULLS FIRST], false : +- RepartitionByExpression [col3#5L] : +- Filter (isnotnull(col3#5L) && isnotnull(col2#6L)) :+- LocalRelation [col3#5L, col2#6L] +- Sort [col3#1L ASC NULLS FIRST], false +- RepartitionByExpression [col3#1L] +- Filter (isnotnull(col4#0L) && isnotnull(col3#1L)) +- LocalRelation [col4#0L, col3#1L] {noformat} And that causes evaluation of the UDF in the {{org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin}} class, before the filter of the parameter value. > query optimizer calls udf with null values when it doesn't expect them > -- > > Key: SPARK-20312 > URL: https://issues.apache.org/jira/browse/SPARK-20312 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Albert Meltzer > > When optimizing an outer join, spark passes an empty row to both sides to see > if nulls would be ignored (side comment: for half-outer joins it subsequently > ignores the assessment on the dominant side). > For some reason, a condition such as "x IS NOT NULL && udf(x) IS NOT NULL" > might result in checking the right side first, and an exception if the udf > doesn't expect a null input (given the left side first). > A example is SIMILAR to the following (see actual query plans separately): > def func(value: Any): Int = ... // return AnyVal which probably causes a IS > NOT NULL added filter on the result > val df1 = sparkSession > .table(...) > .select("col1", "col2") // LongType both > val df11 = df1 > .filter(df1("col1").isNotNull) > .withColumn("col3", functions.udf(func)(df1("col1")) > .repartition(df1("col2")) > .sortWithinPartitions(df1("col2")) > val df2 = ... // load other data containing col2, similarly repartition and > sort > val df3 = > df1.join(df2, Seq("col2"), "left_outer") -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them
[ https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966091#comment-15966091 ] Albert Meltzer commented on SPARK-20312: Query plans are as follows: {noformat} == Parsed Logical Plan == 'InsertIntoTable 'UnresolvedRelation `database`.`table`, OverwriteOptions(true,Map()), false +- Filter (((isnotnull(col2#10L) && isnotnull(col1#11L)) && isnotnull(version#59)) && isnotnull(col5#45)) +- Project [col4#0L, col3#5L, col2#10L, col1#11L, version#59, col5#45] +- Project [col4#0L, col3#5L, col2#10L, col1#11L, col5#45, 2016-11-11 AS version#59] +- Repartition 8, true +- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS col5#45] +- Project [col4#0L, col3#5L, col2#10L, col1#11L] +- Project [col2#10L, col1#11L, col4#0L, col3#5L] +- Join LeftOuter, (col2#10L = col2#6L) :- Sort [col2#10L ASC NULLS FIRST], false : +- RepartitionByExpression [col2#10L] : +- Filter isnotnull(col1#11L) :+- Filter isnotnull(col2#10L) : +- LocalRelation [col2#10L, col1#11L] +- Project [col4#0L, col3#5L, col2#6L] +- Project [col3#5L, col2#6L, col4#0L] +- Join LeftOuter, (col3#5L = col3#1L) :- Sort [col3#5L ASC NULLS FIRST], false : +- RepartitionByExpression [col3#5L] : +- Filter isnotnull(col2#6L) :+- Filter isnotnull(col3#5L) : +- LocalRelation [col3#5L, col2#6L] +- Sort [col3#1L ASC NULLS FIRST], false +- RepartitionByExpression [col3#1L] +- Filter isnotnull(col3#1L) +- Filter isnotnull(col4#0L) +- LocalRelation [col4#0L, col3#1L] == Analyzed Logical Plan == InsertIntoTable MetastoreRelation database, table, Map(version -> None, col5 -> None), OverwriteOptions(true,Map()), false +- Filter (((isnotnull(col2#10L) && isnotnull(col1#11L)) && isnotnull(version#59)) && isnotnull(col5#45)) +- Project [col4#0L, col3#5L, col2#10L, col1#11L, version#59, col5#45] +- Project [col4#0L, col3#5L, col2#10L, col1#11L, col5#45, 2016-11-11 AS version#59] +- Repartition 8, true +- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS col5#45] +- Project [col4#0L, col3#5L, col2#10L, col1#11L] +- Project [col2#10L, col1#11L, col4#0L, col3#5L] +- Join LeftOuter, (col2#10L = col2#6L) :- Sort [col2#10L ASC NULLS FIRST], false : +- RepartitionByExpression [col2#10L] : +- Filter isnotnull(col1#11L) :+- Filter isnotnull(col2#10L) : +- LocalRelation [col2#10L, col1#11L] +- Project [col4#0L, col3#5L, col2#6L] +- Project [col3#5L, col2#6L, col4#0L] +- Join LeftOuter, (col3#5L = col3#1L) :- Sort [col3#5L ASC NULLS FIRST], false : +- RepartitionByExpression [col3#5L] : +- Filter isnotnull(col2#6L) :+- Filter isnotnull(col3#5L) : +- LocalRelation [col3#5L, col2#6L] +- Sort [col3#1L ASC NULLS FIRST], false +- RepartitionByExpression [col3#1L] +- Filter isnotnull(col3#1L) +- Filter isnotnull(col4#0L) +- LocalRelation [col4#0L, col3#1L] == Optimized Logical Plan == InsertIntoTable MetastoreRelation database, table, Map(version -> None, col5 -> None), OverwriteOptions(true,Map()), false +- Project [col4#0L, col3#5L, col2#10L, col1#11L, 2016-11-11 AS version#59, col5#45] +- Repartition 8, true +- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS col5#45] +- Join LeftOuter, (col2#10L = col2#6L) :- Sort [col2#10L ASC NULLS FIRST], false : +- RepartitionByExpression [col2#10L] : +- Filter ((isnotnull(col2#10L) && isnotnull(col1#11L)) && isnotnull(UDF(col1#11L))) :+- LocalRelation [col2#10L, col1#11L] +- Project [col4#0L, col3#5L, col2#6L] +- Join