This is an automated email from the ASF dual-hosted git repository.

zzcclp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7669052792 [GLUTEN-12075][CH] Add a metric to indicate whether it is 
Parquet Reader V3 for the CH Backend (#12076)
7669052792 is described below

commit 76690527922861ba0bff12b46a92142f66ef2746
Author: Zhichao Zhang <[email protected]>
AuthorDate: Tue May 12 18:56:12 2026 +0800

    [GLUTEN-12075][CH] Add a metric to indicate whether it is Parquet Reader V3 
for the CH Backend (#12076)
    
    Add a metric to indicate whether it is Parquet Reader V3 for the CH Backend:
    `Is it the CH Parquet Reader V3 (greater than 0)`
---
 .../org/apache/gluten/metrics/MetricsStep.java     | 12 +++++++
 .../backendsapi/clickhouse/CHMetricsApi.scala      |  5 ++-
 .../metrics/FileSourceScanMetricsUpdater.scala     |  2 ++
 .../GlutenClickhouseFunctionSuite.scala            | 38 +++++++++++++++++++++-
 cpp-ch/local-engine/Common/BlockTypeUtils.cpp      | 12 +++++++
 cpp-ch/local-engine/Common/BlockTypeUtils.h        |  1 +
 cpp-ch/local-engine/Parser/RelMetric.cpp           |  5 +++
 .../Parser/RelParsers/ReadRelParser.cpp            | 12 ++++++-
 .../Storages/SubstraitSource/ParquetFormatFile.cpp | 12 -------
 .../Storages/SubstraitSource/ParquetFormatFile.h   |  2 --
 10 files changed, 84 insertions(+), 17 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java 
b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
index dc8424de82..b11550ffe3 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
@@ -24,6 +24,10 @@ public class MetricsStep {
 
   protected String name;
   protected String description;
+
+  @JsonProperty("parquet_reader_version")
+  protected long parquetReaderVersion;
+
   protected List<MetricsProcessor> processors;
 
   @JsonProperty("total_marks_pk")
@@ -170,4 +174,12 @@ public class MetricsStep {
   public void setParquetMetadataCacheMisses(long parquetMetadataCacheMisses) {
     this.parquetMetadataCacheMisses = parquetMetadataCacheMisses;
   }
+
+  public long getParquetReaderVersion() {
+    return parquetReaderVersion;
+  }
+
+  public void setParquetReaderVersion(long parquetReaderVersion) {
+    this.parquetReaderVersion = parquetReaderVersion;
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 33368185f2..08d224f4f3 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -174,7 +174,10 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
         "Number of times parquet metadata has been found in the cache"),
       "parquetMetadataCacheMisses" -> SQLMetrics.createMetric(
         sparkContext,
-        "Number of times parquet metadata has not been found in the cache")
+        "Number of times parquet metadata has not been found in the cache"),
+      "isParquetReaderV3" -> SQLMetrics.createMetric(
+        sparkContext,
+        "Is it the CH Parquet Reader V3 (greater than 0)")
     )
 
   override def genFileSourceScanTransformerMetricsUpdater(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
index 7af6602013..156103adb7 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
@@ -46,6 +46,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
   val missCacheMillisecond: SQLMetric = metrics("missCacheMillisecond")
   val parquetMetadataCacheHits: SQLMetric = metrics("parquetMetadataCacheHits")
   val parquetMetadataCacheMisses: SQLMetric = 
metrics("parquetMetadataCacheMisses")
+  val isParquetReaderV3: SQLMetric = metrics("isParquetReaderV3")
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
     // inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
@@ -75,6 +76,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
             missCacheMillisecond += step.missCacheMillisecond
             parquetMetadataCacheHits += step.parquetMetadataCacheHits
             parquetMetadataCacheMisses += step.parquetMetadataCacheMisses
+            isParquetReaderV3 += step.parquetReaderVersion
           })
 
         MetricsUtil.updateExtraTimeMetric(
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
index e09c71c107..2a796e02c9 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.execution.compatibility
 
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.{ParquetSuite, ProjectExecTransformer}
+import org.apache.gluten.execution.{FileSourceScanExecTransformerBase, 
ParquetSuite, ProjectExecTransformer}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, 
NullPropagation}
@@ -530,4 +530,40 @@ class GlutenClickhouseFunctionSuite extends ParquetSuite {
     )
   }
 
+  test("GLUTEN-12075: Add a metric to indicate whether it is Parquet Reader 
V3") {
+    withTable("test_12075") {
+      sql("create table test_12075(a map<string, int>) using parquet")
+      sql("insert into test_12075 values(map('a', 1, 'b', 2))")
+      compareResultsAgainstVanillaSpark(
+        """
+          |select cast(a as string) from test_12075
+          |""".stripMargin,
+        true,
+        {
+          df =>
+            getExecutedPlan(df).map {
+              case plan if 
plan.isInstanceOf[FileSourceScanExecTransformerBase] =>
+                // Using the old Parquet Reader
+                assert(plan.metrics("isParquetReaderV3").value == 0)
+              case _ => // do nothing
+            }
+        }
+      )
+    }
+
+    runQueryAndCompare(
+      """
+        |SELECT * FROM lineitem
+        |WHERE l_orderkey in (1, 2, l_partkey, l_suppkey, l_linenumber)
+        |""".stripMargin
+    )(
+      df =>
+        getExecutedPlan(df).map {
+          case plan if plan.isInstanceOf[FileSourceScanExecTransformerBase] =>
+            // Using the Parquet Reader V3
+            assert(plan.metrics("isParquetReaderV3").value > 0)
+          case _ => // do nothing
+        })
+  }
+
 }
diff --git a/cpp-ch/local-engine/Common/BlockTypeUtils.cpp 
b/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
index e669e84a1b..99a729f958 100644
--- a/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
+++ b/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
@@ -56,4 +56,16 @@ DB::DataTypePtr wrapNullableType(bool nullable, 
DB::DataTypePtr nested_type)
     return nested_type;
 }
 
+bool onlyHasFlatType(const DB::Block & header)
+{
+    return std::ranges::all_of(
+        header,
+        [](DB::ColumnWithTypeAndName const & col)
+        {
+            const DB::DataTypePtr type_not_nullable = removeNullable(col.type);
+            const DB::WhichDataType which(type_not_nullable);
+            return !isArray(which) && !isMap(which) && !isTuple(which);
+        });
+}
+
 }
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/BlockTypeUtils.h 
b/cpp-ch/local-engine/Common/BlockTypeUtils.h
index cfb717d268..e1d490e03f 100644
--- a/cpp-ch/local-engine/Common/BlockTypeUtils.h
+++ b/cpp-ch/local-engine/Common/BlockTypeUtils.h
@@ -103,6 +103,7 @@ inline DB::ColumnWithTypeAndName toColumnType(const 
DB::NameAndTypePair & type)
 DB::Block toSampleBlock(const RowType & type);
 RowType blockToRowType(const DB::Block & header);
 DB::DataTypePtr wrapNullableType(bool nullable, DB::DataTypePtr nested_type);
+bool onlyHasFlatType(const DB::Block & header);
 
 inline DB::DataTypePtr wrapNullableType(DB::DataTypePtr nested_type)
 {
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp 
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index 98dd10f61c..c636175fce 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -153,6 +153,11 @@ void RelMetric::serialize(Writer<StringBuffer> & writer, 
bool) const
             writer.String(step->getName().c_str());
             writer.Key("description");
             writer.String(step->getStepDescription().data());
+            writer.Key("parquet_reader_version");
+            if (step->getStepDescription() == "ParquetReaderV3")
+                writer.String("1");
+            else
+                writer.String("0");
             writer.Key("processors");
             writer.StartArray();
             for (const auto & processor : step->getProcessors())
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
index 455219e432..00b8062d77 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
@@ -19,6 +19,7 @@
 #include <memory>
 #include <Core/Block.h>
 #include <Core/Settings.h>
+#include <Formats/FormatFactory.h>
 #include <IO/ReadBufferFromString.h>
 #include <Interpreters/Context.h>
 #include <Operator/BlocksBufferPoolTransform.h>
@@ -27,6 +28,7 @@
 #include <Parser/SubstraitParserUtils.h>
 #include <Parser/TypeParser.h>
 #include <Processors/QueryPlan/ReadFromPreparedSource.h>
+#include <Storages/Parquet/ParquetMeta.h>
 #include <Storages/SourceFromJavaIter.h>
 #include <Storages/SourceFromRange.h>
 #include <Storages/SubstraitSource/SubstraitFileSource.h>
@@ -182,6 +184,11 @@ QueryPlanStepPtr 
ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
 {
     auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema());
 
+    auto format_settings = getFormatSettings(getContext());
+    // TODO: check with the Delta DV
+    bool readRowIndex = ParquetVirtualMeta::hasMetaColumns(header);
+    bool onlyFlatType = onlyHasFlatType(header);
+
     substrait::ReadRel::LocalFiles local_files;
     if (rel.has_local_files())
         local_files = rel.local_files();
@@ -194,7 +201,10 @@ QueryPlanStepPtr 
ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
     auto source = std::make_shared<SubstraitFileSource>(getContext(), header, 
local_files);
     auto source_pipe = Pipe(source);
     auto source_step = std::make_unique<SubstraitFileSourceStep>(getContext(), 
std::move(source_pipe), "substrait local files");
-    source_step->setStepDescription("read local files");
+    if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && 
onlyFlatType)
+        source_step->setStepDescription("ParquetReaderV3");
+    else
+        source_step->setStepDescription("ParquetReader");
 
     if (rel.has_filter())
     {
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index 9c97fed9fd..3a0f5d79c1 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -283,17 +283,5 @@ std::optional<size_t> ParquetFormatFile::getTotalRows()
     }
 }
 
-bool ParquetFormatFile::onlyHasFlatType(const Block & header)
-{
-    return std::ranges::all_of(
-        header,
-        [](ColumnWithTypeAndName const & col)
-        {
-            const DataTypePtr type_not_nullable = removeNullable(col.type);
-            const WhichDataType which(type_not_nullable);
-            return !isArray(which) && !isMap(which) && !isTuple(which);
-        });
-}
-
 }
 #endif
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
index 587b788f79..96159ef41a 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
@@ -44,8 +44,6 @@ public:
 
     String getFileFormat() const override { return "Parquet"; }
 
-    static bool onlyHasFlatType(const DB::Block & header);
-
     void initialize(const ColumnIndexFilterPtr &) override;
 
 private:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to