[jira] [Updated] (SPARK-38667) Optimizer generates error when using inner join along with sequence

2022-04-25 Thread Lars (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lars updated SPARK-38667:
-
Affects Version/s: 3.1.2
   (was: 3.2.1)

> Optimizer generates error when using inner join along with sequence
> ---
>
> Key: SPARK-38667
> URL: https://issues.apache.org/jira/browse/SPARK-38667
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 3.1.2
>Reporter: Lars
>Priority: Major
>
> This issue occurred in a more complex scenario, so I've broken it down into a 
> simple case.
> {*}Steps to reproduce{*}: Execute the following example. The code should run 
> without errors, but instead a *java.lang.IllegalArgumentException: Illegal 
> sequence boundaries: 4 to 2 by 1* is thrown.
> {code:java}
> package com.example
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> object SparkIssue {
>     def main(args: Array[String]): Unit = {
>         val spark = SparkSession
>             .builder()
>             .master("local[*]")
>             .getOrCreate()
>         val dfA = spark
>             .createDataFrame(Seq((1, 1), (2, 4)))
>             .toDF("a1", "a2")
>         val dfB = spark
>             .createDataFrame(Seq((1, 5), (2, 2)))
>             .toDF("b1", "b2")
>         dfA.join(dfB, dfA("a1") === dfB("b1"), "inner")
>             .where(col("a2") < col("b2"))
>             .withColumn("x", explode(sequence(col("a2"), col("b2"), lit(1
>             .show()
>         spark.stop()
>     }
> }
> {code}
> When I look at the Optimized Logical Plan I can see that the Inner Join and 
> the Filter are brought together, with an additional check for an empty 
> Sequence. The exception is thrown because the Sequence check is executed 
> before the Filter.
> {code:java}
> == Parsed Logical Plan ==
> 'Project [a1#4, a2#5, b1#12, b2#13, explode(sequence('a2, 'b2, Some(1), 
> None)) AS x#24]
> +- Filter (a2#5 < b2#13)
>    +- Join Inner, (a1#4 = b1#12)
>       :- Project [_1#0 AS a1#4, _2#1 AS a2#5]
>       :  +- LocalRelation [_1#0, _2#1]
>       +- Project [_1#8 AS b1#12, _2#9 AS b2#13]
>          +- LocalRelation [_1#8, _2#9]
> == Analyzed Logical Plan ==
> a1: int, a2: int, b1: int, b2: int, x: int
> Project [a1#4, a2#5, b1#12, b2#13, x#25]
> +- Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), 
> false, [x#25]
>    +- Filter (a2#5 < b2#13)
>       +- Join Inner, (a1#4 = b1#12)
>          :- Project [_1#0 AS a1#4, _2#1 AS a2#5]
>          :  +- LocalRelation [_1#0, _2#1]
>          +- Project [_1#8 AS b1#12, _2#9 AS b2#13]
>             +- LocalRelation [_1#8, _2#9]
> == Optimized Logical Plan ==
> Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), false, 
> [x#25]
> +- Join Inner, (((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), 
> true) > 0) AND (a2#5 < b2#13)) AND (a1#4 = b1#12))
>    :- LocalRelation [a1#4, a2#5]
>    +- LocalRelation [b1#12, b2#13]
> == Physical Plan ==
> Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), [a1#4, 
> a2#5, b1#12, b2#13], false, [x#25]
> +- *(1) BroadcastHashJoin [a1#4], [b1#12], Inner, BuildRight, 
> ((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), true) > 0) AND 
> (a2#5 < b2#13)), false
>    :- *(1) LocalTableScan [a1#4, a2#5]
>    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
> false] as bigint)),false), [id=#15]
>       +- LocalTableScan [b1#12, b2#13]
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38667) Optimizer generates error when using inner join along with sequence

2022-03-27 Thread Lars (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lars updated SPARK-38667:
-
Description: 
This issue occurred in a more complex scenario, so I've broken it down into a 
simple case.

{*}Steps to reproduce{*}: Execute the following example. The code should run 
without errors, but instead a *java.lang.IllegalArgumentException: Illegal 
sequence boundaries: 4 to 2 by 1* is thrown.
{code:java}
package com.example

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SparkIssue {
    def main(args: Array[String]): Unit = {

        val spark = SparkSession
            .builder()
            .master("local[*]")
            .getOrCreate()

        val dfA = spark
            .createDataFrame(Seq((1, 1), (2, 4)))
            .toDF("a1", "a2")

        val dfB = spark
            .createDataFrame(Seq((1, 5), (2, 2)))
            .toDF("b1", "b2")

        dfA.join(dfB, dfA("a1") === dfB("b1"), "inner")
            .where(col("a2") < col("b2"))
            .withColumn("x", explode(sequence(col("a2"), col("b2"), lit(1
            .show()

        spark.stop()

    }
}
{code}

When I look at the Optimized Logical Plan I can see that the Inner Join and the 
Filter are brought together, with an additional check for an empty Sequence. 
The exception is thrown because the Sequence check is executed before the 
Filter.
{code:java}
== Parsed Logical Plan ==
'Project [a1#4, a2#5, b1#12, b2#13, explode(sequence('a2, 'b2, Some(1), None)) 
AS x#24]
+- Filter (a2#5 < b2#13)
   +- Join Inner, (a1#4 = b1#12)
      :- Project [_1#0 AS a1#4, _2#1 AS a2#5]
      :  +- LocalRelation [_1#0, _2#1]
      +- Project [_1#8 AS b1#12, _2#9 AS b2#13]
         +- LocalRelation [_1#8, _2#9]

== Analyzed Logical Plan ==
a1: int, a2: int, b1: int, b2: int, x: int
Project [a1#4, a2#5, b1#12, b2#13, x#25]
+- Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), 
false, [x#25]
   +- Filter (a2#5 < b2#13)
      +- Join Inner, (a1#4 = b1#12)
         :- Project [_1#0 AS a1#4, _2#1 AS a2#5]
         :  +- LocalRelation [_1#0, _2#1]
         +- Project [_1#8 AS b1#12, _2#9 AS b2#13]
            +- LocalRelation [_1#8, _2#9]

== Optimized Logical Plan ==
Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), false, 
[x#25]
+- Join Inner, (((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), 
true) > 0) AND (a2#5 < b2#13)) AND (a1#4 = b1#12))
   :- LocalRelation [a1#4, a2#5]
   +- LocalRelation [b1#12, b2#13]

== Physical Plan ==
Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), [a1#4, 
a2#5, b1#12, b2#13], false, [x#25]
+- *(1) BroadcastHashJoin [a1#4], [b1#12], Inner, BuildRight, 
((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), true) > 0) AND 
(a2#5 < b2#13)), false
   :- *(1) LocalTableScan [a1#4, a2#5]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
false] as bigint)),false), [id=#15]
      +- LocalTableScan [b1#12, b2#13]
{code}
 

 

 

  was:
This issue occurred in a more complex scenario, so I've broken it down into a 
simple case.

{*}{*}{*}Steps to reproduce{*}: Execute the following example. The code should 
run without errors, but instead a *java.lang.IllegalArgumentException: Illegal 
sequence boundaries: 4 to 2 by 1* is thrown.

 
{code:java}
package com.example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SparkIssue {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession
            .builder()
            .master("local[*]")
            .getOrCreate()
        val dfA = spark
            .createDataFrame(Seq((1, 1), (2, 4)))
            .toDF("a1", "a2")
        val dfB = spark
            .createDataFrame(Seq((1, 5), (2, 2)))
            .toDF("b1", "b2")
        dfA.join(dfB, dfA("a1") === dfB("b1"), "inner")
            .where(col("a2") < col("b2"))
            .withColumn("x", explode(sequence(col("a2"), col("b2"), lit(1
            .show()
        spark.stop()
    }
}
{code}

When I look at the Optimized Logical Plan I can see that the Inner Join and the 
Filter are brought together, with an additional check for an empty Sequence. 
The exception is thrown because the Sequence check is executed before the 
Filter.

 
{code:java}
== Parsed Logical Plan ==
'Project [a1#4, a2#5, b1#12, b2#13, explode(sequence('a2, 'b2, Some(1), None)) 
AS x#24]
+- Filter (a2#5 < b2#13)
   +- Join Inner, (a1#4 = b1#12)
      :- Project [_1#0 AS a1#4, _2#1 AS a2#5]
      :  +- LocalRelation [_1#0, _2#1]
      +- Project [_1#8 AS b1#12, _2#9 AS b2#13]
         +- LocalRelation [_1#8, _2#9]
== Analyzed Logical Plan ==
a1: int, a2: int, b1: int, b2: int, x: int
Project [a1#4, a2#5, b1#12, b2#13, x#25]
+- Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), 
false, [x#25]
   +- Filter (a2#5 < b2#13)
      +- Join Inner, (a1#4 =