This is an automated email from the ASF dual-hosted git repository. mahongbin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new fa0079dca [GLUTEN-5060][CH] Remove unnecessary FilterExec execution when querying from MergeTree with the prewhere (#5067) fa0079dca is described below commit fa0079dca26e755ad4760839d4230885baf3ce5a Author: Zhichao Zhang <zhan...@apache.org> AuthorDate: Thu Mar 21 16:32:41 2024 +0800 [GLUTEN-5060][CH] Remove unnecessary FilterExec execution when querying from MergeTree with the prewhere (#5067) When querying from MergeTree with the prewhere, all the filters will be pushdowned to the ScanExec, so it does not need to execute the FilterExec again. Close #5060. --- .../backendsapi/clickhouse/CHSparkPlanExecApi.scala | 19 +++++++++++++------ .../extension/FallbackBroadcaseHashJoinRules.scala | 3 ++- .../GlutenClickHouseMergeTreeWriteSuite.scala | 16 +++++++++++++--- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala index cea88266e..29af5a0e5 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -22,8 +22,7 @@ import io.glutenproject.exception.GlutenNotSupportException import io.glutenproject.execution._ import io.glutenproject.expression._ import io.glutenproject.expression.ConverterUtils.FunctionConfig -import io.glutenproject.extension.{FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage} -import io.glutenproject.extension.CountDistinctWithoutExpand +import io.glutenproject.extension.{CountDistinctWithoutExpand, FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage} import io.glutenproject.extension.columnar.AddTransformHintRule import io.glutenproject.extension.columnar.MiscColumnarRules.TransformPreOverrides import io.glutenproject.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode} @@ -48,7 +47,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec -import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec} +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, WriteFilesExec} import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat @@ -121,10 +120,18 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { override def genFilterExecTransformer( condition: Expression, child: SparkPlan): FilterExecTransformerBase = { + + def checkMergeTreeFileFormat(relation: HadoopFsRelation): Boolean = { + relation.location.isInstanceOf[TahoeFileIndex] && + relation.fileFormat.isInstanceOf[DeltaMergeTreeFileFormat] + } + child match { - case scan: FileSourceScanExec - if (scan.relation.location.isInstanceOf[TahoeFileIndex] && - scan.relation.fileFormat.isInstanceOf[DeltaMergeTreeFileFormat]) => + case scan: FileSourceScanExec if (checkMergeTreeFileFormat(scan.relation)) => + // For the validation phase of the AddTransformHintRule + CHFilterExecTransformer(condition, child) + case scan: FileSourceScanExecTransformerBase if (checkMergeTreeFileFormat(scan.relation)) => + // For the transform phase, the FileSourceScanExec is already transformed CHFilterExecTransformer(condition, child) case _ => FilterExecTransformer(condition, child) diff --git a/backends-clickhouse/src/main/java/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala b/backends-clickhouse/src/main/scala/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala similarity index 98% rename from backends-clickhouse/src/main/java/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala rename to backends-clickhouse/src/main/scala/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala index ccfa16501..52ec8ce47 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/extension/FallbackBroadcaseHashJoinRules.scala @@ -100,7 +100,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPlan] { private val enableColumnarBroadcastJoin: Boolean = - GlutenConfig.getConf.enableColumnarBroadcastJoin && GlutenConfig.getConf.enableColumnarBroadcastExchange + GlutenConfig.getConf.enableColumnarBroadcastJoin && + GlutenConfig.getConf.enableColumnarBroadcastExchange override def apply(plan: SparkPlan): SparkPlan = { plan.foreachUp { diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala index ca192cb89..457c88e34 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala @@ -129,12 +129,13 @@ class GlutenClickHouseMergeTreeWriteSuite |""".stripMargin runTPCHQueryBySQL(1, sqlStr) { df => - val scanExec = collect(df.queryExecution.executedPlan) { + val plans = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f + case w: WholeStageTransformer => w } - assert(scanExec.size == 1) + assert(plans.size == 4) - val mergetreeScan = scanExec(0) + val mergetreeScan = plans(3).asInstanceOf[FileSourceScanExecTransformer] assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] @@ -148,6 +149,15 @@ class GlutenClickHouseMergeTreeWriteSuite assert( addFiles.map(_.rows).sum == 600572) + + // GLUTEN-5060: check the unnecessary FilterExec + val wholeStageTransformer = plans(2).asInstanceOf[WholeStageTransformer] + val planNodeJson = wholeStageTransformer.substraitPlanJson + assert( + !planNodeJson + .replaceAll("\\\n", "") + .replaceAll(" ", "") + .contains("\"input\":{\"filter\":{")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org