This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new 1dfbdb43f [GLUTEN-5405][CH] Add rewrite todate function (#5406) 1dfbdb43f is described below commit 1dfbdb43f8c2d853db8357051e942df2184859fc Author: Shuai li <loney...@live.cn> AuthorDate: Fri Apr 19 11:03:33 2024 +0800 [GLUTEN-5405][CH] Add rewrite todate function (#5406) [CH] Add rewrite todate function --- .../clickhouse/CHSparkPlanExecApi.scala | 6 +- .../GlutenClickHouseTPCHNullableSuite.scala | 30 +++++++ .../benchmarks/CHOptimizeRuleBenchmark.scala | 78 +++++++++++++++++++ .../gluten/expression/ExpressionConverter.scala | 2 +- ...ansform.scala => TimestampAddTransformer.scala} | 2 +- .../extension/RewriteToDateExpresstionRule.scala | 91 ++++++++++++++++++++++ .../scala/org/apache/gluten/GlutenConfig.scala | 13 ++++ 7 files changed, 218 insertions(+), 4 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index a2f7ae984..fb564c9e2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.expression._ import org.apache.gluten.expression.ConverterUtils.FunctionConfig -import org.apache.gluten.extension.{CountDistinctWithoutExpand, FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage} +import org.apache.gluten.extension.{CountDistinctWithoutExpand, FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, RewriteToDateExpresstionRule} import org.apache.gluten.extension.columnar.AddTransformHintRule import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode} @@ -573,7 +573,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { * @return */ override def genExtendedAnalyzers(): List[SparkSession => Rule[LogicalPlan]] = { - List(spark => new RewriteDateTimestampComparisonRule(spark, spark.sessionState.conf)) + List( + spark => new RewriteToDateExpresstionRule(spark, spark.sessionState.conf), + spark => new RewriteDateTimestampComparisonRule(spark, spark.sessionState.conf)) } /** diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala index 1241f0bcd..fe6afedf4 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.execution +import org.apache.gluten.GlutenConfig + import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.optimizer.BuildLeft @@ -210,4 +212,32 @@ class GlutenClickHouseTPCHNullableSuite extends GlutenClickHouseTPCHAbstractSuit runSql(sql, noFallBack = true) { _ => } } } + + test("test rewrite date conversion") { + val sqlStr = + """ + |SELECT + |to_date( + | from_unixtime( + | unix_timestamp(date_format(l_shipdate, 'yyyyMMdd'), 'yyyyMMdd') + | ) + |) + |FROM lineitem + |limit 10 + |""".stripMargin + + Seq(("true", false), ("false", true)).foreach( + conf => { + withSQLConf((GlutenConfig.ENABLE_CH_REWRITE_DATE_CONVERSION.key, conf._1)) { + runSql(sqlStr)( + df => { + val project = df.queryExecution.executedPlan.collect { + case project: ProjectExecTransformer => project + } + assert(project.size == 1) + assert(project.apply(0).projectList.toString().contains("from_unixtime") == conf._2) + }) + } + }) + } } diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala new file mode 100644 index 000000000..8d6d749fd --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmarks + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark + +object CHOptimizeRuleBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark { + + protected lazy val appName = "CHOptimizeRuleBenchmark" + protected lazy val thrdNum = "1" + protected lazy val memorySize = "4G" + protected lazy val offheapSize = "4G" + + def beforeAll(): Unit = {} + + override def getSparkSession: SparkSession = { + beforeAll() + val conf = getSparkConf + .setIfMissing("spark.sql.columnVector.offheap.enabled", "true") + + SparkSession.builder.config(conf).getOrCreate() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val (parquetDir, readFileCnt, scanSchema, executedCnt, executedVanilla) = + if (mainArgs.isEmpty) { + ("/data/tpch-data-sf1/parquet/lineitem", 3, "l_orderkey,l_receiptdate", 5, true) + } else { + (mainArgs(0), mainArgs(1).toInt, mainArgs(2), mainArgs(3).toInt, mainArgs(4).toBoolean) + } + + val parquetReadBenchmark = + new Benchmark(s"OptimizeRuleBenchmark", 10, output = output) + + parquetReadBenchmark.addCase(s"ClickHouse rewrite dateConversion: false", executedCnt) { + _ => testToDateOptimize(parquetDir, "false") + } + + parquetReadBenchmark.addCase(s"ClickHouse rewrite dateConversion: true", executedCnt) { + _ => testToDateOptimize(parquetDir, "true") + } + + parquetReadBenchmark.run() + } + + def testToDateOptimize(parquetDir: String, enable: String): Unit = { + withSQLConf(("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion", enable)) { + spark + .sql(s""" + |select + |to_date( + | from_unixtime( + | unix_timestamp(date_format(l_shipdate, 'yyyyMMdd'), 'yyyyMMdd') + | ) + |) + |from parquet.`$parquetDir` + | + |""".stripMargin) + .collect() + } + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index d70bb8fed..8c2427509 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -566,7 +566,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { throw new UnsupportedOperationException(s"Not support expression TimestampAdd.") } val add = timestampAdd.asInstanceOf[BinaryExpression] - TimestampAddTransform( + TimestampAddTransformer( substraitExprName, extract.get.head, replaceWithExpressionTransformerInternal(add.left, attributeSeq, expressionsMap), diff --git a/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransform.scala b/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransformer.scala similarity index 98% rename from gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransform.scala rename to gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransformer.scala index b3b3730b7..acede4523 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransform.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransformer.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.types.DataType import com.google.common.collect.Lists -case class TimestampAddTransform( +case class TimestampAddTransformer( substraitExprName: String, unit: String, left: ExpressionTransformer, diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala new file mode 100644 index 000000000..f809bb70f --- /dev/null +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.gluten.GlutenConfig + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +// If users query data through BI tools. +// The BI tools may generate SQL similar to +// `to_date( +// from_unixtime( +// unix_timestamp(stringType, 'yyyyMMdd') +// ) +// )` +// to convert string strings to dates. +// Under ch backend, the StringType can be directly converted into DateType, +// and the functions `from_unixtime` and `unix_timestamp` can be optimized here. +// Optimized result is `to_date(stringType)` +class RewriteToDateExpresstionRule(session: SparkSession, conf: SQLConf) + extends Rule[LogicalPlan] + with Logging { + + override def apply(plan: LogicalPlan): LogicalPlan = { + if ( + plan.resolved && + GlutenConfig.getConf.enableGluten && + GlutenConfig.getConf.enableCHRewriteDateConversion + ) { + visitPlan(plan) + } else { + plan + } + } + + private def visitPlan(plan: LogicalPlan): LogicalPlan = plan match { + case project: Project if canRewrite(project) => + val newProjectList = project.projectList.map(expr => visitExpression(expr)) + val newProject = Project(newProjectList, project.child) + newProject + case other => + val children = other.children.map(visitPlan) + other.withNewChildren(children) + } + + private def visitExpression(expression: NamedExpression): NamedExpression = expression match { + case Alias(c, _) if c.isInstanceOf[ParseToDate] => + val newToDate = rewriteParseToDate(c.asInstanceOf[ParseToDate]) + if (!newToDate.fastEquals(c)) { + Alias(newToDate, newToDate.toString())() + } else { + expression + } + case _ => expression + } + + private def rewriteParseToDate(toDate: ParseToDate): Expression = toDate.left match { + case fromUnixTime: FromUnixTime + if fromUnixTime.left.isInstanceOf[UnixTimestamp] + && fromUnixTime.left.asInstanceOf[UnixTimestamp].left.dataType.isInstanceOf[StringType] => + val unixTimestamp = fromUnixTime.left.asInstanceOf[UnixTimestamp] + val newLeft = unixTimestamp.left + new ParseToDate(newLeft) + case _ => toDate + } + + private def canRewrite(project: Project): Boolean = { + project.projectList.exists( + expr => expr.isInstanceOf[Alias] && expr.asInstanceOf[Alias].child.isInstanceOf[ParseToDate]) + } +} diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 60ff95a7e..437cea3cf 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -94,6 +94,9 @@ class GlutenConfig(conf: SQLConf) extends Logging { def enableRewriteDateTimestampComparison: Boolean = conf.getConf(ENABLE_REWRITE_DATE_TIMESTAMP_COMPARISON) + def enableCHRewriteDateConversion: Boolean = + conf.getConf(ENABLE_CH_REWRITE_DATE_CONVERSION) + def enableCommonSubexpressionEliminate: Boolean = conf.getConf(ENABLE_COMMON_SUBEXPRESSION_ELIMINATE) @@ -1588,6 +1591,16 @@ object GlutenConfig { .booleanConf .createWithDefault(true) + val ENABLE_CH_REWRITE_DATE_CONVERSION = + buildConf("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion") + .internal() + .doc( + "Rewrite the conversion between date and string." + + "For example `to_date(from_unixtime(unix_timestamp(stringType, 'yyyyMMdd')))`" + + " will be rewritten to `to_date(stringType)`") + .booleanConf + .createWithDefault(true) + val ENABLE_COLUMNAR_PROJECT_COLLAPSE = buildConf("spark.gluten.sql.columnar.project.collapse") .internal() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org