This is an automated email from the ASF dual-hosted git repository.
zzcclp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7669052792 [GLUTEN-12075][CH] Add a metric to indicate whether it is
Parquet Reader V3 for the CH Backend (#12076)
7669052792 is described below
commit 76690527922861ba0bff12b46a92142f66ef2746
Author: Zhichao Zhang <[email protected]>
AuthorDate: Tue May 12 18:56:12 2026 +0800
[GLUTEN-12075][CH] Add a metric to indicate whether it is Parquet Reader V3
for the CH Backend (#12076)
Add a metric to indicate whether it is Parquet Reader V3 for the CH Backend:
`Is it the CH Parquet Reader V3 (greater than 0)`
---
.../org/apache/gluten/metrics/MetricsStep.java | 12 +++++++
.../backendsapi/clickhouse/CHMetricsApi.scala | 5 ++-
.../metrics/FileSourceScanMetricsUpdater.scala | 2 ++
.../GlutenClickhouseFunctionSuite.scala | 38 +++++++++++++++++++++-
cpp-ch/local-engine/Common/BlockTypeUtils.cpp | 12 +++++++
cpp-ch/local-engine/Common/BlockTypeUtils.h | 1 +
cpp-ch/local-engine/Parser/RelMetric.cpp | 5 +++
.../Parser/RelParsers/ReadRelParser.cpp | 12 ++++++-
.../Storages/SubstraitSource/ParquetFormatFile.cpp | 12 -------
.../Storages/SubstraitSource/ParquetFormatFile.h | 2 --
10 files changed, 84 insertions(+), 17 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
index dc8424de82..b11550ffe3 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
@@ -24,6 +24,10 @@ public class MetricsStep {
protected String name;
protected String description;
+
+ @JsonProperty("parquet_reader_version")
+ protected long parquetReaderVersion;
+
protected List<MetricsProcessor> processors;
@JsonProperty("total_marks_pk")
@@ -170,4 +174,12 @@ public class MetricsStep {
public void setParquetMetadataCacheMisses(long parquetMetadataCacheMisses) {
this.parquetMetadataCacheMisses = parquetMetadataCacheMisses;
}
+
+ public long getParquetReaderVersion() {
+ return parquetReaderVersion;
+ }
+
+ public void setParquetReaderVersion(long parquetReaderVersion) {
+ this.parquetReaderVersion = parquetReaderVersion;
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 33368185f2..08d224f4f3 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -174,7 +174,10 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
"Number of times parquet metadata has been found in the cache"),
"parquetMetadataCacheMisses" -> SQLMetrics.createMetric(
sparkContext,
- "Number of times parquet metadata has not been found in the cache")
+ "Number of times parquet metadata has not been found in the cache"),
+ "isParquetReaderV3" -> SQLMetrics.createMetric(
+ sparkContext,
+ "Is it the CH Parquet Reader V3 (greater than 0)")
)
override def genFileSourceScanTransformerMetricsUpdater(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
index 7af6602013..156103adb7 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
@@ -46,6 +46,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
val missCacheMillisecond: SQLMetric = metrics("missCacheMillisecond")
val parquetMetadataCacheHits: SQLMetric = metrics("parquetMetadataCacheHits")
val parquetMetadataCacheMisses: SQLMetric =
metrics("parquetMetadataCacheMisses")
+ val isParquetReaderV3: SQLMetric = metrics("isParquetReaderV3")
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
// inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
@@ -75,6 +76,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
missCacheMillisecond += step.missCacheMillisecond
parquetMetadataCacheHits += step.parquetMetadataCacheHits
parquetMetadataCacheMisses += step.parquetMetadataCacheMisses
+ isParquetReaderV3 += step.parquetReaderVersion
})
MetricsUtil.updateExtraTimeMetric(
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
index e09c71c107..2a796e02c9 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.execution.compatibility
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.{ParquetSuite, ProjectExecTransformer}
+import org.apache.gluten.execution.{FileSourceScanExecTransformerBase,
ParquetSuite, ProjectExecTransformer}
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding,
NullPropagation}
@@ -530,4 +530,40 @@ class GlutenClickhouseFunctionSuite extends ParquetSuite {
)
}
+ test("GLUTEN-12075: Add a metric to indicate whether it is Parquet Reader
V3") {
+ withTable("test_12075") {
+ sql("create table test_12075(a map<string, int>) using parquet")
+ sql("insert into test_12075 values(map('a', 1, 'b', 2))")
+ compareResultsAgainstVanillaSpark(
+ """
+ |select cast(a as string) from test_12075
+ |""".stripMargin,
+ true,
+ {
+ df =>
+ getExecutedPlan(df).map {
+ case plan if
plan.isInstanceOf[FileSourceScanExecTransformerBase] =>
+ // Using the old Parquet Reader
+ assert(plan.metrics("isParquetReaderV3").value == 0)
+ case _ => // do nothing
+ }
+ }
+ )
+ }
+
+ runQueryAndCompare(
+ """
+ |SELECT * FROM lineitem
+ |WHERE l_orderkey in (1, 2, l_partkey, l_suppkey, l_linenumber)
+ |""".stripMargin
+ )(
+ df =>
+ getExecutedPlan(df).map {
+ case plan if plan.isInstanceOf[FileSourceScanExecTransformerBase] =>
+ // Using the Parquet Reader V3
+ assert(plan.metrics("isParquetReaderV3").value > 0)
+ case _ => // do nothing
+ })
+ }
+
}
diff --git a/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
b/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
index e669e84a1b..99a729f958 100644
--- a/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
+++ b/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
@@ -56,4 +56,16 @@ DB::DataTypePtr wrapNullableType(bool nullable,
DB::DataTypePtr nested_type)
return nested_type;
}
+bool onlyHasFlatType(const DB::Block & header)
+{
+ return std::ranges::all_of(
+ header,
+ [](DB::ColumnWithTypeAndName const & col)
+ {
+ const DB::DataTypePtr type_not_nullable = removeNullable(col.type);
+ const DB::WhichDataType which(type_not_nullable);
+ return !isArray(which) && !isMap(which) && !isTuple(which);
+ });
+}
+
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/BlockTypeUtils.h
b/cpp-ch/local-engine/Common/BlockTypeUtils.h
index cfb717d268..e1d490e03f 100644
--- a/cpp-ch/local-engine/Common/BlockTypeUtils.h
+++ b/cpp-ch/local-engine/Common/BlockTypeUtils.h
@@ -103,6 +103,7 @@ inline DB::ColumnWithTypeAndName toColumnType(const
DB::NameAndTypePair & type)
DB::Block toSampleBlock(const RowType & type);
RowType blockToRowType(const DB::Block & header);
DB::DataTypePtr wrapNullableType(bool nullable, DB::DataTypePtr nested_type);
+bool onlyHasFlatType(const DB::Block & header);
inline DB::DataTypePtr wrapNullableType(DB::DataTypePtr nested_type)
{
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index 98dd10f61c..c636175fce 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -153,6 +153,11 @@ void RelMetric::serialize(Writer<StringBuffer> & writer,
bool) const
writer.String(step->getName().c_str());
writer.Key("description");
writer.String(step->getStepDescription().data());
+ writer.Key("parquet_reader_version");
+ if (step->getStepDescription() == "ParquetReaderV3")
+ writer.String("1");
+ else
+ writer.String("0");
writer.Key("processors");
writer.StartArray();
for (const auto & processor : step->getProcessors())
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
index 455219e432..00b8062d77 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
@@ -19,6 +19,7 @@
#include <memory>
#include <Core/Block.h>
#include <Core/Settings.h>
+#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/Context.h>
#include <Operator/BlocksBufferPoolTransform.h>
@@ -27,6 +28,7 @@
#include <Parser/SubstraitParserUtils.h>
#include <Parser/TypeParser.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
+#include <Storages/Parquet/ParquetMeta.h>
#include <Storages/SourceFromJavaIter.h>
#include <Storages/SourceFromRange.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
@@ -182,6 +184,11 @@ QueryPlanStepPtr
ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
{
auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema());
+ auto format_settings = getFormatSettings(getContext());
+ // TODO: check with the Delta DV
+ bool readRowIndex = ParquetVirtualMeta::hasMetaColumns(header);
+ bool onlyFlatType = onlyHasFlatType(header);
+
substrait::ReadRel::LocalFiles local_files;
if (rel.has_local_files())
local_files = rel.local_files();
@@ -194,7 +201,10 @@ QueryPlanStepPtr
ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
auto source = std::make_shared<SubstraitFileSource>(getContext(), header,
local_files);
auto source_pipe = Pipe(source);
auto source_step = std::make_unique<SubstraitFileSourceStep>(getContext(),
std::move(source_pipe), "substrait local files");
- source_step->setStepDescription("read local files");
+ if (format_settings.parquet.use_native_reader_v3 && !readRowIndex &&
onlyFlatType)
+ source_step->setStepDescription("ParquetReaderV3");
+ else
+ source_step->setStepDescription("ParquetReader");
if (rel.has_filter())
{
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index 9c97fed9fd..3a0f5d79c1 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -283,17 +283,5 @@ std::optional<size_t> ParquetFormatFile::getTotalRows()
}
}
-bool ParquetFormatFile::onlyHasFlatType(const Block & header)
-{
- return std::ranges::all_of(
- header,
- [](ColumnWithTypeAndName const & col)
- {
- const DataTypePtr type_not_nullable = removeNullable(col.type);
- const WhichDataType which(type_not_nullable);
- return !isArray(which) && !isMap(which) && !isTuple(which);
- });
-}
-
}
#endif
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
index 587b788f79..96159ef41a 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
@@ -44,8 +44,6 @@ public:
String getFileFormat() const override { return "Parquet"; }
- static bool onlyHasFlatType(const DB::Block & header);
-
void initialize(const ColumnIndexFilterPtr &) override;
private:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]