This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new c9748d4 [SPARK-31495][SQL] Support formatted explain for AQE c9748d4 is described below commit c9748d4f00c505053c81c8aeb69f7166e92f82a6 Author: yi.wu <yi...@databricks.com> AuthorDate: Wed Apr 22 12:44:06 2020 +0000 [SPARK-31495][SQL] Support formatted explain for AQE ### What changes were proposed in this pull request? To support formatted explain for AQE. ### Why are the changes needed? AQE does not support formatted explain yet. It's good to support it for better user experience, debugging, etc. Before: ``` == Physical Plan == AdaptiveSparkPlan (1) +- * HashAggregate (unknown) +- CustomShuffleReader (unknown) +- ShuffleQueryStage (unknown) +- Exchange (unknown) +- * HashAggregate (unknown) +- * Project (unknown) +- * BroadcastHashJoin Inner BuildRight (unknown) :- * LocalTableScan (unknown) +- BroadcastQueryStage (unknown) +- BroadcastExchange (unknown) +- LocalTableScan (unknown) (1) AdaptiveSparkPlan Output [4]: [k#7, count(v1)#32L, sum(v1)#33L, avg(v2)#34] Arguments: HashAggregate(keys=[k#7], functions=[count(1), sum(cast(v1#8 as bigint)), avg(cast(v2#19 as bigint))]), AdaptiveExecutionContext(org.apache.spark.sql.SparkSession104ab57b), [PlanAdaptiveSubqueries(Map())], false ``` After: ``` == Physical Plan == AdaptiveSparkPlan (14) +- * HashAggregate (13) +- CustomShuffleReader (12) +- ShuffleQueryStage (11) +- Exchange (10) +- * HashAggregate (9) +- * Project (8) +- * BroadcastHashJoin Inner BuildRight (7) :- * Project (2) : +- * LocalTableScan (1) +- BroadcastQueryStage (6) +- BroadcastExchange (5) +- * Project (4) +- * LocalTableScan (3) (1) LocalTableScan [codegen id : 2] Output [2]: [_1#x, _2#x] Arguments: [_1#x, _2#x] (2) Project [codegen id : 2] Output [2]: [_1#x AS k#x, _2#x AS v1#x] Input [2]: [_1#x, _2#x] (3) LocalTableScan [codegen id : 1] Output [2]: [_1#x, _2#x] Arguments: [_1#x, _2#x] (4) Project [codegen id : 1] Output [2]: [_1#x AS k#x, _2#x AS v2#x] Input [2]: [_1#x, _2#x] (5) BroadcastExchange Input [2]: [k#x, v2#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#x] (6) BroadcastQueryStage Output [2]: [k#x, v2#x] Arguments: 0 (7) BroadcastHashJoin [codegen id : 2] Left keys [1]: [k#x] Right keys [1]: [k#x] Join condition: None (8) Project [codegen id : 2] Output [3]: [k#x, v1#x, v2#x] Input [4]: [k#x, v1#x, k#x, v2#x] (9) HashAggregate [codegen id : 2] Input [3]: [k#x, v1#x, v2#x] Keys [1]: [k#x] Functions [3]: [partial_count(1), partial_sum(cast(v1#x as bigint)), partial_avg(cast(v2#x as bigint))] Aggregate Attributes [4]: [count#xL, sum#xL, sum#x, count#xL] Results [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] (10) Exchange Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] Arguments: hashpartitioning(k#x, 5), true, [id=#x] (11) ShuffleQueryStage Output [5]: [sum#xL, k#x, sum#x, count#xL, count#xL] Arguments: 1 (12) CustomShuffleReader Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] Arguments: coalesced (13) HashAggregate [codegen id : 3] Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] Keys [1]: [k#x] Functions [3]: [count(1), sum(cast(v1#x as bigint)), avg(cast(v2#x as bigint))] Aggregate Attributes [3]: [count(1)#xL, sum(cast(v1#x as bigint))#xL, avg(cast(v2#x as bigint))#x] Results [4]: [k#x, count(1)#xL AS count(v1)#xL, sum(cast(v1#x as bigint))#xL AS sum(v1)#xL, avg(cast(v2#x as bigint))#x AS avg(v2)#x] (14) AdaptiveSparkPlan Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x] Arguments: isFinalPlan=true ``` ### Does this PR introduce any user-facing change? No, this should be new feature along with AQE in Spark 3.0. ### How was this patch tested? Added a query file: `explain-aqe.sql` and a unit test. Closes #28271 from Ngone51/support_formatted_explain_for_aqe. Authored-by: yi.wu <yi...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 8fbfdb38c0baff7bc5d1ce1e3d6f1df70c25da70) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/execution/ExplainUtils.scala | 59 +- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 5 +- .../adaptive/InsertAdaptiveSparkPlan.scala | 2 +- .../resources/sql-tests/inputs/explain-aqe.sql | 3 + .../test/resources/sql-tests/inputs/explain.sql | 1 + .../{explain.sql.out => explain-aqe.sql.out} | 669 ++++++++------------- .../resources/sql-tests/results/explain.sql.out | 10 +- .../scala/org/apache/spark/sql/ExplainSuite.scala | 72 ++- .../adaptive/AdaptiveQueryExecSuite.scala | 6 +- 9 files changed, 358 insertions(+), 469 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index 5d43093..aec1c93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -23,8 +23,9 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec} -object ExplainUtils { +object ExplainUtils extends AdaptiveSparkPlanHelper { /** * Given a input physical plan, performs the following tasks. * 1. Computes the operator id for current operator and records it in the operaror @@ -144,15 +145,26 @@ object ExplainUtils { case p: WholeStageCodegenExec => case p: InputAdapter => case other: QueryPlan[_] => - if (!other.getTagValue(QueryPlan.OP_ID_TAG).isDefined) { + + def setOpId(): Unit = if (other.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) { currentOperationID += 1 other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID) operatorIDs += ((currentOperationID, other)) } - other.innerChildren.foreach { plan => - currentOperationID = generateOperatorIDs(plan, - currentOperationID, - operatorIDs) + + other match { + case p: AdaptiveSparkPlanExec => + currentOperationID = + generateOperatorIDs(p.executedPlan, currentOperationID, operatorIDs) + setOpId() + case p: QueryStageExec => + currentOperationID = generateOperatorIDs(p.plan, currentOperationID, operatorIDs) + setOpId() + case _ => + setOpId() + other.innerChildren.foldLeft(currentOperationID) { + (curId, plan) => generateOperatorIDs(plan, curId, operatorIDs) + } } } currentOperationID @@ -163,21 +175,25 @@ object ExplainUtils { * whole stage code gen id in the plan via setting a tag. */ private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = { + var currentCodegenId = -1 + + def setCodegenId(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = { + if (currentCodegenId != -1) { + p.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId) + } + children.foreach(generateWholeStageCodegenIds) + } + // Skip the subqueries as they are not printed as part of main query block. if (plan.isInstanceOf[BaseSubqueryExec]) { return } - var currentCodegenId = -1 plan.foreach { case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId case _: InputAdapter => currentCodegenId = -1 - case other: QueryPlan[_] => - if (currentCodegenId != -1) { - other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId) - } - other.innerChildren.foreach { plan => - generateWholeStageCodegenIds(plan) - } + case p: AdaptiveSparkPlanExec => setCodegenId(p, Seq(p.executedPlan)) + case p: QueryStageExec => setCodegenId(p, Seq(p.plan)) + case other: QueryPlan[_] => setCodegenId(other, other.innerChildren) } } @@ -232,13 +248,16 @@ object ExplainUtils { } def removeTags(plan: QueryPlan[_]): Unit = { + def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = { + p.unsetTagValue(QueryPlan.OP_ID_TAG) + p.unsetTagValue(QueryPlan.CODEGEN_ID_TAG) + children.foreach(removeTags) + } + plan foreach { - case plan: QueryPlan[_] => - plan.unsetTagValue(QueryPlan.OP_ID_TAG) - plan.unsetTagValue(QueryPlan.CODEGEN_ID_TAG) - plan.innerChildren.foreach { p => - removeTags(p) - } + case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan)) + case p: QueryStageExec => remove(p, Seq(p.plan)) + case plan: QueryPlan[_] => remove(plan, plan.innerChildren) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index d73540c..f00dce2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -251,10 +251,7 @@ case class AdaptiveSparkPlanExec( getFinalPhysicalPlan().execute() } - override def verboseString(maxFields: Int): String = simpleString(maxFields) - - override def simpleString(maxFields: Int): String = - s"AdaptiveSparkPlan(isFinalPlan=$isFinalPlan)" + protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan") override def generateTreeString( depth: Int, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index ea586f0..754225d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -122,7 +122,7 @@ case class InsertAdaptiveSparkPlan( if !subqueryMap.contains(exprId.id) => val executedPlan = compileSubquery(p) verifyAdaptivePlan(executedPlan, p) - val subquery = SubqueryExec(s"subquery${exprId.id}", executedPlan) + val subquery = SubqueryExec(s"subquery#${exprId.id}", executedPlan) subqueryMap.put(exprId.id, subquery) case expressions.InSubquery(_, ListQuery(query, _, exprId, _)) if !subqueryMap.contains(exprId.id) => diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql new file mode 100644 index 0000000..f4afa2b --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql @@ -0,0 +1,3 @@ +--IMPORT explain.sql + +--SET spark.sql.adaptive.enabled=true diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql b/sql/core/src/test/resources/sql-tests/inputs/explain.sql index 497b61c..80bf258 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql @@ -117,3 +117,4 @@ EXPLAIN FORMATTED DROP TABLE explain_temp1; DROP TABLE explain_temp2; DROP TABLE explain_temp3; +DROP TABLE explain_temp4; diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out similarity index 57% copy from sql/core/src/test/resources/sql-tests/results/explain.sql.out copy to sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 06226f1..e1d4fa8 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 22 +-- Number of queries: 23 -- !query @@ -53,14 +53,14 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* Sort (9) -+- Exchange (8) - +- * HashAggregate (7) - +- Exchange (6) - +- * HashAggregate (5) - +- * Project (4) - +- * Filter (3) - +- * ColumnarToRow (2) +AdaptiveSparkPlan (9) ++- Sort (8) + +- Exchange (7) + +- HashAggregate (6) + +- Exchange (5) + +- HashAggregate (4) + +- Project (3) + +- Filter (2) +- Scan parquet default.explain_temp1 (1) @@ -71,42 +71,43 @@ Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct<key:int,val:int> -(2) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(3) Filter [codegen id : 1] +(2) Filter Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(4) Project [codegen id : 1] +(3) Project Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] -(5) HashAggregate [codegen id : 1] +(4) HashAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_max(val#x)] Aggregate Attributes [1]: [max#x] Results [2]: [key#x, max#x] -(6) Exchange +(5) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(7) HashAggregate [codegen id : 2] +(6) HashAggregate Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [2]: [key#x, max(val#x)#x AS max(val)#x] -(8) Exchange +(7) Exchange Input [2]: [key#x, max(val)#x] Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x] -(9) Sort [codegen id : 3] +(8) Sort Input [2]: [key#x, max(val)#x] Arguments: [key#x ASC NULLS FIRST], true, 0 + +(9) AdaptiveSparkPlan +Output [2]: [key#x, max(val)#x] +Arguments: isFinalPlan=false -- !query @@ -120,14 +121,14 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* Project (9) -+- * Filter (8) - +- * HashAggregate (7) - +- Exchange (6) - +- * HashAggregate (5) - +- * Project (4) - +- * Filter (3) - +- * ColumnarToRow (2) +AdaptiveSparkPlan (9) ++- Project (8) + +- Filter (7) + +- HashAggregate (6) + +- Exchange (5) + +- HashAggregate (4) + +- Project (3) + +- Filter (2) +- Scan parquet default.explain_temp1 (1) @@ -138,42 +139,43 @@ Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct<key:int,val:int> -(2) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(3) Filter [codegen id : 1] +(2) Filter Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(4) Project [codegen id : 1] +(3) Project Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] -(5) HashAggregate [codegen id : 1] +(4) HashAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_max(val#x)] Aggregate Attributes [1]: [max#x] Results [2]: [key#x, max#x] -(6) Exchange +(5) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(7) HashAggregate [codegen id : 2] +(6) HashAggregate Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [3]: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x] -(8) Filter [codegen id : 2] +(7) Filter Input [3]: [key#x, max(val)#x, max(val#x)#x] Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0)) -(9) Project [codegen id : 2] +(8) Project Output [2]: [key#x, max(val)#x] Input [3]: [key#x, max(val)#x, max(val#x)#x] + +(9) AdaptiveSparkPlan +Output [2]: [key#x, max(val)#x] +Arguments: isFinalPlan=false -- !query @@ -185,18 +187,17 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* HashAggregate (12) -+- Exchange (11) - +- * HashAggregate (10) - +- Union (9) - :- * Project (4) - : +- * Filter (3) - : +- * ColumnarToRow (2) - : +- Scan parquet default.explain_temp1 (1) - +- * Project (8) - +- * Filter (7) - +- * ColumnarToRow (6) - +- Scan parquet default.explain_temp1 (5) +AdaptiveSparkPlan (11) ++- HashAggregate (10) + +- Exchange (9) + +- HashAggregate (8) + +- Union (7) + :- Project (3) + : +- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- Project (6) + +- Filter (5) + +- Scan parquet default.explain_temp1 (4) (1) Scan parquet default.explain_temp1 @@ -206,54 +207,52 @@ Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct<key:int,val:int> -(2) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(3) Filter [codegen id : 1] +(2) Filter Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(4) Project [codegen id : 1] +(3) Project Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] -(5) Scan parquet default.explain_temp1 +(4) Scan parquet default.explain_temp1 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct<key:int,val:int> -(6) ColumnarToRow [codegen id : 2] -Input [2]: [key#x, val#x] - -(7) Filter [codegen id : 2] +(5) Filter Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(8) Project [codegen id : 2] +(6) Project Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] -(9) Union +(7) Union -(10) HashAggregate [codegen id : 3] +(8) HashAggregate Input [2]: [key#x, val#x] Keys [2]: [key#x, val#x] Functions: [] Aggregate Attributes: [] Results [2]: [key#x, val#x] -(11) Exchange +(9) Exchange Input [2]: [key#x, val#x] Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x] -(12) HashAggregate [codegen id : 4] +(10) HashAggregate Input [2]: [key#x, val#x] Keys [2]: [key#x, val#x] Functions: [] Aggregate Attributes: [] Results [2]: [key#x, val#x] + +(11) AdaptiveSparkPlan +Output [2]: [key#x, val#x] +Arguments: isFinalPlan=false -- !query @@ -266,16 +265,15 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* BroadcastHashJoin Inner BuildRight (10) -:- * Project (4) -: +- * Filter (3) -: +- * ColumnarToRow (2) -: +- Scan parquet default.explain_temp1 (1) -+- BroadcastExchange (9) - +- * Project (8) - +- * Filter (7) - +- * ColumnarToRow (6) - +- Scan parquet default.explain_temp2 (5) +AdaptiveSparkPlan (9) ++- BroadcastHashJoin Inner BuildRight (8) + :- Project (3) + : +- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (7) + +- Project (6) + +- Filter (5) + +- Scan parquet default.explain_temp2 (4) (1) Scan parquet default.explain_temp1 @@ -285,43 +283,41 @@ Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key)] ReadSchema: struct<key:int,val:int> -(2) ColumnarToRow [codegen id : 2] -Input [2]: [key#x, val#x] - -(3) Filter [codegen id : 2] +(2) Filter Input [2]: [key#x, val#x] Condition : isnotnull(key#x) -(4) Project [codegen id : 2] +(3) Project Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] -(5) Scan parquet default.explain_temp2 +(4) Scan parquet default.explain_temp2 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct<key:int,val:int> -(6) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(7) Filter [codegen id : 1] +(5) Filter Input [2]: [key#x, val#x] Condition : isnotnull(key#x) -(8) Project [codegen id : 1] +(6) Project Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] -(9) BroadcastExchange +(7) BroadcastExchange Input [2]: [key#x, val#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] -(10) BroadcastHashJoin [codegen id : 2] +(8) BroadcastHashJoin Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None + +(9) AdaptiveSparkPlan +Output [4]: [key#x, val#x, key#x, val#x] +Arguments: isFinalPlan=false -- !query @@ -334,14 +330,13 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* BroadcastHashJoin LeftOuter BuildRight (8) -:- * ColumnarToRow (2) -: +- Scan parquet default.explain_temp1 (1) -+- BroadcastExchange (7) - +- * Project (6) - +- * Filter (5) - +- * ColumnarToRow (4) - +- Scan parquet default.explain_temp2 (3) +AdaptiveSparkPlan (7) ++- BroadcastHashJoin LeftOuter BuildRight (6) + :- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (5) + +- Project (4) + +- Filter (3) + +- Scan parquet default.explain_temp2 (2) (1) Scan parquet default.explain_temp1 @@ -350,35 +345,33 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct<key:int,val:int> -(2) ColumnarToRow [codegen id : 2] -Input [2]: [key#x, val#x] - -(3) Scan parquet default.explain_temp2 +(2) Scan parquet default.explain_temp2 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct<key:int,val:int> -(4) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(5) Filter [codegen id : 1] +(3) Filter Input [2]: [key#x, val#x] Condition : isnotnull(key#x) -(6) Project [codegen id : 1] +(4) Project Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] -(7) BroadcastExchange +(5) BroadcastExchange Input [2]: [key#x, val#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] -(8) BroadcastHashJoin [codegen id : 2] +(6) BroadcastHashJoin Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None + +(7) AdaptiveSparkPlan +Output [4]: [key#x, val#x, key#x, val#x] +Arguments: isFinalPlan=false -- !query @@ -396,9 +389,9 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* Project (4) -+- * Filter (3) - +- * ColumnarToRow (2) +AdaptiveSparkPlan (4) ++- Project (3) + +- Filter (2) +- Scan parquet default.explain_temp1 (1) @@ -409,110 +402,17 @@ Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)] ReadSchema: struct<key:int,val:int> -(2) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(3) Filter [codegen id : 1] -Input [2]: [key#x, val#x] -Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3)) - -(4) Project [codegen id : 1] -Output [2]: [key#x, val#x] +(2) Filter Input [2]: [key#x, val#x] +Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subquery#x, [id=#x])) AND (val#x > 3)) -===== Subqueries ===== - -Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] -* HashAggregate (11) -+- Exchange (10) - +- * HashAggregate (9) - +- * Project (8) - +- * Filter (7) - +- * ColumnarToRow (6) - +- Scan parquet default.explain_temp2 (5) - - -(5) Scan parquet default.explain_temp2 +(3) Project Output [2]: [key#x, val#x] -Batched: true -Location [not included in comparison]/{warehouse_dir}/explain_temp2] -PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)] -ReadSchema: struct<key:int,val:int> - -(6) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(7) Filter [codegen id : 1] -Input [2]: [key#x, val#x] -Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2)) - -(8) Project [codegen id : 1] -Output [1]: [key#x] Input [2]: [key#x, val#x] -(9) HashAggregate [codegen id : 1] -Input [1]: [key#x] -Keys: [] -Functions [1]: [partial_max(key#x)] -Aggregate Attributes [1]: [max#x] -Results [1]: [max#x] - -(10) Exchange -Input [1]: [max#x] -Arguments: SinglePartition, true, [id=#x] - -(11) HashAggregate [codegen id : 2] -Input [1]: [max#x] -Keys: [] -Functions [1]: [max(key#x)] -Aggregate Attributes [1]: [max(key#x)#x] -Results [1]: [max(key#x)#x AS max(key)#x] - -Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery#x, [id=#x] -* HashAggregate (18) -+- Exchange (17) - +- * HashAggregate (16) - +- * Project (15) - +- * Filter (14) - +- * ColumnarToRow (13) - +- Scan parquet default.explain_temp3 (12) - - -(12) Scan parquet default.explain_temp3 +(4) AdaptiveSparkPlan Output [2]: [key#x, val#x] -Batched: true -Location [not included in comparison]/{warehouse_dir}/explain_temp3] -PushedFilters: [IsNotNull(val), GreaterThan(val,0)] -ReadSchema: struct<key:int,val:int> - -(13) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(14) Filter [codegen id : 1] -Input [2]: [key#x, val#x] -Condition : (isnotnull(val#x) AND (val#x > 0)) - -(15) Project [codegen id : 1] -Output [1]: [key#x] -Input [2]: [key#x, val#x] - -(16) HashAggregate [codegen id : 1] -Input [1]: [key#x] -Keys: [] -Functions [1]: [partial_max(key#x)] -Aggregate Attributes [1]: [max#x] -Results [1]: [max#x] - -(17) Exchange -Input [1]: [max#x] -Arguments: SinglePartition, true, [id=#x] - -(18) HashAggregate [codegen id : 2] -Input [1]: [max#x] -Keys: [] -Functions [1]: [max(key#x)] -Aggregate Attributes [1]: [max(key#x)#x] -Results [1]: [max(key#x)#x AS max(key)#x] +Arguments: isFinalPlan=false -- !query @@ -530,8 +430,8 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* Filter (3) -+- * ColumnarToRow (2) +AdaptiveSparkPlan (3) ++- Filter (2) +- Scan parquet default.explain_temp1 (1) @@ -541,106 +441,13 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct<key:int,val:int> -(2) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(3) Filter [codegen id : 1] -Input [2]: [key#x, val#x] -Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as double) = Subquery scalar-subquery#x, [id=#x])) - -===== Subqueries ===== - -Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] -* HashAggregate (10) -+- Exchange (9) - +- * HashAggregate (8) - +- * Project (7) - +- * Filter (6) - +- * ColumnarToRow (5) - +- Scan parquet default.explain_temp2 (4) - - -(4) Scan parquet default.explain_temp2 -Output [2]: [key#x, val#x] -Batched: true -Location [not included in comparison]/{warehouse_dir}/explain_temp2] -PushedFilters: [IsNotNull(val), GreaterThan(val,0)] -ReadSchema: struct<key:int,val:int> - -(5) ColumnarToRow [codegen id : 1] +(2) Filter Input [2]: [key#x, val#x] - -(6) Filter [codegen id : 1] -Input [2]: [key#x, val#x] -Condition : (isnotnull(val#x) AND (val#x > 0)) - -(7) Project [codegen id : 1] -Output [1]: [key#x] -Input [2]: [key#x, val#x] - -(8) HashAggregate [codegen id : 1] -Input [1]: [key#x] -Keys: [] -Functions [1]: [partial_max(key#x)] -Aggregate Attributes [1]: [max#x] -Results [1]: [max#x] - -(9) Exchange -Input [1]: [max#x] -Arguments: SinglePartition, true, [id=#x] - -(10) HashAggregate [codegen id : 2] -Input [1]: [max#x] -Keys: [] -Functions [1]: [max(key#x)] -Aggregate Attributes [1]: [max(key#x)#x] -Results [1]: [max(key#x)#x AS max(key)#x] +Condition : ((key#x = Subquery subquery#x, [id=#x]) OR (cast(key#x as double) = Subquery subquery#x, [id=#x])) -Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] -* HashAggregate (17) -+- Exchange (16) - +- * HashAggregate (15) - +- * Project (14) - +- * Filter (13) - +- * ColumnarToRow (12) - +- Scan parquet default.explain_temp3 (11) - - -(11) Scan parquet default.explain_temp3 +(3) AdaptiveSparkPlan Output [2]: [key#x, val#x] -Batched: true -Location [not included in comparison]/{warehouse_dir}/explain_temp3] -PushedFilters: [IsNotNull(val), GreaterThan(val,0)] -ReadSchema: struct<key:int,val:int> - -(12) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(13) Filter [codegen id : 1] -Input [2]: [key#x, val#x] -Condition : (isnotnull(val#x) AND (val#x > 0)) - -(14) Project [codegen id : 1] -Output [1]: [key#x] -Input [2]: [key#x, val#x] - -(15) HashAggregate [codegen id : 1] -Input [1]: [key#x] -Keys: [] -Functions [1]: [partial_avg(cast(key#x as bigint))] -Aggregate Attributes [2]: [sum#x, count#xL] -Results [2]: [sum#x, count#xL] - -(16) Exchange -Input [2]: [sum#x, count#xL] -Arguments: SinglePartition, true, [id=#x] - -(17) HashAggregate [codegen id : 2] -Input [2]: [sum#x, count#xL] -Keys: [] -Functions [1]: [avg(cast(key#x as bigint))] -Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] -Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] +Arguments: isFinalPlan=false -- !query @@ -651,8 +458,8 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* Project (3) -+- * ColumnarToRow (2) +AdaptiveSparkPlan (3) ++- Project (2) +- Scan parquet default.explain_temp1 (1) @@ -662,51 +469,13 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct<> -(2) ColumnarToRow [codegen id : 1] +(2) Project +Output [1]: [(Subquery subquery#x, [id=#x] + Subquery subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] Input: [] - -(3) Project [codegen id : 1] -Output [1]: [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] -Input: [] - -===== Subqueries ===== - -Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] -* HashAggregate (8) -+- Exchange (7) - +- * HashAggregate (6) - +- * ColumnarToRow (5) - +- Scan parquet default.explain_temp1 (4) - - -(4) Scan parquet default.explain_temp1 -Output [1]: [key#x] -Batched: true -Location [not included in comparison]/{warehouse_dir}/explain_temp1] -ReadSchema: struct<key:int> - -(5) ColumnarToRow [codegen id : 1] -Input [1]: [key#x] - -(6) HashAggregate [codegen id : 1] -Input [1]: [key#x] -Keys: [] -Functions [1]: [partial_avg(cast(key#x as bigint))] -Aggregate Attributes [2]: [sum#x, count#xL] -Results [2]: [sum#x, count#xL] - -(7) Exchange -Input [2]: [sum#x, count#xL] -Arguments: SinglePartition, true, [id=#x] - -(8) HashAggregate [codegen id : 2] -Input [2]: [sum#x, count#xL] -Keys: [] -Functions [1]: [avg(cast(key#x as bigint))] -Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] -Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] -Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x] +(3) AdaptiveSparkPlan +Output [1]: [(scalarsubquery() + scalarsubquery())#x] +Arguments: isFinalPlan=false -- !query @@ -721,16 +490,15 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* BroadcastHashJoin Inner BuildRight (10) -:- * Project (4) -: +- * Filter (3) -: +- * ColumnarToRow (2) -: +- Scan parquet default.explain_temp1 (1) -+- BroadcastExchange (9) - +- * Project (8) - +- * Filter (7) - +- * ColumnarToRow (6) - +- Scan parquet default.explain_temp1 (5) +AdaptiveSparkPlan (9) ++- BroadcastHashJoin Inner BuildRight (8) + :- Project (3) + : +- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (7) + +- Project (6) + +- Filter (5) + +- Scan parquet default.explain_temp1 (4) (1) Scan parquet default.explain_temp1 @@ -740,43 +508,41 @@ Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct<key:int,val:int> -(2) ColumnarToRow [codegen id : 2] -Input [2]: [key#x, val#x] - -(3) Filter [codegen id : 2] +(2) Filter Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) -(4) Project [codegen id : 2] +(3) Project Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] -(5) Scan parquet default.explain_temp1 +(4) Scan parquet default.explain_temp1 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct<key:int,val:int> -(6) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(7) Filter [codegen id : 1] +(5) Filter Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) -(8) Project [codegen id : 1] +(6) Project Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] -(9) BroadcastExchange +(7) BroadcastExchange Input [2]: [key#x, val#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] -(10) BroadcastHashJoin [codegen id : 2] +(8) BroadcastHashJoin Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None + +(9) AdaptiveSparkPlan +Output [4]: [key#x, val#x, key#x, val#x] +Arguments: isFinalPlan=false -- !query @@ -792,17 +558,21 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* BroadcastHashJoin Inner BuildRight (11) -:- * HashAggregate (7) -: +- Exchange (6) -: +- * HashAggregate (5) -: +- * Project (4) -: +- * Filter (3) -: +- * ColumnarToRow (2) -: +- Scan parquet default.explain_temp1 (1) -+- BroadcastExchange (10) - +- * HashAggregate (9) - +- ReusedExchange (8) +AdaptiveSparkPlan (15) ++- BroadcastHashJoin Inner BuildRight (14) + :- HashAggregate (6) + : +- Exchange (5) + : +- HashAggregate (4) + : +- Project (3) + : +- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (13) + +- HashAggregate (12) + +- Exchange (11) + +- HashAggregate (10) + +- Project (9) + +- Filter (8) + +- Scan parquet default.explain_temp1 (7) (1) Scan parquet default.explain_temp1 @@ -812,53 +582,77 @@ Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct<key:int,val:int> -(2) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(3) Filter [codegen id : 1] +(2) Filter Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) -(4) Project [codegen id : 1] +(3) Project Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] -(5) HashAggregate [codegen id : 1] +(4) HashAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_max(val#x)] Aggregate Attributes [1]: [max#x] Results [2]: [key#x, max#x] -(6) Exchange +(5) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(7) HashAggregate [codegen id : 4] +(6) HashAggregate Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [2]: [key#x, max(val#x)#x AS max(val)#x] -(8) ReusedExchange [Reuses operator id: 6] -Output [2]: [key#x, max#x] +(7) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key), GreaterThan(key,10)] +ReadSchema: struct<key:int,val:int> + +(8) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) -(9) HashAggregate [codegen id : 3] +(9) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(10) HashAggregate +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] + +(11) Exchange +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + +(12) HashAggregate Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [2]: [key#x, max(val#x)#x AS max(val)#x] -(10) BroadcastExchange +(13) BroadcastExchange Input [2]: [key#x, max(val)#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] -(11) BroadcastHashJoin [codegen id : 4] +(14) BroadcastHashJoin Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None + +(15) AdaptiveSparkPlan +Output [4]: [key#x, max(val)#x, key#x, max(val)#x] +Arguments: isFinalPlan=false -- !query @@ -898,10 +692,10 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -* HashAggregate (5) -+- Exchange (4) - +- HashAggregate (3) - +- * ColumnarToRow (2) +AdaptiveSparkPlan (5) ++- HashAggregate (4) + +- Exchange (3) + +- HashAggregate (2) +- Scan parquet default.explain_temp1 (1) @@ -911,26 +705,27 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct<key:int,val:int> -(2) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(3) HashAggregate +(2) HashAggregate Input [2]: [key#x, val#x] Keys: [] Functions [3]: [partial_count(val#x), partial_sum(cast(key#x as bigint)), partial_count(key#x) FILTER (WHERE (val#x > 1))] Aggregate Attributes [3]: [count#xL, sum#xL, count#xL] Results [3]: [count#xL, sum#xL, count#xL] -(4) Exchange +(3) Exchange Input [3]: [count#xL, sum#xL, count#xL] Arguments: SinglePartition, true, [id=#x] -(5) HashAggregate [codegen id : 2] +(4) HashAggregate Input [3]: [count#xL, sum#xL, count#xL] Keys: [] Functions [3]: [count(val#x), sum(cast(key#x as bigint)), count(key#x)] Aggregate Attributes [3]: [count(val#x)#xL, sum(cast(key#x as bigint))#xL, count(key#x)#xL] Results [2]: [(count(val#x)#xL + sum(cast(key#x as bigint))#xL) AS TOTAL#xL, count(key#x)#xL AS count(key) FILTER (WHERE (val > 1))#xL] + +(5) AdaptiveSparkPlan +Output [2]: [TOTAL#xL, count(key) FILTER (WHERE (val > 1))#xL] +Arguments: isFinalPlan=false -- !query @@ -942,10 +737,10 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -ObjectHashAggregate (5) -+- Exchange (4) - +- ObjectHashAggregate (3) - +- * ColumnarToRow (2) +AdaptiveSparkPlan (5) ++- ObjectHashAggregate (4) + +- Exchange (3) + +- ObjectHashAggregate (2) +- Scan parquet default.explain_temp4 (1) @@ -955,26 +750,27 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp4] ReadSchema: struct<key:int,val:string> -(2) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(3) ObjectHashAggregate +(2) ObjectHashAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_collect_set(val#x, 0, 0)] Aggregate Attributes [1]: [buf#x] Results [2]: [key#x, buf#x] -(4) Exchange +(3) Exchange Input [2]: [key#x, buf#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(5) ObjectHashAggregate +(4) ObjectHashAggregate Input [2]: [key#x, buf#x] Keys [1]: [key#x] Functions [1]: [collect_set(val#x, 0, 0)] Aggregate Attributes [1]: [collect_set(val#x, 0, 0)#x] Results [2]: [key#x, sort_array(collect_set(val#x, 0, 0)#x, true)[0] AS sort_array(collect_set(val), true)[0]#x] + +(5) AdaptiveSparkPlan +Output [2]: [key#x, sort_array(collect_set(val), true)[0]#x] +Arguments: isFinalPlan=false -- !query @@ -986,12 +782,12 @@ EXPLAIN FORMATTED struct<plan:string> -- !query output == Physical Plan == -SortAggregate (7) -+- * Sort (6) - +- Exchange (5) - +- SortAggregate (4) - +- * Sort (3) - +- * ColumnarToRow (2) +AdaptiveSparkPlan (7) ++- SortAggregate (6) + +- Sort (5) + +- Exchange (4) + +- SortAggregate (3) + +- Sort (2) +- Scan parquet default.explain_temp4 (1) @@ -1001,34 +797,35 @@ Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp4] ReadSchema: struct<key:int,val:string> -(2) ColumnarToRow [codegen id : 1] -Input [2]: [key#x, val#x] - -(3) Sort [codegen id : 1] +(2) Sort Input [2]: [key#x, val#x] Arguments: [key#x ASC NULLS FIRST], false, 0 -(4) SortAggregate +(3) SortAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_min(val#x)] Aggregate Attributes [1]: [min#x] Results [2]: [key#x, min#x] -(5) Exchange +(4) Exchange Input [2]: [key#x, min#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(6) Sort [codegen id : 2] +(5) Sort Input [2]: [key#x, min#x] Arguments: [key#x ASC NULLS FIRST], false, 0 -(7) SortAggregate +(6) SortAggregate Input [2]: [key#x, min#x] Keys [1]: [key#x] Functions [1]: [min(val#x)] Aggregate Attributes [1]: [min(val#x)#x] Results [2]: [key#x, min(val#x)#x AS min(val)#x] + +(7) AdaptiveSparkPlan +Output [2]: [key#x, min(val)#x] +Arguments: isFinalPlan=false -- !query @@ -1053,3 +850,11 @@ DROP TABLE explain_temp3 struct<> -- !query output + + +-- !query +DROP TABLE explain_temp4 +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 06226f1..1a18d56 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 22 +-- Number of queries: 23 -- !query @@ -1053,3 +1053,11 @@ DROP TABLE explain_temp3 struct<> -- !query output + + +-- !query +DROP TABLE explain_temp4 +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index b204709..a1b6d71 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -18,16 +18,15 @@ package org.apache.spark.sql import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite +import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType -class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiveExecutionSuite { - import testImplicits._ +trait ExplainSuiteHelper extends QueryTest with SharedSparkSession { - private def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String = { + protected def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String = { val output = new java.io.ByteArrayOutputStream() Console.withOut(output) { df.explain(mode.name) @@ -38,7 +37,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiv /** * Get the explain from a DataFrame and run the specified action on it. */ - private def withNormalizedExplain(df: DataFrame, mode: ExplainMode)(f: String => Unit) = { + protected def withNormalizedExplain(df: DataFrame, mode: ExplainMode)(f: String => Unit) = { f(getNormalizedExplain(df, mode)) } @@ -46,7 +45,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiv * Get the explain by running the sql. The explain mode should be part of the * sql text itself. */ - private def withNormalizedExplain(queryText: String)(f: String => Unit) = { + protected def withNormalizedExplain(queryText: String)(f: String => Unit) = { val output = new java.io.ByteArrayOutputStream() Console.withOut(output) { sql(queryText).show(false) @@ -58,7 +57,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiv /** * Runs the plan and makes sure the plans contains all of the keywords. */ - private def checkKeywordsExistsInExplain( + protected def checkKeywordsExistsInExplain( df: DataFrame, mode: ExplainMode, keywords: String*): Unit = { withNormalizedExplain(df, mode) { normalizedOutput => for (key <- keywords) { @@ -67,9 +66,13 @@ class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiv } } - private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = { + protected def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = { checkKeywordsExistsInExplain(df, ExtendedMode, keywords: _*) } +} + +class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite { + import testImplicits._ test("SPARK-23034 show rdd names in RDD scan nodes (Dataset)") { val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") :: Nil).setName("testRdd") @@ -342,4 +345,57 @@ class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiv } } +class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuite { + import testImplicits._ + + test("Explain formatted") { + val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1") + val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2") + val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"), avg("v2")) + // trigger the final plan for AQE + testDf.collect() + // whitespace + val ws = " " + // == Physical Plan == + // AdaptiveSparkPlan (14) + // +- * HashAggregate (13) + // +- CustomShuffleReader (12) + // +- ShuffleQueryStage (11) + // +- Exchange (10) + // +- * HashAggregate (9) + // +- * Project (8) + // +- * BroadcastHashJoin Inner BuildRight (7) + // :- * Project (2) + // : +- * LocalTableScan (1) + // +- BroadcastQueryStage (6) + // +- BroadcastExchange (5) + // +- * Project (4) + // +- * LocalTableScan (3) + checkKeywordsExistsInExplain( + testDf, + FormattedMode, + s""" + |(6) BroadcastQueryStage$ws + |Output [2]: [k#x, v2#x] + |Arguments: 0 + |""".stripMargin, + s""" + |(11) ShuffleQueryStage$ws + |Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] + |Arguments: 1 + |""".stripMargin, + s""" + |(12) CustomShuffleReader$ws + |Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] + |Arguments: coalesced + |""".stripMargin, + s""" + |(14) AdaptiveSparkPlan$ws + |Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x] + |Arguments: isFinalPlan=true + |""".stripMargin + ) + } +} + case class ExplainSingleData(id: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 8225863..c6caffa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -53,7 +53,7 @@ class AdaptiveQueryExecSuite event match { case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) => if (sparkPlanInfo.simpleString.startsWith( - "AdaptiveSparkPlan(isFinalPlan=true)")) { + "AdaptiveSparkPlan isFinalPlan=true")) { finalPlanCnt += 1 } case _ => // ignore other events @@ -64,14 +64,14 @@ class AdaptiveQueryExecSuite val dfAdaptive = sql(query) val planBefore = dfAdaptive.queryExecution.executedPlan - assert(planBefore.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=false)")) + assert(planBefore.toString.startsWith("AdaptiveSparkPlan isFinalPlan=false")) val result = dfAdaptive.collect() withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { val df = sql(query) QueryTest.sameRows(result.toSeq, df.collect().toSeq) } val planAfter = dfAdaptive.queryExecution.executedPlan - assert(planAfter.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=true)")) + assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) spark.sparkContext.listenerBus.waitUntilEmpty() assert(finalPlanCnt == 1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org