This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 14762b372dd6 [SPARK-47177][SQL] Cached SQL plan do not display final 
AQE plan in explain string
14762b372dd6 is described below

commit 14762b372dd623179aa2c985c44cd49048660dda
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Tue Mar 5 10:13:00 2024 +0800

    [SPARK-47177][SQL] Cached SQL plan do not display final AQE plan in explain 
string
    
    ### What changes were proposed in this pull request?
    
    This pr adds lock for ExplainUtils.processPlan to avoid tag race condition.
    
    ### Why are the changes needed?
    
    To fix the issue 
[SPARK-47177](https://issues.apache.org/jira/browse/SPARK-47177)
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, affect plan explain
    
    ### How was this patch tested?
    
    add test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #45282 from ulysses-you/SPARK-47177.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: youxiduo <youxi...@corp.netease.com>
    (cherry picked from commit 6e62a5690b810edb99e4fc6ad39afbd4d49ef85e)
    Signed-off-by: youxiduo <youxi...@corp.netease.com>
---
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |  7 ++--
 .../apache/spark/sql/execution/ExplainUtils.scala  |  6 +++-
 .../sql/execution/columnar/InMemoryRelation.scala  | 12 +------
 .../execution/columnar/InMemoryRelationSuite.scala | 41 +++++++++++++++-------
 4 files changed, 38 insertions(+), 28 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 9e605a45414b..82228a5b2aaf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -1030,10 +1030,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
     append(str)
     append("\n")
 
-    if (innerChildren.nonEmpty) {
+    val innerChildrenLocal = innerChildren
+    if (innerChildrenLocal.nonEmpty) {
       lastChildren.add(children.isEmpty)
       lastChildren.add(false)
-      innerChildren.init.foreach(_.generateTreeString(
+      innerChildrenLocal.init.foreach(_.generateTreeString(
         depth + 2, lastChildren, append, verbose,
         addSuffix = addSuffix, maxFields = maxFields, printNodeId = 
printNodeId, indent = indent))
       lastChildren.remove(lastChildren.size() - 1)
@@ -1041,7 +1042,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
 
       lastChildren.add(children.isEmpty)
       lastChildren.add(true)
-      innerChildren.last.generateTreeString(
+      innerChildrenLocal.last.generateTreeString(
         depth + 2, lastChildren, append, verbose,
         addSuffix = addSuffix, maxFields = maxFields, printNodeId = 
printNodeId, indent = indent)
       lastChildren.remove(lastChildren.size() - 1)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
index 3da3e646f36b..11f6ae0e47ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
@@ -75,8 +75,12 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    * Given a input physical plan, performs the following tasks.
    *   1. Generates the explain output for the input plan excluding the 
subquery plans.
    *   2. Generates the explain output for each subquery referenced in the 
plan.
+   *
+   * Note that, ideally this is a no-op as different explain actions operate 
on different plan,
+   * instances but cached plan is an exception. The 
`InMemoryRelation#innerChildren` use a shared
+   * plan instance across multi-queries. Add lock for this method to avoid tag 
race condition.
    */
-  def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
+  def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = 
synchronized {
     try {
       // Initialize a reference-unique set of Operators to avoid accdiental 
overwrites and to allow
       // intentional overwriting of IDs generated in previous AQE iteration
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 5bab8e53eb16..f750a4503be1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -392,17 +392,7 @@ case class InMemoryRelation(
 
   @volatile var statsOfPlanToCache: Statistics = null
 
-
-  override lazy val innerChildren: Seq[SparkPlan] = {
-    // The cachedPlan needs to be cloned here because it does not get cloned 
when SparkPlan.clone is
-    // called. This is a problem because when the explain output is generated 
for
-    // a plan it traverses the innerChildren and modifies their TreeNode.tags. 
If the plan is not
-    // cloned, there is a thread safety issue in the case that two plans with 
a shared cache
-    // operator have explain called at the same time. The cachedPlan cannot be 
cloned because
-    // it contains stateful information so we only clone it for the purpose of 
generating the
-    // explain output.
-    Seq(cachedPlan.clone())
-  }
+  override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
 
   override def doCanonicalize(): logical.LogicalPlan =
     copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
index a5c5ec40af6f..2c73622739a5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
@@ -18,22 +18,14 @@
 package org.apache.spark.sql.execution.columnar
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions.expr
 import org.apache.spark.sql.test.SharedSparkSessionBase
 import org.apache.spark.storage.StorageLevel
 
-class InMemoryRelationSuite extends SparkFunSuite with SharedSparkSessionBase {
-  test("SPARK-43157: Clone innerChildren cached plan") {
-    val d = spark.range(1)
-    val relation = InMemoryRelation(StorageLevel.MEMORY_ONLY, 
d.queryExecution, None)
-    val cloned = relation.clone().asInstanceOf[InMemoryRelation]
-
-    val relationCachedPlan = relation.innerChildren.head
-    val clonedCachedPlan = cloned.innerChildren.head
-
-    // verify the plans are not the same object but are logically equivalent
-    assert(!relationCachedPlan.eq(clonedCachedPlan))
-    assert(relationCachedPlan === clonedCachedPlan)
-  }
+class InMemoryRelationSuite extends SparkFunSuite
+  with SharedSparkSessionBase with AdaptiveSparkPlanHelper {
 
   test("SPARK-46779: InMemoryRelations with the same cached plan are 
semantically equivalent") {
     val d = spark.range(1)
@@ -41,4 +33,27 @@ class InMemoryRelationSuite extends SparkFunSuite with 
SharedSparkSessionBase {
     val r2 = r1.withOutput(r1.output.map(_.newInstance()))
     assert(r1.sameResult(r2))
   }
+
+  test("SPARK-47177: Cached SQL plan do not display final AQE plan in explain 
string") {
+    def findIMRInnerChild(p: SparkPlan): SparkPlan = {
+      val tableCache = find(p) {
+        case _: InMemoryTableScanExec => true
+        case _ => false
+      }
+      assert(tableCache.isDefined)
+      
tableCache.get.asInstanceOf[InMemoryTableScanExec].relation.innerChildren.head
+    }
+
+    val d1 = spark.range(1).withColumn("key", expr("id % 100"))
+      .groupBy("key").agg(Map("key" -> "count"))
+    val cached_d2 = d1.cache()
+    val df = cached_d2.withColumn("key2", expr("key % 10"))
+      .groupBy("key2").agg(Map("key2" -> "count"))
+
+    assert(findIMRInnerChild(df.queryExecution.executedPlan).treeString
+      .contains("AdaptiveSparkPlan isFinalPlan=false"))
+    df.collect()
+    assert(findIMRInnerChild(df.queryExecution.executedPlan).treeString
+      .contains("AdaptiveSparkPlan isFinalPlan=true"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to