This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c9748d4  [SPARK-31495][SQL] Support formatted explain for AQE
c9748d4 is described below

commit c9748d4f00c505053c81c8aeb69f7166e92f82a6
Author: yi.wu <yi...@databricks.com>
AuthorDate: Wed Apr 22 12:44:06 2020 +0000

    [SPARK-31495][SQL] Support formatted explain for AQE
    
    ### What changes were proposed in this pull request?
    
    To support formatted explain for AQE.
    
    ### Why are the changes needed?
    
    AQE does not support formatted explain yet. It's good to support it for 
better user experience, debugging, etc.
    
    Before:
    ```
    == Physical Plan ==
    AdaptiveSparkPlan (1)
    +- * HashAggregate (unknown)
       +- CustomShuffleReader (unknown)
          +- ShuffleQueryStage (unknown)
             +- Exchange (unknown)
                +- * HashAggregate (unknown)
                   +- * Project (unknown)
                      +- * BroadcastHashJoin Inner BuildRight (unknown)
                         :- * LocalTableScan (unknown)
                         +- BroadcastQueryStage (unknown)
                            +- BroadcastExchange (unknown)
                               +- LocalTableScan (unknown)
    
    (1) AdaptiveSparkPlan
    Output [4]: [k#7, count(v1)#32L, sum(v1)#33L, avg(v2)#34]
    Arguments: HashAggregate(keys=[k#7], functions=[count(1), sum(cast(v1#8 as 
bigint)), avg(cast(v2#19 as bigint))]), 
AdaptiveExecutionContext(org.apache.spark.sql.SparkSession104ab57b), 
[PlanAdaptiveSubqueries(Map())], false
    ```
    
    After:
    ```
    == Physical Plan ==
     AdaptiveSparkPlan (14)
     +- * HashAggregate (13)
        +- CustomShuffleReader (12)
           +- ShuffleQueryStage (11)
              +- Exchange (10)
                 +- * HashAggregate (9)
                    +- * Project (8)
                       +- * BroadcastHashJoin Inner BuildRight (7)
                          :- * Project (2)
                          :  +- * LocalTableScan (1)
                          +- BroadcastQueryStage (6)
                             +- BroadcastExchange (5)
                                +- * Project (4)
                                   +- * LocalTableScan (3)
    
     (1) LocalTableScan [codegen id : 2]
     Output [2]: [_1#x, _2#x]
     Arguments: [_1#x, _2#x]
    
     (2) Project [codegen id : 2]
     Output [2]: [_1#x AS k#x, _2#x AS v1#x]
     Input [2]: [_1#x, _2#x]
    
     (3) LocalTableScan [codegen id : 1]
     Output [2]: [_1#x, _2#x]
     Arguments: [_1#x, _2#x]
    
     (4) Project [codegen id : 1]
     Output [2]: [_1#x AS k#x, _2#x AS v2#x]
     Input [2]: [_1#x, _2#x]
    
     (5) BroadcastExchange
     Input [2]: [k#x, v2#x]
     Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint))), [id=#x]
    
     (6) BroadcastQueryStage
     Output [2]: [k#x, v2#x]
     Arguments: 0
    
     (7) BroadcastHashJoin [codegen id : 2]
     Left keys [1]: [k#x]
     Right keys [1]: [k#x]
     Join condition: None
    
     (8) Project [codegen id : 2]
     Output [3]: [k#x, v1#x, v2#x]
     Input [4]: [k#x, v1#x, k#x, v2#x]
    
     (9) HashAggregate [codegen id : 2]
     Input [3]: [k#x, v1#x, v2#x]
     Keys [1]: [k#x]
     Functions [3]: [partial_count(1), partial_sum(cast(v1#x as bigint)), 
partial_avg(cast(v2#x as bigint))]
     Aggregate Attributes [4]: [count#xL, sum#xL, sum#x, count#xL]
     Results [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
    
     (10) Exchange
     Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
     Arguments: hashpartitioning(k#x, 5), true, [id=#x]
    
     (11) ShuffleQueryStage
     Output [5]: [sum#xL, k#x, sum#x, count#xL, count#xL]
     Arguments: 1
    
     (12) CustomShuffleReader
     Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
     Arguments: coalesced
    
     (13) HashAggregate [codegen id : 3]
     Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
     Keys [1]: [k#x]
     Functions [3]: [count(1), sum(cast(v1#x as bigint)), avg(cast(v2#x as 
bigint))]
     Aggregate Attributes [3]: [count(1)#xL, sum(cast(v1#x as bigint))#xL, 
avg(cast(v2#x as bigint))#x]
     Results [4]: [k#x, count(1)#xL AS count(v1)#xL, sum(cast(v1#x as 
bigint))#xL AS sum(v1)#xL, avg(cast(v2#x as bigint))#x AS avg(v2)#x]
    
     (14) AdaptiveSparkPlan
     Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
     Arguments: isFinalPlan=true
    ```
    
    ### Does this PR introduce any user-facing change?
    
    No, this should be new feature along with AQE in Spark 3.0.
    
    ### How was this patch tested?
    
    Added a query file: `explain-aqe.sql` and a unit test.
    
    Closes #28271 from Ngone51/support_formatted_explain_for_aqe.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 8fbfdb38c0baff7bc5d1ce1e3d6f1df70c25da70)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/execution/ExplainUtils.scala  |  59 +-
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |   5 +-
 .../adaptive/InsertAdaptiveSparkPlan.scala         |   2 +-
 .../resources/sql-tests/inputs/explain-aqe.sql     |   3 +
 .../test/resources/sql-tests/inputs/explain.sql    |   1 +
 .../{explain.sql.out => explain-aqe.sql.out}       | 669 ++++++++-------------
 .../resources/sql-tests/results/explain.sql.out    |  10 +-
 .../scala/org/apache/spark/sql/ExplainSuite.scala  |  72 ++-
 .../adaptive/AdaptiveQueryExecSuite.scala          |   6 +-
 9 files changed, 358 insertions(+), 469 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
index 5d43093..aec1c93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
@@ -23,8 +23,9 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, QueryStageExec}
 
-object ExplainUtils {
+object ExplainUtils extends AdaptiveSparkPlanHelper {
   /**
    * Given a input physical plan, performs the following tasks.
    *   1. Computes the operator id for current operator and records it in the 
operaror
@@ -144,15 +145,26 @@ object ExplainUtils {
       case p: WholeStageCodegenExec =>
       case p: InputAdapter =>
       case other: QueryPlan[_] =>
-        if (!other.getTagValue(QueryPlan.OP_ID_TAG).isDefined) {
+
+        def setOpId(): Unit = if 
(other.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
           currentOperationID += 1
           other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
           operatorIDs += ((currentOperationID, other))
         }
-        other.innerChildren.foreach { plan =>
-          currentOperationID = generateOperatorIDs(plan,
-            currentOperationID,
-            operatorIDs)
+
+        other match {
+          case p: AdaptiveSparkPlanExec =>
+            currentOperationID =
+              generateOperatorIDs(p.executedPlan, currentOperationID, 
operatorIDs)
+            setOpId()
+          case p: QueryStageExec =>
+            currentOperationID = generateOperatorIDs(p.plan, 
currentOperationID, operatorIDs)
+            setOpId()
+          case _ =>
+            setOpId()
+            other.innerChildren.foldLeft(currentOperationID) {
+              (curId, plan) => generateOperatorIDs(plan, curId, operatorIDs)
+            }
         }
     }
     currentOperationID
@@ -163,21 +175,25 @@ object ExplainUtils {
    * whole stage code gen id in the plan via setting a tag.
    */
   private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = {
+    var currentCodegenId = -1
+
+    def setCodegenId(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
+      if (currentCodegenId != -1) {
+        p.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
+      }
+      children.foreach(generateWholeStageCodegenIds)
+    }
+
     // Skip the subqueries as they are not printed as part of main query block.
     if (plan.isInstanceOf[BaseSubqueryExec]) {
       return
     }
-    var currentCodegenId = -1
     plan.foreach {
       case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId
       case _: InputAdapter => currentCodegenId = -1
-      case other: QueryPlan[_] =>
-        if (currentCodegenId != -1) {
-          other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
-        }
-        other.innerChildren.foreach { plan =>
-          generateWholeStageCodegenIds(plan)
-        }
+      case p: AdaptiveSparkPlanExec => setCodegenId(p, Seq(p.executedPlan))
+      case p: QueryStageExec => setCodegenId(p, Seq(p.plan))
+      case other: QueryPlan[_] => setCodegenId(other, other.innerChildren)
     }
   }
 
@@ -232,13 +248,16 @@ object ExplainUtils {
   }
 
   def removeTags(plan: QueryPlan[_]): Unit = {
+    def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
+      p.unsetTagValue(QueryPlan.OP_ID_TAG)
+      p.unsetTagValue(QueryPlan.CODEGEN_ID_TAG)
+      children.foreach(removeTags)
+    }
+
     plan foreach {
-      case plan: QueryPlan[_] =>
-        plan.unsetTagValue(QueryPlan.OP_ID_TAG)
-        plan.unsetTagValue(QueryPlan.CODEGEN_ID_TAG)
-        plan.innerChildren.foreach { p =>
-          removeTags(p)
-        }
+      case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan))
+      case p: QueryStageExec => remove(p, Seq(p.plan))
+      case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
     }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index d73540c..f00dce2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -251,10 +251,7 @@ case class AdaptiveSparkPlanExec(
     getFinalPhysicalPlan().execute()
   }
 
-  override def verboseString(maxFields: Int): String = simpleString(maxFields)
-
-  override def simpleString(maxFields: Int): String =
-    s"AdaptiveSparkPlan(isFinalPlan=$isFinalPlan)"
+  protected override def stringArgs: Iterator[Any] = 
Iterator(s"isFinalPlan=$isFinalPlan")
 
   override def generateTreeString(
       depth: Int,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index ea586f0..754225d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -122,7 +122,7 @@ case class InsertAdaptiveSparkPlan(
           if !subqueryMap.contains(exprId.id) =>
         val executedPlan = compileSubquery(p)
         verifyAdaptivePlan(executedPlan, p)
-        val subquery = SubqueryExec(s"subquery${exprId.id}", executedPlan)
+        val subquery = SubqueryExec(s"subquery#${exprId.id}", executedPlan)
         subqueryMap.put(exprId.id, subquery)
       case expressions.InSubquery(_, ListQuery(query, _, exprId, _))
           if !subqueryMap.contains(exprId.id) =>
diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql 
b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
new file mode 100644
index 0000000..f4afa2b
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
@@ -0,0 +1,3 @@
+--IMPORT explain.sql
+
+--SET spark.sql.adaptive.enabled=true
diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql 
b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
index 497b61c..80bf258 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
@@ -117,3 +117,4 @@ EXPLAIN FORMATTED
 DROP TABLE explain_temp1;
 DROP TABLE explain_temp2;
 DROP TABLE explain_temp3;
+DROP TABLE explain_temp4;
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
similarity index 57%
copy from sql/core/src/test/resources/sql-tests/results/explain.sql.out
copy to sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index 06226f1..e1d4fa8 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 22
+-- Number of queries: 23
 
 
 -- !query
@@ -53,14 +53,14 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* Sort (9)
-+- Exchange (8)
-   +- * HashAggregate (7)
-      +- Exchange (6)
-         +- * HashAggregate (5)
-            +- * Project (4)
-               +- * Filter (3)
-                  +- * ColumnarToRow (2)
+AdaptiveSparkPlan (9)
++- Sort (8)
+   +- Exchange (7)
+      +- HashAggregate (6)
+         +- Exchange (5)
+            +- HashAggregate (4)
+               +- Project (3)
+                  +- Filter (2)
                      +- Scan parquet default.explain_temp1 (1)
 
 
@@ -71,42 +71,43 @@ Location [not included in 
comparison]/{warehouse_dir}/explain_temp1]
 PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
 ReadSchema: struct<key:int,val:int>
      
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(3) Filter [codegen id : 1]
+(2) Filter 
 Input [2]: [key#x, val#x]
 Condition : (isnotnull(key#x) AND (key#x > 0))
      
-(4) Project [codegen id : 1]
+(3) Project 
 Output [2]: [key#x, val#x]
 Input [2]: [key#x, val#x]
      
-(5) HashAggregate [codegen id : 1]
+(4) HashAggregate 
 Input [2]: [key#x, val#x]
 Keys [1]: [key#x]
 Functions [1]: [partial_max(val#x)]
 Aggregate Attributes [1]: [max#x]
 Results [2]: [key#x, max#x]
      
-(6) Exchange 
+(5) Exchange 
 Input [2]: [key#x, max#x]
 Arguments: hashpartitioning(key#x, 4), true, [id=#x]
       
-(7) HashAggregate [codegen id : 2]
+(6) HashAggregate 
 Input [2]: [key#x, max#x]
 Keys [1]: [key#x]
 Functions [1]: [max(val#x)]
 Aggregate Attributes [1]: [max(val#x)#x]
 Results [2]: [key#x, max(val#x)#x AS max(val)#x]
      
-(8) Exchange 
+(7) Exchange 
 Input [2]: [key#x, max(val)#x]
 Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x]
       
-(9) Sort [codegen id : 3]
+(8) Sort 
 Input [2]: [key#x, max(val)#x]
 Arguments: [key#x ASC NULLS FIRST], true, 0
+      
+(9) AdaptiveSparkPlan 
+Output [2]: [key#x, max(val)#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -120,14 +121,14 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* Project (9)
-+- * Filter (8)
-   +- * HashAggregate (7)
-      +- Exchange (6)
-         +- * HashAggregate (5)
-            +- * Project (4)
-               +- * Filter (3)
-                  +- * ColumnarToRow (2)
+AdaptiveSparkPlan (9)
++- Project (8)
+   +- Filter (7)
+      +- HashAggregate (6)
+         +- Exchange (5)
+            +- HashAggregate (4)
+               +- Project (3)
+                  +- Filter (2)
                      +- Scan parquet default.explain_temp1 (1)
 
 
@@ -138,42 +139,43 @@ Location [not included in 
comparison]/{warehouse_dir}/explain_temp1]
 PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
 ReadSchema: struct<key:int,val:int>
      
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(3) Filter [codegen id : 1]
+(2) Filter 
 Input [2]: [key#x, val#x]
 Condition : (isnotnull(key#x) AND (key#x > 0))
      
-(4) Project [codegen id : 1]
+(3) Project 
 Output [2]: [key#x, val#x]
 Input [2]: [key#x, val#x]
      
-(5) HashAggregate [codegen id : 1]
+(4) HashAggregate 
 Input [2]: [key#x, val#x]
 Keys [1]: [key#x]
 Functions [1]: [partial_max(val#x)]
 Aggregate Attributes [1]: [max#x]
 Results [2]: [key#x, max#x]
      
-(6) Exchange 
+(5) Exchange 
 Input [2]: [key#x, max#x]
 Arguments: hashpartitioning(key#x, 4), true, [id=#x]
       
-(7) HashAggregate [codegen id : 2]
+(6) HashAggregate 
 Input [2]: [key#x, max#x]
 Keys [1]: [key#x]
 Functions [1]: [max(val#x)]
 Aggregate Attributes [1]: [max(val#x)#x]
 Results [3]: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x]
      
-(8) Filter [codegen id : 2]
+(7) Filter 
 Input [3]: [key#x, max(val)#x, max(val#x)#x]
 Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0))
      
-(9) Project [codegen id : 2]
+(8) Project 
 Output [2]: [key#x, max(val)#x]
 Input [3]: [key#x, max(val)#x, max(val#x)#x]
+     
+(9) AdaptiveSparkPlan 
+Output [2]: [key#x, max(val)#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -185,18 +187,17 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* HashAggregate (12)
-+- Exchange (11)
-   +- * HashAggregate (10)
-      +- Union (9)
-         :- * Project (4)
-         :  +- * Filter (3)
-         :     +- * ColumnarToRow (2)
-         :        +- Scan parquet default.explain_temp1 (1)
-         +- * Project (8)
-            +- * Filter (7)
-               +- * ColumnarToRow (6)
-                  +- Scan parquet default.explain_temp1 (5)
+AdaptiveSparkPlan (11)
++- HashAggregate (10)
+   +- Exchange (9)
+      +- HashAggregate (8)
+         +- Union (7)
+            :- Project (3)
+            :  +- Filter (2)
+            :     +- Scan parquet default.explain_temp1 (1)
+            +- Project (6)
+               +- Filter (5)
+                  +- Scan parquet default.explain_temp1 (4)
 
 
 (1) Scan parquet default.explain_temp1 
@@ -206,54 +207,52 @@ Location [not included in 
comparison]/{warehouse_dir}/explain_temp1]
 PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
 ReadSchema: struct<key:int,val:int>
      
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(3) Filter [codegen id : 1]
+(2) Filter 
 Input [2]: [key#x, val#x]
 Condition : (isnotnull(key#x) AND (key#x > 0))
      
-(4) Project [codegen id : 1]
+(3) Project 
 Output [2]: [key#x, val#x]
 Input [2]: [key#x, val#x]
      
-(5) Scan parquet default.explain_temp1 
+(4) Scan parquet default.explain_temp1 
 Output [2]: [key#x, val#x]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/explain_temp1]
 PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
 ReadSchema: struct<key:int,val:int>
      
-(6) ColumnarToRow [codegen id : 2]
-Input [2]: [key#x, val#x]
-      
-(7) Filter [codegen id : 2]
+(5) Filter 
 Input [2]: [key#x, val#x]
 Condition : (isnotnull(key#x) AND (key#x > 0))
      
-(8) Project [codegen id : 2]
+(6) Project 
 Output [2]: [key#x, val#x]
 Input [2]: [key#x, val#x]
      
-(9) Union 
+(7) Union 
       
-(10) HashAggregate [codegen id : 3]
+(8) HashAggregate 
 Input [2]: [key#x, val#x]
 Keys [2]: [key#x, val#x]
 Functions: []
 Aggregate Attributes: []
 Results [2]: [key#x, val#x]
      
-(11) Exchange 
+(9) Exchange 
 Input [2]: [key#x, val#x]
 Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x]
       
-(12) HashAggregate [codegen id : 4]
+(10) HashAggregate 
 Input [2]: [key#x, val#x]
 Keys [2]: [key#x, val#x]
 Functions: []
 Aggregate Attributes: []
 Results [2]: [key#x, val#x]
+     
+(11) AdaptiveSparkPlan 
+Output [2]: [key#x, val#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -266,16 +265,15 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* BroadcastHashJoin Inner BuildRight (10)
-:- * Project (4)
-:  +- * Filter (3)
-:     +- * ColumnarToRow (2)
-:        +- Scan parquet default.explain_temp1 (1)
-+- BroadcastExchange (9)
-   +- * Project (8)
-      +- * Filter (7)
-         +- * ColumnarToRow (6)
-            +- Scan parquet default.explain_temp2 (5)
+AdaptiveSparkPlan (9)
++- BroadcastHashJoin Inner BuildRight (8)
+   :- Project (3)
+   :  +- Filter (2)
+   :     +- Scan parquet default.explain_temp1 (1)
+   +- BroadcastExchange (7)
+      +- Project (6)
+         +- Filter (5)
+            +- Scan parquet default.explain_temp2 (4)
 
 
 (1) Scan parquet default.explain_temp1 
@@ -285,43 +283,41 @@ Location [not included in 
comparison]/{warehouse_dir}/explain_temp1]
 PushedFilters: [IsNotNull(key)]
 ReadSchema: struct<key:int,val:int>
      
-(2) ColumnarToRow [codegen id : 2]
-Input [2]: [key#x, val#x]
-      
-(3) Filter [codegen id : 2]
+(2) Filter 
 Input [2]: [key#x, val#x]
 Condition : isnotnull(key#x)
      
-(4) Project [codegen id : 2]
+(3) Project 
 Output [2]: [key#x, val#x]
 Input [2]: [key#x, val#x]
      
-(5) Scan parquet default.explain_temp2 
+(4) Scan parquet default.explain_temp2 
 Output [2]: [key#x, val#x]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/explain_temp2]
 PushedFilters: [IsNotNull(key)]
 ReadSchema: struct<key:int,val:int>
      
-(6) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(7) Filter [codegen id : 1]
+(5) Filter 
 Input [2]: [key#x, val#x]
 Condition : isnotnull(key#x)
      
-(8) Project [codegen id : 1]
+(6) Project 
 Output [2]: [key#x, val#x]
 Input [2]: [key#x, val#x]
      
-(9) BroadcastExchange 
+(7) BroadcastExchange 
 Input [2]: [key#x, val#x]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint))), [id=#x]
       
-(10) BroadcastHashJoin [codegen id : 2]
+(8) BroadcastHashJoin 
 Left keys [1]: [key#x]
 Right keys [1]: [key#x]
 Join condition: None
+       
+(9) AdaptiveSparkPlan 
+Output [4]: [key#x, val#x, key#x, val#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -334,14 +330,13 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* BroadcastHashJoin LeftOuter BuildRight (8)
-:- * ColumnarToRow (2)
-:  +- Scan parquet default.explain_temp1 (1)
-+- BroadcastExchange (7)
-   +- * Project (6)
-      +- * Filter (5)
-         +- * ColumnarToRow (4)
-            +- Scan parquet default.explain_temp2 (3)
+AdaptiveSparkPlan (7)
++- BroadcastHashJoin LeftOuter BuildRight (6)
+   :- Scan parquet default.explain_temp1 (1)
+   +- BroadcastExchange (5)
+      +- Project (4)
+         +- Filter (3)
+            +- Scan parquet default.explain_temp2 (2)
 
 
 (1) Scan parquet default.explain_temp1 
@@ -350,35 +345,33 @@ Batched: true
 Location [not included in comparison]/{warehouse_dir}/explain_temp1]
 ReadSchema: struct<key:int,val:int>
      
-(2) ColumnarToRow [codegen id : 2]
-Input [2]: [key#x, val#x]
-      
-(3) Scan parquet default.explain_temp2 
+(2) Scan parquet default.explain_temp2 
 Output [2]: [key#x, val#x]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/explain_temp2]
 PushedFilters: [IsNotNull(key)]
 ReadSchema: struct<key:int,val:int>
      
-(4) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(5) Filter [codegen id : 1]
+(3) Filter 
 Input [2]: [key#x, val#x]
 Condition : isnotnull(key#x)
      
-(6) Project [codegen id : 1]
+(4) Project 
 Output [2]: [key#x, val#x]
 Input [2]: [key#x, val#x]
      
-(7) BroadcastExchange 
+(5) BroadcastExchange 
 Input [2]: [key#x, val#x]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint))), [id=#x]
       
-(8) BroadcastHashJoin [codegen id : 2]
+(6) BroadcastHashJoin 
 Left keys [1]: [key#x]
 Right keys [1]: [key#x]
 Join condition: None
+       
+(7) AdaptiveSparkPlan 
+Output [4]: [key#x, val#x, key#x, val#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -396,9 +389,9 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* Project (4)
-+- * Filter (3)
-   +- * ColumnarToRow (2)
+AdaptiveSparkPlan (4)
++- Project (3)
+   +- Filter (2)
       +- Scan parquet default.explain_temp1 (1)
 
 
@@ -409,110 +402,17 @@ Location [not included in 
comparison]/{warehouse_dir}/explain_temp1]
 PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)]
 ReadSchema: struct<key:int,val:int>
      
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(3) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery 
scalar-subquery#x, [id=#x])) AND (val#x > 3))
-     
-(4) Project [codegen id : 1]
-Output [2]: [key#x, val#x]
+(2) Filter 
 Input [2]: [key#x, val#x]
+Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery 
subquery#x, [id=#x])) AND (val#x > 3))
      
-===== Subqueries =====
-
-Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
-* HashAggregate (11)
-+- Exchange (10)
-   +- * HashAggregate (9)
-      +- * Project (8)
-         +- * Filter (7)
-            +- * ColumnarToRow (6)
-               +- Scan parquet default.explain_temp2 (5)
-
-
-(5) Scan parquet default.explain_temp2 
+(3) Project 
 Output [2]: [key#x, val#x]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/explain_temp2]
-PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)]
-ReadSchema: struct<key:int,val:int>
-     
-(6) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(7) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery 
scalar-subquery#x, [id=#x])) AND (val#x = 2))
-     
-(8) Project [codegen id : 1]
-Output [1]: [key#x]
 Input [2]: [key#x, val#x]
      
-(9) HashAggregate [codegen id : 1]
-Input [1]: [key#x]
-Keys: []
-Functions [1]: [partial_max(key#x)]
-Aggregate Attributes [1]: [max#x]
-Results [1]: [max#x]
-     
-(10) Exchange 
-Input [1]: [max#x]
-Arguments: SinglePartition, true, [id=#x]
-      
-(11) HashAggregate [codegen id : 2]
-Input [1]: [max#x]
-Keys: []
-Functions [1]: [max(key#x)]
-Aggregate Attributes [1]: [max(key#x)#x]
-Results [1]: [max(key#x)#x AS max(key)#x]
-     
-Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
-* HashAggregate (18)
-+- Exchange (17)
-   +- * HashAggregate (16)
-      +- * Project (15)
-         +- * Filter (14)
-            +- * ColumnarToRow (13)
-               +- Scan parquet default.explain_temp3 (12)
-
-
-(12) Scan parquet default.explain_temp3 
+(4) AdaptiveSparkPlan 
 Output [2]: [key#x, val#x]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/explain_temp3]
-PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
-ReadSchema: struct<key:int,val:int>
-     
-(13) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(14) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : (isnotnull(val#x) AND (val#x > 0))
-     
-(15) Project [codegen id : 1]
-Output [1]: [key#x]
-Input [2]: [key#x, val#x]
-     
-(16) HashAggregate [codegen id : 1]
-Input [1]: [key#x]
-Keys: []
-Functions [1]: [partial_max(key#x)]
-Aggregate Attributes [1]: [max#x]
-Results [1]: [max#x]
-     
-(17) Exchange 
-Input [1]: [max#x]
-Arguments: SinglePartition, true, [id=#x]
-      
-(18) HashAggregate [codegen id : 2]
-Input [1]: [max#x]
-Keys: []
-Functions [1]: [max(key#x)]
-Aggregate Attributes [1]: [max(key#x)#x]
-Results [1]: [max(key#x)#x AS max(key)#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -530,8 +430,8 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* Filter (3)
-+- * ColumnarToRow (2)
+AdaptiveSparkPlan (3)
++- Filter (2)
    +- Scan parquet default.explain_temp1 (1)
 
 
@@ -541,106 +441,13 @@ Batched: true
 Location [not included in comparison]/{warehouse_dir}/explain_temp1]
 ReadSchema: struct<key:int,val:int>
      
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(3) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as 
double) = Subquery scalar-subquery#x, [id=#x]))
-     
-===== Subqueries =====
-
-Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
-* HashAggregate (10)
-+- Exchange (9)
-   +- * HashAggregate (8)
-      +- * Project (7)
-         +- * Filter (6)
-            +- * ColumnarToRow (5)
-               +- Scan parquet default.explain_temp2 (4)
-
-
-(4) Scan parquet default.explain_temp2 
-Output [2]: [key#x, val#x]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/explain_temp2]
-PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
-ReadSchema: struct<key:int,val:int>
-     
-(5) ColumnarToRow [codegen id : 1]
+(2) Filter 
 Input [2]: [key#x, val#x]
-      
-(6) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : (isnotnull(val#x) AND (val#x > 0))
-     
-(7) Project [codegen id : 1]
-Output [1]: [key#x]
-Input [2]: [key#x, val#x]
-     
-(8) HashAggregate [codegen id : 1]
-Input [1]: [key#x]
-Keys: []
-Functions [1]: [partial_max(key#x)]
-Aggregate Attributes [1]: [max#x]
-Results [1]: [max#x]
-     
-(9) Exchange 
-Input [1]: [max#x]
-Arguments: SinglePartition, true, [id=#x]
-      
-(10) HashAggregate [codegen id : 2]
-Input [1]: [max#x]
-Keys: []
-Functions [1]: [max(key#x)]
-Aggregate Attributes [1]: [max(key#x)#x]
-Results [1]: [max(key#x)#x AS max(key)#x]
+Condition : ((key#x = Subquery subquery#x, [id=#x]) OR (cast(key#x as double) 
= Subquery subquery#x, [id=#x]))
      
-Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
-* HashAggregate (17)
-+- Exchange (16)
-   +- * HashAggregate (15)
-      +- * Project (14)
-         +- * Filter (13)
-            +- * ColumnarToRow (12)
-               +- Scan parquet default.explain_temp3 (11)
-
-
-(11) Scan parquet default.explain_temp3 
+(3) AdaptiveSparkPlan 
 Output [2]: [key#x, val#x]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/explain_temp3]
-PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
-ReadSchema: struct<key:int,val:int>
-     
-(12) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(13) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : (isnotnull(val#x) AND (val#x > 0))
-     
-(14) Project [codegen id : 1]
-Output [1]: [key#x]
-Input [2]: [key#x, val#x]
-     
-(15) HashAggregate [codegen id : 1]
-Input [1]: [key#x]
-Keys: []
-Functions [1]: [partial_avg(cast(key#x as bigint))]
-Aggregate Attributes [2]: [sum#x, count#xL]
-Results [2]: [sum#x, count#xL]
-     
-(16) Exchange 
-Input [2]: [sum#x, count#xL]
-Arguments: SinglePartition, true, [id=#x]
-      
-(17) HashAggregate [codegen id : 2]
-Input [2]: [sum#x, count#xL]
-Keys: []
-Functions [1]: [avg(cast(key#x as bigint))]
-Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
-Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -651,8 +458,8 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* Project (3)
-+- * ColumnarToRow (2)
+AdaptiveSparkPlan (3)
++- Project (2)
    +- Scan parquet default.explain_temp1 (1)
 
 
@@ -662,51 +469,13 @@ Batched: true
 Location [not included in comparison]/{warehouse_dir}/explain_temp1]
 ReadSchema: struct<>
      
-(2) ColumnarToRow [codegen id : 1]
+(2) Project 
+Output [1]: [(Subquery subquery#x, [id=#x] + Subquery subquery#x, [id=#x]) AS 
(scalarsubquery() + scalarsubquery())#x]
 Input: []
-      
-(3) Project [codegen id : 1]
-Output [1]: [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery 
scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x]
-Input: []
-     
-===== Subqueries =====
-
-Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
-* HashAggregate (8)
-+- Exchange (7)
-   +- * HashAggregate (6)
-      +- * ColumnarToRow (5)
-         +- Scan parquet default.explain_temp1 (4)
-
-
-(4) Scan parquet default.explain_temp1 
-Output [1]: [key#x]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/explain_temp1]
-ReadSchema: struct<key:int>
-     
-(5) ColumnarToRow [codegen id : 1]
-Input [1]: [key#x]
-      
-(6) HashAggregate [codegen id : 1]
-Input [1]: [key#x]
-Keys: []
-Functions [1]: [partial_avg(cast(key#x as bigint))]
-Aggregate Attributes [2]: [sum#x, count#xL]
-Results [2]: [sum#x, count#xL]
-     
-(7) Exchange 
-Input [2]: [sum#x, count#xL]
-Arguments: SinglePartition, true, [id=#x]
-      
-(8) HashAggregate [codegen id : 2]
-Input [2]: [sum#x, count#xL]
-Keys: []
-Functions [1]: [avg(cast(key#x as bigint))]
-Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
-Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]
      
-Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#x, [id=#x]
+(3) AdaptiveSparkPlan 
+Output [1]: [(scalarsubquery() + scalarsubquery())#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -721,16 +490,15 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* BroadcastHashJoin Inner BuildRight (10)
-:- * Project (4)
-:  +- * Filter (3)
-:     +- * ColumnarToRow (2)
-:        +- Scan parquet default.explain_temp1 (1)
-+- BroadcastExchange (9)
-   +- * Project (8)
-      +- * Filter (7)
-         +- * ColumnarToRow (6)
-            +- Scan parquet default.explain_temp1 (5)
+AdaptiveSparkPlan (9)
++- BroadcastHashJoin Inner BuildRight (8)
+   :- Project (3)
+   :  +- Filter (2)
+   :     +- Scan parquet default.explain_temp1 (1)
+   +- BroadcastExchange (7)
+      +- Project (6)
+         +- Filter (5)
+            +- Scan parquet default.explain_temp1 (4)
 
 
 (1) Scan parquet default.explain_temp1 
@@ -740,43 +508,41 @@ Location [not included in 
comparison]/{warehouse_dir}/explain_temp1]
 PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
 ReadSchema: struct<key:int,val:int>
      
-(2) ColumnarToRow [codegen id : 2]
-Input [2]: [key#x, val#x]
-      
-(3) Filter [codegen id : 2]
+(2) Filter 
 Input [2]: [key#x, val#x]
 Condition : (isnotnull(key#x) AND (key#x > 10))
      
-(4) Project [codegen id : 2]
+(3) Project 
 Output [2]: [key#x, val#x]
 Input [2]: [key#x, val#x]
      
-(5) Scan parquet default.explain_temp1 
+(4) Scan parquet default.explain_temp1 
 Output [2]: [key#x, val#x]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/explain_temp1]
 PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
 ReadSchema: struct<key:int,val:int>
      
-(6) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(7) Filter [codegen id : 1]
+(5) Filter 
 Input [2]: [key#x, val#x]
 Condition : (isnotnull(key#x) AND (key#x > 10))
      
-(8) Project [codegen id : 1]
+(6) Project 
 Output [2]: [key#x, val#x]
 Input [2]: [key#x, val#x]
      
-(9) BroadcastExchange 
+(7) BroadcastExchange 
 Input [2]: [key#x, val#x]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint))), [id=#x]
       
-(10) BroadcastHashJoin [codegen id : 2]
+(8) BroadcastHashJoin 
 Left keys [1]: [key#x]
 Right keys [1]: [key#x]
 Join condition: None
+       
+(9) AdaptiveSparkPlan 
+Output [4]: [key#x, val#x, key#x, val#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -792,17 +558,21 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* BroadcastHashJoin Inner BuildRight (11)
-:- * HashAggregate (7)
-:  +- Exchange (6)
-:     +- * HashAggregate (5)
-:        +- * Project (4)
-:           +- * Filter (3)
-:              +- * ColumnarToRow (2)
-:                 +- Scan parquet default.explain_temp1 (1)
-+- BroadcastExchange (10)
-   +- * HashAggregate (9)
-      +- ReusedExchange (8)
+AdaptiveSparkPlan (15)
++- BroadcastHashJoin Inner BuildRight (14)
+   :- HashAggregate (6)
+   :  +- Exchange (5)
+   :     +- HashAggregate (4)
+   :        +- Project (3)
+   :           +- Filter (2)
+   :              +- Scan parquet default.explain_temp1 (1)
+   +- BroadcastExchange (13)
+      +- HashAggregate (12)
+         +- Exchange (11)
+            +- HashAggregate (10)
+               +- Project (9)
+                  +- Filter (8)
+                     +- Scan parquet default.explain_temp1 (7)
 
 
 (1) Scan parquet default.explain_temp1 
@@ -812,53 +582,77 @@ Location [not included in 
comparison]/{warehouse_dir}/explain_temp1]
 PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
 ReadSchema: struct<key:int,val:int>
      
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(3) Filter [codegen id : 1]
+(2) Filter 
 Input [2]: [key#x, val#x]
 Condition : (isnotnull(key#x) AND (key#x > 10))
      
-(4) Project [codegen id : 1]
+(3) Project 
 Output [2]: [key#x, val#x]
 Input [2]: [key#x, val#x]
      
-(5) HashAggregate [codegen id : 1]
+(4) HashAggregate 
 Input [2]: [key#x, val#x]
 Keys [1]: [key#x]
 Functions [1]: [partial_max(val#x)]
 Aggregate Attributes [1]: [max#x]
 Results [2]: [key#x, max#x]
      
-(6) Exchange 
+(5) Exchange 
 Input [2]: [key#x, max#x]
 Arguments: hashpartitioning(key#x, 4), true, [id=#x]
       
-(7) HashAggregate [codegen id : 4]
+(6) HashAggregate 
 Input [2]: [key#x, max#x]
 Keys [1]: [key#x]
 Functions [1]: [max(val#x)]
 Aggregate Attributes [1]: [max(val#x)#x]
 Results [2]: [key#x, max(val#x)#x AS max(val)#x]
      
-(8) ReusedExchange  [Reuses operator id: 6]
-Output [2]: [key#x, max#x]
+(7) Scan parquet default.explain_temp1 
+Output [2]: [key#x, val#x]
+Batched: true
+Location [not included in comparison]/{warehouse_dir}/explain_temp1]
+PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
+ReadSchema: struct<key:int,val:int>
+     
+(8) Filter 
+Input [2]: [key#x, val#x]
+Condition : (isnotnull(key#x) AND (key#x > 10))
      
-(9) HashAggregate [codegen id : 3]
+(9) Project 
+Output [2]: [key#x, val#x]
+Input [2]: [key#x, val#x]
+     
+(10) HashAggregate 
+Input [2]: [key#x, val#x]
+Keys [1]: [key#x]
+Functions [1]: [partial_max(val#x)]
+Aggregate Attributes [1]: [max#x]
+Results [2]: [key#x, max#x]
+     
+(11) Exchange 
+Input [2]: [key#x, max#x]
+Arguments: hashpartitioning(key#x, 4), true, [id=#x]
+      
+(12) HashAggregate 
 Input [2]: [key#x, max#x]
 Keys [1]: [key#x]
 Functions [1]: [max(val#x)]
 Aggregate Attributes [1]: [max(val#x)#x]
 Results [2]: [key#x, max(val#x)#x AS max(val)#x]
      
-(10) BroadcastExchange 
+(13) BroadcastExchange 
 Input [2]: [key#x, max(val)#x]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint))), [id=#x]
       
-(11) BroadcastHashJoin [codegen id : 4]
+(14) BroadcastHashJoin 
 Left keys [1]: [key#x]
 Right keys [1]: [key#x]
 Join condition: None
+       
+(15) AdaptiveSparkPlan 
+Output [4]: [key#x, max(val)#x, key#x, max(val)#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -898,10 +692,10 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-* HashAggregate (5)
-+- Exchange (4)
-   +- HashAggregate (3)
-      +- * ColumnarToRow (2)
+AdaptiveSparkPlan (5)
++- HashAggregate (4)
+   +- Exchange (3)
+      +- HashAggregate (2)
          +- Scan parquet default.explain_temp1 (1)
 
 
@@ -911,26 +705,27 @@ Batched: true
 Location [not included in comparison]/{warehouse_dir}/explain_temp1]
 ReadSchema: struct<key:int,val:int>
      
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(3) HashAggregate 
+(2) HashAggregate 
 Input [2]: [key#x, val#x]
 Keys: []
 Functions [3]: [partial_count(val#x), partial_sum(cast(key#x as bigint)), 
partial_count(key#x) FILTER (WHERE (val#x > 1))]
 Aggregate Attributes [3]: [count#xL, sum#xL, count#xL]
 Results [3]: [count#xL, sum#xL, count#xL]
      
-(4) Exchange 
+(3) Exchange 
 Input [3]: [count#xL, sum#xL, count#xL]
 Arguments: SinglePartition, true, [id=#x]
       
-(5) HashAggregate [codegen id : 2]
+(4) HashAggregate 
 Input [3]: [count#xL, sum#xL, count#xL]
 Keys: []
 Functions [3]: [count(val#x), sum(cast(key#x as bigint)), count(key#x)]
 Aggregate Attributes [3]: [count(val#x)#xL, sum(cast(key#x as bigint))#xL, 
count(key#x)#xL]
 Results [2]: [(count(val#x)#xL + sum(cast(key#x as bigint))#xL) AS TOTAL#xL, 
count(key#x)#xL AS count(key) FILTER (WHERE (val > 1))#xL]
+     
+(5) AdaptiveSparkPlan 
+Output [2]: [TOTAL#xL, count(key) FILTER (WHERE (val > 1))#xL]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -942,10 +737,10 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-ObjectHashAggregate (5)
-+- Exchange (4)
-   +- ObjectHashAggregate (3)
-      +- * ColumnarToRow (2)
+AdaptiveSparkPlan (5)
++- ObjectHashAggregate (4)
+   +- Exchange (3)
+      +- ObjectHashAggregate (2)
          +- Scan parquet default.explain_temp4 (1)
 
 
@@ -955,26 +750,27 @@ Batched: true
 Location [not included in comparison]/{warehouse_dir}/explain_temp4]
 ReadSchema: struct<key:int,val:string>
      
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(3) ObjectHashAggregate 
+(2) ObjectHashAggregate 
 Input [2]: [key#x, val#x]
 Keys [1]: [key#x]
 Functions [1]: [partial_collect_set(val#x, 0, 0)]
 Aggregate Attributes [1]: [buf#x]
 Results [2]: [key#x, buf#x]
      
-(4) Exchange 
+(3) Exchange 
 Input [2]: [key#x, buf#x]
 Arguments: hashpartitioning(key#x, 4), true, [id=#x]
       
-(5) ObjectHashAggregate 
+(4) ObjectHashAggregate 
 Input [2]: [key#x, buf#x]
 Keys [1]: [key#x]
 Functions [1]: [collect_set(val#x, 0, 0)]
 Aggregate Attributes [1]: [collect_set(val#x, 0, 0)#x]
 Results [2]: [key#x, sort_array(collect_set(val#x, 0, 0)#x, true)[0] AS 
sort_array(collect_set(val), true)[0]#x]
+     
+(5) AdaptiveSparkPlan 
+Output [2]: [key#x, sort_array(collect_set(val), true)[0]#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -986,12 +782,12 @@ EXPLAIN FORMATTED
 struct<plan:string>
 -- !query output
 == Physical Plan ==
-SortAggregate (7)
-+- * Sort (6)
-   +- Exchange (5)
-      +- SortAggregate (4)
-         +- * Sort (3)
-            +- * ColumnarToRow (2)
+AdaptiveSparkPlan (7)
++- SortAggregate (6)
+   +- Sort (5)
+      +- Exchange (4)
+         +- SortAggregate (3)
+            +- Sort (2)
                +- Scan parquet default.explain_temp4 (1)
 
 
@@ -1001,34 +797,35 @@ Batched: true
 Location [not included in comparison]/{warehouse_dir}/explain_temp4]
 ReadSchema: struct<key:int,val:string>
      
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-      
-(3) Sort [codegen id : 1]
+(2) Sort 
 Input [2]: [key#x, val#x]
 Arguments: [key#x ASC NULLS FIRST], false, 0
       
-(4) SortAggregate 
+(3) SortAggregate 
 Input [2]: [key#x, val#x]
 Keys [1]: [key#x]
 Functions [1]: [partial_min(val#x)]
 Aggregate Attributes [1]: [min#x]
 Results [2]: [key#x, min#x]
      
-(5) Exchange 
+(4) Exchange 
 Input [2]: [key#x, min#x]
 Arguments: hashpartitioning(key#x, 4), true, [id=#x]
       
-(6) Sort [codegen id : 2]
+(5) Sort 
 Input [2]: [key#x, min#x]
 Arguments: [key#x ASC NULLS FIRST], false, 0
       
-(7) SortAggregate 
+(6) SortAggregate 
 Input [2]: [key#x, min#x]
 Keys [1]: [key#x]
 Functions [1]: [min(val#x)]
 Aggregate Attributes [1]: [min(val#x)#x]
 Results [2]: [key#x, min(val#x)#x AS min(val)#x]
+     
+(7) AdaptiveSparkPlan 
+Output [2]: [key#x, min(val)#x]
+Arguments: isFinalPlan=false
 
 
 -- !query
@@ -1053,3 +850,11 @@ DROP TABLE explain_temp3
 struct<>
 -- !query output
 
+
+
+-- !query
+DROP TABLE explain_temp4
+-- !query schema
+struct<>
+-- !query output
+
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 06226f1..1a18d56 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 22
+-- Number of queries: 23
 
 
 -- !query
@@ -1053,3 +1053,11 @@ DROP TABLE explain_temp3
 struct<>
 -- !query output
 
+
+
+-- !query
+DROP TABLE explain_temp4
+-- !query schema
+struct<>
+-- !query output
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index b204709..a1b6d71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -18,16 +18,15 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
+import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, 
EnableAdaptiveExecutionSuite}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
 
-class ExplainSuite extends QueryTest with SharedSparkSession with 
DisableAdaptiveExecutionSuite {
-  import testImplicits._
+trait ExplainSuiteHelper extends QueryTest with SharedSparkSession {
 
-  private def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String = 
{
+  protected def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String 
= {
     val output = new java.io.ByteArrayOutputStream()
     Console.withOut(output) {
       df.explain(mode.name)
@@ -38,7 +37,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession 
with DisableAdaptiv
   /**
    * Get the explain from a DataFrame and run the specified action on it.
    */
-  private def withNormalizedExplain(df: DataFrame, mode: ExplainMode)(f: 
String => Unit) = {
+  protected def withNormalizedExplain(df: DataFrame, mode: ExplainMode)(f: 
String => Unit) = {
     f(getNormalizedExplain(df, mode))
   }
 
@@ -46,7 +45,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession 
with DisableAdaptiv
    * Get the explain by running the sql. The explain mode should be part of the
    * sql text itself.
    */
-  private def withNormalizedExplain(queryText: String)(f: String => Unit) = {
+  protected def withNormalizedExplain(queryText: String)(f: String => Unit) = {
     val output = new java.io.ByteArrayOutputStream()
     Console.withOut(output) {
       sql(queryText).show(false)
@@ -58,7 +57,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession 
with DisableAdaptiv
   /**
    * Runs the plan and makes sure the plans contains all of the keywords.
    */
-  private def checkKeywordsExistsInExplain(
+  protected def checkKeywordsExistsInExplain(
       df: DataFrame, mode: ExplainMode, keywords: String*): Unit = {
     withNormalizedExplain(df, mode) { normalizedOutput =>
       for (key <- keywords) {
@@ -67,9 +66,13 @@ class ExplainSuite extends QueryTest with SharedSparkSession 
with DisableAdaptiv
     }
   }
 
-  private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): 
Unit = {
+  protected def checkKeywordsExistsInExplain(df: DataFrame, keywords: 
String*): Unit = {
     checkKeywordsExistsInExplain(df, ExtendedMode, keywords: _*)
   }
+}
+
+class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite {
+  import testImplicits._
 
   test("SPARK-23034 show rdd names in RDD scan nodes (Dataset)") {
     val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") :: 
Nil).setName("testRdd")
@@ -342,4 +345,57 @@ class ExplainSuite extends QueryTest with 
SharedSparkSession with DisableAdaptiv
   }
 }
 
+class ExplainSuiteAE extends ExplainSuiteHelper with 
EnableAdaptiveExecutionSuite {
+  import testImplicits._
+
+  test("Explain formatted") {
+    val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1")
+    val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2")
+    val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"), 
avg("v2"))
+    // trigger the final plan for AQE
+    testDf.collect()
+    // whitespace
+    val ws = " "
+    //   == Physical Plan ==
+    //   AdaptiveSparkPlan (14)
+    //   +- * HashAggregate (13)
+    //      +- CustomShuffleReader (12)
+    //         +- ShuffleQueryStage (11)
+    //            +- Exchange (10)
+    //               +- * HashAggregate (9)
+    //                  +- * Project (8)
+    //                     +- * BroadcastHashJoin Inner BuildRight (7)
+    //                        :- * Project (2)
+    //                        :  +- * LocalTableScan (1)
+    //                        +- BroadcastQueryStage (6)
+    //                           +- BroadcastExchange (5)
+    //                              +- * Project (4)
+    //                                 +- * LocalTableScan (3)
+    checkKeywordsExistsInExplain(
+      testDf,
+      FormattedMode,
+      s"""
+         |(6) BroadcastQueryStage$ws
+         |Output [2]: [k#x, v2#x]
+         |Arguments: 0
+         |""".stripMargin,
+      s"""
+         |(11) ShuffleQueryStage$ws
+         |Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
+         |Arguments: 1
+         |""".stripMargin,
+      s"""
+         |(12) CustomShuffleReader$ws
+         |Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
+         |Arguments: coalesced
+         |""".stripMargin,
+      s"""
+         |(14) AdaptiveSparkPlan$ws
+         |Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
+         |Arguments: isFinalPlan=true
+         |""".stripMargin
+    )
+  }
+}
+
 case class ExplainSingleData(id: Int)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 8225863..c6caffa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -53,7 +53,7 @@ class AdaptiveQueryExecSuite
         event match {
           case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) =>
             if (sparkPlanInfo.simpleString.startsWith(
-              "AdaptiveSparkPlan(isFinalPlan=true)")) {
+              "AdaptiveSparkPlan isFinalPlan=true")) {
               finalPlanCnt += 1
             }
           case _ => // ignore other events
@@ -64,14 +64,14 @@ class AdaptiveQueryExecSuite
 
     val dfAdaptive = sql(query)
     val planBefore = dfAdaptive.queryExecution.executedPlan
-    
assert(planBefore.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=false)"))
+    assert(planBefore.toString.startsWith("AdaptiveSparkPlan 
isFinalPlan=false"))
     val result = dfAdaptive.collect()
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       val df = sql(query)
       QueryTest.sameRows(result.toSeq, df.collect().toSeq)
     }
     val planAfter = dfAdaptive.queryExecution.executedPlan
-    
assert(planAfter.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=true)"))
+    assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
 
     spark.sparkContext.listenerBus.waitUntilEmpty()
     assert(finalPlanCnt == 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to