This is an automated email from the ASF dual-hosted git repository.

mahongbin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new fa0079dca [GLUTEN-5060][CH] Remove unnecessary FilterExec execution 
when querying from MergeTree with the prewhere (#5067)
fa0079dca is described below

commit fa0079dca26e755ad4760839d4230885baf3ce5a
Author: Zhichao Zhang <zhan...@apache.org>
AuthorDate: Thu Mar 21 16:32:41 2024 +0800

    [GLUTEN-5060][CH] Remove unnecessary FilterExec execution when querying 
from MergeTree with the prewhere (#5067)
    
    When querying from MergeTree with the prewhere, all the filters will be 
pushdowned to the ScanExec, so it does not need to execute the FilterExec again.
    
    Close #5060.
---
 .../backendsapi/clickhouse/CHSparkPlanExecApi.scala   | 19 +++++++++++++------
 .../extension/FallbackBroadcaseHashJoinRules.scala    |  3 ++-
 .../GlutenClickHouseMergeTreeWriteSuite.scala         | 16 +++++++++++++---
 3 files changed, 28 insertions(+), 10 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index cea88266e..29af5a0e5 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -22,8 +22,7 @@ import io.glutenproject.exception.GlutenNotSupportException
 import io.glutenproject.execution._
 import io.glutenproject.expression._
 import io.glutenproject.expression.ConverterUtils.FunctionConfig
-import io.glutenproject.extension.{FallbackBroadcastHashJoin, 
FallbackBroadcastHashJoinPrepQueryStage}
-import io.glutenproject.extension.CountDistinctWithoutExpand
+import io.glutenproject.extension.{CountDistinctWithoutExpand, 
FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage}
 import io.glutenproject.extension.columnar.AddTransformHintRule
 import 
io.glutenproject.extension.columnar.MiscColumnarRules.TransformPreOverrides
 import io.glutenproject.substrait.expression.{ExpressionBuilder, 
ExpressionNode, WindowFunctionNode}
@@ -48,7 +47,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.delta.files.TahoeFileIndex
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
-import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec}
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation, WriteFilesExec}
 import 
org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
@@ -121,10 +120,18 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
   override def genFilterExecTransformer(
       condition: Expression,
       child: SparkPlan): FilterExecTransformerBase = {
+
+    def checkMergeTreeFileFormat(relation: HadoopFsRelation): Boolean = {
+      relation.location.isInstanceOf[TahoeFileIndex] &&
+      relation.fileFormat.isInstanceOf[DeltaMergeTreeFileFormat]
+    }
+
     child match {
-      case scan: FileSourceScanExec
-          if (scan.relation.location.isInstanceOf[TahoeFileIndex] &&
-            scan.relation.fileFormat.isInstanceOf[DeltaMergeTreeFileFormat]) =>
+      case scan: FileSourceScanExec if 
(checkMergeTreeFileFormat(scan.relation)) =>
+        // For the validation phase of the AddTransformHintRule
+        CHFilterExecTransformer(condition, child)
+      case scan: FileSourceScanExecTransformerBase if 
(checkMergeTreeFileFormat(scan.relation)) =>
+        // For the transform phase, the FileSourceScanExec is already 
transformed
         CHFilterExecTransformer(condition, child)
       case _ =>
         FilterExecTransformer(condition, child)
diff --git 
a/backends-clickhouse/src/main/java/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
similarity index 98%
rename from 
backends-clickhouse/src/main/java/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
rename to 
backends-clickhouse/src/main/scala/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
index ccfa16501..52ec8ce47 100644
--- 
a/backends-clickhouse/src/main/java/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala
@@ -100,7 +100,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: 
SparkSession) extend
 case class FallbackBroadcastHashJoin(session: SparkSession) extends 
Rule[SparkPlan] {
 
   private val enableColumnarBroadcastJoin: Boolean =
-    GlutenConfig.getConf.enableColumnarBroadcastJoin && 
GlutenConfig.getConf.enableColumnarBroadcastExchange
+    GlutenConfig.getConf.enableColumnarBroadcastJoin &&
+      GlutenConfig.getConf.enableColumnarBroadcastExchange
 
   override def apply(plan: SparkPlan): SparkPlan = {
     plan.foreachUp {
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index ca192cb89..457c88e34 100644
--- 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -129,12 +129,13 @@ class GlutenClickHouseMergeTreeWriteSuite
          |""".stripMargin
     runTPCHQueryBySQL(1, sqlStr) {
       df =>
-        val scanExec = collect(df.queryExecution.executedPlan) {
+        val plans = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
+          case w: WholeStageTransformer => w
         }
-        assert(scanExec.size == 1)
+        assert(plans.size == 4)
 
-        val mergetreeScan = scanExec(0)
+        val mergetreeScan = 
plans(3).asInstanceOf[FileSourceScanExecTransformer]
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
 
         val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
@@ -148,6 +149,15 @@ class GlutenClickHouseMergeTreeWriteSuite
         assert(
           addFiles.map(_.rows).sum
             == 600572)
+
+        // GLUTEN-5060: check the unnecessary FilterExec
+        val wholeStageTransformer = 
plans(2).asInstanceOf[WholeStageTransformer]
+        val planNodeJson = wholeStageTransformer.substraitPlanJson
+        assert(
+          !planNodeJson
+            .replaceAll("\\\n", "")
+            .replaceAll(" ", "")
+            .contains("\"input\":{\"filter\":{"))
     }
 
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to