This is an automated email from the ASF dual-hosted git repository.
zzcclp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 93de3c8b41 [GLUTEN-12010][CH] Pass the correct values to
ParquetInputFormat (#12011)
93de3c8b41 is described below
commit 93de3c8b416880474fb0806debc415037960c39f
Author: Zhichao Zhang <[email protected]>
AuthorDate: Thu Apr 30 11:05:10 2026 +0800
[GLUTEN-12010][CH] Pass the correct values to ParquetInputFormat (#12011)
Updates the ClickHouse backend’s Parquet read path to use the configured
Parquet input settings (rather than hardcoded defaults) when
constructing/feeding the Parquet input formats.
Changes:
- Thread format_settings.parquet.max_block_size into the local
ParquetInputFormat wrapper and use it for row-index-only batch generation.
- Select min_bytes_for_seek based on whether the underlying read is remote
vs local, and pass it into the native Parquet input formats.
- Remove the default ClickHouse setting
local_engine.settings.log_processors_profiles = true from backend
initialization.
---
.../backendsapi/clickhouse/CHListenerApi.scala | 3 +--
.../Storages/SubstraitSource/ParquetFormatFile.cpp | 23 ++++++++++++++--------
2 files changed, 16 insertions(+), 10 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 2d21a08ca0..41772f265c 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -90,8 +90,7 @@ class CHListenerApi extends ListenerApi with Logging {
// Add configs
import org.apache.gluten.backendsapi.clickhouse.CHConfig._
conf.setCHConfig(
- "timezone" -> conf.get("spark.sql.session.timeZone",
TimeZone.getDefault.getID),
- "local_engine.settings.log_processors_profiles" -> "true")
+ "timezone" -> conf.get("spark.sql.session.timeZone",
TimeZone.getDefault.getID))
conf.setCHSettings("spark_version", SPARK_VERSION)
if (!conf.contains(RuntimeSettings.ENABLE_MEMORY_SPILL_SCHEDULER.key)) {
// Enable adaptive memory spill scheduler for native by default
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index 55a53eeda2..48b67c8337 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -82,6 +82,7 @@ class ParquetInputFormat : public FormatFile::InputFormat
const Block outputHeader;
std::unique_ptr<ColumnIndexRowRangesProvider> rowRangesProvider;
std::optional<VirtualColumnRowIndexReader> row_index_reader;
+ const UInt64 max_block_size;
public:
ParquetInputFormat(
@@ -89,7 +90,8 @@ public:
const InputFormatPtr & input_,
std::unique_ptr<ColumnIndexRowRangesProvider> provider,
const Block & readHeader_,
- const Block & outputHeader_)
+ const Block & outputHeader_,
+ const UInt64 max_block_size_)
: InputFormat(std::move(read_buffer_), input_)
, readHeader(readHeader_)
, outputHeader(outputHeader_)
@@ -98,6 +100,7 @@ public:
outputHeader.columns() > readHeader.columns()
?
std::make_optional<VirtualColumnRowIndexReader>(*rowRangesProvider,
getMetaColumnType(outputHeader))
: std::nullopt)
+ , max_block_size(max_block_size_)
{
}
@@ -107,8 +110,7 @@ public:
{
assert(outputHeader.columns());
assert(row_index_reader);
- // TODO: rebase-25.12, format_settings_.parquet.max_block_size
- Columns cols{row_index_reader->readBatch(8192)};
+ Columns cols{row_index_reader->readBatch(max_block_size)};
size_t rows = cols[0]->size();
return Chunk(std::move(cols), rows);
}
@@ -178,6 +180,11 @@ ParquetFormatFile::createInputFormat(const Block & header,
const std::shared_ptr
metaBuilder.build(*in, *read_header, column_index_filter_.get(),
should_include_row_group);
}
+ // remote_read_min_bytes_for_seek and
input_format_parquet_local_file_min_bytes_for_seek in settings
+ const size_t min_bytes_for_seek = read_buffer_builder->isRemote() ?
context->getReadSettings().remote_read_min_bytes_for_seek :
format_settings.parquet.local_read_min_bytes_for_seek;
+ // input_format_parquet_max_block_size
+ const UInt64 max_block_size = format_settings.parquet.max_block_size;
+
column_index_filter_.reset();
if (metaBuilder.readRowGroups.empty())
@@ -188,7 +195,7 @@ ParquetFormatFile::createInputFormat(const Block & header,
const std::shared_ptr
auto createVectorizedFormat = [&]() -> InputFormatPtr
{
auto input =
std::make_shared<VectorizedParquetBlockInputFormat>(*read_buffer_, read_header,
*provider, format_settings);
- return std::make_shared<ParquetInputFormat>(std::move(read_buffer_),
input, std::move(provider), *read_header, header);
+ return std::make_shared<ParquetInputFormat>(std::move(read_buffer_),
input, std::move(provider), *read_header, header, max_block_size);
};
auto createParquetBlockInputFormat = [&]() -> InputFormatPtr
@@ -212,8 +219,8 @@ ParquetFormatFile::createInputFormat(const Block & header,
const std::shared_ptr
auto parser_group =
std::make_shared<FormatFilterInfo>(filter_actions_dag, context, nullptr,
nullptr, nullptr);
auto parser_shared_resources =
std::make_shared<FormatParserSharedResources>(context->getSettingsRef(),
/*num_streams_=*/1);
- size_t min_bytes_for_seek =
format_settings.parquet.local_read_min_bytes_for_seek;
- // TODO: check whether support complex types
+ // TODO: rebase-25.12, support complex types when there is a nullable
type
+ // for example: parquet type is Array, requested type is
Nullable(Array(Nullable(String)))
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex &&
onlyFlatType)
{
LOG_TRACE(
@@ -228,7 +235,7 @@ ParquetFormatFile::createInputFormat(const Block & header,
const std::shared_ptr
for (const auto & rg : metaBuilder.readRowGroups)
row_group_ids.push_back(static_cast<size_t>(rg.index));
input->setBucketsToRead(std::make_shared<ParquetFileBucketInfo>(row_group_ids));
- return
std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input,
std::move(provider), *read_header, header);
+ return
std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input,
std::move(provider), *read_header, header, max_block_size);
}
else
{
@@ -236,7 +243,7 @@ ParquetFormatFile::createInputFormat(const Block & header,
const std::shared_ptr
&Poco::Logger::get("ParquetFormatFile"),
"Using native parquet reader");
auto input =
std::make_shared<ParquetBlockInputFormat>(*read_buffer_, read_header,
format_settings, parser_shared_resources, parser_group, min_bytes_for_seek);
- return
std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input,
std::move(provider), *read_header, header);
+ return
std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input,
std::move(provider), *read_header, header, max_block_size);
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]