This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 13307f1ecea [SPARK-42778][SQL] QueryStageExec should respect 
supportsRowBased
13307f1ecea is described below

commit 13307f1ecea1e81ff82a9eb348a7b43e4fd1a332
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Tue Mar 14 21:03:35 2023 +0800

    [SPARK-42778][SQL] QueryStageExec should respect supportsRowBased
    
    ### What changes were proposed in this pull request?
    
    Make `QueryStageExec` respect plan.supportsRowBased
    
    ### Why are the changes needed?
    
    It is a long time issue that if the plan support both columnar and row, 
then it would add a unnecessary `ColumnarToRow`
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    add test
    
    Closes #40407 from ulysses-you/SPARK-42778.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/adaptive/QueryStageExec.scala    |  1 +
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala  | 16 +++++++++++++++-
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 72e7fc937f2..8a2abadd19e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -97,6 +97,7 @@ abstract class QueryStageExec extends LeafExecNode {
   override def executeToIterator(): Iterator[InternalRow] = 
plan.executeToIterator()
 
   protected override def doExecute(): RDD[InternalRow] = plan.execute()
+  override def supportsRowBased: Boolean = plan.supportsRowBased
   override def supportsColumnar: Boolean = plan.supportsColumnar
   protected override def doExecuteColumnar(): RDD[ColumnarBatch] = 
plan.executeColumnar()
   override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 8ed31e1968c..faf1f911b1f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{SparkListener, 
SparkListenerEvent, SparkListe
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
-import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, 
PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, 
ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
+import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, 
LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, 
ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, 
UnionExec}
 import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
@@ -2716,6 +2716,20 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-42778: QueryStageExec should respect supportsRowBased") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      withTempView("t") {
+        Seq(1).toDF("c1").createOrReplaceTempView("t")
+        spark.catalog.cacheTable("t")
+        val df = spark.table("t")
+        df.collect()
+        assert(collect(df.queryExecution.executedPlan) {
+          case c: ColumnarToRowExec => c
+        }.isEmpty)
+      }
+    }
+  }
+
   test("SPARK-42101: Apply AQE if contains nested AdaptiveSparkPlanExec") {
     withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> 
"true") {
       val df = spark.range(3).repartition().cache()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to