This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d762deb9b [GLUTEN-5041][CH] Fix primary not used when query with 
filter (#5045)
d762deb9b is described below

commit d762deb9bee73b21df48e5a3f51f3fe6cad4396b
Author: Shuai li <loney...@live.cn>
AuthorDate: Wed Mar 20 19:00:59 2024 +0800

    [GLUTEN-5041][CH] Fix primary not used when query with filter (#5045)
    
    Fix primary not used when query with filter
---
 .../java/io/glutenproject/metrics/MetricsStep.java | 24 ++++++
 .../backendsapi/clickhouse/CHMetricsApi.scala      |  4 +-
 .../metrics/FileSourceScanMetricsUpdater.scala     |  8 ++
 .../GlutenClickHouseMergeTreeWriteSuite.scala      | 89 ++++++++++++++++++++++
 cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp  |  8 ++
 cpp-ch/local-engine/Parser/RelMetric.cpp           | 12 +++
 6 files changed, 144 insertions(+), 1 deletion(-)

diff --git 
a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java 
b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
index de39f7679..c569cd2ee 100644
--- 
a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
+++ 
b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
@@ -16,6 +16,8 @@
  */
 package io.glutenproject.metrics;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.List;
 
 public class MetricsStep {
@@ -24,6 +26,12 @@ public class MetricsStep {
   protected String description;
   protected List<MetricsProcessor> processors;
 
+  @JsonProperty("total_marks_pk")
+  protected long totalMarksPk;
+
+  @JsonProperty("selected_marks_pk")
+  protected long selectedMarksPk;
+
   public String getName() {
     return name;
   }
@@ -47,4 +55,20 @@ public class MetricsStep {
   public void setProcessors(List<MetricsProcessor> processors) {
     this.processors = processors;
   }
+
+  public void setTotalMarksPk(long totalMarksPk) {
+    this.totalMarksPk = totalMarksPk;
+  }
+
+  public void setSelectedMarksPk(long selectedMarksPk) {
+    this.selectedMarksPk = selectedMarksPk;
+  }
+
+  public long getTotalMarksPk() {
+    return totalMarksPk;
+  }
+
+  public long getSelectedMarksPk() {
+    return selectedMarksPk;
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
index cda406872..3012d5371 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
@@ -125,7 +125,9 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
       "pruningTime" ->
         SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning 
time"),
       "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
-      "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time")
+      "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
+      "selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected 
marks primary"),
+      "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks 
primary")
     )
 
   override def genFileSourceScanTransformerMetricsUpdater(
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
index 1c6da8dad..8c79536bd 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
@@ -35,6 +35,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
   val extraTime: SQLMetric = metrics("extraTime")
   val inputWaitTime: SQLMetric = metrics("inputWaitTime")
   val outputWaitTime: SQLMetric = metrics("outputWaitTime")
+  val selected_marks_pk: SQLMetric = metrics("selectedMarksPk")
+  val total_marks_pk: SQLMetric = metrics("totalMarksPk")
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
     // inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
@@ -51,6 +53,12 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
         outputWaitTime += (metricsData.outputWaitTime / 1000L).toLong
         outputVectors += metricsData.outputVectors
 
+        metricsData.getSteps.forEach(
+          step => {
+            selected_marks_pk += step.selectedMarksPk
+            total_marks_pk += step.totalMarksPk
+          })
+
         MetricsUtil.updateExtraTimeMetric(
           metricsData,
           extraTime,
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index 2960b502b..f8aa2cfa4 100644
--- 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1290,5 +1290,94 @@ class GlutenClickHouseMergeTreeWriteSuite
     )
   }
 
+  test("test mergetree with primary keys filter") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey2;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey2
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |TBLPROPERTIES (orderByKey='l_shipdate,l_orderkey',
+                 |               primaryKey='l_shipdate')
+                 |LOCATION '$basePath/lineitem_mergetree_orderbykey2'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_orderbykey2
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    sum(l_extendedprice * l_discount) AS revenue
+         |FROM
+         |    lineitem_mergetree_orderbykey2
+         |WHERE
+         |    l_shipdate >= date'1994-01-01'
+         |    AND l_shipdate < date'1994-01-01' + interval 1 year
+         |    AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
+         |    AND l_quantity < 24
+         |""".stripMargin
+    runTPCHQueryBySQL(6, sqlStr) {
+      df =>
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assert(scanExec.size == 1)
+
+        val mergetreeScan = scanExec(0)
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .orderByKeyOption
+            .get
+            .mkString(",")
+            .equals("l_shipdate,l_orderkey"))
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .primaryKeyOption
+            .get
+            .mkString(",")
+            .equals("l_shipdate"))
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+
+        assert(addFiles.size == 6)
+        assert(addFiles.map(_.rows).sum == 600572)
+
+        val plans = collect(df.queryExecution.executedPlan) {
+          case scanExec: BasicScanExecTransformer => scanExec
+        }
+        assert(plans.size == 1)
+        assert(plans(0).metrics("selectedMarksPk").value === 17)
+        assert(plans(0).metrics("totalMarksPk").value === 74)
+    }
+  }
 }
 // scalastyle:off line.size.limit
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp 
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 8737ecb7d..34746217b 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -171,6 +171,14 @@ MergeTreeRelParser::parseReadRel(
         context,
         context->getSettingsRef().max_block_size,
         1);
+
+    auto * source_step_with_filter = static_cast<SourceStepWithFilter 
*>(read_step.get());
+    const auto & storage_prewhere_info = query_info->prewhere_info;
+    if (storage_prewhere_info)
+    {
+        
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions, 
storage_prewhere_info->prewhere_column_name);
+        source_step_with_filter->applyFilters();
+    }
     
query_context.custom_storage_merge_tree->wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree
 *>(read_step.get()), ranges);
     steps.emplace_back(read_step.get());
     query_plan->addStep(std::move(read_step));
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp 
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index 10a49639b..2449fa4e5 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -17,6 +17,7 @@
 #include "RelMetric.h"
 #include <Processors/IProcessor.h>
 #include <Processors/QueryPlan/AggregatingStep.h>
+#include <Processors/QueryPlan/ReadFromMergeTree.h>
 
 using namespace rapidjson;
 
@@ -115,6 +116,17 @@ void RelMetric::serialize(Writer<StringBuffer> & writer, 
bool) const
                 writer.EndObject();
             }
             writer.EndArray();
+
+            if (auto read_mergetree = 
dynamic_cast<DB::ReadFromMergeTree*>(step))
+            {
+                auto selected_marks_pk = 
read_mergetree->getAnalysisResult().selected_marks_pk;
+                auto total_marks_pk = 
read_mergetree->getAnalysisResult().total_marks_pk;
+                writer.Key("selected_marks_pk");
+                writer.Uint64(selected_marks_pk);
+                writer.Key("total_marks_pk");
+                writer.Uint64(total_marks_pk);
+            }
+
             writer.EndObject();
         }
         writer.EndArray();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to