This is an automated email from the ASF dual-hosted git repository.

zzcclp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 93de3c8b41 [GLUTEN-12010][CH] Pass the correct values to 
ParquetInputFormat (#12011)
93de3c8b41 is described below

commit 93de3c8b416880474fb0806debc415037960c39f
Author: Zhichao Zhang <[email protected]>
AuthorDate: Thu Apr 30 11:05:10 2026 +0800

    [GLUTEN-12010][CH] Pass the correct values to ParquetInputFormat (#12011)
    
    Updates the ClickHouse backend’s Parquet read path to use the configured 
Parquet input settings (rather than hardcoded defaults) when 
constructing/feeding the Parquet input formats.
    
    Changes:
    
    - Thread format_settings.parquet.max_block_size into the local 
ParquetInputFormat wrapper and use it for row-index-only batch generation.
    - Select min_bytes_for_seek based on whether the underlying read is remote 
vs local, and pass it into the native Parquet input formats.
    - Remove the default ClickHouse setting 
local_engine.settings.log_processors_profiles = true from backend 
initialization.
---
 .../backendsapi/clickhouse/CHListenerApi.scala     |  3 +--
 .../Storages/SubstraitSource/ParquetFormatFile.cpp | 23 ++++++++++++++--------
 2 files changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 2d21a08ca0..41772f265c 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -90,8 +90,7 @@ class CHListenerApi extends ListenerApi with Logging {
     // Add configs
     import org.apache.gluten.backendsapi.clickhouse.CHConfig._
     conf.setCHConfig(
-      "timezone" -> conf.get("spark.sql.session.timeZone", 
TimeZone.getDefault.getID),
-      "local_engine.settings.log_processors_profiles" -> "true")
+      "timezone" -> conf.get("spark.sql.session.timeZone", 
TimeZone.getDefault.getID))
     conf.setCHSettings("spark_version", SPARK_VERSION)
     if (!conf.contains(RuntimeSettings.ENABLE_MEMORY_SPILL_SCHEDULER.key)) {
       // Enable adaptive memory spill scheduler for native by default
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index 55a53eeda2..48b67c8337 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -82,6 +82,7 @@ class ParquetInputFormat : public FormatFile::InputFormat
     const Block outputHeader;
     std::unique_ptr<ColumnIndexRowRangesProvider> rowRangesProvider;
     std::optional<VirtualColumnRowIndexReader> row_index_reader;
+    const UInt64 max_block_size;
 
 public:
     ParquetInputFormat(
@@ -89,7 +90,8 @@ public:
         const InputFormatPtr & input_,
         std::unique_ptr<ColumnIndexRowRangesProvider> provider,
         const Block & readHeader_,
-        const Block & outputHeader_)
+        const Block & outputHeader_,
+        const UInt64 max_block_size_)
         : InputFormat(std::move(read_buffer_), input_)
         , readHeader(readHeader_)
         , outputHeader(outputHeader_)
@@ -98,6 +100,7 @@ public:
               outputHeader.columns() > readHeader.columns()
                   ? 
std::make_optional<VirtualColumnRowIndexReader>(*rowRangesProvider, 
getMetaColumnType(outputHeader))
                   : std::nullopt)
+        , max_block_size(max_block_size_)
     {
     }
 
@@ -107,8 +110,7 @@ public:
         {
             assert(outputHeader.columns());
             assert(row_index_reader);
-            // TODO: rebase-25.12, format_settings_.parquet.max_block_size
-            Columns cols{row_index_reader->readBatch(8192)};
+            Columns cols{row_index_reader->readBatch(max_block_size)};
             size_t rows = cols[0]->size();
             return Chunk(std::move(cols), rows);
         }
@@ -178,6 +180,11 @@ ParquetFormatFile::createInputFormat(const Block & header, 
const std::shared_ptr
         metaBuilder.build(*in, *read_header, column_index_filter_.get(), 
should_include_row_group);
     }
 
+    // remote_read_min_bytes_for_seek and 
input_format_parquet_local_file_min_bytes_for_seek in settings
+    const size_t min_bytes_for_seek = read_buffer_builder->isRemote() ? 
context->getReadSettings().remote_read_min_bytes_for_seek : 
format_settings.parquet.local_read_min_bytes_for_seek;
+    // input_format_parquet_max_block_size
+    const UInt64 max_block_size = format_settings.parquet.max_block_size;
+
     column_index_filter_.reset();
 
     if (metaBuilder.readRowGroups.empty())
@@ -188,7 +195,7 @@ ParquetFormatFile::createInputFormat(const Block & header, 
const std::shared_ptr
     auto createVectorizedFormat = [&]() -> InputFormatPtr
     {
         auto input = 
std::make_shared<VectorizedParquetBlockInputFormat>(*read_buffer_, read_header, 
*provider, format_settings);
-        return std::make_shared<ParquetInputFormat>(std::move(read_buffer_), 
input, std::move(provider), *read_header, header);
+        return std::make_shared<ParquetInputFormat>(std::move(read_buffer_), 
input, std::move(provider), *read_header, header, max_block_size);
     };
 
     auto createParquetBlockInputFormat = [&]() -> InputFormatPtr
@@ -212,8 +219,8 @@ ParquetFormatFile::createInputFormat(const Block & header, 
const std::shared_ptr
         auto parser_group = 
std::make_shared<FormatFilterInfo>(filter_actions_dag, context, nullptr, 
nullptr, nullptr);
         auto parser_shared_resources = 
std::make_shared<FormatParserSharedResources>(context->getSettingsRef(), 
/*num_streams_=*/1);
 
-        size_t min_bytes_for_seek = 
format_settings.parquet.local_read_min_bytes_for_seek;
-        // TODO: check whether support complex types
+        // TODO: rebase-25.12, support complex types when there is a nullable 
type
+        // for example: parquet type is Array, requested type is 
Nullable(Array(Nullable(String)))
         if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && 
onlyFlatType)
         {
             LOG_TRACE(
@@ -228,7 +235,7 @@ ParquetFormatFile::createInputFormat(const Block & header, 
const std::shared_ptr
             for (const auto & rg : metaBuilder.readRowGroups)
                 row_group_ids.push_back(static_cast<size_t>(rg.index));
             
input->setBucketsToRead(std::make_shared<ParquetFileBucketInfo>(row_group_ids));
-            return 
std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, 
std::move(provider), *read_header, header);
+            return 
std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, 
std::move(provider), *read_header, header, max_block_size);
         }
         else
         {
@@ -236,7 +243,7 @@ ParquetFormatFile::createInputFormat(const Block & header, 
const std::shared_ptr
                 &Poco::Logger::get("ParquetFormatFile"),
                 "Using native parquet reader");
             auto input = 
std::make_shared<ParquetBlockInputFormat>(*read_buffer_, read_header, 
format_settings, parser_shared_resources, parser_group, min_bytes_for_seek);
-            return 
std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, 
std::move(provider), *read_header, header);
+            return 
std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, 
std::move(provider), *read_header, header, max_block_size);
         }
     };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to