[jira] [Commented] (SPARK-37222) Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures

2022-04-26 Thread Nicholas Chammas (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17528233#comment-17528233
 ] 

Nicholas Chammas commented on SPARK-37222:
--

I've found a helpful log setting that causes Spark to print out detailed 
information about how exactly a plan is transformed during optimization:
{code:java}
spark.conf.set("spark.sql.planChangeLog.level", "warn") {code}
Here's the log generated by enabling this setting and running Shawn's example: 
[^plan-log.log]

To confirm what Shawn noted in his comment above, it looks like the chain of 
events that results in a loop is as follows:
 # PushDownLeftSemiAntiJoin
 # ColumnPruning
 # CollapseProject
 # FoldablePropagation
 # RemoveNoopOperators
 # 

What seems to be the problem is that:
 * ColumnPruning inserts a couple of Project operators which are then removed 
by CollapseProject.
 * CollapseProject in turn pushes up the left anti-join which is then pushed 
down again by PushDownLeftSemiAntiJoin.

These three rules go back and forth, undoing each other's work, until 
{{spark.sql.optimizer.maxIterations}} is exhausted.

> Max iterations reached in Operator Optimization w/left_anti or left_semi join 
> and nested structures
> ---
>
> Key: SPARK-37222
> URL: https://issues.apache.org/jira/browse/SPARK-37222
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 3.1.2, 3.2.0, 3.2.1
> Environment: I've reproduced the error on Spark 3.1.2, 3.2.0, and 
> with the current branch-3.2 HEAD (git commit 966c90c0b5) as of November 5, 
> 2021.
> The problem does not occur with Spark 3.0.1.
>  
>Reporter: Shawn Smith
>Priority: Major
> Attachments: plan-log.log
>
>
> The query optimizer never reaches a fixed point when optimizing the query 
> below. This manifests as a warning:
> > WARN: Max iterations (100) reached for batch Operator Optimization before 
> > Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a 
> > larger value.
> But the suggested fix won't help. The actual problem is that the optimizer 
> fails to make progress on each iteration and gets stuck in a loop.
> In practice, Spark logs a warning but continues on and appears to execute the 
> query successfully, albeit perhaps sub-optimally.
> To reproduce, paste the following into the Spark shell. With Spark 3.1.2 and 
> 3.2.0 but not 3.0.1 it will throw an exception:
> {noformat}
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a 
> warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
> .select('id, 'nested("n"))
> .explain()
> }
> {noformat}
> Looking at the query plan as the optimizer iterates in 
> {{RuleExecutor.execute()}}, here's an example of the plan after 49 iterations:
> {noformat}
> Project [id#2, _gen_alias_108#108L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>:- Project [id#2, nested#3.n AS _gen_alias_108#108L]
>:  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>:+- LocalTableScan , [id#2, nested#3]
>+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
> replicas)
>  +- LocalTableScan , [id#18]
> {noformat}
> And here's the plan after one more iteration. You can see that all that has 
> changed is new aliases for the column in the nested column: 
> "{{_gen_alias_108#108L}}" to "{{_gen_alias_109#109L}}".
> {noformat}
> Project [id#2, _gen_alias_109#109L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>:- Project [id#2, nested#3.n AS _gen_alias_109#109L]
>:  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>:+- LocalTableScan , [id#2, nested#3]
>+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
> replicas)
>  +- LocalTableScan , [id#18]
> {noformat}
> The optimizer continues looping and tweaking the alias until it hits the max 
> iteration count and bails out.
> Here's an example that includes a stack trace:
> {noformat}
> $ bin/spark-shell
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.2.0
>   /_/
> Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
> case class Nested(b: Boolean, n: Long)
> case class 

[jira] [Commented] (SPARK-37222) Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures

2022-04-25 Thread Nicholas Chammas (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17527527#comment-17527527
 ] 

Nicholas Chammas commented on SPARK-37222:
--

Thanks for the detailed report, [~ssmith]. I am hitting this issue as well on 
Spark 3.2.1, and your minimal test case also reproduces the issue for me.

How did you break down the optimization into its individual steps like that? 
That was very helpful.

I was able to use your breakdown to work around the issue by excluding 
{{{}PushDownLeftSemiAntiJoin{}}}:
{code:java}
spark.conf.set(
  "spark.sql.optimizer.excludedRules",
  "org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin"
){code}
If I run that before running the problematic query (including your test case), 
it seems to work around the issue.

> Max iterations reached in Operator Optimization w/left_anti or left_semi join 
> and nested structures
> ---
>
> Key: SPARK-37222
> URL: https://issues.apache.org/jira/browse/SPARK-37222
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 3.1.2, 3.2.0, 3.2.1
> Environment: I've reproduced the error on Spark 3.1.2, 3.2.0, and 
> with the current branch-3.2 HEAD (git commit 966c90c0b5) as of November 5, 
> 2021.
> The problem does not occur with Spark 3.0.1.
>  
>Reporter: Shawn Smith
>Priority: Major
>
> The query optimizer never reaches a fixed point when optimizing the query 
> below. This manifests as a warning:
> > WARN: Max iterations (100) reached for batch Operator Optimization before 
> > Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a 
> > larger value.
> But the suggested fix won't help. The actual problem is that the optimizer 
> fails to make progress on each iteration and gets stuck in a loop.
> In practice, Spark logs a warning but continues on and appears to execute the 
> query successfully, albeit perhaps sub-optimally.
> To reproduce, paste the following into the Spark shell. With Spark 3.1.2 and 
> 3.2.0 but not 3.0.1 it will throw an exception:
> {noformat}
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a 
> warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
> .select('id, 'nested("n"))
> .explain()
> }
> {noformat}
> Looking at the query plan as the optimizer iterates in 
> {{RuleExecutor.execute()}}, here's an example of the plan after 49 iterations:
> {noformat}
> Project [id#2, _gen_alias_108#108L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>:- Project [id#2, nested#3.n AS _gen_alias_108#108L]
>:  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>:+- LocalTableScan , [id#2, nested#3]
>+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
> replicas)
>  +- LocalTableScan , [id#18]
> {noformat}
> And here's the plan after one more iteration. You can see that all that has 
> changed is new aliases for the column in the nested column: 
> "{{_gen_alias_108#108L}}" to "{{_gen_alias_109#109L}}".
> {noformat}
> Project [id#2, _gen_alias_109#109L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>:- Project [id#2, nested#3.n AS _gen_alias_109#109L]
>:  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>:+- LocalTableScan , [id#2, nested#3]
>+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
> replicas)
>  +- LocalTableScan , [id#18]
> {noformat}
> The optimizer continues looping and tweaking the alias until it hits the max 
> iteration count and bails out.
> Here's an example that includes a stack trace:
> {noformat}
> $ bin/spark-shell
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.2.0
>   /_/
> Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a 
> warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
> .select('id, 

[jira] [Commented] (SPARK-37222) Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures

2021-11-05 Thread Shawn Smith (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439575#comment-17439575
 ] 

Shawn Smith commented on SPARK-37222:
-

Plan at the start of one iteration:
{noformat}
Project [id#2, _gen_alias_110#110L AS nested.n#28L]
+- Join LeftAnti, (id#2 = id#18)
   :- Project [id#2, nested#3.n AS _gen_alias_110#110L]
   :  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :+- LocalTableScan , [id#2, nested#3]
   +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
replicas)
 +- LocalTableScan , [id#18]
{noformat}
After {{org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin}} 
(moved {{Join}} down)
{noformat}
Project [id#2, _gen_alias_110#110L AS nested.n#28L]
+- Project [id#2, nested#3.n AS _gen_alias_110#110L]
   +- Join LeftAnti, (id#2 = id#18)
  :- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
deserialized, 1 replicas)
  : +- LocalTableScan , [id#2, nested#3]
  +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
replicas)
+- LocalTableScan , [id#18]
{noformat}
After {{org.apache.spark.sql.catalyst.optimizer.ColumnPruning}} the plan looks 
the same but plan objects are different.

After {{org.apache.spark.sql.catalyst.optimizer.CollapseProject}} (moved 
{{Join}} up, added {{Project [id18]}})
{noformat}
Project [id#2, _gen_alias_111#111L AS nested.n#28L]
+- Join LeftAnti, (id#2 = id#18)
   :- Project [id#2, nested#3.n AS _gen_alias_111#111L]
   :  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :+- LocalTableScan , [id#2, nested#3]
   +- Project [id#18]
  +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
replicas)
+- LocalTableScan , [id#18]
{noformat}
After {{org.apache.spark.sql.catalyst.optimizer.FoldablePropagation}} the plan 
looks the same but plan objects are different.

After {{org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators}} (removed 
{{Project [id18]}})
{noformat}
Project [id#2, _gen_alias_111#111L AS nested.n#28L]
+- Join LeftAnti, (id#2 = id#18)
   :- Project [id#2, nested#3.n AS _gen_alias_111#111L]
   :  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :+- LocalTableScan , [id#2, nested#3]
   +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
replicas)
 +- LocalTableScan , [id#18]
{noformat}
Plan at the end of one iteration:
{noformat}
Project [id#2, _gen_alias_112#112L AS nested.n#28L]
+- Join LeftAnti, (id#2 = id#18)
   :- Project [id#2, nested#3.n AS _gen_alias_112#112L]
   :  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :+- LocalTableScan , [id#2, nested#3]
   +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 
replicas)
 +- LocalTableScan , [id#18]
{noformat}

> Max iterations reached in Operator Optimization w/left_anti or left_semi join 
> and nested structures
> ---
>
> Key: SPARK-37222
> URL: https://issues.apache.org/jira/browse/SPARK-37222
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 3.1.2, 3.2.0
> Environment: I've reproduced the error on Spark 3.1.2, 3.2.0, and 
> with the current branch-3.2 HEAD (git commit 966c90c0b5) as of November 5, 
> 2021.
> The problem does not occur with Spark 3.0.1.
>  
>Reporter: Shawn Smith
>Priority: Major
>
> The query optimizer never reaches a fixed point when optimizing the query 
> below. This manifests as a warning:
> > WARN: Max iterations (100) reached for batch Operator Optimization before 
> > Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a 
> > larger value.
> But the suggested fix won't help. The actual problem is that the optimizer 
> fails to make progress on each iteration and gets stuck in a loop.
> In practice, Spark logs a warning but continues on and appears to execute the 
> query successfully, albeit perhaps sub-optimally.
> To reproduce, paste the following into the Spark shell. With Spark 3.1.2 and 
> 3.2.0 but not 3.0.1 it will throw an exception:
> {noformat}
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a 
> warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
> .select('id, 'nested("n"))
> .explain()
> }
> {noformat}
> Looking at the query plan as the