This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f2ea2fbb92 [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250714)
(#10178)
f2ea2fbb92 is described below
commit f2ea2fbb92b2722cc8f83f8348c299d7324cd28d
Author: Kyligence Git <[email protected]>
AuthorDate: Mon Jul 14 08:17:34 2025 -0500
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20250714) (#10178)
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250714)
* Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/82949
---------
Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang chen <[email protected]>
---
cpp-ch/clickhouse.version | 4 ++--
.../Storages/Parquet/VectorizedParquetRecordReader.h | 3 ---
.../Storages/SubstraitSource/ExcelTextFormatFile.cpp | 3 ++-
.../Storages/SubstraitSource/ExcelTextFormatFile.h | 3 ++-
.../Storages/SubstraitSource/FileReader.cpp | 15 ++++++---------
.../Storages/SubstraitSource/FileReader.h | 2 +-
.../Storages/SubstraitSource/FormatFile.h | 6 ++++--
.../Storages/SubstraitSource/Iceberg/IcebergReader.h | 6 ++++++
.../Storages/SubstraitSource/JSONFormatFile.cpp | 3 ++-
.../Storages/SubstraitSource/JSONFormatFile.h | 3 ++-
.../Storages/SubstraitSource/ORCFormatFile.cpp | 6 ++++--
.../Storages/SubstraitSource/ORCFormatFile.h | 3 ++-
.../Storages/SubstraitSource/ParquetFormatFile.cpp | 13 ++++---------
.../Storages/SubstraitSource/ParquetFormatFile.h | 3 ++-
.../Storages/SubstraitSource/SubstraitFileSource.cpp | 12 ++++++------
.../Storages/SubstraitSource/SubstraitFileSource.h | 13 ++++++++++---
.../SubstraitSource/SubstraitFileSourceStep.cpp | 12 +++++++-----
.../SubstraitSource/SubstraitFileSourceStep.h | 5 ++---
.../Storages/SubstraitSource/TextFormatFile.cpp | 3 ++-
.../Storages/SubstraitSource/TextFormatFile.h | 3 ++-
cpp-ch/local-engine/tests/benchmark_parquet_read.cpp | 19 +++++++++++++------
cpp-ch/local-engine/tests/benchmark_spark_row.cpp | 7 +++++--
cpp-ch/local-engine/tests/gtest_parquet_read.cpp | 5 ++++-
23 files changed, 90 insertions(+), 62 deletions(-)
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index a92ffc7639..2905a10017 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20250707
-CH_COMMIT=e5b9e10de82
+CH_BRANCH=rebase_ch/20250714
+CH_COMMIT=8d128664b55
diff --git
a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
index 512cc91bed..ce1a139014 100644
--- a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
+++ b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
@@ -214,9 +214,6 @@ class VectorizedParquetBlockInputFormat final : public
DB::IInputFormat
protected:
void onCancel() noexcept override { is_stopped = 1; }
- // TODO: create ColumnIndexFilter here, currently disable it now.
- void setKeyCondition(const std::shared_ptr<const DB::KeyCondition> &
key_condition_) override { }
-
public:
VectorizedParquetBlockInputFormat(
DB::ReadBuffer & in_,
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
index f094bf915d..dd191831c3 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
@@ -62,7 +62,8 @@ bool ExcelTextFormatFile::useThis(const DB::ContextPtr &
context)
return settingsEqual(context->getSettingsRef(), USE_EXCEL_PARSER, "true");
}
-FormatFile::InputFormatPtr ExcelTextFormatFile::createInputFormat(const
DB::Block & header)
+FormatFile::InputFormatPtr
+ExcelTextFormatFile::createInputFormat(const DB::Block & header, const
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
{
auto read_buffer = read_buffer_builder->build(file_info);
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
index 5e70d22eeb..0b92f86219 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
@@ -47,7 +47,8 @@ public:
~ExcelTextFormatFile() override = default;
- FormatFile::InputFormatPtr createInputFormat(const DB::Block & header)
override;
+ FormatFile::InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
bool supportSplit() const override { return true; }
String getFileFormat() const override { return "ExcelText"; }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
index e5d1f8d8a0..8de7f83509 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
@@ -274,17 +274,12 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
const FormatFilePtr & file,
const DB::Block & to_read_header_,
const DB::Block & output_header_,
- const std::shared_ptr<const DB::KeyCondition> & key_condition = nullptr,
+ const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag = nullptr,
const ColumnIndexFilterPtr & column_index_filter = nullptr)
{
file->initialize(column_index_filter);
auto createInputFormat = [&](const DB::Block & new_read_header_) ->
FormatFile::InputFormatPtr
- {
- auto input_format = file->createInputFormat(new_read_header_);
- if (key_condition && input_format)
- input_format->inputFormat().setKeyCondition(key_condition);
- return input_format;
- };
+ { return file->createInputFormat(new_read_header_, filter_actions_dag); };
if (file->getFileInfo().has_iceberg())
return iceberg::IcebergReader::create(file, to_read_header_,
output_header_, createInputFormat);
@@ -316,11 +311,13 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
return std::make_unique<NormalFileReader>(file, to_read_header_,
output_header_, input_format);
}
}
+
+/// TODO Remove ColumnIndexFilterPtr
std::unique_ptr<BaseReader> BaseReader::create(
const FormatFilePtr & current_file,
const DB::Block & readHeader,
const DB::Block & outputHeader,
- const std::shared_ptr<const DB::KeyCondition> & key_condition,
+ const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag,
const ColumnIndexFilterPtr & column_index_filter)
{
if (!readHeader)
@@ -335,7 +332,7 @@ std::unique_ptr<BaseReader> BaseReader::create(
}
}
- return createNormalFileReader(current_file, readHeader, outputHeader,
key_condition, column_index_filter);
+ return createNormalFileReader(current_file, readHeader, outputHeader,
filter_actions_dag, column_index_filter);
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
index bdb6772ab5..467824610e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
@@ -77,7 +77,7 @@ public:
const FormatFilePtr & current_file,
const DB::Block & readHeader,
const DB::Block & outputHeader,
- const std::shared_ptr<const DB::KeyCondition> & key_condition,
+ const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag,
const ColumnIndexFilterPtr & column_index_filter);
};
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
index 262aeff6fa..60d28f5bba 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
@@ -30,6 +30,7 @@
namespace DB
{
+class ActionsDAG;
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
@@ -115,7 +116,9 @@ public:
virtual ~FormatFile() = default;
/// Create a new input format for reading this file
- virtual InputFormatPtr createInputFormat(const DB::Block & header) = 0;
+ virtual InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr)
+ = 0;
/// Spark would split a large file into small segements and read in
different tasks
/// If this file doesn't support the split feacture, only the task with
offset 0 will generate data.
@@ -149,7 +152,6 @@ protected:
std::map<String, String> partition_values;
/// partition keys are normalized to lower cases for partition column
case-insensitive matching
std::map<String, String> normalized_partition_values;
- std::shared_ptr<const DB::KeyCondition> key_condition;
const FileMetaColumns meta_columns;
/// Currently, it is used to read an iceberg format, and initialized in
the constructor of child class
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
index f8845c1367..ddfd0954db 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
@@ -18,6 +18,12 @@
#include <Storages/SubstraitSource/FileReader.h>
+namespace DB
+{
+class ExpressionActions;
+using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
+}
+
namespace local_engine
{
class DeltaDVRoaringBitmapArray;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
index 5cbab2bbda..b5728abbf9 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
@@ -31,7 +31,8 @@ JSONFormatFile::JSONFormatFile(
{
}
-FormatFile::InputFormatPtr JSONFormatFile::createInputFormat(const DB::Block &
header)
+FormatFile::InputFormatPtr
+JSONFormatFile::createInputFormat(const DB::Block & header, const
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
{
auto read_buffer =
read_buffer_builder->buildWithCompressionWrapper(file_info);
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
index 97b3a694de..a98af115a2 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
@@ -28,7 +28,8 @@ public:
bool supportSplit() const override { return true; }
- FormatFile::InputFormatPtr createInputFormat(const DB::Block & header)
override;
+ FormatFile::InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
String getFileFormat() const override { return "JSONEachRow"; }
};
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
index f78cca5d87..9b87c6528e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
@@ -36,7 +36,8 @@ ORCFormatFile::ORCFormatFile(
{
}
-FormatFile::InputFormatPtr ORCFormatFile::createInputFormat(const DB::Block &
header)
+FormatFile::InputFormatPtr
+ORCFormatFile::createInputFormat(const DB::Block & header, const
std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag)
{
auto read_buffer = read_buffer_builder->build(file_info);
@@ -70,7 +71,8 @@ FormatFile::InputFormatPtr
ORCFormatFile::createInputFormat(const DB::Block & he
format_settings.orc.reader_time_zone_name = mapped_timezone;
}
//TODO: support prefetch
- auto input_format =
std::make_shared<DB::NativeORCBlockInputFormat>(*read_buffer, header,
format_settings, false, 0);
+ auto parser_group =
std::make_shared<DB::FormatParserGroup>(context->getSettingsRef(), 1,
filter_actions_dag, context);
+ auto input_format =
std::make_shared<DB::NativeORCBlockInputFormat>(*read_buffer, header,
format_settings, false, 0, parser_group);
return std::make_shared<InputFormat>(std::move(read_buffer), input_format);
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
index b6f6fd2321..0d8beb5d3b 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
@@ -44,7 +44,8 @@ public:
DB::ContextPtr context_, const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr
read_buffer_builder_);
~ORCFormatFile() override = default;
- FormatFile::InputFormatPtr createInputFormat(const DB::Block & header)
override;
+ FormatFile::InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
std::optional<size_t> getTotalRows() override;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index a4ccfa93b7..50e84e60f9 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -142,7 +142,8 @@ void ParquetFormatFile::initialize(const
ColumnIndexFilterPtr & filter)
file_schema = ParquetMetaBuilder::collectFileSchema(context,
*read_buffer_);
}
-FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const Block &
header)
+FormatFile::InputFormatPtr
+ParquetFormatFile::createInputFormat(const Block & header, const
std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag)
{
assert(read_buffer_);
@@ -205,14 +206,8 @@ FormatFile::InputFormatPtr
ParquetFormatFile::createInputFormat(const Block & he
// We need to disable fiter push down and read all row groups, so
that we can get correct row index.
format_settings.parquet.filter_push_down = false;
}
-
- auto input = std::make_shared<ParquetBlockInputFormat>(
- *read_buffer_,
- read_header,
- format_settings,
- settings[Setting::max_parsing_threads],
- settings[Setting::max_download_threads],
- 8192);
+ auto parser_group =
std::make_shared<FormatParserGroup>(context->getSettingsRef(), 1,
filter_actions_dag, context);
+ auto input = std::make_shared<ParquetBlockInputFormat>(*read_buffer_,
read_header, format_settings, parser_group, 8192);
return std::make_shared<ParquetInputFormat>(
std::move(read_buffer_), input, std::move(provider),
std::move(read_header), std::move(output_header));
};
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
index 2014e1d643..587b788f79 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
@@ -35,7 +35,8 @@ public:
bool use_local_format_);
~ParquetFormatFile() override = default;
- InputFormatPtr createInputFormat(const DB::Block & header) override;
+ InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
std::optional<size_t> getTotalRows() override;
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
index a5df941a48..51c041fa3c 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
@@ -54,7 +54,7 @@ static DB::Block initReadHeader(const DB::Block & block,
const FormatFiles & fil
SubstraitFileSource::SubstraitFileSource(
const DB::ContextPtr & context_, const DB::Block & outputHeader_, const
substrait::ReadRel::LocalFiles & file_infos)
- :
DB::SourceWithKeyCondition(BaseReader::buildRowCountHeader(outputHeader_),
false)
+ : DB::ISource(BaseReader::buildRowCountHeader(outputHeader_), false)
, files(initializeFiles(file_infos, context_))
, outputHeader(outputHeader_)
, readHeader(initReadHeader(outputHeader, files))
@@ -63,11 +63,11 @@ SubstraitFileSource::SubstraitFileSource(
SubstraitFileSource::~SubstraitFileSource() = default;
-void SubstraitFileSource::setKeyCondition(const std::optional<DB::ActionsDAG>
& filter_actions_dag, DB::ContextPtr context_)
+void SubstraitFileSource::setKeyCondition(const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag_, DB::ContextPtr context_)
{
- setKeyConditionImpl(filter_actions_dag, context_, readHeader);
- if (filter_actions_dag)
- column_index_filter =
std::make_shared<ColumnIndexFilter>(filter_actions_dag.value(), context_);
+ assert(filter_actions_dag_);
+ filter_actions_dag = filter_actions_dag_;
+ column_index_filter =
std::make_shared<ColumnIndexFilter>(*filter_actions_dag, context_);
}
DB::Chunk SubstraitFileSource::generate()
@@ -105,7 +105,7 @@ bool SubstraitFileSource::tryPrepareReader()
if (!current_file->supportSplit() && current_file->getStartOffset())
continue;
- file_reader = BaseReader::create(current_file, readHeader,
outputHeader, key_condition, column_index_filter);
+ file_reader = BaseReader::create(current_file, readHeader,
outputHeader, filter_actions_dag, column_index_filter);
if (file_reader)
return true;
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
index a2280da0d2..b40dd9a82d 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
@@ -17,9 +17,15 @@
#pragma once
#include <memory>
-#include <Processors/SourceWithKeyCondition.h>
+#include <Interpreters/Context_fwd.h>
+#include <Processors/ISource.h>
#include <substrait/algebra.pb.h>
+namespace DB
+{
+class ActionsDAG;
+}
+
namespace local_engine
{
class ColumnIndexFilter;
@@ -29,7 +35,7 @@ class FormatFile;
using FormatFilePtr = std::shared_ptr<FormatFile>;
using FormatFiles = std::vector<FormatFilePtr>;
-class SubstraitFileSource : public DB::SourceWithKeyCondition
+class SubstraitFileSource : public DB::ISource
{
public:
SubstraitFileSource(const DB::ContextPtr & context_, const DB::Block &
header_, const substrait::ReadRel::LocalFiles & file_infos);
@@ -37,7 +43,7 @@ public:
String getName() const override { return "SubstraitFileSource"; }
- void setKeyCondition(const std::optional<DB::ActionsDAG> &
filter_actions_dag, DB::ContextPtr context_) override;
+ void setKeyCondition(const std::shared_ptr<const DB::ActionsDAG> &
filter_actions_dag_, DB::ContextPtr context_);
protected:
DB::Chunk generate() override;
@@ -54,5 +60,6 @@ private:
std::unique_ptr<BaseReader> file_reader;
ColumnIndexFilterPtr column_index_filter;
+ std::shared_ptr<const DB::ActionsDAG> filter_actions_dag;
};
}
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
index 91e50b4275..220d181226 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-#include <Interpreters/Context_fwd.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@@ -66,9 +65,12 @@ void
SubstraitFileSourceStep::initializePipeline(DB::QueryPipelineBuilder & pipe
void SubstraitFileSourceStep::applyFilters(const DB::ActionDAGNodes
added_filter_nodes)
{
- filter_actions_dag =
DB::ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
- for (const auto & processor : pipe.getProcessors())
- if (auto * source = dynamic_cast<DB::SourceWithKeyCondition
*>(processor.get()))
- source->setKeyCondition(filter_actions_dag, context);
+ SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
+ if (filter_actions_dag)
+ {
+ for (const auto & processor : pipe.getProcessors())
+ if (auto * source = dynamic_cast<SubstraitFileSource
*>(processor.get()))
+ source->setKeyCondition(filter_actions_dag, context);
+ }
}
}
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
index 78fcb767f0..e1175507f5 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
@@ -17,11 +17,10 @@
#pragma once
+#include <Interpreters/Context_fwd.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
-#include <Storages/MergeTree/KeyCondition.h>
-#include <Interpreters/Context_fwd.h>
-#include <Core/NamesAndTypes.h>
+
namespace local_engine
{
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
index fcbe650d46..9e40209eea 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
@@ -34,7 +34,8 @@ TextFormatFile::TextFormatFile(
{
}
-FormatFile::InputFormatPtr TextFormatFile::createInputFormat(const DB::Block &
header)
+FormatFile::InputFormatPtr
+TextFormatFile::createInputFormat(const DB::Block & header, const
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
{
auto read_buffer =
read_buffer_builder->buildWithCompressionWrapper(file_info);
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
index 62e60af4a8..0e6827e653 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
@@ -32,7 +32,8 @@ public:
DB::ContextPtr context_, const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr
read_buffer_builder_);
~TextFormatFile() override = default;
- FormatFile::InputFormatPtr createInputFormat(const DB::Block & header)
override;
+ FormatFile::InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
DB::NamesAndTypesList getSchema() const
{
diff --git a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
index f9b4693aeb..35721a63f4 100644
--- a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
@@ -19,6 +19,7 @@
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeString.h>
#include <IO/ReadBufferFromFile.h>
+#include <Interpreters/JoinInfo.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
@@ -31,7 +32,6 @@
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <Storages/SubstraitSource/substrait_fwd.h>
#include <benchmark/benchmark.h>
-#include <parquet/arrow/reader.h>
#include <substrait/plan.pb.h>
#include <tests/utils/TempFilePath.h>
#include <tests/utils/gluten_test_util.h>
@@ -87,7 +87,9 @@ void BM_ColumnIndexRead_Old(benchmark::State & state)
for (auto _ : state)
{
ReadBufferFromFilePRead fileReader(file);
- auto format = std::make_shared<ParquetBlockInputFormat>(fileReader,
header, format_settings, 1, 1, 8192);
+ auto global_context = local_engine::QueryContext::globalContext();
+ auto parser_group =
std::make_shared<FormatParserGroup>(global_context->getSettingsRef(), 1,
nullptr, global_context);
+ auto format = std::make_shared<ParquetBlockInputFormat>(fileReader,
header, format_settings, parser_group, 8192);
auto pipeline = QueryPipeline(std::move(format));
auto reader = std::make_unique<PullingPipelineExecutor>(pipeline);
while (reader->pull(res))
@@ -110,7 +112,9 @@ void BM_ParquetReadDate32(benchmark::State & state)
for (auto _ : state)
{
auto in = std::make_unique<ReadBufferFromFile>(file);
- auto format = std::make_shared<ParquetBlockInputFormat>(*in, header,
format_settings, 1, 1, 8192);
+ auto global_context = local_engine::QueryContext::globalContext();
+ auto parser_group =
std::make_shared<FormatParserGroup>(global_context->getSettingsRef(), 1,
nullptr, global_context);
+ auto format = std::make_shared<ParquetBlockInputFormat>(*in, header,
format_settings, parser_group, 8192);
auto pipeline = QueryPipeline(std::move(format));
auto reader = std::make_unique<PullingPipelineExecutor>(pipeline);
while (reader->pull(res))
@@ -198,7 +202,7 @@ substrait::ReadRel::LocalFiles createLocalFiles(const
std::string & filename, co
return files;
}
-void doRead(const substrait::ReadRel::LocalFiles & files, const
std::optional<DB::ActionsDAG> & pushDown, const DB::Block & header)
+void doRead(const substrait::ReadRel::LocalFiles & files, const
std::shared_ptr<const DB::ActionsDAG> & pushDown, const DB::Block & header)
{
const auto builder = std::make_unique<DB::QueryPipelineBuilder>();
const auto source =
std::make_shared<local_engine::SubstraitFileSource>(local_engine::QueryContext::globalContext(),
header, files);
@@ -229,7 +233,9 @@ void
BM_ColumnIndexRead_Filter_ReturnAllResult(benchmark::State & state)
const std::string filter1 = "l_shipdate is not null AND l_shipdate <=
toDate32('1998-09-01')";
const substrait::ReadRel::LocalFiles files = createLocalFiles(filename,
true);
const local_engine::RowType schema =
local_engine::test::readParquetSchema(filename);
- auto pushDown = local_engine::test::parseFilter(filter1, schema);
+ auto pushDownOpt = local_engine::test::parseFilter(filter1, schema);
+ auto pushDown = pushDownOpt ? std::make_shared<const
ActionsDAG>(std::move(*pushDownOpt)) : nullptr;
+
const Block header = {local_engine::toSampleBlock(schema)};
for (auto _ : state)
@@ -246,7 +252,8 @@ void
BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state)
const std::string filter1 = "l_orderkey is not null AND l_orderkey >
300977829";
const substrait::ReadRel::LocalFiles files = createLocalFiles(filename,
true);
const local_engine::RowType schema =
local_engine::test::readParquetSchema(filename);
- auto pushDown = local_engine::test::parseFilter(filter1, schema);
+ auto pushDownOpt = local_engine::test::parseFilter(filter1, schema);
+ auto pushDown = pushDownOpt ? std::make_shared<const
ActionsDAG>(std::move(*pushDownOpt)) : nullptr;
const Block header = {local_engine::toSampleBlock(schema)};
for (auto _ : state)
diff --git a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
index b8592a0d15..789d7e9c34 100644
--- a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
@@ -19,6 +19,7 @@
#include <Core/Block.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadBufferFromFile.h>
+#include <Interpreters/Context.h>
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/SparkRowToCHColumn.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
@@ -27,7 +28,7 @@
#include <QueryPipeline/QueryPipeline.h>
#include <base/types.h>
#include <benchmark/benchmark.h>
-#include <parquet/arrow/reader.h>
+#include <Common/QueryContext.h>
using namespace DB;
using namespace local_engine;
@@ -56,7 +57,9 @@ static void readParquetFile(const Block & header, const
String & file, Block & b
{
auto in = std::make_unique<ReadBufferFromFile>(file);
FormatSettings format_settings;
- auto format = std::make_shared<ParquetBlockInputFormat>(*in, header,
format_settings, 1, 1, 8192);
+ auto global_context = QueryContext::globalContext();
+ auto parser_group =
std::make_shared<FormatParserGroup>(global_context->getSettingsRef(), 1,
nullptr, global_context);
+ auto format = std::make_shared<ParquetBlockInputFormat>(*in, header,
format_settings, std::move(parser_group), 8192);
auto pipeline = QueryPipeline(std::move(format));
auto reader = std::make_unique<PullingPipelineExecutor>(pipeline);
while (reader->pull(block))
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
index 57b6b86ae0..57b4eca6cf 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
@@ -45,6 +45,7 @@
#include <tests/utils/gluten_test_util.h>
#include <Common/BlockTypeUtils.h>
#include <Common/DebugUtils.h>
+#include <Common/QueryContext.h>
using namespace DB;
using namespace local_engine;
@@ -111,8 +112,10 @@ void readData(const String & path, const std::map<String,
Field> & fields)
ReadBufferFromFile in(full_path);
InputFormatPtr format;
+ auto parser_group
+ =
std::make_shared<FormatParserGroup>(QueryContext::globalContext()->getSettingsRef(),
1, nullptr, QueryContext::globalContext());
if constexpr (std::is_same_v<InputFormat, DB::ParquetBlockInputFormat>)
- format = std::make_shared<InputFormat>(in, header, settings, 1, 1,
8192);
+ format = std::make_shared<InputFormat>(in, header, settings,
parser_group, 8192);
else
format = std::make_shared<InputFormat>(in, header, settings);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]