This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1dfbdb43f [GLUTEN-5405][CH] Add rewrite todate function (#5406)
1dfbdb43f is described below

commit 1dfbdb43f8c2d853db8357051e942df2184859fc
Author: Shuai li <loney...@live.cn>
AuthorDate: Fri Apr 19 11:03:33 2024 +0800

    [GLUTEN-5405][CH] Add rewrite todate function (#5406)
    
    [CH] Add rewrite todate function
---
 .../clickhouse/CHSparkPlanExecApi.scala            |  6 +-
 .../GlutenClickHouseTPCHNullableSuite.scala        | 30 +++++++
 .../benchmarks/CHOptimizeRuleBenchmark.scala       | 78 +++++++++++++++++++
 .../gluten/expression/ExpressionConverter.scala    |  2 +-
 ...ansform.scala => TimestampAddTransformer.scala} |  2 +-
 .../extension/RewriteToDateExpresstionRule.scala   | 91 ++++++++++++++++++++++
 .../scala/org/apache/gluten/GlutenConfig.scala     | 13 ++++
 7 files changed, 218 insertions(+), 4 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index a2f7ae984..fb564c9e2 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -22,7 +22,7 @@ import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
 import org.apache.gluten.expression._
 import org.apache.gluten.expression.ConverterUtils.FunctionConfig
-import org.apache.gluten.extension.{CountDistinctWithoutExpand, 
FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage}
+import org.apache.gluten.extension.{CountDistinctWithoutExpand, 
FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, 
RewriteToDateExpresstionRule}
 import org.apache.gluten.extension.columnar.AddTransformHintRule
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode, WindowFunctionNode}
@@ -573,7 +573,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
    * @return
    */
   override def genExtendedAnalyzers(): List[SparkSession => Rule[LogicalPlan]] 
= {
-    List(spark => new RewriteDateTimestampComparisonRule(spark, 
spark.sessionState.conf))
+    List(
+      spark => new RewriteToDateExpresstionRule(spark, 
spark.sessionState.conf),
+      spark => new RewriteDateTimestampComparisonRule(spark, 
spark.sessionState.conf))
   }
 
   /**
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala
index 1241f0bcd..fe6afedf4 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.GlutenConfig
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.optimizer.BuildLeft
 
@@ -210,4 +212,32 @@ class GlutenClickHouseTPCHNullableSuite extends 
GlutenClickHouseTPCHAbstractSuit
       runSql(sql, noFallBack = true) { _ => }
     }
   }
+
+  test("test rewrite date conversion") {
+    val sqlStr =
+      """
+        |SELECT
+        |to_date(
+        |  from_unixtime(
+        |    unix_timestamp(date_format(l_shipdate, 'yyyyMMdd'), 'yyyyMMdd')
+        |  )
+        |)
+        |FROM lineitem
+        |limit 10
+        |""".stripMargin
+
+    Seq(("true", false), ("false", true)).foreach(
+      conf => {
+        withSQLConf((GlutenConfig.ENABLE_CH_REWRITE_DATE_CONVERSION.key, 
conf._1)) {
+          runSql(sqlStr)(
+            df => {
+              val project = df.queryExecution.executedPlan.collect {
+                case project: ProjectExecTransformer => project
+              }
+              assert(project.size == 1)
+              
assert(project.apply(0).projectList.toString().contains("from_unixtime") == 
conf._2)
+            })
+        }
+      })
+  }
 }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala
 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala
new file mode 100644
index 000000000..8d6d749fd
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmarks
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
+
+object CHOptimizeRuleBenchmark extends SqlBasedBenchmark with 
CHSqlBasedBenchmark {
+
+  protected lazy val appName = "CHOptimizeRuleBenchmark"
+  protected lazy val thrdNum = "1"
+  protected lazy val memorySize = "4G"
+  protected lazy val offheapSize = "4G"
+
+  def beforeAll(): Unit = {}
+
+  override def getSparkSession: SparkSession = {
+    beforeAll()
+    val conf = getSparkConf
+      .setIfMissing("spark.sql.columnVector.offheap.enabled", "true")
+
+    SparkSession.builder.config(conf).getOrCreate()
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val (parquetDir, readFileCnt, scanSchema, executedCnt, executedVanilla) =
+      if (mainArgs.isEmpty) {
+        ("/data/tpch-data-sf1/parquet/lineitem", 3, 
"l_orderkey,l_receiptdate", 5, true)
+      } else {
+        (mainArgs(0), mainArgs(1).toInt, mainArgs(2), mainArgs(3).toInt, 
mainArgs(4).toBoolean)
+      }
+
+    val parquetReadBenchmark =
+      new Benchmark(s"OptimizeRuleBenchmark", 10, output = output)
+
+    parquetReadBenchmark.addCase(s"ClickHouse rewrite dateConversion: false", 
executedCnt) {
+      _ => testToDateOptimize(parquetDir, "false")
+    }
+
+    parquetReadBenchmark.addCase(s"ClickHouse rewrite dateConversion: true", 
executedCnt) {
+      _ => testToDateOptimize(parquetDir, "true")
+    }
+
+    parquetReadBenchmark.run()
+  }
+
+  def testToDateOptimize(parquetDir: String, enable: String): Unit = {
+    
withSQLConf(("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion", 
enable)) {
+      spark
+        .sql(s"""
+                |select
+                |to_date(
+                |  from_unixtime(
+                |    unix_timestamp(date_format(l_shipdate, 'yyyyMMdd'), 
'yyyyMMdd')
+                |  )
+                |)
+                |from parquet.`$parquetDir`
+                |
+                |""".stripMargin)
+        .collect()
+    }
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index d70bb8fed..8c2427509 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -566,7 +566,7 @@ object ExpressionConverter extends SQLConfHelper with 
Logging {
           throw new UnsupportedOperationException(s"Not support expression 
TimestampAdd.")
         }
         val add = timestampAdd.asInstanceOf[BinaryExpression]
-        TimestampAddTransform(
+        TimestampAddTransformer(
           substraitExprName,
           extract.get.head,
           replaceWithExpressionTransformerInternal(add.left, attributeSeq, 
expressionsMap),
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransform.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransformer.scala
similarity index 98%
rename from 
gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransform.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransformer.scala
index b3b3730b7..acede4523 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransform.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransformer.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.types.DataType
 
 import com.google.common.collect.Lists
 
-case class TimestampAddTransform(
+case class TimestampAddTransformer(
     substraitExprName: String,
     unit: String,
     left: ExpressionTransformer,
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
new file mode 100644
index 000000000..f809bb70f
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.GlutenConfig
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+// If users query data through BI tools.
+// The BI tools may generate SQL similar to
+// `to_date(
+//   from_unixtime(
+//     unix_timestamp(stringType, 'yyyyMMdd')
+//   )
+// )`
+// to convert string strings to dates.
+// Under ch backend, the StringType can be directly converted into DateType,
+//     and the functions `from_unixtime` and `unix_timestamp` can be optimized 
here.
+// Optimized result is `to_date(stringType)`
+class RewriteToDateExpresstionRule(session: SparkSession, conf: SQLConf)
+  extends Rule[LogicalPlan]
+  with Logging {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    if (
+      plan.resolved &&
+      GlutenConfig.getConf.enableGluten &&
+      GlutenConfig.getConf.enableCHRewriteDateConversion
+    ) {
+      visitPlan(plan)
+    } else {
+      plan
+    }
+  }
+
+  private def visitPlan(plan: LogicalPlan): LogicalPlan = plan match {
+    case project: Project if canRewrite(project) =>
+      val newProjectList = project.projectList.map(expr => 
visitExpression(expr))
+      val newProject = Project(newProjectList, project.child)
+      newProject
+    case other =>
+      val children = other.children.map(visitPlan)
+      other.withNewChildren(children)
+  }
+
+  private def visitExpression(expression: NamedExpression): NamedExpression = 
expression match {
+    case Alias(c, _) if c.isInstanceOf[ParseToDate] =>
+      val newToDate = rewriteParseToDate(c.asInstanceOf[ParseToDate])
+      if (!newToDate.fastEquals(c)) {
+        Alias(newToDate, newToDate.toString())()
+      } else {
+        expression
+      }
+    case _ => expression
+  }
+
+  private def rewriteParseToDate(toDate: ParseToDate): Expression = 
toDate.left match {
+    case fromUnixTime: FromUnixTime
+        if fromUnixTime.left.isInstanceOf[UnixTimestamp]
+          && 
fromUnixTime.left.asInstanceOf[UnixTimestamp].left.dataType.isInstanceOf[StringType]
 =>
+      val unixTimestamp = fromUnixTime.left.asInstanceOf[UnixTimestamp]
+      val newLeft = unixTimestamp.left
+      new ParseToDate(newLeft)
+    case _ => toDate
+  }
+
+  private def canRewrite(project: Project): Boolean = {
+    project.projectList.exists(
+      expr => expr.isInstanceOf[Alias] && 
expr.asInstanceOf[Alias].child.isInstanceOf[ParseToDate])
+  }
+}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 60ff95a7e..437cea3cf 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -94,6 +94,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def enableRewriteDateTimestampComparison: Boolean =
     conf.getConf(ENABLE_REWRITE_DATE_TIMESTAMP_COMPARISON)
 
+  def enableCHRewriteDateConversion: Boolean =
+    conf.getConf(ENABLE_CH_REWRITE_DATE_CONVERSION)
+
   def enableCommonSubexpressionEliminate: Boolean =
     conf.getConf(ENABLE_COMMON_SUBEXPRESSION_ELIMINATE)
 
@@ -1588,6 +1591,16 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(true)
 
+  val ENABLE_CH_REWRITE_DATE_CONVERSION =
+    buildConf("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion")
+      .internal()
+      .doc(
+        "Rewrite the conversion between date and string."
+          + "For example `to_date(from_unixtime(unix_timestamp(stringType, 
'yyyyMMdd')))`"
+          + " will be rewritten to `to_date(stringType)`")
+      .booleanConf
+      .createWithDefault(true)
+
   val ENABLE_COLUMNAR_PROJECT_COLLAPSE =
     buildConf("spark.gluten.sql.columnar.project.collapse")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to