This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 13307f1ecea [SPARK-42778][SQL] QueryStageExec should respect supportsRowBased 13307f1ecea is described below commit 13307f1ecea1e81ff82a9eb348a7b43e4fd1a332 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Tue Mar 14 21:03:35 2023 +0800 [SPARK-42778][SQL] QueryStageExec should respect supportsRowBased ### What changes were proposed in this pull request? Make `QueryStageExec` respect plan.supportsRowBased ### Why are the changes needed? It is a long time issue that if the plan support both columnar and row, then it would add a unnecessary `ColumnarToRow` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add test Closes #40407 from ulysses-you/SPARK-42778. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/adaptive/QueryStageExec.scala | 1 + .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 72e7fc937f2..8a2abadd19e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -97,6 +97,7 @@ abstract class QueryStageExec extends LeafExecNode { override def executeToIterator(): Iterator[InternalRow] = plan.executeToIterator() protected override def doExecute(): RDD[InternalRow] = plan.execute() + override def supportsRowBased: Boolean = plan.supportsRowBased override def supportsColumnar: Boolean = plan.supportsColumnar protected override def doExecuteColumnar(): RDD[ColumnarBatch] = plan.executeColumnar() override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 8ed31e1968c..faf1f911b1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec} +import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -2716,6 +2716,20 @@ class AdaptiveQueryExecSuite } } + test("SPARK-42778: QueryStageExec should respect supportsRowBased") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + withTempView("t") { + Seq(1).toDF("c1").createOrReplaceTempView("t") + spark.catalog.cacheTable("t") + val df = spark.table("t") + df.collect() + assert(collect(df.queryExecution.executedPlan) { + case c: ColumnarToRowExec => c + }.isEmpty) + } + } + } + test("SPARK-42101: Apply AQE if contains nested AdaptiveSparkPlanExec") { withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { val df = spark.range(3).repartition().cache() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org