This is an automated email from the ASF dual-hosted git repository. mahongbin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new 972597184 [Gluten-5018][CH] support minmax/bloomfilter/set skip index (#5019) 972597184 is described below commit 972597184e147fcf488fd6cda4b447356d61136d Author: Hongbin Ma <mahong...@apache.org> AuthorDate: Wed Mar 27 09:33:40 2024 +0800 [Gluten-5018][CH] support minmax/bloomfilter/set skip index (#5019) * temp, by defualt all cols minmax index basically works, dealing with nullable nullable/not-null ok remove unneceesary change fix compile * add ut * remove dataschema * fix spark32 bug --- .../source/DeltaMergeTreeFileFormat.scala | 17 +- .../source/DeltaMergeTreeFileFormat.scala | 17 +- .../java/io/glutenproject/metrics/MetricsStep.java | 11 + .../backendsapi/clickhouse/CHIteratorApi.scala | 3 + .../backendsapi/clickhouse/CHMetricsApi.scala | 1 + .../execution/GlutenMergeTreePartition.scala | 3 + .../metrics/FileSourceScanMetricsUpdater.scala | 2 + .../delta/ClickhouseOptimisticTransaction.scala | 7 +- .../sql/delta/catalog/ClickHouseTableV2.scala | 35 ++- .../utils/MergeTreePartsPartitionsUtil.scala | 33 +++ .../datasources/v1/CHMergeTreeWriterInjects.scala | 29 ++- .../v1/clickhouse/MergeTreeFileFormatWriter.scala | 9 + ...GlutenClickHouseTPCHNotNullSkipIndexSuite.scala | 271 ++++++++++++++++++++ ...lutenClickHouseTPCHNullableSkipIndexSuite.scala | 277 +++++++++++++++++++++ .../apache/spark/affinity/MixedAffinitySuite.scala | 3 + cpp-ch/local-engine/Common/MergeTreeTool.cpp | 84 ++++++- cpp-ch/local-engine/Common/MergeTreeTool.h | 3 + cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 18 +- cpp-ch/local-engine/Parser/RelMetric.cpp | 3 + cpp-ch/local-engine/Parser/TypeParser.cpp | 4 +- cpp-ch/local-engine/Parser/TypeParser.h | 50 ++-- .../substrait/rel/ExtensionTableBuilder.java | 6 + .../substrait/rel/ExtensionTableNode.java | 12 + .../datasource/GlutenFormatWriterInjects.scala | 4 +- 24 files changed, 843 insertions(+), 59 deletions(-) diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index fef109d35..d4ca321a9 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.v2.clickhouse.source import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} @@ -31,9 +30,11 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) protected var database = "" protected var tableName = "" - protected var dataSchemas = Seq.empty[Attribute] protected var orderByKeyOption: Option[Seq[String]] = None protected var lowCardKeyOption: Option[Seq[String]] = None + protected var minmaxIndexKeyOption: Option[Seq[String]] = None + protected var bfIndexKeyOption: Option[Seq[String]] = None + protected var setIndexKeyOption: Option[Seq[String]] = None protected var primaryKeyOption: Option[Seq[String]] = None protected var partitionColumns: Seq[String] = Seq.empty[String] protected var clickhouseTableConfigs: Map[String, String] = Map.empty @@ -42,18 +43,22 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) metadata: Metadata, database: String, tableName: String, - schemas: Seq[Attribute], orderByKeyOption: Option[Seq[String]], lowCardKeyOption: Option[Seq[String]], + minmaxIndexKeyOption: Option[Seq[String]], + bfIndexKeyOption: Option[Seq[String]], + setIndexKeyOption: Option[Seq[String]], primaryKeyOption: Option[Seq[String]], clickhouseTableConfigs: Map[String, String], partitionColumns: Seq[String]) { this(metadata) this.database = database this.tableName = tableName - this.dataSchemas = schemas this.orderByKeyOption = orderByKeyOption this.lowCardKeyOption = lowCardKeyOption + this.minmaxIndexKeyOption = minmaxIndexKeyOption + this.bfIndexKeyOption = bfIndexKeyOption + this.setIndexKeyOption = setIndexKeyOption this.primaryKeyOption = primaryKeyOption this.clickhouseTableConfigs = clickhouseTableConfigs this.partitionColumns = partitionColumns @@ -102,10 +107,12 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) tableName, orderByKeyOption, lowCardKeyOption, + minmaxIndexKeyOption, + bfIndexKeyOption, + setIndexKeyOption, primaryKeyOption, partitionColumns, metadata.schema, - dataSchemas, clickhouseTableConfigs, context, nativeConf diff --git a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index b87420787..002e636af 100644 --- a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.v2.clickhouse.source import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} @@ -30,9 +29,11 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma protected var database = "" protected var tableName = "" - protected var dataSchemas = Seq.empty[Attribute] protected var orderByKeyOption: Option[Seq[String]] = None protected var lowCardKeyOption: Option[Seq[String]] = None + protected var minmaxIndexKeyOption: Option[Seq[String]] = None + protected var bfIndexKeyOption: Option[Seq[String]] = None + protected var setIndexKeyOption: Option[Seq[String]] = None protected var primaryKeyOption: Option[Seq[String]] = None protected var partitionColumns: Seq[String] = Seq.empty[String] protected var clickhouseTableConfigs: Map[String, String] = Map.empty @@ -41,18 +42,22 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma metadata: Metadata, database: String, tableName: String, - schemas: Seq[Attribute], orderByKeyOption: Option[Seq[String]], lowCardKeyOption: Option[Seq[String]], + minmaxIndexKeyOption: Option[Seq[String]], + bfIndexKeyOption: Option[Seq[String]], + setIndexKeyOption: Option[Seq[String]], primaryKeyOption: Option[Seq[String]], clickhouseTableConfigs: Map[String, String], partitionColumns: Seq[String]) { this(metadata) this.database = database this.tableName = tableName - this.dataSchemas = schemas this.orderByKeyOption = orderByKeyOption this.lowCardKeyOption = lowCardKeyOption + this.minmaxIndexKeyOption = minmaxIndexKeyOption + this.bfIndexKeyOption = bfIndexKeyOption + this.setIndexKeyOption = setIndexKeyOption this.primaryKeyOption = primaryKeyOption this.clickhouseTableConfigs = clickhouseTableConfigs this.partitionColumns = partitionColumns @@ -101,10 +106,12 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma tableName, orderByKeyOption, lowCardKeyOption, + minmaxIndexKeyOption, + bfIndexKeyOption, + setIndexKeyOption, primaryKeyOption, partitionColumns, metadata.schema, - dataSchemas, clickhouseTableConfigs, context, nativeConf diff --git a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java index c569cd2ee..d1714d825 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java @@ -32,6 +32,9 @@ public class MetricsStep { @JsonProperty("selected_marks_pk") protected long selectedMarksPk; + @JsonProperty("selected_marks") + protected long selectedMarks; + public String getName() { return name; } @@ -64,6 +67,14 @@ public class MetricsStep { this.selectedMarksPk = selectedMarksPk; } + public long getSelectedMarks() { + return selectedMarks; + } + + public void setSelectedMarks(long selectedMarks) { + this.selectedMarks = selectedMarks; + } + public long getTotalMarksPk() { return totalMarksPk; } diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala index 6fecb2c5f..08786a00b 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala @@ -99,6 +99,9 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { p.absoluteTablePath, p.orderByKey, p.lowCardKey, + p.minmaxIndexKey, + p.bfIndexKey, + p.setIndexKey, p.primaryKey, partLists, starts, diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala index 3012d5371..0157b370f 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala @@ -127,6 +127,7 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected marks primary"), + "selectedMarks" -> SQLMetrics.createMetric(sparkContext, "selected marks"), "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks primary") ) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala b/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala index df41191f2..be17c713e 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala @@ -39,6 +39,9 @@ case class GlutenMergeTreePartition( absoluteTablePath: String, orderByKey: String, lowCardKey: String, + minmaxIndexKey: String, + bfIndexKey: String, + setIndexKey: String, primaryKey: String, partList: Array[MergeTreePartSplit], tableSchemaJson: String, diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala index 8c79536bd..497e8b780 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala @@ -36,6 +36,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric val inputWaitTime: SQLMetric = metrics("inputWaitTime") val outputWaitTime: SQLMetric = metrics("outputWaitTime") val selected_marks_pk: SQLMetric = metrics("selectedMarksPk") + val selected_marks: SQLMetric = metrics("selectedMarks") val total_marks_pk: SQLMetric = metrics("totalMarksPk") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { @@ -56,6 +57,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric metricsData.getSteps.forEach( step => { selected_marks_pk += step.selectedMarksPk + selected_marks += step.selectedMarks total_marks_pk += step.totalMarksPk }) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 9111bea7f..95119f842 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -128,9 +128,11 @@ class ClickhouseOptimisticTransaction( metadata, tableV2.dataBaseName, tableV2.tableName, - output, tableV2.orderByKeyOption, tableV2.lowCardKeyOption, + tableV2.minmaxIndexKeyOption, + tableV2.bfIndexKeyOption, + tableV2.setIndexKeyOption, tableV2.primaryKeyOption, tableV2.clickhouseTableConfigs, tableV2.partitionColumns @@ -144,6 +146,9 @@ class ClickhouseOptimisticTransaction( // scalastyle:on deltahadoopconfiguration orderByKeyOption = tableV2.orderByKeyOption, lowCardKeyOption = tableV2.lowCardKeyOption, + minmaxIndexKeyOption = tableV2.minmaxIndexKeyOption, + bfIndexKeyOption = tableV2.bfIndexKeyOption, + setIndexKeyOption = tableV2.setIndexKeyOption, primaryKeyOption = tableV2.primaryKeyOption, partitionColumns = partitioningColumns, bucketSpec = tableV2.bucketOption, diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index e06a01edf..92d12c05f 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -114,18 +114,34 @@ class ClickHouseTableV2( } lazy val lowCardKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("lowCardKey") + } + + lazy val minmaxIndexKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("minmaxIndexKey") + } + + lazy val bfIndexKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("bloomfilterIndexKey") + } + + lazy val setIndexKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("setIndexKey") + } + + private def getCommaSeparatedColumns(keyName: String) = { val tableProperties = properties() - if (tableProperties.containsKey("lowCardKey")) { - if (tableProperties.get("lowCardKey").nonEmpty) { - val lowCardKeys = tableProperties.get("lowCardKey").split(",").map(_.trim).toSeq - lowCardKeys.foreach( + if (tableProperties.containsKey(keyName)) { + if (tableProperties.get(keyName).nonEmpty) { + val keys = tableProperties.get(keyName).split(",").map(_.trim).toSeq + keys.foreach( s => { if (s.contains(".")) { throw new IllegalStateException( - s"lowCardKey $s can not contain '.' (not support nested column yet)") + s"$keyName $s can not contain '.' (not support nested column yet)") } }) - Some(lowCardKeys.map(s => s.toLowerCase())) + Some(keys.map(s => s.toLowerCase())) } else { None } @@ -259,12 +275,15 @@ class ClickHouseTableV2( meta, dataBaseName, tableName, - Seq.empty[Attribute], orderByKeyOption, lowCardKeyOption, + minmaxIndexKeyOption, + bfIndexKeyOption, + setIndexKeyOption, primaryKeyOption, clickhouseTableConfigs, - partitionColumns) + partitionColumns + ) } def cacheThis(): Unit = { deltaLog2Table.put(deltaLog, this) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala index a7ac2ce16..7d202b4d1 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala @@ -74,6 +74,21 @@ object MergeTreePartsPartitionsUtil extends Logging { case None => "" } + val minmaxIndexKey = table.minmaxIndexKeyOption match { + case Some(keys) => keys.mkString(",") + case None => "" + } + + val bfIndexKey = table.bfIndexKeyOption match { + case Some(keys) => keys.mkString(",") + case None => "" + } + + val setIndexKey = table.setIndexKeyOption match { + case Some(keys) => keys.mkString(",") + case None => "" + } + val tableSchemaJson = ConverterUtils.convertNamedStructJson(table.schema()) // bucket table @@ -92,6 +107,9 @@ object MergeTreePartsPartitionsUtil extends Logging { partitions, orderByKey, lowCardKey, + minmaxIndexKey, + bfIndexKey, + setIndexKey, primaryKey, table.clickhouseTableConfigs, sparkSession @@ -109,6 +127,9 @@ object MergeTreePartsPartitionsUtil extends Logging { partitions, orderByKey, lowCardKey, + minmaxIndexKey, + bfIndexKey, + setIndexKey, primaryKey, table.clickhouseTableConfigs, sparkSession @@ -129,6 +150,9 @@ object MergeTreePartsPartitionsUtil extends Logging { partitions: ArrayBuffer[InputPartition], orderByKey: String, lowCardKey: String, + minmaxIndexKey: String, + bfIndexKey: String, + setIndexKey: String, primaryKey: String, clickhouseTableConfigs: Map[String, String], sparkSession: SparkSession): Unit = { @@ -214,6 +238,9 @@ object MergeTreePartsPartitionsUtil extends Logging { absoluteTablePath, orderByKey, lowCardKey, + minmaxIndexKey, + bfIndexKey, + setIndexKey, primaryKey, currentFiles.toArray, tableSchemaJson, @@ -256,6 +283,9 @@ object MergeTreePartsPartitionsUtil extends Logging { partitions: ArrayBuffer[InputPartition], orderByKey: String, lowCardKey: String, + minmaxIndexKey: String, + bfIndexKey: String, + setIndexKey: String, primaryKey: String, clickhouseTableConfigs: Map[String, String], sparkSession: SparkSession): Unit = { @@ -318,6 +348,9 @@ object MergeTreePartsPartitionsUtil extends Logging { absoluteTablePath, orderByKey, lowCardKey, + minmaxIndexKey, + bfIndexKey, + setIndexKey, primaryKey, currentFiles.toArray, tableSchemaJson, diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala index d05945a81..6175f76cb 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala @@ -68,10 +68,12 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase { tableName: String, orderByKeyOption: Option[Seq[String]], lowCardKeyOption: Option[Seq[String]], + minmaxIndexKeyOption: Option[Seq[String]], + bfIndexKeyOption: Option[Seq[String]], + setIndexKeyOption: Option[Seq[String]], primaryKeyOption: Option[Seq[String]], partitionColumns: Seq[String], tableSchema: StructType, - dataSchema: Seq[Attribute], clickhouseTableConfigs: Map[String, String], context: TaskAttemptContext, nativeConf: JMap[String, String]): OutputWriter = { @@ -83,11 +85,14 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase { tableName, orderByKeyOption, lowCardKeyOption, + minmaxIndexKeyOption, + bfIndexKeyOption, + setIndexKeyOption, primaryKeyOption, partitionColumns, ConverterUtils.convertNamedStructJson(tableSchema), clickhouseTableConfigs, - dataSchema + tableSchema.toAttributes // use table schema instead of data schema ) val datasourceJniWrapper = new CHDatasourceJniWrapper() @@ -119,17 +124,22 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase { object CHMergeTreeWriterInjects { + // scalastyle:off argcount def genMergeTreeWriteRel( path: String, database: String, tableName: String, orderByKeyOption: Option[Seq[String]], lowCardKeyOption: Option[Seq[String]], + minmaxIndexKeyOption: Option[Seq[String]], + bfIndexKeyOption: Option[Seq[String]], + setIndexKeyOption: Option[Seq[String]], primaryKeyOption: Option[Seq[String]], partitionColumns: Seq[String], tableSchemaJson: String, clickhouseTableConfigs: Map[String, String], output: Seq[Attribute]): PlanWithSplitInfo = { + // scalastyle:on argcount val typeNodes = ConverterUtils.collectAttributeTypeNodes(output) val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output) val columnTypeNodes = output.map { @@ -150,6 +160,18 @@ object CHMergeTreeWriterInjects { case Some(keys) => keys.mkString(",") case None => "" } + val minmaxIndexKey = minmaxIndexKeyOption match { + case Some(keys) => keys.mkString(",") + case None => "" + } + val bfIndexKey = bfIndexKeyOption match { + case Some(keys) => keys.mkString(",") + case None => "" + } + val setIndexKey = setIndexKeyOption match { + case Some(keys) => keys.mkString(",") + case None => "" + } val substraitContext = new SubstraitContext val extensionTableNode = ExtensionTableBuilder.makeExtensionTable( @@ -161,6 +183,9 @@ object CHMergeTreeWriterInjects { "", orderByKey, lowCardKey, + minmaxIndexKey, + bfIndexKey, + setIndexKey, primaryKey, new JList[String](), new JList[JLong](), diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala index 874a4ede3..90f01e744 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala @@ -57,6 +57,9 @@ object MergeTreeFileFormatWriter extends Logging { hadoopConf: Configuration, orderByKeyOption: Option[Seq[String]], lowCardKeyOption: Option[Seq[String]], + minmaxIndexKeyOption: Option[Seq[String]], + bfIndexKeyOption: Option[Seq[String]], + setIndexKeyOption: Option[Seq[String]], primaryKeyOption: Option[Seq[String]], partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], @@ -71,6 +74,9 @@ object MergeTreeFileFormatWriter extends Logging { hadoopConf = hadoopConf, orderByKeyOption = orderByKeyOption, lowCardKeyOption = lowCardKeyOption, + minmaxIndexKeyOption = minmaxIndexKeyOption, + bfIndexKeyOption = bfIndexKeyOption, + setIndexKeyOption = setIndexKeyOption, primaryKeyOption = primaryKeyOption, partitionColumns = partitionColumns, bucketSpec = bucketSpec, @@ -89,6 +95,9 @@ object MergeTreeFileFormatWriter extends Logging { hadoopConf: Configuration, orderByKeyOption: Option[Seq[String]], lowCardKeyOption: Option[Seq[String]], + minmaxIndexKeyOption: Option[Seq[String]], + bfIndexKeyOption: Option[Seq[String]], + setIndexKeyOption: Option[Seq[String]], primaryKeyOption: Option[Seq[String]], partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala new file mode 100644 index 000000000..73462780c --- /dev/null +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.SparkConf + +import java.io.File + +class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbstractSuite { + + override protected val tablesPath: String = basePath + "/tpch-data-ch" + override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" + override protected val queriesResults: String = rootPath + "mergetree-queries-output" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "sort") + .set("spark.io.compression.codec", "SNAPPY") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.sql.autoBroadcastJoinThreshold", "10MB") +// .set("spark.ui.enabled", "true") +// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.dump_pipeline", "true") +// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "debug") + } + + test("test simple minmax index") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_minmax; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax + |( + | l_orderkey bigint not null, + | l_partkey bigint not null, + | l_suppkey bigint not null, + | l_linenumber bigint not null, + | l_quantity double not null, + | l_extendedprice double not null, + | l_discount double not null, + | l_tax double not null, + | l_returnflag string not null, + | l_linestatus string not null, + | l_shipdate date not null, + | l_commitdate date not null, + | l_receiptdate date not null, + | l_shipinstruct string not null, + | l_shipmode string not null, + | l_comment string not null + |) + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_minmax' + |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_minmax + | select * from lineitem + |""".stripMargin) + + val df = spark + .sql(s""" + |select count(*) from lineitem_mergetree_minmax where l_receiptdate = '1998-12-27' + |""".stripMargin) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() + assert(ret.apply(0).get(0) == 1) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) + + val directory = new File(s"$basePath/lineitem_mergetree_minmax") + // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 + val partDir = directory.listFiles().filter(f => f.getName.length > 20).head + assert(!partDir.listFiles().exists(p => p.getName.contains("null"))) + assert( + partDir.listFiles().exists(p => p.getName.contains("skp_idx__minmax_l_receiptdate.idx2"))) + } + + test("test simple bloom filter index") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_bf; + |""".stripMargin) + + spark.sql(s""" + CREATE TABLE IF NOT EXISTS lineitem_mergetree_bf + |( + | l_orderkey bigint not null, + | l_partkey bigint not null, + | l_suppkey bigint not null, + | l_linenumber bigint not null, + | l_quantity double not null, + | l_extendedprice double not null, + | l_discount double not null, + | l_tax double not null, + | l_returnflag string not null, + | l_linestatus string not null, + | l_shipdate date not null, + | l_commitdate date not null, + | l_receiptdate date not null, + | l_shipinstruct string not null, + | l_shipmode string not null, + | l_comment string not null + |) + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_bf' + |TBLPROPERTIES('bloomfilterIndexKey'='l_orderkey') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_bf + | select * from lineitem + |""".stripMargin) + + val df = spark + .sql(s""" + select count(*) from lineitem_mergetree_bf where l_orderkey = '600000' + |""".stripMargin) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() + assert(ret.apply(0).get(0) == 2) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) + + val directory = new File(s"$basePath/lineitem_mergetree_bf") + // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 + val partDir = directory.listFiles().filter(f => f.getName.length > 20).head + assert(!partDir.listFiles().exists(p => p.getName.contains("null"))) + assert( + partDir.listFiles().exists(p => p.getName.contains("skp_idx__bloomfilter_l_orderkey.idx"))) + } + + test("test simple set index") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_set; + |""".stripMargin) + + spark.sql(s""" + CREATE TABLE IF NOT EXISTS lineitem_mergetree_set + |( + | l_orderkey bigint not null, + | l_partkey bigint not null, + | l_suppkey bigint not null, + | l_linenumber bigint not null, + | l_quantity double not null, + | l_extendedprice double not null, + | l_discount double not null, + | l_tax double not null, + | l_returnflag string not null, + | l_linestatus string not null, + | l_shipdate date not null, + | l_commitdate date not null, + | l_receiptdate date not null, + | l_shipinstruct string not null, + | l_shipmode string not null, + | l_comment string not null + |) + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_set' + |TBLPROPERTIES('setIndexKey'='l_orderkey') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_set + | select * from lineitem + |""".stripMargin) + + val df = spark + .sql(s""" + select count(*) from lineitem_mergetree_set where l_orderkey = '600000' + |""".stripMargin) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() + assert(ret.apply(0).get(0) == 2) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) + + val directory = new File(s"$basePath/lineitem_mergetree_set") + // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 + val partDir = directory.listFiles().filter(f => f.getName.length > 20).head + assert(!partDir.listFiles().exists(p => p.getName.contains("null"))) + assert(partDir.listFiles().exists(p => p.getName.contains("skp_idx__set_l_orderkey.idx"))) + } + + test("test not null dataset inserted into nullable schema") { + + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_minmax2; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax2 + |( + l_orderkey bigint , + | l_partkey bigint , + | l_suppkey bigint , + | l_linenumber bigint , + | l_quantity double , + | l_extendedprice double , + | l_discount double , + | l_tax double , + | l_returnflag string , + | l_linestatus string , + | l_shipdate date , + | l_commitdate date , + | l_receiptdate date , + | l_shipinstruct string , + | l_shipmode string , + | l_comment string + |) + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_minmax2' + |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_minmax2 + | select * from lineitem + |""".stripMargin) + + val df = spark + .sql(s""" + |select count(*) from lineitem_mergetree_minmax2 where l_receiptdate = '1998-12-27' + |""".stripMargin) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() + assert(ret.apply(0).get(0) == 1) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) + + val directory = new File(s"$basePath/lineitem_mergetree_minmax2") + // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 + val partDir = directory.listFiles().filter(f => f.getName.length > 20).head + assert(partDir.listFiles().exists(p => p.getName.contains("null"))) + assert( + partDir.listFiles().exists(p => p.getName.contains("skp_idx__minmax_l_receiptdate.idx2"))) + } +} diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala new file mode 100644 index 000000000..27fa79018 --- /dev/null +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.SparkConf + +import java.io.File + +class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbstractSuite { + + override protected val createNullableTables = true + + override protected val tablesPath: String = basePath + "/tpch-data-ch" + override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" + override protected val queriesResults: String = rootPath + "mergetree-queries-output" + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "sort") + .set("spark.io.compression.codec", "SNAPPY") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.sql.autoBroadcastJoinThreshold", "10MB") +// .set("spark.ui.enabled", "true") +// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.dump_pipeline", "true") +// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "debug") + } + + test("test simple minmax index") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_minmax; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_minmax' + |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_minmax + | select * from lineitem + |""".stripMargin) + + val df = spark + .sql(s""" + |select count(*) from lineitem_mergetree_minmax where l_receiptdate = '1998-12-27' + |""".stripMargin) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() + assert(ret.apply(0).get(0) == 1) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) + + val directory = new File(s"$basePath/lineitem_mergetree_minmax") + // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 + val partDir = directory.listFiles().filter(f => f.getName.length > 20).head + assert( + partDir.listFiles().exists(p => p.getName.contains("skp_idx__minmax_l_receiptdate.idx2"))) + } + + test("test simple bloom filter index") { + + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_bf; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bf + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_bf' + |TBLPROPERTIES('bloomfilterIndexKey'='l_orderkey') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_bf + | select * from lineitem + |""".stripMargin) + + val df = spark + .sql(s""" + |select count(*) from lineitem_mergetree_bf where l_orderkey = '600000' + |""".stripMargin) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() + assert(ret.apply(0).get(0) == 2) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) + + val directory = new File(s"$basePath/lineitem_mergetree_bf") + // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 + val partDir = directory.listFiles().filter(f => f.getName.length > 20).head + assert( + partDir.listFiles().exists(p => p.getName.contains("skp_idx__bloomfilter_l_orderkey.idx"))) + + } + + test("test simple set index") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_set; + |""".stripMargin) + + spark.sql(s""" + CREATE TABLE IF NOT EXISTS lineitem_mergetree_set + |( + | l_orderkey bigint , + | l_partkey bigint , + | l_suppkey bigint , + | l_linenumber bigint , + | l_quantity double , + | l_extendedprice double , + | l_discount double , + | l_tax double , + | l_returnflag string , + | l_linestatus string , + | l_shipdate date , + | l_commitdate date , + | l_receiptdate date , + | l_shipinstruct string , + | l_shipmode string , + | l_comment string + |) + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_set' + |TBLPROPERTIES('setIndexKey'='l_orderkey') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_set + | select * from lineitem + |""".stripMargin) + + val df = spark + .sql(s""" + |select count(*) from lineitem_mergetree_set where l_orderkey = '600000' + |""".stripMargin) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() + assert(ret.apply(0).get(0) == 2) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) + + val directory = new File(s"$basePath/lineitem_mergetree_set") + // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 + val partDir = directory.listFiles().filter(f => f.getName.length > 20).head + assert(partDir.listFiles().exists(p => p.getName.contains("skp_idx__set_l_orderkey.idx"))) + } + + test("test nullable dataset inserted into not null schema") { + + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_minmax2; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax2 + |( + | l_orderkey bigint not null, + | l_partkey bigint not null, + | l_suppkey bigint not null, + | l_linenumber bigint not null, + | l_quantity double not null, + | l_extendedprice double not null, + | l_discount double not null, + | l_tax double not null, + | l_returnflag string not null, + | l_linestatus string not null, + | l_shipdate date not null, + | l_commitdate date not null, + | l_receiptdate date not null, + | l_shipinstruct string not null, + | l_shipmode string not null, + | l_comment string not null + |) + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_minmax2' + |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_minmax2 + | select * from lineitem + |""".stripMargin) + + val df = spark + .sql(s""" + |select count(*) from lineitem_mergetree_minmax2 where l_receiptdate = '1998-12-27' + |""".stripMargin) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() + assert(ret.apply(0).get(0) == 1) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) + + val directory = new File(s"$basePath/lineitem_mergetree_minmax2") + // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 + val partDir = directory.listFiles().filter(f => f.getName.length > 20).head + assert(!partDir.listFiles().exists(p => p.getName.contains("null"))) + assert( + partDir.listFiles().exists(p => p.getName.contains("skp_idx__minmax_l_receiptdate.idx2"))) + } + + // TODO: + // 1. auto check visited granule (effectiveness of index) + // 2. set index is implemented, but not encouraged because we by default does not cap set size + // 3. need to test minmax/bf/set index on every type (bloom filter on date32, e.g. is not working) + // 4. complex case where a column has many types of indexes / a type of index on many columns +} diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala index e6910a430..53fcf3cbc 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala @@ -59,6 +59,9 @@ class MixedAffinitySuite extends QueryTest with SharedSparkSession { "", "", "", + "", + "", + "", Array(file), "", Map.empty) diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.cpp b/cpp-ch/local-engine/Common/MergeTreeTool.cpp index c5122905e..39198b87e 100644 --- a/cpp-ch/local-engine/Common/MergeTreeTool.cpp +++ b/cpp-ch/local-engine/Common/MergeTreeTool.cpp @@ -24,12 +24,77 @@ #include <google/protobuf/util/json_util.h> #include <rapidjson/rapidjson.h> #include <rapidjson/document.h> +#include <Poco/StringTokenizer.h> using namespace DB; namespace local_engine { -std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const DB::NamesAndTypesList &columns, ContextPtr context, const MergeTreeTable &table) + +// set skip index for each column if specified +void setSecondaryIndex( + const DB::NamesAndTypesList & columns, + ContextPtr context, + const MergeTreeTable & table, + std::shared_ptr<DB::StorageInMemoryMetadata> metadata) +{ + std::unordered_set<std::string> minmax_index_cols; + std::unordered_set<std::string> bf_index_cols; + std::unordered_set<std::string> set_index_cols; + { + Poco::StringTokenizer tokenizer(table.minmax_index_key, ","); + for (const auto & token : tokenizer) + minmax_index_cols.insert(token); + } + { + Poco::StringTokenizer tokenizer(table.bf_index_key, ","); + for (const auto & token : tokenizer) + bf_index_cols.insert(token); + } + { + Poco::StringTokenizer tokenizer(table.set_index_key, ","); + for (const auto & token : tokenizer) + set_index_cols.insert(token); + } + + std::stringstream ss; + bool first = true; + for (const auto & column : columns) + { + if (minmax_index_cols.contains(column.name)) + { + if (!first) + ss << ", "; + else + first = false; + ss << "_minmax_" << column.name << " " << column.name << " TYPE minmax GRANULARITY 1"; + } + + if (bf_index_cols.contains(column.name)) + { + if (!first) + ss << ", "; + else + first = false; + ss << "_bloomfilter_" << column.name << " " << column.name << " TYPE bloom_filter GRANULARITY 1"; + } + + if (set_index_cols.contains(column.name)) + { + if (!first) + ss << ", "; + else + first = false; + ss << "_set_" << column.name << " " << column.name << " TYPE set(0) GRANULARITY 1"; + } + } + metadata->setSecondaryIndices(IndicesDescription::parse(ss.str(), metadata->getColumns(), context)); +} + +std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData( + const DB::NamesAndTypesList & columns, + ContextPtr context, + const MergeTreeTable & table) { std::shared_ptr<DB::StorageInMemoryMetadata> metadata = std::make_shared<DB::StorageInMemoryMetadata>(); ColumnsDescription columns_description; @@ -38,6 +103,9 @@ std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const DB::NamesAndTyp columns_description.add(ColumnDescription(item.name, item.type)); } metadata->setColumns(std::move(columns_description)); + + setSecondaryIndex(columns, context, table, metadata); + metadata->partition_key.expression_list_ast = std::make_shared<ASTExpressionList>(); metadata->sorting_key = KeyDescription::parse(table.order_by_key, metadata->getColumns(), context); if (table.primary_key.empty()) @@ -85,7 +153,6 @@ void parseTableConfig(MergeTreeTableSettings & settings, String config_json) MergeTreeTable parseMergeTreeTableString(const std::string & info) { - ReadBufferFromString in(info); assertString("MergeTree;", in); MergeTreeTable table; @@ -106,6 +173,12 @@ MergeTreeTable parseMergeTreeTableString(const std::string & info) } readString(table.low_card_key, in); assertChar('\n', in); + readString(table.minmax_index_key, in); + assertChar('\n', in); + readString(table.bf_index_key, in); + assertChar('\n', in); + readString(table.set_index_key, in); + assertChar('\n', in); readString(table.relative_path, in); assertChar('\n', in); readString(table.absolute_path, in); @@ -139,13 +212,13 @@ std::unordered_set<String> MergeTreeTable::getPartNames() const RangesInDataParts MergeTreeTable::extractRange(DataPartsVector parts_vector) const { std::unordered_map<String, DataPartPtr> name_index; - std::ranges::for_each(parts_vector, [&](const DataPartPtr & part) {name_index.emplace(part->name, part);}); + std::ranges::for_each(parts_vector, [&](const DataPartPtr & part) { name_index.emplace(part->name, part); }); RangesInDataParts ranges_in_data_parts; std::ranges::transform( parts, std::inserter(ranges_in_data_parts, ranges_in_data_parts.end()), - [&](const MergeTreePart& part) + [&](const MergeTreePart & part) { RangesInDataPart ranges_in_data_part; ranges_in_data_part.data_part = name_index.at(part.name); @@ -155,5 +228,4 @@ RangesInDataParts MergeTreeTable::extractRange(DataPartsVector parts_vector) con }); return ranges_in_data_parts; } - -} +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.h b/cpp-ch/local-engine/Common/MergeTreeTool.h index bde632f0d..d8ca9d34e 100644 --- a/cpp-ch/local-engine/Common/MergeTreeTool.h +++ b/cpp-ch/local-engine/Common/MergeTreeTool.h @@ -56,6 +56,9 @@ struct MergeTreeTable substrait::NamedStruct schema; std::string order_by_key; std::string low_card_key; + std::string minmax_index_key; + std::string bf_index_key; + std::string set_index_key; std::string primary_key = ""; std::string relative_path; std::string absolute_path; diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index 82a64b999..f1381a1f7 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -38,7 +38,6 @@ extern const int NO_SUCH_DATA_PART; extern const int LOGICAL_ERROR; extern const int UNKNOWN_FUNCTION; extern const int UNKNOWN_TYPE; - } } @@ -69,7 +68,10 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage( table.ParseFromString(extension_table.detail().value()); auto merge_tree_table = local_engine::parseMergeTreeTableString(table.value()); DB::Block header; - header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, merge_tree_table.low_card_key); + + header = TypeParser::buildBlockFromNamedStruct( + merge_tree_table.schema, + merge_tree_table.low_card_key); auto names_and_types_list = header.getNamesAndTypesList(); auto storage_factory = StorageMergeTreeFactory::instance(); auto metadata = buildMetaData(names_and_types_list, context, merge_tree_table); @@ -161,7 +163,8 @@ MergeTreeRelParser::parseReadRel( throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found."); auto read_step = query_context.custom_storage_merge_tree->reader.readFromParts( selected_parts, - /* alter_conversions = */ {}, + /* alter_conversions = */ + {}, names_and_types_list.getNames(), query_context.storage_snapshot, *query_info, @@ -176,6 +179,7 @@ MergeTreeRelParser::parseReadRel( source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions, storage_prewhere_info->prewhere_column_name); source_step_with_filter->applyFilters(); } + query_context.custom_storage_merge_tree->wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree *>(read_step.get()), ranges); steps.emplace_back(read_step.get()); query_plan->addStep(std::move(read_step)); @@ -266,7 +270,10 @@ void MergeTreeRelParser::parseToAction(ActionsDAGPtr & filter_action, const subs } void MergeTreeRelParser::analyzeExpressions( - Conditions & res, const substrait::Expression & rel, std::set<Int64> & pk_positions, Block & block) + Conditions & res, + const substrait::Expression & rel, + std::set<Int64> & pk_positions, + Block & block) { if (rel.has_scalar_function() && getCHFunctionName(rel.scalar_function()) == "and") { @@ -378,5 +385,4 @@ String MergeTreeRelParser::getCHFunctionName(const substrait::Expression_ScalarF throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unsupported substrait function on mergetree prewhere parser: {}", func_name); return it->second; } - -} +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp b/cpp-ch/local-engine/Parser/RelMetric.cpp index 2449fa4e5..eec31213a 100644 --- a/cpp-ch/local-engine/Parser/RelMetric.cpp +++ b/cpp-ch/local-engine/Parser/RelMetric.cpp @@ -120,9 +120,12 @@ void RelMetric::serialize(Writer<StringBuffer> & writer, bool) const if (auto read_mergetree = dynamic_cast<DB::ReadFromMergeTree*>(step)) { auto selected_marks_pk = read_mergetree->getAnalysisResult().selected_marks_pk; + auto selected_marks = read_mergetree->getAnalysisResult().selected_marks; auto total_marks_pk = read_mergetree->getAnalysisResult().total_marks_pk; writer.Key("selected_marks_pk"); writer.Uint64(selected_marks_pk); + writer.Key("selected_marks"); + writer.Uint64(selected_marks); writer.Key("total_marks_pk"); writer.Uint64(total_marks_pk); } diff --git a/cpp-ch/local-engine/Parser/TypeParser.cpp b/cpp-ch/local-engine/Parser/TypeParser.cpp index 3ffa0b0d9..12c23e606 100644 --- a/cpp-ch/local-engine/Parser/TypeParser.cpp +++ b/cpp-ch/local-engine/Parser/TypeParser.cpp @@ -238,7 +238,9 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st } -DB::Block TypeParser::buildBlockFromNamedStruct(const substrait::NamedStruct & struct_, const std::string & low_card_cols) +DB::Block TypeParser::buildBlockFromNamedStruct( + const substrait::NamedStruct & struct_, + const std::string & low_card_cols) { std::unordered_set<std::string> low_card_columns; Poco::StringTokenizer tokenizer(low_card_cols, ","); diff --git a/cpp-ch/local-engine/Parser/TypeParser.h b/cpp-ch/local-engine/Parser/TypeParser.h index 55420ee1a..c687c3024 100644 --- a/cpp-ch/local-engine/Parser/TypeParser.h +++ b/cpp-ch/local-engine/Parser/TypeParser.h @@ -24,36 +24,38 @@ namespace local_engine { -class TypeParser -{ -public: - TypeParser() = default; - ~TypeParser() = default; + class TypeParser + { + public: + TypeParser() = default; + ~TypeParser() = default; - static String getCHTypeName(const String & spark_type_name); + static String getCHTypeName(const String& spark_type_name); - static DB::DataTypePtr getCHTypeByName(const String & spark_type_name); + static DB::DataTypePtr getCHTypeByName(const String& spark_type_name); - /// When parsing named structure, we need the field names. - static DB::DataTypePtr parseType(const substrait::Type & substrait_type, std::list<String> * field_names); + /// When parsing named structure, we need the field names. + static DB::DataTypePtr parseType(const substrait::Type& substrait_type, std::list<String>* field_names); - inline static DB::DataTypePtr parseType(const substrait::Type & substrait_type) - { - return parseType(substrait_type, nullptr); - } + inline static DB::DataTypePtr parseType(const substrait::Type& substrait_type) + { + return parseType(substrait_type, nullptr); + } + + // low_card_cols is in format of "cola,colb". Currently does not nested column to be LowCardinality. + static DB::Block buildBlockFromNamedStruct(const substrait::NamedStruct& struct_, + const std::string& low_card_cols = ""); - // low_card_cols is in format of "cola,colb". Currently does not nested column to be LowCardinality. - static DB::Block buildBlockFromNamedStruct(const substrait::NamedStruct & struct_, const std::string& low_card_cols = ""); + /// Build block from substrait NamedStruct without DFS rules, different from buildBlockFromNamedStruct + static DB::Block buildBlockFromNamedStructWithoutDFS(const substrait::NamedStruct& struct_); - /// Build block from substrait NamedStruct without DFS rules, different from buildBlockFromNamedStruct - static DB::Block buildBlockFromNamedStructWithoutDFS(const substrait::NamedStruct & struct_); + static bool isTypeMatched(const substrait::Type& substrait_type, const DB::DataTypePtr& ch_type); + static bool isTypeMatchedWithNullability(const substrait::Type& substrait_type, const DB::DataTypePtr& ch_type); - static bool isTypeMatched(const substrait::Type & substrait_type, const DB::DataTypePtr & ch_type); - static bool isTypeMatchedWithNullability(const substrait::Type & substrait_type, const DB::DataTypePtr & ch_type); -private: - /// Mapping spark type names to CH type names. - static std::unordered_map<String, String> type_names_mapping; + private: + /// Mapping spark type names to CH type names. + static std::unordered_map<String, String> type_names_mapping; - static DB::DataTypePtr tryWrapNullable(substrait::Type_Nullability nullable, DB::DataTypePtr nested_type); -}; + static DB::DataTypePtr tryWrapNullable(substrait::Type_Nullability nullable, DB::DataTypePtr nested_type); + }; } diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java index 9dc26215b..086d475b0 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java @@ -31,6 +31,9 @@ public class ExtensionTableBuilder { String absoluteTablePath, String orderByKey, String lowCardKey, + String minmaxIndexKey, + String bfIndexKey, + String setIndexKey, String primaryKey, List<String> partList, List<Long> starts, @@ -47,6 +50,9 @@ public class ExtensionTableBuilder { absoluteTablePath, orderByKey, lowCardKey, + minmaxIndexKey, + bfIndexKey, + setIndexKey, primaryKey, partList, starts, diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java index f07e6fccb..583ca9549 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java @@ -45,6 +45,9 @@ public class ExtensionTableNode implements SplitInfo { private String primaryKey; private String lowCardKey; + private String minmaxIndexKey; + private String bfIndexKey; + private String setIndexKey; private List<String> partList; private List<Long> starts; @@ -61,6 +64,9 @@ public class ExtensionTableNode implements SplitInfo { String absolutePath, String orderByKey, String lowCardKey, + String minmaxIndexKey, + String bfIndexKey, + String setIndexKey, String primaryKey, List<String> partList, List<Long> starts, @@ -82,6 +88,9 @@ public class ExtensionTableNode implements SplitInfo { this.tableSchemaJson = tableSchemaJson; this.orderByKey = orderByKey; this.lowCardKey = lowCardKey; + this.minmaxIndexKey = minmaxIndexKey; + this.bfIndexKey = bfIndexKey; + this.setIndexKey = setIndexKey; this.primaryKey = primaryKey; this.partList = partList; this.starts = starts; @@ -117,6 +126,9 @@ public class ExtensionTableNode implements SplitInfo { extensionTableStr.append(this.primaryKey).append("\n"); } extensionTableStr.append(this.lowCardKey).append("\n"); + extensionTableStr.append(this.minmaxIndexKey).append("\n"); + extensionTableStr.append(this.bfIndexKey).append("\n"); + extensionTableStr.append(this.setIndexKey).append("\n"); extensionTableStr.append(this.relativePath).append("\n"); extensionTableStr.append(this.absolutePath).append("\n"); diff --git a/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala b/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala index 856974a0e..3dd2bfaba 100644 --- a/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala +++ b/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala @@ -43,10 +43,12 @@ trait GlutenFormatWriterInjects { tableName: String, orderByKeyOption: Option[Seq[String]], lowCardKeyOption: Option[Seq[String]], + minmaxIndexKeyOption: Option[Seq[String]], + bfIndexKeyOption: Option[Seq[String]], + setIndexKeyOption: Option[Seq[String]], primaryKeyOption: Option[Seq[String]], partitionColumns: Seq[String], tableSchema: StructType, - dataSchema: Seq[Attribute], clickhouseTableConfigs: Map[String, String], context: TaskAttemptContext, nativeConf: java.util.Map[String, String]): OutputWriter = null --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org