[jira] [Updated] (SPARK-37222) Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures

2022-04-26 Thread Nicholas Chammas (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-37222:
-
Attachment: plan-log.log

> Max iterations reached in Operator Optimization w/left_anti or left_semi join 
> and nested structures
> ---
>
> Key: SPARK-37222
> URL: https://issues.apache.org/jira/browse/SPARK-37222
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 3.1.2, 3.2.0, 3.2.1
> Environment: I've reproduced the error on Spark 3.1.2, 3.2.0, and 
> with the current branch-3.2 HEAD (git commit 966c90c0b5) as of November 5, 
> 2021.
> The problem does not occur with Spark 3.0.1.
>  
>Reporter: Shawn Smith
>Priority: Major
> Attachments: plan-log.log
>
>
> The query optimizer never reaches a fixed point when optimizing the query 
> below. This manifests as a warning:
> > WARN: Max iterations (100) reached for batch Operator Optimization before 
> > Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a 
> > larger value.
> But the suggested fix won't help. The actual problem is that the optimizer 
> fails to make progress on each iteration and gets stuck in a loop.
> In practice, Spark logs a warning but continues on and appears to execute the 
> query successfully, albeit perhaps sub-optimally.
> To reproduce, paste the following into the Spark shell. With Spark 3.1.2 and 
> 3.2.0 but not 3.0.1 it will throw an exception:
> {noformat}
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a 
> warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
> .select('id, 'nested("n"))
> .explain()
> }
> {noformat}
> Looking at the query plan as the optimizer iterates in 
> {{RuleExecutor.execute()}}, here's an example of the plan after 49 iterations:
> {noformat}
> Project [id#2, _gen_alias_108#108L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>:- Project [id#2, nested#3.n AS _gen_alias_108#108L]
>:  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>:+- LocalTableScan , [id#2, nested#3]
>+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
> replicas)
>  +- LocalTableScan , [id#18]
> {noformat}
> And here's the plan after one more iteration. You can see that all that has 
> changed is new aliases for the column in the nested column: 
> "{{_gen_alias_108#108L}}" to "{{_gen_alias_109#109L}}".
> {noformat}
> Project [id#2, _gen_alias_109#109L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>:- Project [id#2, nested#3.n AS _gen_alias_109#109L]
>:  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>:+- LocalTableScan , [id#2, nested#3]
>+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
> replicas)
>  +- LocalTableScan , [id#18]
> {noformat}
> The optimizer continues looping and tweaking the alias until it hits the max 
> iteration count and bails out.
> Here's an example that includes a stack trace:
> {noformat}
> $ bin/spark-shell
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.2.0
>   /_/
> Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a 
> warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
> .select('id, 'nested("n"))
> .explain()
> }
> // Exiting paste mode, now interpreting.
> java.lang.RuntimeException: Max iterations (100) reached for batch Operator 
> Optimization before Inferring Filters, please set 
> 'spark.sql.optimizer.maxIterations' to a larger value.
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:246)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
>   at scala.collection.immutable.List.foreach(List.scala:431)
>   at 
> 

[jira] [Updated] (SPARK-37222) Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures

2022-04-25 Thread Nicholas Chammas (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-37222:
-
Affects Version/s: 3.2.1

> Max iterations reached in Operator Optimization w/left_anti or left_semi join 
> and nested structures
> ---
>
> Key: SPARK-37222
> URL: https://issues.apache.org/jira/browse/SPARK-37222
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 3.1.2, 3.2.0, 3.2.1
> Environment: I've reproduced the error on Spark 3.1.2, 3.2.0, and 
> with the current branch-3.2 HEAD (git commit 966c90c0b5) as of November 5, 
> 2021.
> The problem does not occur with Spark 3.0.1.
>  
>Reporter: Shawn Smith
>Priority: Major
>
> The query optimizer never reaches a fixed point when optimizing the query 
> below. This manifests as a warning:
> > WARN: Max iterations (100) reached for batch Operator Optimization before 
> > Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a 
> > larger value.
> But the suggested fix won't help. The actual problem is that the optimizer 
> fails to make progress on each iteration and gets stuck in a loop.
> In practice, Spark logs a warning but continues on and appears to execute the 
> query successfully, albeit perhaps sub-optimally.
> To reproduce, paste the following into the Spark shell. With Spark 3.1.2 and 
> 3.2.0 but not 3.0.1 it will throw an exception:
> {noformat}
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a 
> warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
> .select('id, 'nested("n"))
> .explain()
> }
> {noformat}
> Looking at the query plan as the optimizer iterates in 
> {{RuleExecutor.execute()}}, here's an example of the plan after 49 iterations:
> {noformat}
> Project [id#2, _gen_alias_108#108L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>:- Project [id#2, nested#3.n AS _gen_alias_108#108L]
>:  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>:+- LocalTableScan , [id#2, nested#3]
>+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
> replicas)
>  +- LocalTableScan , [id#18]
> {noformat}
> And here's the plan after one more iteration. You can see that all that has 
> changed is new aliases for the column in the nested column: 
> "{{_gen_alias_108#108L}}" to "{{_gen_alias_109#109L}}".
> {noformat}
> Project [id#2, _gen_alias_109#109L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>:- Project [id#2, nested#3.n AS _gen_alias_109#109L]
>:  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>:+- LocalTableScan , [id#2, nested#3]
>+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
> replicas)
>  +- LocalTableScan , [id#18]
> {noformat}
> The optimizer continues looping and tweaking the alias until it hits the max 
> iteration count and bails out.
> Here's an example that includes a stack trace:
> {noformat}
> $ bin/spark-shell
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.2.0
>   /_/
> Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a 
> warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
> .select('id, 'nested("n"))
> .explain()
> }
> // Exiting paste mode, now interpreting.
> java.lang.RuntimeException: Max iterations (100) reached for batch Operator 
> Optimization before Inferring Filters, please set 
> 'spark.sql.optimizer.maxIterations' to a larger value.
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:246)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
>   at scala.collection.immutable.List.foreach(List.scala:431)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
>