This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 1171904 [SPARK-28863][SQL][FOLLOWUP] Make sure optimized plan will not be re-analyzed 1171904 is described below commit 117190474957b7a5e44cf8b9e04c0d928bf06e1d Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Mon Dec 21 20:59:33 2020 +0900 [SPARK-28863][SQL][FOLLOWUP] Make sure optimized plan will not be re-analyzed ### What changes were proposed in this pull request? It's a known issue that re-analyzing an optimized plan can lead to various issues. We made several attempts to avoid it from happening, but the current solution `AlreadyOptimized` is still not 100% safe, as people can inject catalyst rules to call analyzer directly. This PR proposes a simpler and safer idea: we set the `analyzed` flag to true after optimization, and analyzer will skip processing plans whose `analyzed` flag is true. ### Why are the changes needed? make the code simpler and safer ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests. Closes #30777 from cloud-fan/ds. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit b4bea1aa8972cdfd8901757a0ed990a20fca620f) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 1 + .../catalyst/plans/logical/AnalysisHelper.scala | 7 +- .../spark/sql/execution/AlreadyOptimized.scala | 37 ---------- .../spark/sql/execution/QueryExecution.scala | 7 +- .../datasources/v2/V1FallbackWriters.scala | 7 +- .../sql/execution/AlreadyOptimizedSuite.scala | 85 ---------------------- 6 files changed, 16 insertions(+), 128 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c5c0c68..a5a28a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -168,6 +168,7 @@ class Analyzer(override val catalogManager: CatalogManager) } def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { + if (plan.analyzed) return plan AnalysisHelper.markInAnalyzer { val analyzed = executeAndTrack(plan, tracker) try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala index 2c6a716a..ffd1f78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala @@ -46,7 +46,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => * This should only be called by * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. */ - private[catalyst] def setAnalyzed(): Unit = { + private[sql] def setAnalyzed(): Unit = { if (!_analyzed) { _analyzed = true children.foreach(_.setAnalyzed()) @@ -180,6 +180,11 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => super.transformAllExpressions(rule) } + override def clone(): LogicalPlan = { + val cloned = super.clone() + if (analyzed) cloned.setAnalyzed() + cloned + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala deleted file mode 100644 index e40b114..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - -/** Query execution that skips re-analysis and optimize. */ -class AlreadyOptimizedExecution( - session: SparkSession, - plan: LogicalPlan) extends QueryExecution(session, plan) { - override lazy val analyzed: LogicalPlan = plan - override lazy val optimizedPlan: LogicalPlan = plan -} - -object AlreadyOptimized { - def dataFrame(sparkSession: SparkSession, optimized: LogicalPlan): DataFrame = { - val qe = new AlreadyOptimizedExecution(sparkSession, optimized) - new Dataset[Row](qe, RowEncoder(qe.analyzed.schema)) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 0531dd2..1d5a884 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -84,7 +84,12 @@ class QueryExecution( lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) { // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. - sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker) + val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker) + // We do not want optimized plans to be re-analyzed as literals that have been constant folded + // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state + // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia. + plan.setAnalyzed() + plan } private def assertOptimized(): Unit = optimizedPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index 9d2cea9..080e977 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -20,12 +20,13 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.UUID import org.apache.spark.SparkException +import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.SupportsWrite import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} -import org.apache.spark.sql.execution.{AlreadyOptimized, SparkPlan} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -118,9 +119,7 @@ trait SupportsV1Write extends SparkPlan { protected def writeWithV1( relation: InsertableRelation, refreshCache: () => Unit = () => ()): Seq[InternalRow] = { - val session = sqlContext.sparkSession - // The `plan` is already optimized, we should not analyze and optimize it again. - relation.insert(AlreadyOptimized.dataFrame(session, plan), overwrite = false) + relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false) refreshCache() Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala deleted file mode 100644 index c266aa9..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.execution.adaptive.EnableAdaptiveExecutionSuite -import org.apache.spark.sql.test.SharedSparkSession - -class AlreadyOptimizedSuite extends QueryTest with SharedSparkSession { - - import testImplicits._ - - test("simple execution") { - val df = spark.range(10) - val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan) - - checkAnswer(planned, df.toDF().collect()) - } - - test("planning on top works - projection") { - val df = spark.range(10) - val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan) - - checkAnswer( - planned.withColumn("data", 'id + 1), - df.withColumn("data", 'id + 1).collect()) - } - - test("planning on top works - filter") { - val df = spark.range(10) - val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan) - - checkAnswer(planned.where('id < 5), df.where('id < 5).toDF().collect()) - } - - test("planning on top works - aggregate") { - val df = spark.range(10) - val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan) - - checkAnswer(planned.groupBy('id).count(), df.groupBy('id).count().collect()) - } - - test("planning on top works - joins") { - val df = spark.range(10) - val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan) - - val plannedLeft = planned.alias("l") - val dfLeft = df.alias("l") - val plannedRight = planned.alias("r") - val dfRight = df.alias("r") - - checkAnswer( - plannedLeft.where('id < 3).join(plannedRight, Seq("id")), - dfLeft.where('id < 3).join(dfRight, Seq("id")).collect()) - - checkAnswer( - plannedLeft.where('id < 3).join(plannedRight, plannedLeft("id") === plannedRight("id")), - dfLeft.where('id < 3).join(dfRight, dfLeft("id") === dfRight("id")).collect()) - - checkAnswer( - plannedLeft.join(plannedRight, Seq("id")).where('id < 3), - dfLeft.join(dfRight, Seq("id")).where('id < 3).collect()) - - checkAnswer( - plannedLeft.join(plannedRight, plannedLeft("id") === plannedRight("id")).where($"l.id" < 3), - dfLeft.join(dfRight, dfLeft("id") === dfRight("id")).where($"l.id" < 3).collect()) - } -} - -class AlreadyOptimizedAQESuite extends AlreadyOptimizedSuite with EnableAdaptiveExecutionSuite --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org