This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bc5c6a  [SPARK-28520][SQL] WholeStageCodegen does not work property 
for LocalTableScanExec
6bc5c6a is described below

commit 6bc5c6a4e7c36361db437313cd950509a1ab6db2
Author: Kousuke Saruta <saru...@oss.nttdata.com>
AuthorDate: Mon Jul 29 08:35:25 2019 +0900

    [SPARK-28520][SQL] WholeStageCodegen does not work property for 
LocalTableScanExec
    
    Code is not generated for LocalTableScanExec although proper situations.
    
    If a LocalTableScanExec plan has the direct parent plan which supports 
WholeStageCodegen,
    the LocalTableScanExec plan also should be within a WholeStageCodegen 
domain.
    But code is not generated for LocalTableScanExec and InputAdapter is 
inserted for now.
    
    ```
    val df1 = spark.createDataset(1 to 10).toDF
    val df2 = spark.createDataset(1 to 10).toDF
    val df3 = df1.join(df2, df1("value") === df2("value"))
    df3.explain(true)
    
    ...
    
    == Physical Plan ==
    *(1) BroadcastHashJoin [value#1], [value#6], Inner, BuildRight
    :- LocalTableScan [value#1]                                             // 
LocalTableScanExec is not within a WholeStageCodegen domain
    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
false] as bigint)))
       +- LocalTableScan [value#6]
    ```
    
    ```
    scala> df3.queryExecution.executedPlan.children.head.children.head.getClass
    res4: Class[_ <: org.apache.spark.sql.execution.SparkPlan] = class 
org.apache.spark.sql.execution.InputAdapter
    ```
    
    For the current implementation of LocalTableScanExec, codegen is enabled in 
case `parent` is not null
    but `parent` is set in `consume`, which is called after 
`insertInputAdapter` so it doesn't work as intended.
    
    After applying this cnahge, we can get following plan, which means 
LocalTableScanExec is within a WholeStageCodegen domain.
    
    ```
    == Physical Plan ==
    *(1) BroadcastHashJoin [value#63], [value#68], Inner, BuildRight
    :- *(1) LocalTableScan [value#63]
    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
false] as bigint)))
       +- LocalTableScan [value#68]
    
    ## How was this patch tested?
    
    New test cases are added into WholeStageCodegenSuite.
    
    Closes #25260 from sarutak/localtablescan-improvement.
    
    Authored-by: Kousuke Saruta <saru...@oss.nttdata.com>
    Signed-off-by: Takeshi Yamamuro <yamam...@apache.org>
---
 .../spark/sql/execution/LocalTableScanExec.scala   |  3 ---
 .../sql/execution/WholeStageCodegenExec.scala      |  4 +++
 .../sql/execution/WholeStageCodegenSuite.scala     | 30 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 31640db..9e32ecf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -80,8 +80,5 @@ case class LocalTableScanExec(
   // Input is already UnsafeRows.
   override protected val createUnsafeProjection: Boolean = false
 
-  // Do not codegen when there is no parent - to support the fast driver-local 
collect/take paths.
-  override def supportCodegen: Boolean = (parent != null)
-
   override def inputRDD: RDD[InternalRow] = rdd
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index a0afa9a..d9d9b1f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -895,6 +895,10 @@ case class CollapseCodegenStages(
       // domain object can not be written into unsafe row.
       case plan if plan.output.length == 1 && 
plan.output.head.dataType.isInstanceOf[ObjectType] =>
         plan.withNewChildren(plan.children.map(insertWholeStageCodegen(_, 
isColumnar)))
+      case plan: LocalTableScanExec =>
+        // Do not make LogicalTableScanExec the root of WholeStageCodegen
+        // to support the fast driver-local collect/take paths.
+        plan
       case plan: CodegenSupport if supportCodegen(plan) =>
         WholeStageCodegenExec(
           insertInputAdapter(plan, 
isColumnar))(codegenStageCounter.incrementAndGet())
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 483a046..59b9e15 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, 
CodeGenerator}
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.expressions.scalalang.typed
@@ -325,4 +326,33 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSQLContext {
       checkAnswer(groupByWithId, Seq(Row(1, 2, 0), Row(1, 2, 0)))
     }
   }
+
+  test("SPARK-28520: WholeStageCodegen does not work properly for 
LocalTableScanExec") {
+    // Case1: LocalTableScanExec is the root of a query plan tree.
+    // In this case, WholeStageCodegenExec should not be inserted
+    // as the direct parent of LocalTableScanExec.
+    val df = Seq(1, 2, 3).toDF
+    val rootOfExecutedPlan = df.queryExecution.executedPlan
+
+    // Ensure WholeStageCodegenExec is not inserted and
+    // LocalTableScanExec is still the root.
+    assert(rootOfExecutedPlan.isInstanceOf[LocalTableScanExec],
+      "LocalTableScanExec should be still the root.")
+
+    // Case2: The parent of a LocalTableScanExec supports WholeStageCodegen.
+    // In this case, the LocalTableScanExec should be within a 
WholeStageCodegen domain
+    // and no more InputAdapter is inserted as the direct parent of the 
LocalTableScanExec.
+    val aggedDF = Seq(1, 2, 3).toDF.groupBy("value").sum()
+    val executedPlan = aggedDF.queryExecution.executedPlan
+
+    // HashAggregateExec supports WholeStageCodegen and it's the parent of
+    // LocalTableScanExec so LocalTableScanExec should be within a 
WholeStageCodegen domain.
+    assert(
+      executedPlan.find {
+        case WholeStageCodegenExec(
+          HashAggregateExec(_, _, _, _, _, _, _: LocalTableScanExec)) => true
+        case _ => false
+      }.isDefined,
+      "LocalTableScanExec should be within a WholeStageCodegen domain.")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to