This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0820beb  [SPARK-28863][SQL][FOLLOWUP][3.0] Make sure optimized plan 
will not be re-analyzed
0820beb is described below

commit 0820beb60018210ee718b96acb37feb8d8445251
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Dec 22 10:30:34 2020 +0900

    [SPARK-28863][SQL][FOLLOWUP][3.0] Make sure optimized plan will not be 
re-analyzed
    
    backport https://github.com/apache/spark/pull/30777 to 3.0
    
    ----------
    
    ### What changes were proposed in this pull request?
    
    It's a known issue that re-analyzing an optimized plan can lead to various 
issues. We made several attempts to avoid it from happening, but the current 
solution `AlreadyOptimized` is still not 100% safe, as people can inject 
catalyst rules to call analyzer directly.
    
    This PR proposes a simpler and safer idea: we set the `analyzed` flag to 
true after optimization, and analyzer will skip processing plans whose 
`analyzed` flag is true.
    
    ### Why are the changes needed?
    
    make the code simpler and safer
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests.
    
    Closes #30872 from cloud-fan/ds.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  1 +
 .../catalyst/plans/logical/AnalysisHelper.scala    |  7 +-
 .../spark/sql/execution/AlreadyOptimized.scala     | 37 ----------
 .../spark/sql/execution/QueryExecution.scala       |  7 +-
 .../datasources/v2/V1FallbackWriters.scala         |  6 +-
 .../sql/execution/AlreadyOptimizedSuite.scala      | 85 ----------------------
 6 files changed, 16 insertions(+), 127 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1307fc5..fbe6041 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -144,6 +144,7 @@ class Analyzer(
   }
 
   def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): 
LogicalPlan = {
+    if (plan.analyzed) return plan
     AnalysisHelper.markInAnalyzer {
       val analyzed = executeAndTrack(plan, tracker)
       try {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
index 30447db..9b6fee6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
@@ -46,7 +46,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: 
LogicalPlan =>
    * Recursively marks all nodes in this plan tree as analyzed.
    * This should only be called by [[CheckAnalysis]].
    */
-  private[catalyst] def setAnalyzed(): Unit = {
+  private[sql] def setAnalyzed(): Unit = {
     if (!_analyzed) {
       _analyzed = true
       children.foreach(_.setAnalyzed())
@@ -180,6 +180,11 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { 
self: LogicalPlan =>
     super.transformAllExpressions(rule)
   }
 
+  override def clone(): LogicalPlan = {
+    val cloned = super.clone()
+    if (analyzed) cloned.setAnalyzed()
+    cloned
+  }
 }
 
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
deleted file mode 100644
index e40b114..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/** Query execution that skips re-analysis and optimize. */
-class AlreadyOptimizedExecution(
-    session: SparkSession,
-    plan: LogicalPlan) extends QueryExecution(session, plan) {
-  override lazy val analyzed: LogicalPlan = plan
-  override lazy val optimizedPlan: LogicalPlan = plan
-}
-
-object AlreadyOptimized {
-  def dataFrame(sparkSession: SparkSession, optimized: LogicalPlan): DataFrame 
= {
-    val qe = new AlreadyOptimizedExecution(sparkSession, optimized)
-    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
-  }
-}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 7f5a5e3..ed36c78 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -79,7 +79,12 @@ class QueryExecution(
   lazy val optimizedPlan: LogicalPlan = 
executePhase(QueryPlanningTracker.OPTIMIZATION) {
     // clone the plan to avoid sharing the plan instance between different 
stages like analyzing,
     // optimizing and planning.
-    
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), 
tracker)
+    val plan = 
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), 
tracker)
+    // We do not want optimized plans to be re-analyzed as literals that have 
been constant folded
+    // and such can cause issues during analysis. While `clone` should 
maintain the `analyzed` state
+    // of the LogicalPlan, we set the plan as analyzed here as well out of 
paranoia.
+    plan.setAnalyzed()
+    plan
   }
 
   private def assertOptimized(): Unit = optimizedPlan
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
index 560da39..95082bc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
@@ -20,12 +20,13 @@ package org.apache.spark.sql.execution.datasources.v2
 import java.util.UUID
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.SupportsWrite
 import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, 
SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
-import org.apache.spark.sql.execution.{AlreadyOptimized, SparkPlan}
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -113,8 +114,7 @@ trait SupportsV1Write extends SparkPlan {
   def plan: LogicalPlan
 
   protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = {
-    // The `plan` is already optimized, we should not analyze and optimize it 
again.
-    relation.insert(AlreadyOptimized.dataFrame(sqlContext.sparkSession, plan), 
overwrite = false)
+    relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = 
false)
     Nil
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
deleted file mode 100644
index c266aa9..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.execution.adaptive.EnableAdaptiveExecutionSuite
-import org.apache.spark.sql.test.SharedSparkSession
-
-class AlreadyOptimizedSuite extends QueryTest with SharedSparkSession {
-
-  import testImplicits._
-
-  test("simple execution") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, 
df.queryExecution.optimizedPlan)
-
-    checkAnswer(planned, df.toDF().collect())
-  }
-
-  test("planning on top works - projection") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, 
df.queryExecution.optimizedPlan)
-
-    checkAnswer(
-      planned.withColumn("data", 'id + 1),
-      df.withColumn("data", 'id + 1).collect())
-  }
-
-  test("planning on top works - filter") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, 
df.queryExecution.optimizedPlan)
-
-    checkAnswer(planned.where('id < 5), df.where('id < 5).toDF().collect())
-  }
-
-  test("planning on top works - aggregate") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, 
df.queryExecution.optimizedPlan)
-
-    checkAnswer(planned.groupBy('id).count(), 
df.groupBy('id).count().collect())
-  }
-
-  test("planning on top works - joins") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, 
df.queryExecution.optimizedPlan)
-
-    val plannedLeft = planned.alias("l")
-    val dfLeft = df.alias("l")
-    val plannedRight = planned.alias("r")
-    val dfRight = df.alias("r")
-
-    checkAnswer(
-      plannedLeft.where('id < 3).join(plannedRight, Seq("id")),
-      dfLeft.where('id < 3).join(dfRight, Seq("id")).collect())
-
-    checkAnswer(
-      plannedLeft.where('id < 3).join(plannedRight, plannedLeft("id") === 
plannedRight("id")),
-      dfLeft.where('id < 3).join(dfRight, dfLeft("id") === 
dfRight("id")).collect())
-
-    checkAnswer(
-      plannedLeft.join(plannedRight, Seq("id")).where('id < 3),
-      dfLeft.join(dfRight, Seq("id")).where('id < 3).collect())
-
-    checkAnswer(
-      plannedLeft.join(plannedRight, plannedLeft("id") === 
plannedRight("id")).where($"l.id" < 3),
-      dfLeft.join(dfRight, dfLeft("id") === dfRight("id")).where($"l.id" < 
3).collect())
-  }
-}
-
-class AlreadyOptimizedAQESuite extends AlreadyOptimizedSuite with 
EnableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to