This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new d762deb9b [GLUTEN-5041][CH] Fix primary not used when query with filter (#5045) d762deb9b is described below commit d762deb9bee73b21df48e5a3f51f3fe6cad4396b Author: Shuai li <loney...@live.cn> AuthorDate: Wed Mar 20 19:00:59 2024 +0800 [GLUTEN-5041][CH] Fix primary not used when query with filter (#5045) Fix primary not used when query with filter --- .../java/io/glutenproject/metrics/MetricsStep.java | 24 ++++++ .../backendsapi/clickhouse/CHMetricsApi.scala | 4 +- .../metrics/FileSourceScanMetricsUpdater.scala | 8 ++ .../GlutenClickHouseMergeTreeWriteSuite.scala | 89 ++++++++++++++++++++++ cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 8 ++ cpp-ch/local-engine/Parser/RelMetric.cpp | 12 +++ 6 files changed, 144 insertions(+), 1 deletion(-) diff --git a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java index de39f7679..c569cd2ee 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java @@ -16,6 +16,8 @@ */ package io.glutenproject.metrics; +import com.fasterxml.jackson.annotation.JsonProperty; + import java.util.List; public class MetricsStep { @@ -24,6 +26,12 @@ public class MetricsStep { protected String description; protected List<MetricsProcessor> processors; + @JsonProperty("total_marks_pk") + protected long totalMarksPk; + + @JsonProperty("selected_marks_pk") + protected long selectedMarksPk; + public String getName() { return name; } @@ -47,4 +55,20 @@ public class MetricsStep { public void setProcessors(List<MetricsProcessor> processors) { this.processors = processors; } + + public void setTotalMarksPk(long totalMarksPk) { + this.totalMarksPk = totalMarksPk; + } + + public void setSelectedMarksPk(long selectedMarksPk) { + this.selectedMarksPk = selectedMarksPk; + } + + public long getTotalMarksPk() { + return totalMarksPk; + } + + public long getSelectedMarksPk() { + return selectedMarksPk; + } } diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala index cda406872..3012d5371 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala @@ -125,7 +125,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { "pruningTime" -> SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time"), "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time") + "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), + "selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected marks primary"), + "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks primary") ) override def genFileSourceScanTransformerMetricsUpdater( diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala index 1c6da8dad..8c79536bd 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala @@ -35,6 +35,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric val extraTime: SQLMetric = metrics("extraTime") val inputWaitTime: SQLMetric = metrics("inputWaitTime") val outputWaitTime: SQLMetric = metrics("outputWaitTime") + val selected_marks_pk: SQLMetric = metrics("selectedMarksPk") + val total_marks_pk: SQLMetric = metrics("totalMarksPk") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { // inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value) @@ -51,6 +53,12 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric outputWaitTime += (metricsData.outputWaitTime / 1000L).toLong outputVectors += metricsData.outputVectors + metricsData.getSteps.forEach( + step => { + selected_marks_pk += step.selectedMarksPk + total_marks_pk += step.totalMarksPk + }) + MetricsUtil.updateExtraTimeMetric( metricsData, extraTime, diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala index 2960b502b..f8aa2cfa4 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala @@ -1290,5 +1290,94 @@ class GlutenClickHouseMergeTreeWriteSuite ) } + test("test mergetree with primary keys filter") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey2; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey2 + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |TBLPROPERTIES (orderByKey='l_shipdate,l_orderkey', + | primaryKey='l_shipdate') + |LOCATION '$basePath/lineitem_mergetree_orderbykey2' + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_orderbykey2 + | select * from lineitem + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | sum(l_extendedprice * l_discount) AS revenue + |FROM + | lineitem_mergetree_orderbykey2 + |WHERE + | l_shipdate >= date'1994-01-01' + | AND l_shipdate < date'1994-01-01' + interval 1 year + | AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01 + | AND l_quantity < 24 + |""".stripMargin + runTPCHQueryBySQL(6, sqlStr) { + df => + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + + val mergetreeScan = scanExec(0) + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",") + .equals("l_shipdate,l_orderkey")) + assert( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .primaryKeyOption + .get + .mkString(",") + .equals("l_shipdate")) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + + assert(addFiles.size == 6) + assert(addFiles.map(_.rows).sum == 600572) + + val plans = collect(df.queryExecution.executedPlan) { + case scanExec: BasicScanExecTransformer => scanExec + } + assert(plans.size == 1) + assert(plans(0).metrics("selectedMarksPk").value === 17) + assert(plans(0).metrics("totalMarksPk").value === 74) + } + } } // scalastyle:off line.size.limit diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index 8737ecb7d..34746217b 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -171,6 +171,14 @@ MergeTreeRelParser::parseReadRel( context, context->getSettingsRef().max_block_size, 1); + + auto * source_step_with_filter = static_cast<SourceStepWithFilter *>(read_step.get()); + const auto & storage_prewhere_info = query_info->prewhere_info; + if (storage_prewhere_info) + { + source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions, storage_prewhere_info->prewhere_column_name); + source_step_with_filter->applyFilters(); + } query_context.custom_storage_merge_tree->wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree *>(read_step.get()), ranges); steps.emplace_back(read_step.get()); query_plan->addStep(std::move(read_step)); diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp b/cpp-ch/local-engine/Parser/RelMetric.cpp index 10a49639b..2449fa4e5 100644 --- a/cpp-ch/local-engine/Parser/RelMetric.cpp +++ b/cpp-ch/local-engine/Parser/RelMetric.cpp @@ -17,6 +17,7 @@ #include "RelMetric.h" #include <Processors/IProcessor.h> #include <Processors/QueryPlan/AggregatingStep.h> +#include <Processors/QueryPlan/ReadFromMergeTree.h> using namespace rapidjson; @@ -115,6 +116,17 @@ void RelMetric::serialize(Writer<StringBuffer> & writer, bool) const writer.EndObject(); } writer.EndArray(); + + if (auto read_mergetree = dynamic_cast<DB::ReadFromMergeTree*>(step)) + { + auto selected_marks_pk = read_mergetree->getAnalysisResult().selected_marks_pk; + auto total_marks_pk = read_mergetree->getAnalysisResult().total_marks_pk; + writer.Key("selected_marks_pk"); + writer.Uint64(selected_marks_pk); + writer.Key("total_marks_pk"); + writer.Uint64(total_marks_pk); + } + writer.EndObject(); } writer.EndArray(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org