This is an automated email from the ASF dual-hosted git repository.

mahongbin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 972597184 [Gluten-5018][CH] support minmax/bloomfilter/set skip index 
(#5019)
972597184 is described below

commit 972597184e147fcf488fd6cda4b447356d61136d
Author: Hongbin Ma <mahong...@apache.org>
AuthorDate: Wed Mar 27 09:33:40 2024 +0800

    [Gluten-5018][CH] support minmax/bloomfilter/set skip index (#5019)
    
    * temp, by defualt all cols minmax index
    
    basically works, dealing with nullable
    
    nullable/not-null ok
    
    remove unneceesary change
    
    fix compile
    
    * add ut
    
    * remove dataschema
    
    * fix spark32 bug
---
 .../source/DeltaMergeTreeFileFormat.scala          |  17 +-
 .../source/DeltaMergeTreeFileFormat.scala          |  17 +-
 .../java/io/glutenproject/metrics/MetricsStep.java |  11 +
 .../backendsapi/clickhouse/CHIteratorApi.scala     |   3 +
 .../backendsapi/clickhouse/CHMetricsApi.scala      |   1 +
 .../execution/GlutenMergeTreePartition.scala       |   3 +
 .../metrics/FileSourceScanMetricsUpdater.scala     |   2 +
 .../delta/ClickhouseOptimisticTransaction.scala    |   7 +-
 .../sql/delta/catalog/ClickHouseTableV2.scala      |  35 ++-
 .../utils/MergeTreePartsPartitionsUtil.scala       |  33 +++
 .../datasources/v1/CHMergeTreeWriterInjects.scala  |  29 ++-
 .../v1/clickhouse/MergeTreeFileFormatWriter.scala  |   9 +
 ...GlutenClickHouseTPCHNotNullSkipIndexSuite.scala | 271 ++++++++++++++++++++
 ...lutenClickHouseTPCHNullableSkipIndexSuite.scala | 277 +++++++++++++++++++++
 .../apache/spark/affinity/MixedAffinitySuite.scala |   3 +
 cpp-ch/local-engine/Common/MergeTreeTool.cpp       |  84 ++++++-
 cpp-ch/local-engine/Common/MergeTreeTool.h         |   3 +
 cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp  |  18 +-
 cpp-ch/local-engine/Parser/RelMetric.cpp           |   3 +
 cpp-ch/local-engine/Parser/TypeParser.cpp          |   4 +-
 cpp-ch/local-engine/Parser/TypeParser.h            |  50 ++--
 .../substrait/rel/ExtensionTableBuilder.java       |   6 +
 .../substrait/rel/ExtensionTableNode.java          |  12 +
 .../datasource/GlutenFormatWriterInjects.scala     |   4 +-
 24 files changed, 843 insertions(+), 59 deletions(-)

diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index fef109d35..d4ca321a9 100644
--- 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.execution.datasources.v2.clickhouse.source
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.delta.DeltaParquetFileFormat
 import org.apache.spark.sql.delta.actions.Metadata
 import org.apache.spark.sql.execution.datasources.{OutputWriter, 
OutputWriterFactory}
@@ -31,9 +30,11 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
 
   protected var database = ""
   protected var tableName = ""
-  protected var dataSchemas = Seq.empty[Attribute]
   protected var orderByKeyOption: Option[Seq[String]] = None
   protected var lowCardKeyOption: Option[Seq[String]] = None
+  protected var minmaxIndexKeyOption: Option[Seq[String]] = None
+  protected var bfIndexKeyOption: Option[Seq[String]] = None
+  protected var setIndexKeyOption: Option[Seq[String]] = None
   protected var primaryKeyOption: Option[Seq[String]] = None
   protected var partitionColumns: Seq[String] = Seq.empty[String]
   protected var clickhouseTableConfigs: Map[String, String] = Map.empty
@@ -42,18 +43,22 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
       metadata: Metadata,
       database: String,
       tableName: String,
-      schemas: Seq[Attribute],
       orderByKeyOption: Option[Seq[String]],
       lowCardKeyOption: Option[Seq[String]],
+      minmaxIndexKeyOption: Option[Seq[String]],
+      bfIndexKeyOption: Option[Seq[String]],
+      setIndexKeyOption: Option[Seq[String]],
       primaryKeyOption: Option[Seq[String]],
       clickhouseTableConfigs: Map[String, String],
       partitionColumns: Seq[String]) {
     this(metadata)
     this.database = database
     this.tableName = tableName
-    this.dataSchemas = schemas
     this.orderByKeyOption = orderByKeyOption
     this.lowCardKeyOption = lowCardKeyOption
+    this.minmaxIndexKeyOption = minmaxIndexKeyOption
+    this.bfIndexKeyOption = bfIndexKeyOption
+    this.setIndexKeyOption = setIndexKeyOption
     this.primaryKeyOption = primaryKeyOption
     this.clickhouseTableConfigs = clickhouseTableConfigs
     this.partitionColumns = partitionColumns
@@ -102,10 +107,12 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
             tableName,
             orderByKeyOption,
             lowCardKeyOption,
+            minmaxIndexKeyOption,
+            bfIndexKeyOption,
+            setIndexKeyOption,
             primaryKeyOption,
             partitionColumns,
             metadata.schema,
-            dataSchemas,
             clickhouseTableConfigs,
             context,
             nativeConf
diff --git 
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
 
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index b87420787..002e636af 100644
--- 
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++ 
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.execution.datasources.v2.clickhouse.source
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.delta.DeltaParquetFileFormat
 import org.apache.spark.sql.delta.actions.Metadata
 import org.apache.spark.sql.execution.datasources.{OutputWriter, 
OutputWriterFactory}
@@ -30,9 +29,11 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends 
DeltaParquetFileForma
 
   protected var database = ""
   protected var tableName = ""
-  protected var dataSchemas = Seq.empty[Attribute]
   protected var orderByKeyOption: Option[Seq[String]] = None
   protected var lowCardKeyOption: Option[Seq[String]] = None
+  protected var minmaxIndexKeyOption: Option[Seq[String]] = None
+  protected var bfIndexKeyOption: Option[Seq[String]] = None
+  protected var setIndexKeyOption: Option[Seq[String]] = None
   protected var primaryKeyOption: Option[Seq[String]] = None
   protected var partitionColumns: Seq[String] = Seq.empty[String]
   protected var clickhouseTableConfigs: Map[String, String] = Map.empty
@@ -41,18 +42,22 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends 
DeltaParquetFileForma
       metadata: Metadata,
       database: String,
       tableName: String,
-      schemas: Seq[Attribute],
       orderByKeyOption: Option[Seq[String]],
       lowCardKeyOption: Option[Seq[String]],
+      minmaxIndexKeyOption: Option[Seq[String]],
+      bfIndexKeyOption: Option[Seq[String]],
+      setIndexKeyOption: Option[Seq[String]],
       primaryKeyOption: Option[Seq[String]],
       clickhouseTableConfigs: Map[String, String],
       partitionColumns: Seq[String]) {
     this(metadata)
     this.database = database
     this.tableName = tableName
-    this.dataSchemas = schemas
     this.orderByKeyOption = orderByKeyOption
     this.lowCardKeyOption = lowCardKeyOption
+    this.minmaxIndexKeyOption = minmaxIndexKeyOption
+    this.bfIndexKeyOption = bfIndexKeyOption
+    this.setIndexKeyOption = setIndexKeyOption
     this.primaryKeyOption = primaryKeyOption
     this.clickhouseTableConfigs = clickhouseTableConfigs
     this.partitionColumns = partitionColumns
@@ -101,10 +106,12 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) 
extends DeltaParquetFileForma
             tableName,
             orderByKeyOption,
             lowCardKeyOption,
+            minmaxIndexKeyOption,
+            bfIndexKeyOption,
+            setIndexKeyOption,
             primaryKeyOption,
             partitionColumns,
             metadata.schema,
-            dataSchemas,
             clickhouseTableConfigs,
             context,
             nativeConf
diff --git 
a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java 
b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
index c569cd2ee..d1714d825 100644
--- 
a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
+++ 
b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
@@ -32,6 +32,9 @@ public class MetricsStep {
   @JsonProperty("selected_marks_pk")
   protected long selectedMarksPk;
 
+  @JsonProperty("selected_marks")
+  protected long selectedMarks;
+
   public String getName() {
     return name;
   }
@@ -64,6 +67,14 @@ public class MetricsStep {
     this.selectedMarksPk = selectedMarksPk;
   }
 
+  public long getSelectedMarks() {
+    return selectedMarks;
+  }
+
+  public void setSelectedMarks(long selectedMarks) {
+    this.selectedMarks = selectedMarks;
+  }
+
   public long getTotalMarksPk() {
     return totalMarksPk;
   }
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
index 6fecb2c5f..08786a00b 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
@@ -99,6 +99,9 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
             p.absoluteTablePath,
             p.orderByKey,
             p.lowCardKey,
+            p.minmaxIndexKey,
+            p.bfIndexKey,
+            p.setIndexKey,
             p.primaryKey,
             partLists,
             starts,
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
index 3012d5371..0157b370f 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
@@ -127,6 +127,7 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
       "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected 
marks primary"),
+      "selectedMarks" -> SQLMetrics.createMetric(sparkContext, "selected 
marks"),
       "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks 
primary")
     )
 
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala
index df41191f2..be17c713e 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala
@@ -39,6 +39,9 @@ case class GlutenMergeTreePartition(
     absoluteTablePath: String,
     orderByKey: String,
     lowCardKey: String,
+    minmaxIndexKey: String,
+    bfIndexKey: String,
+    setIndexKey: String,
     primaryKey: String,
     partList: Array[MergeTreePartSplit],
     tableSchemaJson: String,
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
index 8c79536bd..497e8b780 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
@@ -36,6 +36,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
   val inputWaitTime: SQLMetric = metrics("inputWaitTime")
   val outputWaitTime: SQLMetric = metrics("outputWaitTime")
   val selected_marks_pk: SQLMetric = metrics("selectedMarksPk")
+  val selected_marks: SQLMetric = metrics("selectedMarks")
   val total_marks_pk: SQLMetric = metrics("totalMarksPk")
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
@@ -56,6 +57,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
         metricsData.getSteps.forEach(
           step => {
             selected_marks_pk += step.selectedMarksPk
+            selected_marks += step.selectedMarks
             total_marks_pk += step.totalMarksPk
           })
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 9111bea7f..95119f842 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -128,9 +128,11 @@ class ClickhouseOptimisticTransaction(
             metadata,
             tableV2.dataBaseName,
             tableV2.tableName,
-            output,
             tableV2.orderByKeyOption,
             tableV2.lowCardKeyOption,
+            tableV2.minmaxIndexKeyOption,
+            tableV2.bfIndexKeyOption,
+            tableV2.setIndexKeyOption,
             tableV2.primaryKeyOption,
             tableV2.clickhouseTableConfigs,
             tableV2.partitionColumns
@@ -144,6 +146,9 @@ class ClickhouseOptimisticTransaction(
           // scalastyle:on deltahadoopconfiguration
           orderByKeyOption = tableV2.orderByKeyOption,
           lowCardKeyOption = tableV2.lowCardKeyOption,
+          minmaxIndexKeyOption = tableV2.minmaxIndexKeyOption,
+          bfIndexKeyOption = tableV2.bfIndexKeyOption,
+          setIndexKeyOption = tableV2.setIndexKeyOption,
           primaryKeyOption = tableV2.primaryKeyOption,
           partitionColumns = partitioningColumns,
           bucketSpec = tableV2.bucketOption,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index e06a01edf..92d12c05f 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -114,18 +114,34 @@ class ClickHouseTableV2(
   }
 
   lazy val lowCardKeyOption: Option[Seq[String]] = {
+    getCommaSeparatedColumns("lowCardKey")
+  }
+
+  lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
+    getCommaSeparatedColumns("minmaxIndexKey")
+  }
+
+  lazy val bfIndexKeyOption: Option[Seq[String]] = {
+    getCommaSeparatedColumns("bloomfilterIndexKey")
+  }
+
+  lazy val setIndexKeyOption: Option[Seq[String]] = {
+    getCommaSeparatedColumns("setIndexKey")
+  }
+
+  private def getCommaSeparatedColumns(keyName: String) = {
     val tableProperties = properties()
-    if (tableProperties.containsKey("lowCardKey")) {
-      if (tableProperties.get("lowCardKey").nonEmpty) {
-        val lowCardKeys = 
tableProperties.get("lowCardKey").split(",").map(_.trim).toSeq
-        lowCardKeys.foreach(
+    if (tableProperties.containsKey(keyName)) {
+      if (tableProperties.get(keyName).nonEmpty) {
+        val keys = tableProperties.get(keyName).split(",").map(_.trim).toSeq
+        keys.foreach(
           s => {
             if (s.contains(".")) {
               throw new IllegalStateException(
-                s"lowCardKey $s can not contain '.' (not support nested column 
yet)")
+                s"$keyName $s can not contain '.' (not support nested column 
yet)")
             }
           })
-        Some(lowCardKeys.map(s => s.toLowerCase()))
+        Some(keys.map(s => s.toLowerCase()))
       } else {
         None
       }
@@ -259,12 +275,15 @@ class ClickHouseTableV2(
       meta,
       dataBaseName,
       tableName,
-      Seq.empty[Attribute],
       orderByKeyOption,
       lowCardKeyOption,
+      minmaxIndexKeyOption,
+      bfIndexKeyOption,
+      setIndexKeyOption,
       primaryKeyOption,
       clickhouseTableConfigs,
-      partitionColumns)
+      partitionColumns
+    )
   }
   def cacheThis(): Unit = {
     deltaLog2Table.put(deltaLog, this)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
index a7ac2ce16..7d202b4d1 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
@@ -74,6 +74,21 @@ object MergeTreePartsPartitionsUtil extends Logging {
       case None => ""
     }
 
+    val minmaxIndexKey = table.minmaxIndexKeyOption match {
+      case Some(keys) => keys.mkString(",")
+      case None => ""
+    }
+
+    val bfIndexKey = table.bfIndexKeyOption match {
+      case Some(keys) => keys.mkString(",")
+      case None => ""
+    }
+
+    val setIndexKey = table.setIndexKeyOption match {
+      case Some(keys) => keys.mkString(",")
+      case None => ""
+    }
+
     val tableSchemaJson = ConverterUtils.convertNamedStructJson(table.schema())
 
     // bucket table
@@ -92,6 +107,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
         partitions,
         orderByKey,
         lowCardKey,
+        minmaxIndexKey,
+        bfIndexKey,
+        setIndexKey,
         primaryKey,
         table.clickhouseTableConfigs,
         sparkSession
@@ -109,6 +127,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
         partitions,
         orderByKey,
         lowCardKey,
+        minmaxIndexKey,
+        bfIndexKey,
+        setIndexKey,
         primaryKey,
         table.clickhouseTableConfigs,
         sparkSession
@@ -129,6 +150,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
       partitions: ArrayBuffer[InputPartition],
       orderByKey: String,
       lowCardKey: String,
+      minmaxIndexKey: String,
+      bfIndexKey: String,
+      setIndexKey: String,
       primaryKey: String,
       clickhouseTableConfigs: Map[String, String],
       sparkSession: SparkSession): Unit = {
@@ -214,6 +238,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
           absoluteTablePath,
           orderByKey,
           lowCardKey,
+          minmaxIndexKey,
+          bfIndexKey,
+          setIndexKey,
           primaryKey,
           currentFiles.toArray,
           tableSchemaJson,
@@ -256,6 +283,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
       partitions: ArrayBuffer[InputPartition],
       orderByKey: String,
       lowCardKey: String,
+      minmaxIndexKey: String,
+      bfIndexKey: String,
+      setIndexKey: String,
       primaryKey: String,
       clickhouseTableConfigs: Map[String, String],
       sparkSession: SparkSession): Unit = {
@@ -318,6 +348,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
             absoluteTablePath,
             orderByKey,
             lowCardKey,
+            minmaxIndexKey,
+            bfIndexKey,
+            setIndexKey,
             primaryKey,
             currentFiles.toArray,
             tableSchemaJson,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
index d05945a81..6175f76cb 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
@@ -68,10 +68,12 @@ class CHMergeTreeWriterInjects extends 
GlutenFormatWriterInjectsBase {
       tableName: String,
       orderByKeyOption: Option[Seq[String]],
       lowCardKeyOption: Option[Seq[String]],
+      minmaxIndexKeyOption: Option[Seq[String]],
+      bfIndexKeyOption: Option[Seq[String]],
+      setIndexKeyOption: Option[Seq[String]],
       primaryKeyOption: Option[Seq[String]],
       partitionColumns: Seq[String],
       tableSchema: StructType,
-      dataSchema: Seq[Attribute],
       clickhouseTableConfigs: Map[String, String],
       context: TaskAttemptContext,
       nativeConf: JMap[String, String]): OutputWriter = {
@@ -83,11 +85,14 @@ class CHMergeTreeWriterInjects extends 
GlutenFormatWriterInjectsBase {
       tableName,
       orderByKeyOption,
       lowCardKeyOption,
+      minmaxIndexKeyOption,
+      bfIndexKeyOption,
+      setIndexKeyOption,
       primaryKeyOption,
       partitionColumns,
       ConverterUtils.convertNamedStructJson(tableSchema),
       clickhouseTableConfigs,
-      dataSchema
+      tableSchema.toAttributes // use table schema instead of data schema
     )
 
     val datasourceJniWrapper = new CHDatasourceJniWrapper()
@@ -119,17 +124,22 @@ class CHMergeTreeWriterInjects extends 
GlutenFormatWriterInjectsBase {
 
 object CHMergeTreeWriterInjects {
 
+  // scalastyle:off argcount
   def genMergeTreeWriteRel(
       path: String,
       database: String,
       tableName: String,
       orderByKeyOption: Option[Seq[String]],
       lowCardKeyOption: Option[Seq[String]],
+      minmaxIndexKeyOption: Option[Seq[String]],
+      bfIndexKeyOption: Option[Seq[String]],
+      setIndexKeyOption: Option[Seq[String]],
       primaryKeyOption: Option[Seq[String]],
       partitionColumns: Seq[String],
       tableSchemaJson: String,
       clickhouseTableConfigs: Map[String, String],
       output: Seq[Attribute]): PlanWithSplitInfo = {
+    // scalastyle:on argcount
     val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
     val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
     val columnTypeNodes = output.map {
@@ -150,6 +160,18 @@ object CHMergeTreeWriterInjects {
       case Some(keys) => keys.mkString(",")
       case None => ""
     }
+    val minmaxIndexKey = minmaxIndexKeyOption match {
+      case Some(keys) => keys.mkString(",")
+      case None => ""
+    }
+    val bfIndexKey = bfIndexKeyOption match {
+      case Some(keys) => keys.mkString(",")
+      case None => ""
+    }
+    val setIndexKey = setIndexKeyOption match {
+      case Some(keys) => keys.mkString(",")
+      case None => ""
+    }
 
     val substraitContext = new SubstraitContext
     val extensionTableNode = ExtensionTableBuilder.makeExtensionTable(
@@ -161,6 +183,9 @@ object CHMergeTreeWriterInjects {
       "",
       orderByKey,
       lowCardKey,
+      minmaxIndexKey,
+      bfIndexKey,
+      setIndexKey,
       primaryKey,
       new JList[String](),
       new JList[JLong](),
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
index 874a4ede3..90f01e744 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
@@ -57,6 +57,9 @@ object MergeTreeFileFormatWriter extends Logging {
       hadoopConf: Configuration,
       orderByKeyOption: Option[Seq[String]],
       lowCardKeyOption: Option[Seq[String]],
+      minmaxIndexKeyOption: Option[Seq[String]],
+      bfIndexKeyOption: Option[Seq[String]],
+      setIndexKeyOption: Option[Seq[String]],
       primaryKeyOption: Option[Seq[String]],
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
@@ -71,6 +74,9 @@ object MergeTreeFileFormatWriter extends Logging {
     hadoopConf = hadoopConf,
     orderByKeyOption = orderByKeyOption,
     lowCardKeyOption = lowCardKeyOption,
+    minmaxIndexKeyOption = minmaxIndexKeyOption,
+    bfIndexKeyOption = bfIndexKeyOption,
+    setIndexKeyOption = setIndexKeyOption,
     primaryKeyOption = primaryKeyOption,
     partitionColumns = partitionColumns,
     bucketSpec = bucketSpec,
@@ -89,6 +95,9 @@ object MergeTreeFileFormatWriter extends Logging {
       hadoopConf: Configuration,
       orderByKeyOption: Option[Seq[String]],
       lowCardKeyOption: Option[Seq[String]],
+      minmaxIndexKeyOption: Option[Seq[String]],
+      bfIndexKeyOption: Option[Seq[String]],
+      setIndexKeyOption: Option[Seq[String]],
       primaryKeyOption: Option[Seq[String]],
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala
new file mode 100644
index 000000000..73462780c
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.glutenproject.execution
+
+import org.apache.spark.SparkConf
+
+import java.io.File
+
+class GlutenClickHouseTPCHNotNullSkipIndexSuite extends 
GlutenClickHouseTPCHAbstractSuite {
+
+  override protected val tablesPath: String = basePath + "/tpch-data-ch"
+  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
+  override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", "sort")
+      .set("spark.io.compression.codec", "SNAPPY")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+//      .set("spark.ui.enabled", "true")
+//      
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.dump_pipeline", 
"true")
+//      
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", 
"debug")
+  }
+
+  test("test simple minmax index") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_minmax;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax
+                 |(
+                 | l_orderkey      bigint not null,
+                 | l_partkey       bigint not null,
+                 | l_suppkey       bigint not null,
+                 | l_linenumber    bigint not null,
+                 | l_quantity      double not null,
+                 | l_extendedprice double not null,
+                 | l_discount      double not null,
+                 | l_tax           double not null,
+                 | l_returnflag    string not null,
+                 | l_linestatus    string not null,
+                 | l_shipdate      date not null,
+                 | l_commitdate    date not null,
+                 | l_receiptdate   date not null,
+                 | l_shipinstruct  string not null,
+                 | l_shipmode      string not null,
+                 | l_comment       string not null
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$basePath/lineitem_mergetree_minmax'
+                 |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_minmax
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val df = spark
+      .sql(s"""
+              |select count(*) from lineitem_mergetree_minmax  where 
l_receiptdate = '1998-12-27'
+              |""".stripMargin)
+
+    val scanExec = collect(df.queryExecution.executedPlan) {
+      case f: FileSourceScanExecTransformer => f
+    }
+    assert(scanExec.size == 1)
+    val mergetreeScan = scanExec(0)
+    val ret = df.collect()
+    assert(ret.apply(0).get(0) == 1)
+    val marks = mergetreeScan.metrics("selectedMarks").value
+    assert(marks == 1)
+
+    val directory = new File(s"$basePath/lineitem_mergetree_minmax")
+    // find a folder whose name is like 
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+    val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+    assert(!partDir.listFiles().exists(p => p.getName.contains("null")))
+    assert(
+      partDir.listFiles().exists(p => 
p.getName.contains("skp_idx__minmax_l_receiptdate.idx2")))
+  }
+
+  test("test simple bloom filter index") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_bf;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+             CREATE TABLE IF NOT EXISTS lineitem_mergetree_bf
+                 |(
+                 | l_orderkey      bigint not null,
+                 | l_partkey       bigint not null,
+                 | l_suppkey       bigint not null,
+                 | l_linenumber    bigint not null,
+                 | l_quantity      double not null,
+                 | l_extendedprice double not null,
+                 | l_discount      double not null,
+                 | l_tax           double not null,
+                 | l_returnflag    string not null,
+                 | l_linestatus    string not null,
+                 | l_shipdate      date not null,
+                 | l_commitdate    date not null,
+                 | l_receiptdate   date not null,
+                 | l_shipinstruct  string not null,
+                 | l_shipmode      string not null,
+                 | l_comment       string not null
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$basePath/lineitem_mergetree_bf'
+                 |TBLPROPERTIES('bloomfilterIndexKey'='l_orderkey')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_bf
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val df = spark
+      .sql(s"""
+             select count(*) from lineitem_mergetree_bf  where l_orderkey = 
'600000'
+              |""".stripMargin)
+
+    val scanExec = collect(df.queryExecution.executedPlan) {
+      case f: FileSourceScanExecTransformer => f
+    }
+    assert(scanExec.size == 1)
+    val mergetreeScan = scanExec(0)
+    val ret = df.collect()
+    assert(ret.apply(0).get(0) == 2)
+    val marks = mergetreeScan.metrics("selectedMarks").value
+    assert(marks == 1)
+
+    val directory = new File(s"$basePath/lineitem_mergetree_bf")
+    // find a folder whose name is like 
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+    val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+    assert(!partDir.listFiles().exists(p => p.getName.contains("null")))
+    assert(
+      partDir.listFiles().exists(p => 
p.getName.contains("skp_idx__bloomfilter_l_orderkey.idx")))
+  }
+
+  test("test simple set index") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_set;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+               CREATE TABLE IF NOT EXISTS lineitem_mergetree_set
+                 |(
+                 | l_orderkey      bigint not null,
+                 | l_partkey       bigint not null,
+                 | l_suppkey       bigint not null,
+                 | l_linenumber    bigint not null,
+                 | l_quantity      double not null,
+                 | l_extendedprice double not null,
+                 | l_discount      double not null,
+                 | l_tax           double not null,
+                 | l_returnflag    string not null,
+                 | l_linestatus    string not null,
+                 | l_shipdate      date not null,
+                 | l_commitdate    date not null,
+                 | l_receiptdate   date not null,
+                 | l_shipinstruct  string not null,
+                 | l_shipmode      string not null,
+                 | l_comment       string not null
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$basePath/lineitem_mergetree_set'
+                 |TBLPROPERTIES('setIndexKey'='l_orderkey')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_set
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val df = spark
+      .sql(s"""
+             select count(*) from lineitem_mergetree_set  where l_orderkey = 
'600000'
+              |""".stripMargin)
+
+    val scanExec = collect(df.queryExecution.executedPlan) {
+      case f: FileSourceScanExecTransformer => f
+    }
+    assert(scanExec.size == 1)
+    val mergetreeScan = scanExec(0)
+    val ret = df.collect()
+    assert(ret.apply(0).get(0) == 2)
+    val marks = mergetreeScan.metrics("selectedMarks").value
+    assert(marks == 1)
+
+    val directory = new File(s"$basePath/lineitem_mergetree_set")
+    // find a folder whose name is like 
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+    val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+    assert(!partDir.listFiles().exists(p => p.getName.contains("null")))
+    assert(partDir.listFiles().exists(p => 
p.getName.contains("skp_idx__set_l_orderkey.idx")))
+  }
+
+  test("test not null dataset inserted into nullable schema") {
+
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_minmax2;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax2
+                 |(
+               l_orderkey      bigint ,
+                 | l_partkey       bigint ,
+                 | l_suppkey       bigint ,
+                 | l_linenumber    bigint ,
+                 | l_quantity      double ,
+                 | l_extendedprice double ,
+                 | l_discount      double ,
+                 | l_tax           double ,
+                 | l_returnflag    string ,
+                 | l_linestatus    string ,
+                 | l_shipdate      date ,
+                 | l_commitdate    date ,
+                 | l_receiptdate   date ,
+                 | l_shipinstruct  string ,
+                 | l_shipmode      string ,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$basePath/lineitem_mergetree_minmax2'
+                 |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_minmax2
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val df = spark
+      .sql(s"""
+              |select count(*) from lineitem_mergetree_minmax2  where 
l_receiptdate = '1998-12-27'
+              |""".stripMargin)
+
+    val scanExec = collect(df.queryExecution.executedPlan) {
+      case f: FileSourceScanExecTransformer => f
+    }
+    assert(scanExec.size == 1)
+    val mergetreeScan = scanExec(0)
+    val ret = df.collect()
+    assert(ret.apply(0).get(0) == 1)
+    val marks = mergetreeScan.metrics("selectedMarks").value
+    assert(marks == 1)
+
+    val directory = new File(s"$basePath/lineitem_mergetree_minmax2")
+    // find a folder whose name is like 
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+    val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+    assert(partDir.listFiles().exists(p => p.getName.contains("null")))
+    assert(
+      partDir.listFiles().exists(p => 
p.getName.contains("skp_idx__minmax_l_receiptdate.idx2")))
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala
new file mode 100644
index 000000000..27fa79018
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.glutenproject.execution
+
+import org.apache.spark.SparkConf
+
+import java.io.File
+
+class GlutenClickHouseTPCHNullableSkipIndexSuite extends 
GlutenClickHouseTPCHAbstractSuite {
+
+  override protected val createNullableTables = true
+
+  override protected val tablesPath: String = basePath + "/tpch-data-ch"
+  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
+  override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", "sort")
+      .set("spark.io.compression.codec", "SNAPPY")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+//      .set("spark.ui.enabled", "true")
+//      
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.dump_pipeline", 
"true")
+//      
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", 
"debug")
+  }
+
+  test("test simple minmax index") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_minmax;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$basePath/lineitem_mergetree_minmax'
+                 |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_minmax
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val df = spark
+      .sql(s"""
+              |select count(*) from lineitem_mergetree_minmax  where 
l_receiptdate = '1998-12-27'
+              |""".stripMargin)
+
+    val scanExec = collect(df.queryExecution.executedPlan) {
+      case f: FileSourceScanExecTransformer => f
+    }
+    assert(scanExec.size == 1)
+    val mergetreeScan = scanExec(0)
+    val ret = df.collect()
+    assert(ret.apply(0).get(0) == 1)
+    val marks = mergetreeScan.metrics("selectedMarks").value
+    assert(marks == 1)
+
+    val directory = new File(s"$basePath/lineitem_mergetree_minmax")
+    // find a folder whose name is like 
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+    val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+    assert(
+      partDir.listFiles().exists(p => 
p.getName.contains("skp_idx__minmax_l_receiptdate.idx2")))
+  }
+
+  test("test simple bloom filter index") {
+
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_bf;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bf
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$basePath/lineitem_mergetree_bf'
+                 |TBLPROPERTIES('bloomfilterIndexKey'='l_orderkey')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_bf
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val df = spark
+      .sql(s"""
+              |select count(*) from lineitem_mergetree_bf  where l_orderkey = 
'600000'
+              |""".stripMargin)
+
+    val scanExec = collect(df.queryExecution.executedPlan) {
+      case f: FileSourceScanExecTransformer => f
+    }
+    assert(scanExec.size == 1)
+    val mergetreeScan = scanExec(0)
+    val ret = df.collect()
+    assert(ret.apply(0).get(0) == 2)
+    val marks = mergetreeScan.metrics("selectedMarks").value
+    assert(marks == 1)
+
+    val directory = new File(s"$basePath/lineitem_mergetree_bf")
+    // find a folder whose name is like 
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+    val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+    assert(
+      partDir.listFiles().exists(p => 
p.getName.contains("skp_idx__bloomfilter_l_orderkey.idx")))
+
+  }
+
+  test("test simple set index") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_set;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+               CREATE TABLE IF NOT EXISTS lineitem_mergetree_set
+                 |(
+                 | l_orderkey      bigint ,
+                 | l_partkey       bigint ,
+                 | l_suppkey       bigint ,
+                 | l_linenumber    bigint ,
+                 | l_quantity      double ,
+                 | l_extendedprice double ,
+                 | l_discount      double ,
+                 | l_tax           double ,
+                 | l_returnflag    string ,
+                 | l_linestatus    string ,
+                 | l_shipdate      date ,
+                 | l_commitdate    date ,
+                 | l_receiptdate   date ,
+                 | l_shipinstruct  string ,
+                 | l_shipmode      string ,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$basePath/lineitem_mergetree_set'
+                 |TBLPROPERTIES('setIndexKey'='l_orderkey')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_set
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val df = spark
+      .sql(s"""
+              |select count(*) from lineitem_mergetree_set  where l_orderkey = 
'600000'
+              |""".stripMargin)
+
+    val scanExec = collect(df.queryExecution.executedPlan) {
+      case f: FileSourceScanExecTransformer => f
+    }
+    assert(scanExec.size == 1)
+    val mergetreeScan = scanExec(0)
+    val ret = df.collect()
+    assert(ret.apply(0).get(0) == 2)
+    val marks = mergetreeScan.metrics("selectedMarks").value
+    assert(marks == 1)
+
+    val directory = new File(s"$basePath/lineitem_mergetree_set")
+    // find a folder whose name is like 
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+    val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+    assert(partDir.listFiles().exists(p => 
p.getName.contains("skp_idx__set_l_orderkey.idx")))
+  }
+
+  test("test nullable dataset inserted into not null schema") {
+
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_minmax2;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax2
+                 |(
+                 | l_orderkey      bigint not null,
+                 | l_partkey       bigint not null,
+                 | l_suppkey       bigint not null,
+                 | l_linenumber    bigint not null,
+                 | l_quantity      double not null,
+                 | l_extendedprice double not null,
+                 | l_discount      double not null,
+                 | l_tax           double not null,
+                 | l_returnflag    string not null,
+                 | l_linestatus    string not null,
+                 | l_shipdate      date not null,
+                 | l_commitdate    date not null,
+                 | l_receiptdate   date not null,
+                 | l_shipinstruct  string not null,
+                 | l_shipmode      string not null,
+                 | l_comment       string not null
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$basePath/lineitem_mergetree_minmax2'
+                 |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_minmax2
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val df = spark
+      .sql(s"""
+              |select count(*) from lineitem_mergetree_minmax2  where 
l_receiptdate = '1998-12-27'
+              |""".stripMargin)
+
+    val scanExec = collect(df.queryExecution.executedPlan) {
+      case f: FileSourceScanExecTransformer => f
+    }
+    assert(scanExec.size == 1)
+    val mergetreeScan = scanExec(0)
+    val ret = df.collect()
+    assert(ret.apply(0).get(0) == 1)
+    val marks = mergetreeScan.metrics("selectedMarks").value
+    assert(marks == 1)
+
+    val directory = new File(s"$basePath/lineitem_mergetree_minmax2")
+    // find a folder whose name is like 
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+    val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+    assert(!partDir.listFiles().exists(p => p.getName.contains("null")))
+    assert(
+      partDir.listFiles().exists(p => 
p.getName.contains("skp_idx__minmax_l_receiptdate.idx2")))
+  }
+
+  // TODO:
+  // 1. auto check visited granule (effectiveness of index)
+  // 2. set index is implemented, but not encouraged because we by default 
does not cap set size
+  // 3. need to test minmax/bf/set index on every type (bloom filter on 
date32, e.g. is not working)
+  // 4. complex case where a column has many types of indexes / a type of 
index on many columns
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
index e6910a430..53fcf3cbc 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
@@ -59,6 +59,9 @@ class MixedAffinitySuite extends QueryTest with 
SharedSparkSession {
         "",
         "",
         "",
+        "",
+        "",
+        "",
         Array(file),
         "",
         Map.empty)
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.cpp 
b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
index c5122905e..39198b87e 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
@@ -24,12 +24,77 @@
 #include <google/protobuf/util/json_util.h>
 #include <rapidjson/rapidjson.h>
 #include <rapidjson/document.h>
+#include <Poco/StringTokenizer.h>
 
 using namespace DB;
 
 namespace local_engine
 {
-std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const 
DB::NamesAndTypesList &columns, ContextPtr context, const MergeTreeTable &table)
+
+// set skip index for each column if specified
+void setSecondaryIndex(
+    const DB::NamesAndTypesList & columns,
+    ContextPtr context,
+    const MergeTreeTable & table,
+    std::shared_ptr<DB::StorageInMemoryMetadata> metadata)
+{
+    std::unordered_set<std::string> minmax_index_cols;
+    std::unordered_set<std::string> bf_index_cols;
+    std::unordered_set<std::string> set_index_cols;
+    {
+        Poco::StringTokenizer tokenizer(table.minmax_index_key, ",");
+        for (const auto & token : tokenizer)
+            minmax_index_cols.insert(token);
+    }
+    {
+        Poco::StringTokenizer tokenizer(table.bf_index_key, ",");
+        for (const auto & token : tokenizer)
+            bf_index_cols.insert(token);
+    }
+    {
+        Poco::StringTokenizer tokenizer(table.set_index_key, ",");
+        for (const auto & token : tokenizer)
+            set_index_cols.insert(token);
+    }
+
+    std::stringstream ss;
+    bool first = true;
+    for (const auto & column : columns)
+    {
+        if (minmax_index_cols.contains(column.name))
+        {
+            if (!first)
+                ss << ", ";
+            else
+                first = false;
+            ss << "_minmax_" << column.name << " " << column.name <<  " TYPE 
minmax GRANULARITY 1";
+        }
+
+        if (bf_index_cols.contains(column.name))
+        {
+            if (!first)
+                ss << ", ";
+            else
+                first = false;
+            ss << "_bloomfilter_"  << column.name << " " << column.name << " 
TYPE bloom_filter GRANULARITY 1";
+        }
+
+        if (set_index_cols.contains(column.name))
+        {
+            if (!first)
+                ss << ", ";
+            else
+                first = false;
+            ss << "_set_" << column.name << " " << column.name << " TYPE 
set(0) GRANULARITY 1";
+        }
+    }
+    metadata->setSecondaryIndices(IndicesDescription::parse(ss.str(), 
metadata->getColumns(), context));
+}
+
+std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(
+    const DB::NamesAndTypesList & columns,
+    ContextPtr context,
+    const MergeTreeTable & table)
 {
     std::shared_ptr<DB::StorageInMemoryMetadata> metadata = 
std::make_shared<DB::StorageInMemoryMetadata>();
     ColumnsDescription columns_description;
@@ -38,6 +103,9 @@ std::shared_ptr<DB::StorageInMemoryMetadata> 
buildMetaData(const DB::NamesAndTyp
         columns_description.add(ColumnDescription(item.name, item.type));
     }
     metadata->setColumns(std::move(columns_description));
+
+    setSecondaryIndex(columns, context, table, metadata);
+
     metadata->partition_key.expression_list_ast = 
std::make_shared<ASTExpressionList>();
     metadata->sorting_key = KeyDescription::parse(table.order_by_key, 
metadata->getColumns(), context);
     if (table.primary_key.empty())
@@ -85,7 +153,6 @@ void parseTableConfig(MergeTreeTableSettings & settings, 
String config_json)
 
 MergeTreeTable parseMergeTreeTableString(const std::string & info)
 {
-
     ReadBufferFromString in(info);
     assertString("MergeTree;", in);
     MergeTreeTable table;
@@ -106,6 +173,12 @@ MergeTreeTable parseMergeTreeTableString(const std::string 
& info)
     }
     readString(table.low_card_key, in);
     assertChar('\n', in);
+    readString(table.minmax_index_key, in);
+    assertChar('\n', in);
+    readString(table.bf_index_key, in);
+    assertChar('\n', in);
+    readString(table.set_index_key, in);
+    assertChar('\n', in);
     readString(table.relative_path, in);
     assertChar('\n', in);
     readString(table.absolute_path, in);
@@ -139,13 +212,13 @@ std::unordered_set<String> MergeTreeTable::getPartNames() 
const
 RangesInDataParts MergeTreeTable::extractRange(DataPartsVector parts_vector) 
const
 {
     std::unordered_map<String, DataPartPtr> name_index;
-    std::ranges::for_each(parts_vector, [&](const DataPartPtr & part) 
{name_index.emplace(part->name, part);});
+    std::ranges::for_each(parts_vector, [&](const DataPartPtr & part) { 
name_index.emplace(part->name, part); });
     RangesInDataParts ranges_in_data_parts;
 
     std::ranges::transform(
         parts,
         std::inserter(ranges_in_data_parts, ranges_in_data_parts.end()),
-        [&](const MergeTreePart& part)
+        [&](const MergeTreePart & part)
         {
             RangesInDataPart ranges_in_data_part;
             ranges_in_data_part.data_part = name_index.at(part.name);
@@ -155,5 +228,4 @@ RangesInDataParts 
MergeTreeTable::extractRange(DataPartsVector parts_vector) con
         });
     return ranges_in_data_parts;
 }
-
-}
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.h 
b/cpp-ch/local-engine/Common/MergeTreeTool.h
index bde632f0d..d8ca9d34e 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.h
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.h
@@ -56,6 +56,9 @@ struct MergeTreeTable
     substrait::NamedStruct schema;
     std::string order_by_key;
     std::string low_card_key;
+    std::string minmax_index_key;
+    std::string bf_index_key;
+    std::string set_index_key;
     std::string primary_key = "";
     std::string relative_path;
     std::string absolute_path;
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp 
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 82a64b999..f1381a1f7 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -38,7 +38,6 @@ extern const int NO_SUCH_DATA_PART;
 extern const int LOGICAL_ERROR;
 extern const int UNKNOWN_FUNCTION;
 extern const int UNKNOWN_TYPE;
-
 }
 }
 
@@ -69,7 +68,10 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(
     table.ParseFromString(extension_table.detail().value());
     auto merge_tree_table = 
local_engine::parseMergeTreeTableString(table.value());
     DB::Block header;
-    header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, 
merge_tree_table.low_card_key);
+
+    header = TypeParser::buildBlockFromNamedStruct(
+        merge_tree_table.schema,
+        merge_tree_table.low_card_key);
     auto names_and_types_list = header.getNamesAndTypesList();
     auto storage_factory = StorageMergeTreeFactory::instance();
     auto metadata = buildMetaData(names_and_types_list, context, 
merge_tree_table);
@@ -161,7 +163,8 @@ MergeTreeRelParser::parseReadRel(
         throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found.");
     auto read_step = 
query_context.custom_storage_merge_tree->reader.readFromParts(
         selected_parts,
-        /* alter_conversions = */ {},
+        /* alter_conversions = */
+        {},
         names_and_types_list.getNames(),
         query_context.storage_snapshot,
         *query_info,
@@ -176,6 +179,7 @@ MergeTreeRelParser::parseReadRel(
         
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions, 
storage_prewhere_info->prewhere_column_name);
         source_step_with_filter->applyFilters();
     }
+
     
query_context.custom_storage_merge_tree->wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree
 *>(read_step.get()), ranges);
     steps.emplace_back(read_step.get());
     query_plan->addStep(std::move(read_step));
@@ -266,7 +270,10 @@ void MergeTreeRelParser::parseToAction(ActionsDAGPtr & 
filter_action, const subs
 }
 
 void MergeTreeRelParser::analyzeExpressions(
-    Conditions & res, const substrait::Expression & rel, std::set<Int64> & 
pk_positions, Block & block)
+    Conditions & res,
+    const substrait::Expression & rel,
+    std::set<Int64> & pk_positions,
+    Block & block)
 {
     if (rel.has_scalar_function() && getCHFunctionName(rel.scalar_function()) 
== "and")
     {
@@ -378,5 +385,4 @@ String MergeTreeRelParser::getCHFunctionName(const 
substrait::Expression_ScalarF
         throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unsupported substrait 
function on mergetree prewhere parser: {}", func_name);
     return it->second;
 }
-
-}
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp 
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index 2449fa4e5..eec31213a 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -120,9 +120,12 @@ void RelMetric::serialize(Writer<StringBuffer> & writer, 
bool) const
             if (auto read_mergetree = 
dynamic_cast<DB::ReadFromMergeTree*>(step))
             {
                 auto selected_marks_pk = 
read_mergetree->getAnalysisResult().selected_marks_pk;
+                auto selected_marks = 
read_mergetree->getAnalysisResult().selected_marks;
                 auto total_marks_pk = 
read_mergetree->getAnalysisResult().total_marks_pk;
                 writer.Key("selected_marks_pk");
                 writer.Uint64(selected_marks_pk);
+                writer.Key("selected_marks");
+                writer.Uint64(selected_marks);
                 writer.Key("total_marks_pk");
                 writer.Uint64(total_marks_pk);
             }
diff --git a/cpp-ch/local-engine/Parser/TypeParser.cpp 
b/cpp-ch/local-engine/Parser/TypeParser.cpp
index 3ffa0b0d9..12c23e606 100644
--- a/cpp-ch/local-engine/Parser/TypeParser.cpp
+++ b/cpp-ch/local-engine/Parser/TypeParser.cpp
@@ -238,7 +238,9 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type 
& substrait_type, st
 }
 
 
-DB::Block TypeParser::buildBlockFromNamedStruct(const substrait::NamedStruct & 
struct_, const std::string & low_card_cols)
+DB::Block TypeParser::buildBlockFromNamedStruct(
+    const substrait::NamedStruct & struct_,
+    const std::string & low_card_cols)
 {
     std::unordered_set<std::string> low_card_columns;
     Poco::StringTokenizer tokenizer(low_card_cols, ",");
diff --git a/cpp-ch/local-engine/Parser/TypeParser.h 
b/cpp-ch/local-engine/Parser/TypeParser.h
index 55420ee1a..c687c3024 100644
--- a/cpp-ch/local-engine/Parser/TypeParser.h
+++ b/cpp-ch/local-engine/Parser/TypeParser.h
@@ -24,36 +24,38 @@
 
 namespace local_engine
 {
-class TypeParser
-{
-public:
-    TypeParser() = default;
-    ~TypeParser() = default;
+    class TypeParser
+    {
+    public:
+        TypeParser() = default;
+        ~TypeParser() = default;
 
-    static String getCHTypeName(const String & spark_type_name);
+        static String getCHTypeName(const String& spark_type_name);
 
-    static DB::DataTypePtr getCHTypeByName(const String & spark_type_name);
+        static DB::DataTypePtr getCHTypeByName(const String& spark_type_name);
 
-    /// When parsing named structure, we need the field names.
-    static DB::DataTypePtr parseType(const substrait::Type & substrait_type, 
std::list<String> * field_names);
+        /// When parsing named structure, we need the field names.
+        static DB::DataTypePtr parseType(const substrait::Type& 
substrait_type, std::list<String>* field_names);
 
-    inline static DB::DataTypePtr parseType(const substrait::Type & 
substrait_type)
-    {
-        return parseType(substrait_type, nullptr);
-    }
+        inline static DB::DataTypePtr parseType(const substrait::Type& 
substrait_type)
+        {
+            return parseType(substrait_type, nullptr);
+        }
+
+        // low_card_cols is in format of "cola,colb". Currently does not 
nested column to be LowCardinality.
+        static DB::Block buildBlockFromNamedStruct(const 
substrait::NamedStruct& struct_,
+                                                   const std::string& 
low_card_cols = "");
 
-    // low_card_cols is in format of "cola,colb". Currently does not nested 
column to be LowCardinality.
-    static DB::Block buildBlockFromNamedStruct(const substrait::NamedStruct & 
struct_,  const std::string& low_card_cols = "");
+        /// Build block from substrait NamedStruct without DFS rules, 
different from buildBlockFromNamedStruct
+        static DB::Block buildBlockFromNamedStructWithoutDFS(const 
substrait::NamedStruct& struct_);
 
-    /// Build block from substrait NamedStruct without DFS rules, different 
from buildBlockFromNamedStruct
-    static DB::Block buildBlockFromNamedStructWithoutDFS(const 
substrait::NamedStruct & struct_);
+        static bool isTypeMatched(const substrait::Type& substrait_type, const 
DB::DataTypePtr& ch_type);
+        static bool isTypeMatchedWithNullability(const substrait::Type& 
substrait_type, const DB::DataTypePtr& ch_type);
 
-    static bool isTypeMatched(const substrait::Type & substrait_type, const 
DB::DataTypePtr & ch_type);
-    static bool isTypeMatchedWithNullability(const substrait::Type & 
substrait_type, const DB::DataTypePtr & ch_type);
-private:
-    /// Mapping spark type names to CH type names.
-    static std::unordered_map<String, String> type_names_mapping;
+    private:
+        /// Mapping spark type names to CH type names.
+        static std::unordered_map<String, String> type_names_mapping;
 
-    static DB::DataTypePtr tryWrapNullable(substrait::Type_Nullability 
nullable, DB::DataTypePtr nested_type);
-};
+        static DB::DataTypePtr tryWrapNullable(substrait::Type_Nullability 
nullable, DB::DataTypePtr nested_type);
+    };
 }
diff --git 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java
 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java
index 9dc26215b..086d475b0 100644
--- 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java
+++ 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java
@@ -31,6 +31,9 @@ public class ExtensionTableBuilder {
       String absoluteTablePath,
       String orderByKey,
       String lowCardKey,
+      String minmaxIndexKey,
+      String bfIndexKey,
+      String setIndexKey,
       String primaryKey,
       List<String> partList,
       List<Long> starts,
@@ -47,6 +50,9 @@ public class ExtensionTableBuilder {
         absoluteTablePath,
         orderByKey,
         lowCardKey,
+        minmaxIndexKey,
+        bfIndexKey,
+        setIndexKey,
         primaryKey,
         partList,
         starts,
diff --git 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
index f07e6fccb..583ca9549 100644
--- 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
+++ 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
@@ -45,6 +45,9 @@ public class ExtensionTableNode implements SplitInfo {
   private String primaryKey;
 
   private String lowCardKey;
+  private String minmaxIndexKey;
+  private String bfIndexKey;
+  private String setIndexKey;
 
   private List<String> partList;
   private List<Long> starts;
@@ -61,6 +64,9 @@ public class ExtensionTableNode implements SplitInfo {
       String absolutePath,
       String orderByKey,
       String lowCardKey,
+      String minmaxIndexKey,
+      String bfIndexKey,
+      String setIndexKey,
       String primaryKey,
       List<String> partList,
       List<Long> starts,
@@ -82,6 +88,9 @@ public class ExtensionTableNode implements SplitInfo {
     this.tableSchemaJson = tableSchemaJson;
     this.orderByKey = orderByKey;
     this.lowCardKey = lowCardKey;
+    this.minmaxIndexKey = minmaxIndexKey;
+    this.bfIndexKey = bfIndexKey;
+    this.setIndexKey = setIndexKey;
     this.primaryKey = primaryKey;
     this.partList = partList;
     this.starts = starts;
@@ -117,6 +126,9 @@ public class ExtensionTableNode implements SplitInfo {
       extensionTableStr.append(this.primaryKey).append("\n");
     }
     extensionTableStr.append(this.lowCardKey).append("\n");
+    extensionTableStr.append(this.minmaxIndexKey).append("\n");
+    extensionTableStr.append(this.bfIndexKey).append("\n");
+    extensionTableStr.append(this.setIndexKey).append("\n");
     extensionTableStr.append(this.relativePath).append("\n");
     extensionTableStr.append(this.absolutePath).append("\n");
 
diff --git 
a/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala
 
b/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala
index 856974a0e..3dd2bfaba 100644
--- 
a/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala
+++ 
b/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala
@@ -43,10 +43,12 @@ trait GlutenFormatWriterInjects {
       tableName: String,
       orderByKeyOption: Option[Seq[String]],
       lowCardKeyOption: Option[Seq[String]],
+      minmaxIndexKeyOption: Option[Seq[String]],
+      bfIndexKeyOption: Option[Seq[String]],
+      setIndexKeyOption: Option[Seq[String]],
       primaryKeyOption: Option[Seq[String]],
       partitionColumns: Seq[String],
       tableSchema: StructType,
-      dataSchema: Seq[Attribute],
       clickhouseTableConfigs: Map[String, String],
       context: TaskAttemptContext,
       nativeConf: java.util.Map[String, String]): OutputWriter = null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to