This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0820beb [SPARK-28863][SQL][FOLLOWUP][3.0] Make sure optimized plan will not be re-analyzed 0820beb is described below commit 0820beb60018210ee718b96acb37feb8d8445251 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Dec 22 10:30:34 2020 +0900 [SPARK-28863][SQL][FOLLOWUP][3.0] Make sure optimized plan will not be re-analyzed backport https://github.com/apache/spark/pull/30777 to 3.0 ---------- ### What changes were proposed in this pull request? It's a known issue that re-analyzing an optimized plan can lead to various issues. We made several attempts to avoid it from happening, but the current solution `AlreadyOptimized` is still not 100% safe, as people can inject catalyst rules to call analyzer directly. This PR proposes a simpler and safer idea: we set the `analyzed` flag to true after optimization, and analyzer will skip processing plans whose `analyzed` flag is true. ### Why are the changes needed? make the code simpler and safer ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests. Closes #30872 from cloud-fan/ds. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 1 + .../catalyst/plans/logical/AnalysisHelper.scala | 7 +- .../spark/sql/execution/AlreadyOptimized.scala | 37 ---------- .../spark/sql/execution/QueryExecution.scala | 7 +- .../datasources/v2/V1FallbackWriters.scala | 6 +- .../sql/execution/AlreadyOptimizedSuite.scala | 85 ---------------------- 6 files changed, 16 insertions(+), 127 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1307fc5..fbe6041 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -144,6 +144,7 @@ class Analyzer( } def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { + if (plan.analyzed) return plan AnalysisHelper.markInAnalyzer { val analyzed = executeAndTrack(plan, tracker) try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala index 30447db..9b6fee6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala @@ -46,7 +46,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => * Recursively marks all nodes in this plan tree as analyzed. * This should only be called by [[CheckAnalysis]]. */ - private[catalyst] def setAnalyzed(): Unit = { + private[sql] def setAnalyzed(): Unit = { if (!_analyzed) { _analyzed = true children.foreach(_.setAnalyzed()) @@ -180,6 +180,11 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => super.transformAllExpressions(rule) } + override def clone(): LogicalPlan = { + val cloned = super.clone() + if (analyzed) cloned.setAnalyzed() + cloned + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala deleted file mode 100644 index e40b114..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - -/** Query execution that skips re-analysis and optimize. */ -class AlreadyOptimizedExecution( - session: SparkSession, - plan: LogicalPlan) extends QueryExecution(session, plan) { - override lazy val analyzed: LogicalPlan = plan - override lazy val optimizedPlan: LogicalPlan = plan -} - -object AlreadyOptimized { - def dataFrame(sparkSession: SparkSession, optimized: LogicalPlan): DataFrame = { - val qe = new AlreadyOptimizedExecution(sparkSession, optimized) - new Dataset[Row](qe, RowEncoder(qe.analyzed.schema)) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 7f5a5e3..ed36c78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -79,7 +79,12 @@ class QueryExecution( lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) { // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. - sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker) + val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker) + // We do not want optimized plans to be re-analyzed as literals that have been constant folded + // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state + // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia. + plan.setAnalyzed() + plan } private def assertOptimized(): Unit = optimizedPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index 560da39..95082bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -20,12 +20,13 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.UUID import org.apache.spark.SparkException +import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.SupportsWrite import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} -import org.apache.spark.sql.execution.{AlreadyOptimized, SparkPlan} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -113,8 +114,7 @@ trait SupportsV1Write extends SparkPlan { def plan: LogicalPlan protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = { - // The `plan` is already optimized, we should not analyze and optimize it again. - relation.insert(AlreadyOptimized.dataFrame(sqlContext.sparkSession, plan), overwrite = false) + relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false) Nil } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala deleted file mode 100644 index c266aa9..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.execution.adaptive.EnableAdaptiveExecutionSuite -import org.apache.spark.sql.test.SharedSparkSession - -class AlreadyOptimizedSuite extends QueryTest with SharedSparkSession { - - import testImplicits._ - - test("simple execution") { - val df = spark.range(10) - val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan) - - checkAnswer(planned, df.toDF().collect()) - } - - test("planning on top works - projection") { - val df = spark.range(10) - val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan) - - checkAnswer( - planned.withColumn("data", 'id + 1), - df.withColumn("data", 'id + 1).collect()) - } - - test("planning on top works - filter") { - val df = spark.range(10) - val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan) - - checkAnswer(planned.where('id < 5), df.where('id < 5).toDF().collect()) - } - - test("planning on top works - aggregate") { - val df = spark.range(10) - val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan) - - checkAnswer(planned.groupBy('id).count(), df.groupBy('id).count().collect()) - } - - test("planning on top works - joins") { - val df = spark.range(10) - val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan) - - val plannedLeft = planned.alias("l") - val dfLeft = df.alias("l") - val plannedRight = planned.alias("r") - val dfRight = df.alias("r") - - checkAnswer( - plannedLeft.where('id < 3).join(plannedRight, Seq("id")), - dfLeft.where('id < 3).join(dfRight, Seq("id")).collect()) - - checkAnswer( - plannedLeft.where('id < 3).join(plannedRight, plannedLeft("id") === plannedRight("id")), - dfLeft.where('id < 3).join(dfRight, dfLeft("id") === dfRight("id")).collect()) - - checkAnswer( - plannedLeft.join(plannedRight, Seq("id")).where('id < 3), - dfLeft.join(dfRight, Seq("id")).where('id < 3).collect()) - - checkAnswer( - plannedLeft.join(plannedRight, plannedLeft("id") === plannedRight("id")).where($"l.id" < 3), - dfLeft.join(dfRight, dfLeft("id") === dfRight("id")).where($"l.id" < 3).collect()) - } -} - -class AlreadyOptimizedAQESuite extends AlreadyOptimizedSuite with EnableAdaptiveExecutionSuite --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org