This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f2ea2fbb92 [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250714) 
(#10178)
f2ea2fbb92 is described below

commit f2ea2fbb92b2722cc8f83f8348c299d7324cd28d
Author: Kyligence Git <[email protected]>
AuthorDate: Mon Jul 14 08:17:34 2025 -0500

    [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250714) (#10178)
    
    * [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250714)
    
    * Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/82949
    
    ---------
    
    Co-authored-by: kyligence-git <[email protected]>
    Co-authored-by: Chang chen <[email protected]>
---
 cpp-ch/clickhouse.version                             |  4 ++--
 .../Storages/Parquet/VectorizedParquetRecordReader.h  |  3 ---
 .../Storages/SubstraitSource/ExcelTextFormatFile.cpp  |  3 ++-
 .../Storages/SubstraitSource/ExcelTextFormatFile.h    |  3 ++-
 .../Storages/SubstraitSource/FileReader.cpp           | 15 ++++++---------
 .../Storages/SubstraitSource/FileReader.h             |  2 +-
 .../Storages/SubstraitSource/FormatFile.h             |  6 ++++--
 .../Storages/SubstraitSource/Iceberg/IcebergReader.h  |  6 ++++++
 .../Storages/SubstraitSource/JSONFormatFile.cpp       |  3 ++-
 .../Storages/SubstraitSource/JSONFormatFile.h         |  3 ++-
 .../Storages/SubstraitSource/ORCFormatFile.cpp        |  6 ++++--
 .../Storages/SubstraitSource/ORCFormatFile.h          |  3 ++-
 .../Storages/SubstraitSource/ParquetFormatFile.cpp    | 13 ++++---------
 .../Storages/SubstraitSource/ParquetFormatFile.h      |  3 ++-
 .../Storages/SubstraitSource/SubstraitFileSource.cpp  | 12 ++++++------
 .../Storages/SubstraitSource/SubstraitFileSource.h    | 13 ++++++++++---
 .../SubstraitSource/SubstraitFileSourceStep.cpp       | 12 +++++++-----
 .../SubstraitSource/SubstraitFileSourceStep.h         |  5 ++---
 .../Storages/SubstraitSource/TextFormatFile.cpp       |  3 ++-
 .../Storages/SubstraitSource/TextFormatFile.h         |  3 ++-
 cpp-ch/local-engine/tests/benchmark_parquet_read.cpp  | 19 +++++++++++++------
 cpp-ch/local-engine/tests/benchmark_spark_row.cpp     |  7 +++++--
 cpp-ch/local-engine/tests/gtest_parquet_read.cpp      |  5 ++++-
 23 files changed, 90 insertions(+), 62 deletions(-)

diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index a92ffc7639..2905a10017 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20250707
-CH_COMMIT=e5b9e10de82
+CH_BRANCH=rebase_ch/20250714
+CH_COMMIT=8d128664b55
diff --git 
a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h 
b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
index 512cc91bed..ce1a139014 100644
--- a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
+++ b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
@@ -214,9 +214,6 @@ class VectorizedParquetBlockInputFormat final : public 
DB::IInputFormat
 protected:
     void onCancel() noexcept override { is_stopped = 1; }
 
-    // TODO: create ColumnIndexFilter here, currently disable it now.
-    void setKeyCondition(const std::shared_ptr<const DB::KeyCondition> & 
key_condition_) override { }
-
 public:
     VectorizedParquetBlockInputFormat(
         DB::ReadBuffer & in_,
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
index f094bf915d..dd191831c3 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
@@ -62,7 +62,8 @@ bool ExcelTextFormatFile::useThis(const DB::ContextPtr & 
context)
     return settingsEqual(context->getSettingsRef(), USE_EXCEL_PARSER, "true");
 }
 
-FormatFile::InputFormatPtr ExcelTextFormatFile::createInputFormat(const 
DB::Block & header)
+FormatFile::InputFormatPtr
+ExcelTextFormatFile::createInputFormat(const DB::Block & header, const 
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
 {
     auto read_buffer = read_buffer_builder->build(file_info);
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
index 5e70d22eeb..0b92f86219 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
@@ -47,7 +47,8 @@ public:
 
     ~ExcelTextFormatFile() override = default;
 
-    FormatFile::InputFormatPtr createInputFormat(const DB::Block & header) 
override;
+    FormatFile::InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
 
     bool supportSplit() const override { return true; }
     String getFileFormat() const override { return "ExcelText"; }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
index e5d1f8d8a0..8de7f83509 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
@@ -274,17 +274,12 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
     const FormatFilePtr & file,
     const DB::Block & to_read_header_,
     const DB::Block & output_header_,
-    const std::shared_ptr<const DB::KeyCondition> & key_condition = nullptr,
+    const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag = nullptr,
     const ColumnIndexFilterPtr & column_index_filter = nullptr)
 {
     file->initialize(column_index_filter);
     auto createInputFormat = [&](const DB::Block & new_read_header_) -> 
FormatFile::InputFormatPtr
-    {
-        auto input_format = file->createInputFormat(new_read_header_);
-        if (key_condition && input_format)
-            input_format->inputFormat().setKeyCondition(key_condition);
-        return input_format;
-    };
+    { return file->createInputFormat(new_read_header_, filter_actions_dag); };
 
     if (file->getFileInfo().has_iceberg())
         return iceberg::IcebergReader::create(file, to_read_header_, 
output_header_, createInputFormat);
@@ -316,11 +311,13 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
     return std::make_unique<NormalFileReader>(file, to_read_header_, 
output_header_, input_format);
 }
 }
+
+/// TODO Remove ColumnIndexFilterPtr
 std::unique_ptr<BaseReader> BaseReader::create(
     const FormatFilePtr & current_file,
     const DB::Block & readHeader,
     const DB::Block & outputHeader,
-    const std::shared_ptr<const DB::KeyCondition> & key_condition,
+    const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag,
     const ColumnIndexFilterPtr & column_index_filter)
 {
     if (!readHeader)
@@ -335,7 +332,7 @@ std::unique_ptr<BaseReader> BaseReader::create(
         }
     }
 
-    return createNormalFileReader(current_file, readHeader, outputHeader, 
key_condition, column_index_filter);
+    return createNormalFileReader(current_file, readHeader, outputHeader, 
filter_actions_dag, column_index_filter);
 }
 
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
index bdb6772ab5..467824610e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
@@ -77,7 +77,7 @@ public:
         const FormatFilePtr & current_file,
         const DB::Block & readHeader,
         const DB::Block & outputHeader,
-        const std::shared_ptr<const DB::KeyCondition> & key_condition,
+        const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag,
         const ColumnIndexFilterPtr & column_index_filter);
 };
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
index 262aeff6fa..60d28f5bba 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
@@ -30,6 +30,7 @@
 
 namespace DB
 {
+class ActionsDAG;
 namespace ErrorCodes
 {
 extern const int NOT_IMPLEMENTED;
@@ -115,7 +116,9 @@ public:
     virtual ~FormatFile() = default;
 
     /// Create a new input format for reading this file
-    virtual InputFormatPtr createInputFormat(const DB::Block & header) = 0;
+    virtual InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr)
+        = 0;
 
     /// Spark would split a large file into small segements and read in 
different tasks
     /// If this file doesn't support the split feacture, only the task with 
offset 0 will generate data.
@@ -149,7 +152,6 @@ protected:
     std::map<String, String> partition_values;
     /// partition keys are normalized to lower cases for partition column 
case-insensitive matching
     std::map<String, String> normalized_partition_values;
-    std::shared_ptr<const DB::KeyCondition> key_condition;
     const FileMetaColumns meta_columns;
 
     /// Currently, it is used to read an iceberg format, and initialized in 
the constructor of child class
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
index f8845c1367..ddfd0954db 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
@@ -18,6 +18,12 @@
 
 #include <Storages/SubstraitSource/FileReader.h>
 
+namespace DB
+{
+class ExpressionActions;
+using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
+}
+
 namespace local_engine
 {
 class DeltaDVRoaringBitmapArray;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
index 5cbab2bbda..b5728abbf9 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
@@ -31,7 +31,8 @@ JSONFormatFile::JSONFormatFile(
 {
 }
 
-FormatFile::InputFormatPtr JSONFormatFile::createInputFormat(const DB::Block & 
header)
+FormatFile::InputFormatPtr
+JSONFormatFile::createInputFormat(const DB::Block & header, const 
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
 {
     auto read_buffer = 
read_buffer_builder->buildWithCompressionWrapper(file_info);
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
index 97b3a694de..a98af115a2 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
@@ -28,7 +28,8 @@ public:
 
     bool supportSplit() const override { return true; }
 
-    FormatFile::InputFormatPtr createInputFormat(const DB::Block & header) 
override;
+    FormatFile::InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
 
     String getFileFormat() const override { return "JSONEachRow"; }
 };
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
index f78cca5d87..9b87c6528e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
@@ -36,7 +36,8 @@ ORCFormatFile::ORCFormatFile(
 {
 }
 
-FormatFile::InputFormatPtr ORCFormatFile::createInputFormat(const DB::Block & 
header)
+FormatFile::InputFormatPtr
+ORCFormatFile::createInputFormat(const DB::Block & header, const 
std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag)
 {
     auto read_buffer = read_buffer_builder->build(file_info);
 
@@ -70,7 +71,8 @@ FormatFile::InputFormatPtr 
ORCFormatFile::createInputFormat(const DB::Block & he
         format_settings.orc.reader_time_zone_name = mapped_timezone;
     }
     //TODO: support prefetch
-    auto input_format = 
std::make_shared<DB::NativeORCBlockInputFormat>(*read_buffer, header, 
format_settings, false, 0);
+    auto parser_group = 
std::make_shared<DB::FormatParserGroup>(context->getSettingsRef(), 1, 
filter_actions_dag, context);
+    auto input_format = 
std::make_shared<DB::NativeORCBlockInputFormat>(*read_buffer, header, 
format_settings, false, 0, parser_group);
     return std::make_shared<InputFormat>(std::move(read_buffer), input_format);
 }
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
index b6f6fd2321..0d8beb5d3b 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
@@ -44,7 +44,8 @@ public:
         DB::ContextPtr context_, const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr 
read_buffer_builder_);
     ~ORCFormatFile() override = default;
 
-    FormatFile::InputFormatPtr createInputFormat(const DB::Block & header) 
override;
+    FormatFile::InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
 
     std::optional<size_t> getTotalRows() override;
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index a4ccfa93b7..50e84e60f9 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -142,7 +142,8 @@ void ParquetFormatFile::initialize(const 
ColumnIndexFilterPtr & filter)
         file_schema = ParquetMetaBuilder::collectFileSchema(context, 
*read_buffer_);
 }
 
-FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const Block & 
header)
+FormatFile::InputFormatPtr
+ParquetFormatFile::createInputFormat(const Block & header, const 
std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag)
 {
     assert(read_buffer_);
 
@@ -205,14 +206,8 @@ FormatFile::InputFormatPtr 
ParquetFormatFile::createInputFormat(const Block & he
             // We need to disable fiter push down and read all row groups, so 
that we can get correct row index.
             format_settings.parquet.filter_push_down = false;
         }
-
-        auto input = std::make_shared<ParquetBlockInputFormat>(
-            *read_buffer_,
-            read_header,
-            format_settings,
-            settings[Setting::max_parsing_threads],
-            settings[Setting::max_download_threads],
-            8192);
+        auto parser_group = 
std::make_shared<FormatParserGroup>(context->getSettingsRef(), 1, 
filter_actions_dag, context);
+        auto input = std::make_shared<ParquetBlockInputFormat>(*read_buffer_, 
read_header, format_settings, parser_group, 8192);
         return std::make_shared<ParquetInputFormat>(
             std::move(read_buffer_), input, std::move(provider), 
std::move(read_header), std::move(output_header));
     };
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
index 2014e1d643..587b788f79 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
@@ -35,7 +35,8 @@ public:
         bool use_local_format_);
     ~ParquetFormatFile() override = default;
 
-    InputFormatPtr createInputFormat(const DB::Block & header) override;
+    InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
 
     std::optional<size_t> getTotalRows() override;
 
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
index a5df941a48..51c041fa3c 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
@@ -54,7 +54,7 @@ static DB::Block initReadHeader(const DB::Block & block, 
const FormatFiles & fil
 
 SubstraitFileSource::SubstraitFileSource(
     const DB::ContextPtr & context_, const DB::Block & outputHeader_, const 
substrait::ReadRel::LocalFiles & file_infos)
-    : 
DB::SourceWithKeyCondition(BaseReader::buildRowCountHeader(outputHeader_), 
false)
+    : DB::ISource(BaseReader::buildRowCountHeader(outputHeader_), false)
     , files(initializeFiles(file_infos, context_))
     , outputHeader(outputHeader_)
     , readHeader(initReadHeader(outputHeader, files))
@@ -63,11 +63,11 @@ SubstraitFileSource::SubstraitFileSource(
 
 SubstraitFileSource::~SubstraitFileSource() = default;
 
-void SubstraitFileSource::setKeyCondition(const std::optional<DB::ActionsDAG> 
& filter_actions_dag, DB::ContextPtr context_)
+void SubstraitFileSource::setKeyCondition(const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag_, DB::ContextPtr context_)
 {
-    setKeyConditionImpl(filter_actions_dag, context_, readHeader);
-    if (filter_actions_dag)
-        column_index_filter = 
std::make_shared<ColumnIndexFilter>(filter_actions_dag.value(), context_);
+    assert(filter_actions_dag_);
+    filter_actions_dag = filter_actions_dag_;
+    column_index_filter = 
std::make_shared<ColumnIndexFilter>(*filter_actions_dag, context_);
 }
 
 DB::Chunk SubstraitFileSource::generate()
@@ -105,7 +105,7 @@ bool SubstraitFileSource::tryPrepareReader()
         if (!current_file->supportSplit() && current_file->getStartOffset())
             continue;
 
-        file_reader = BaseReader::create(current_file, readHeader, 
outputHeader, key_condition, column_index_filter);
+        file_reader = BaseReader::create(current_file, readHeader, 
outputHeader, filter_actions_dag, column_index_filter);
         if (file_reader)
             return true;
     }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
index a2280da0d2..b40dd9a82d 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
@@ -17,9 +17,15 @@
 #pragma once
 
 #include <memory>
-#include <Processors/SourceWithKeyCondition.h>
+#include <Interpreters/Context_fwd.h>
+#include <Processors/ISource.h>
 #include <substrait/algebra.pb.h>
 
+namespace DB
+{
+class ActionsDAG;
+}
+
 namespace local_engine
 {
 class ColumnIndexFilter;
@@ -29,7 +35,7 @@ class FormatFile;
 using FormatFilePtr = std::shared_ptr<FormatFile>;
 using FormatFiles = std::vector<FormatFilePtr>;
 
-class SubstraitFileSource : public DB::SourceWithKeyCondition
+class SubstraitFileSource : public DB::ISource
 {
 public:
     SubstraitFileSource(const DB::ContextPtr & context_, const DB::Block & 
header_, const substrait::ReadRel::LocalFiles & file_infos);
@@ -37,7 +43,7 @@ public:
 
     String getName() const override { return "SubstraitFileSource"; }
 
-    void setKeyCondition(const std::optional<DB::ActionsDAG> & 
filter_actions_dag, DB::ContextPtr context_) override;
+    void setKeyCondition(const std::shared_ptr<const DB::ActionsDAG> & 
filter_actions_dag_, DB::ContextPtr context_);
 
 protected:
     DB::Chunk generate() override;
@@ -54,5 +60,6 @@ private:
 
     std::unique_ptr<BaseReader> file_reader;
     ColumnIndexFilterPtr column_index_filter;
+    std::shared_ptr<const DB::ActionsDAG> filter_actions_dag;
 };
 }
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
index 91e50b4275..220d181226 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-#include <Interpreters/Context_fwd.h>
 #include <Processors/QueryPlan/IQueryPlanStep.h>
 #include <QueryPipeline/Pipe.h>
 #include <QueryPipeline/QueryPipelineBuilder.h>
@@ -66,9 +65,12 @@ void 
SubstraitFileSourceStep::initializePipeline(DB::QueryPipelineBuilder & pipe
 
 void SubstraitFileSourceStep::applyFilters(const DB::ActionDAGNodes 
added_filter_nodes)
 {
-    filter_actions_dag = 
DB::ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
-    for (const auto & processor : pipe.getProcessors())
-        if (auto * source = dynamic_cast<DB::SourceWithKeyCondition 
*>(processor.get()))
-            source->setKeyCondition(filter_actions_dag, context);
+    SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
+    if (filter_actions_dag)
+    {
+        for (const auto & processor : pipe.getProcessors())
+            if (auto * source = dynamic_cast<SubstraitFileSource 
*>(processor.get()))
+                source->setKeyCondition(filter_actions_dag, context);
+    }
 }
 }
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
index 78fcb767f0..e1175507f5 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
@@ -17,11 +17,10 @@
 
 #pragma once
 
+#include <Interpreters/Context_fwd.h>
 #include <Processors/QueryPlan/ReadFromPreparedSource.h>
 #include <Processors/QueryPlan/SourceStepWithFilter.h>
-#include <Storages/MergeTree/KeyCondition.h>
-#include <Interpreters/Context_fwd.h>
-#include <Core/NamesAndTypes.h>
+
 
 namespace local_engine
 {
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
index fcbe650d46..9e40209eea 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
@@ -34,7 +34,8 @@ TextFormatFile::TextFormatFile(
 {
 }
 
-FormatFile::InputFormatPtr TextFormatFile::createInputFormat(const DB::Block & 
header)
+FormatFile::InputFormatPtr
+TextFormatFile::createInputFormat(const DB::Block & header, const 
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
 {
     auto read_buffer = 
read_buffer_builder->buildWithCompressionWrapper(file_info);
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
index 62e60af4a8..0e6827e653 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
@@ -32,7 +32,8 @@ public:
         DB::ContextPtr context_, const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr 
read_buffer_builder_);
     ~TextFormatFile() override = default;
 
-    FormatFile::InputFormatPtr createInputFormat(const DB::Block & header) 
override;
+    FormatFile::InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
 
     DB::NamesAndTypesList getSchema() const
     {
diff --git a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp 
b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
index f9b4693aeb..35721a63f4 100644
--- a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
@@ -19,6 +19,7 @@
 #include <DataTypes/DataTypeDate32.h>
 #include <DataTypes/DataTypeString.h>
 #include <IO/ReadBufferFromFile.h>
+#include <Interpreters/JoinInfo.h>
 #include <Processors/Executors/PullingPipelineExecutor.h>
 #include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
 #include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
@@ -31,7 +32,6 @@
 #include <Storages/SubstraitSource/SubstraitFileSource.h>
 #include <Storages/SubstraitSource/substrait_fwd.h>
 #include <benchmark/benchmark.h>
-#include <parquet/arrow/reader.h>
 #include <substrait/plan.pb.h>
 #include <tests/utils/TempFilePath.h>
 #include <tests/utils/gluten_test_util.h>
@@ -87,7 +87,9 @@ void BM_ColumnIndexRead_Old(benchmark::State & state)
     for (auto _ : state)
     {
         ReadBufferFromFilePRead fileReader(file);
-        auto format = std::make_shared<ParquetBlockInputFormat>(fileReader, 
header, format_settings, 1, 1, 8192);
+        auto global_context = local_engine::QueryContext::globalContext();
+        auto parser_group = 
std::make_shared<FormatParserGroup>(global_context->getSettingsRef(), 1, 
nullptr, global_context);
+        auto format = std::make_shared<ParquetBlockInputFormat>(fileReader, 
header, format_settings, parser_group, 8192);
         auto pipeline = QueryPipeline(std::move(format));
         auto reader = std::make_unique<PullingPipelineExecutor>(pipeline);
         while (reader->pull(res))
@@ -110,7 +112,9 @@ void BM_ParquetReadDate32(benchmark::State & state)
     for (auto _ : state)
     {
         auto in = std::make_unique<ReadBufferFromFile>(file);
-        auto format = std::make_shared<ParquetBlockInputFormat>(*in, header, 
format_settings, 1, 1, 8192);
+        auto global_context = local_engine::QueryContext::globalContext();
+        auto parser_group = 
std::make_shared<FormatParserGroup>(global_context->getSettingsRef(), 1, 
nullptr, global_context);
+        auto format = std::make_shared<ParquetBlockInputFormat>(*in, header, 
format_settings, parser_group, 8192);
         auto pipeline = QueryPipeline(std::move(format));
         auto reader = std::make_unique<PullingPipelineExecutor>(pipeline);
         while (reader->pull(res))
@@ -198,7 +202,7 @@ substrait::ReadRel::LocalFiles createLocalFiles(const 
std::string & filename, co
     return files;
 }
 
-void doRead(const substrait::ReadRel::LocalFiles & files, const 
std::optional<DB::ActionsDAG> & pushDown, const DB::Block & header)
+void doRead(const substrait::ReadRel::LocalFiles & files, const 
std::shared_ptr<const DB::ActionsDAG> & pushDown, const DB::Block & header)
 {
     const auto builder = std::make_unique<DB::QueryPipelineBuilder>();
     const auto source = 
std::make_shared<local_engine::SubstraitFileSource>(local_engine::QueryContext::globalContext(),
 header, files);
@@ -229,7 +233,9 @@ void 
BM_ColumnIndexRead_Filter_ReturnAllResult(benchmark::State & state)
     const std::string filter1 = "l_shipdate is not null AND l_shipdate <= 
toDate32('1998-09-01')";
     const substrait::ReadRel::LocalFiles files = createLocalFiles(filename, 
true);
     const local_engine::RowType schema = 
local_engine::test::readParquetSchema(filename);
-    auto pushDown = local_engine::test::parseFilter(filter1, schema);
+    auto pushDownOpt = local_engine::test::parseFilter(filter1, schema);
+    auto pushDown = pushDownOpt ? std::make_shared<const 
ActionsDAG>(std::move(*pushDownOpt)) : nullptr;
+
     const Block header = {local_engine::toSampleBlock(schema)};
 
     for (auto _ : state)
@@ -246,7 +252,8 @@ void 
BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state)
     const std::string filter1 = "l_orderkey is not null AND l_orderkey > 
300977829";
     const substrait::ReadRel::LocalFiles files = createLocalFiles(filename, 
true);
     const local_engine::RowType schema = 
local_engine::test::readParquetSchema(filename);
-    auto pushDown = local_engine::test::parseFilter(filter1, schema);
+    auto pushDownOpt = local_engine::test::parseFilter(filter1, schema);
+    auto pushDown = pushDownOpt ? std::make_shared<const 
ActionsDAG>(std::move(*pushDownOpt)) : nullptr;
     const Block header = {local_engine::toSampleBlock(schema)};
 
     for (auto _ : state)
diff --git a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp 
b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
index b8592a0d15..789d7e9c34 100644
--- a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
@@ -19,6 +19,7 @@
 #include <Core/Block.h>
 #include <DataTypes/DataTypeFactory.h>
 #include <IO/ReadBufferFromFile.h>
+#include <Interpreters/Context.h>
 #include <Parser/CHColumnToSparkRow.h>
 #include <Parser/SparkRowToCHColumn.h>
 #include <Processors/Executors/PullingPipelineExecutor.h>
@@ -27,7 +28,7 @@
 #include <QueryPipeline/QueryPipeline.h>
 #include <base/types.h>
 #include <benchmark/benchmark.h>
-#include <parquet/arrow/reader.h>
+#include <Common/QueryContext.h>
 
 using namespace DB;
 using namespace local_engine;
@@ -56,7 +57,9 @@ static void readParquetFile(const Block & header, const 
String & file, Block & b
 {
     auto in = std::make_unique<ReadBufferFromFile>(file);
     FormatSettings format_settings;
-    auto format = std::make_shared<ParquetBlockInputFormat>(*in, header, 
format_settings, 1, 1, 8192);
+    auto global_context = QueryContext::globalContext();
+    auto parser_group = 
std::make_shared<FormatParserGroup>(global_context->getSettingsRef(), 1, 
nullptr, global_context);
+    auto format = std::make_shared<ParquetBlockInputFormat>(*in, header, 
format_settings, std::move(parser_group), 8192);
     auto pipeline = QueryPipeline(std::move(format));
     auto reader = std::make_unique<PullingPipelineExecutor>(pipeline);
     while (reader->pull(block))
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp 
b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
index 57b6b86ae0..57b4eca6cf 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
@@ -45,6 +45,7 @@
 #include <tests/utils/gluten_test_util.h>
 #include <Common/BlockTypeUtils.h>
 #include <Common/DebugUtils.h>
+#include <Common/QueryContext.h>
 
 using namespace DB;
 using namespace local_engine;
@@ -111,8 +112,10 @@ void readData(const String & path, const std::map<String, 
Field> & fields)
     ReadBufferFromFile in(full_path);
 
     InputFormatPtr format;
+    auto parser_group
+        = 
std::make_shared<FormatParserGroup>(QueryContext::globalContext()->getSettingsRef(),
 1, nullptr, QueryContext::globalContext());
     if constexpr (std::is_same_v<InputFormat, DB::ParquetBlockInputFormat>)
-        format = std::make_shared<InputFormat>(in, header, settings, 1, 1, 
8192);
+        format = std::make_shared<InputFormat>(in, header, settings, 
parser_group, 8192);
     else
         format = std::make_shared<InputFormat>(in, header, settings);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to