[jira] [Updated] (SPARK-37222) Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures
[ https://issues.apache.org/jira/browse/SPARK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Chammas updated SPARK-37222: - Attachment: plan-log.log > Max iterations reached in Operator Optimization w/left_anti or left_semi join > and nested structures > --- > > Key: SPARK-37222 > URL: https://issues.apache.org/jira/browse/SPARK-37222 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 3.1.2, 3.2.0, 3.2.1 > Environment: I've reproduced the error on Spark 3.1.2, 3.2.0, and > with the current branch-3.2 HEAD (git commit 966c90c0b5) as of November 5, > 2021. > The problem does not occur with Spark 3.0.1. > >Reporter: Shawn Smith >Priority: Major > Attachments: plan-log.log > > > The query optimizer never reaches a fixed point when optimizing the query > below. This manifests as a warning: > > WARN: Max iterations (100) reached for batch Operator Optimization before > > Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a > > larger value. > But the suggested fix won't help. The actual problem is that the optimizer > fails to make progress on each iteration and gets stuck in a loop. > In practice, Spark logs a warning but continues on and appears to execute the > query successfully, albeit perhaps sub-optimally. > To reproduce, paste the following into the Spark shell. With Spark 3.1.2 and > 3.2.0 but not 3.0.1 it will throw an exception: > {noformat} > case class Nested(b: Boolean, n: Long) > case class Table(id: String, nested: Nested) > case class Identifier(id: String) > locally { > System.setProperty("spark.testing", "true") // Fail instead of logging a > warning > val df = List.empty[Table].toDS.cache() > val ids = List.empty[Identifier].toDS.cache() > df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi" > .select('id, 'nested("n")) > .explain() > } > {noformat} > Looking at the query plan as the optimizer iterates in > {{RuleExecutor.execute()}}, here's an example of the plan after 49 iterations: > {noformat} > Project [id#2, _gen_alias_108#108L AS nested.n#28L] > +- Join LeftAnti, (id#2 = id#18) >:- Project [id#2, nested#3.n AS _gen_alias_108#108L] >: +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, > deserialized, 1 replicas) >:+- LocalTableScan , [id#2, nested#3] >+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 > replicas) > +- LocalTableScan , [id#18] > {noformat} > And here's the plan after one more iteration. You can see that all that has > changed is new aliases for the column in the nested column: > "{{_gen_alias_108#108L}}" to "{{_gen_alias_109#109L}}". > {noformat} > Project [id#2, _gen_alias_109#109L AS nested.n#28L] > +- Join LeftAnti, (id#2 = id#18) >:- Project [id#2, nested#3.n AS _gen_alias_109#109L] >: +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, > deserialized, 1 replicas) >:+- LocalTableScan , [id#2, nested#3] >+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 > replicas) > +- LocalTableScan , [id#18] > {noformat} > The optimizer continues looping and tweaking the alias until it hits the max > iteration count and bails out. > Here's an example that includes a stack trace: > {noformat} > $ bin/spark-shell > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 3.2.0 > /_/ > Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :paste > // Entering paste mode (ctrl-D to finish) > case class Nested(b: Boolean, n: Long) > case class Table(id: String, nested: Nested) > case class Identifier(id: String) > locally { > System.setProperty("spark.testing", "true") // Fail instead of logging a > warning > val df = List.empty[Table].toDS.cache() > val ids = List.empty[Identifier].toDS.cache() > df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi" > .select('id, 'nested("n")) > .explain() > } > // Exiting paste mode, now interpreting. > java.lang.RuntimeException: Max iterations (100) reached for batch Operator > Optimization before Inferring Filters, please set > 'spark.sql.optimizer.maxIterations' to a larger value. > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:246) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) > at scala.collection.immutable.List.foreach(List.scala:431) > at >
[jira] [Updated] (SPARK-37222) Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures
[ https://issues.apache.org/jira/browse/SPARK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Chammas updated SPARK-37222: - Affects Version/s: 3.2.1 > Max iterations reached in Operator Optimization w/left_anti or left_semi join > and nested structures > --- > > Key: SPARK-37222 > URL: https://issues.apache.org/jira/browse/SPARK-37222 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 3.1.2, 3.2.0, 3.2.1 > Environment: I've reproduced the error on Spark 3.1.2, 3.2.0, and > with the current branch-3.2 HEAD (git commit 966c90c0b5) as of November 5, > 2021. > The problem does not occur with Spark 3.0.1. > >Reporter: Shawn Smith >Priority: Major > > The query optimizer never reaches a fixed point when optimizing the query > below. This manifests as a warning: > > WARN: Max iterations (100) reached for batch Operator Optimization before > > Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a > > larger value. > But the suggested fix won't help. The actual problem is that the optimizer > fails to make progress on each iteration and gets stuck in a loop. > In practice, Spark logs a warning but continues on and appears to execute the > query successfully, albeit perhaps sub-optimally. > To reproduce, paste the following into the Spark shell. With Spark 3.1.2 and > 3.2.0 but not 3.0.1 it will throw an exception: > {noformat} > case class Nested(b: Boolean, n: Long) > case class Table(id: String, nested: Nested) > case class Identifier(id: String) > locally { > System.setProperty("spark.testing", "true") // Fail instead of logging a > warning > val df = List.empty[Table].toDS.cache() > val ids = List.empty[Identifier].toDS.cache() > df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi" > .select('id, 'nested("n")) > .explain() > } > {noformat} > Looking at the query plan as the optimizer iterates in > {{RuleExecutor.execute()}}, here's an example of the plan after 49 iterations: > {noformat} > Project [id#2, _gen_alias_108#108L AS nested.n#28L] > +- Join LeftAnti, (id#2 = id#18) >:- Project [id#2, nested#3.n AS _gen_alias_108#108L] >: +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, > deserialized, 1 replicas) >:+- LocalTableScan , [id#2, nested#3] >+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 > replicas) > +- LocalTableScan , [id#18] > {noformat} > And here's the plan after one more iteration. You can see that all that has > changed is new aliases for the column in the nested column: > "{{_gen_alias_108#108L}}" to "{{_gen_alias_109#109L}}". > {noformat} > Project [id#2, _gen_alias_109#109L AS nested.n#28L] > +- Join LeftAnti, (id#2 = id#18) >:- Project [id#2, nested#3.n AS _gen_alias_109#109L] >: +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, > deserialized, 1 replicas) >:+- LocalTableScan , [id#2, nested#3] >+- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 > replicas) > +- LocalTableScan , [id#18] > {noformat} > The optimizer continues looping and tweaking the alias until it hits the max > iteration count and bails out. > Here's an example that includes a stack trace: > {noformat} > $ bin/spark-shell > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 3.2.0 > /_/ > Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :paste > // Entering paste mode (ctrl-D to finish) > case class Nested(b: Boolean, n: Long) > case class Table(id: String, nested: Nested) > case class Identifier(id: String) > locally { > System.setProperty("spark.testing", "true") // Fail instead of logging a > warning > val df = List.empty[Table].toDS.cache() > val ids = List.empty[Identifier].toDS.cache() > df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi" > .select('id, 'nested("n")) > .explain() > } > // Exiting paste mode, now interpreting. > java.lang.RuntimeException: Max iterations (100) reached for batch Operator > Optimization before Inferring Filters, please set > 'spark.sql.optimizer.maxIterations' to a larger value. > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:246) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) > at scala.collection.immutable.List.foreach(List.scala:431) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) >