This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 34a88dddd9 [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241105) 
(#7809)
34a88dddd9 is described below

commit 34a88dddd98d248ee18141cba7b170ec092c82e8
Author: Kyligence Git <[email protected]>
AuthorDate: Tue Nov 5 04:56:58 2024 -0600

    [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241105) (#7809)
    
    * [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241105)
    
    * Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/71261
    
    * Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/68682
    
    ---------
    
    Co-authored-by: kyligence-git <[email protected]>
    Co-authored-by: Chang Chen <[email protected]>
---
 cpp-ch/clickhouse.version                          |  4 +--
 .../Parser/RelParsers/CrossRelParser.cpp           | 21 ++++++++++--
 .../Parser/RelParsers/JoinRelParser.cpp            | 40 ++++++++++++++++++----
 .../Storages/SubstraitSource/ReadBufferBuilder.cpp | 14 +++++---
 cpp-ch/local-engine/tests/gtest_ch_join.cpp        |  8 +++--
 5 files changed, 69 insertions(+), 18 deletions(-)

diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 445cd99068..a4dd7eb5f2 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20241101
-CH_COMMIT=7cd7bb8ece2
\ No newline at end of file
+CH_BRANCH=rebase_ch/20241105
+CH_COMMIT=500e1e35c0b
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
index 4fef282fe4..2e7f353156 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
@@ -17,6 +17,7 @@
 #include "CrossRelParser.h"
 #include <optional>
 
+#include <Core/Settings.h>
 #include <Interpreters/CollectJoinOnKeysVisitor.h>
 #include <Interpreters/GraceHashJoin.h>
 #include <Interpreters/HashJoin/HashJoin.h>
@@ -37,6 +38,10 @@
 
 namespace DB
 {
+namespace Setting
+{
+extern const SettingsUInt64 max_block_size;
+}
 namespace ErrorCodes
 {
 extern const int LOGICAL_ERROR;
@@ -194,8 +199,15 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const 
substrait::CrossRel & join, DB:
     else
     {
         JoinPtr hash_join = std::make_shared<HashJoin>(table_join, 
right->getCurrentHeader().cloneEmpty());
-        QueryPlanStepPtr join_step
-            = std::make_unique<DB::JoinStep>(left->getCurrentHeader(), 
right->getCurrentHeader(), hash_join, 8192, 1, false);
+        QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
+            left->getCurrentHeader(),
+            right->getCurrentHeader(),
+            hash_join,
+            context->getSettingsRef()[Setting::max_block_size],
+            1,
+            /* required_output_ = */ NameSet{},
+            false,
+            /* use_new_analyzer_ = */ false);
         join_step->setStepDescription("CROSS_JOIN");
         steps.emplace_back(join_step.get());
         std::vector<QueryPlanPtr> plans;
@@ -243,7 +255,10 @@ void CrossRelParser::addConvertStep(TableJoin & 
table_join, DB::QueryPlan & left
     for (const auto & col : left.getCurrentHeader().getNames())
         left_columns_set.emplace(col);
     table_join.setColumnsFromJoinedTable(
-        right.getCurrentHeader().getNamesAndTypesList(), left_columns_set, 
getUniqueName("right") + ".");
+        right.getCurrentHeader().getNamesAndTypesList(),
+        left_columns_set,
+        getUniqueName("right") + ".",
+        left.getCurrentHeader().getNamesAndTypesList());
 
     // fix right table key duplicate
     NamesWithAliases right_table_alias;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
index 0781614bf0..99ec543066 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
@@ -45,6 +45,7 @@ namespace DB
 namespace Setting
 {
 extern const SettingsJoinAlgorithm join_algorithm;
+extern const SettingsUInt64 max_block_size;
 }
 namespace ErrorCodes
 {
@@ -313,8 +314,15 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const 
substrait::JoinRel & join, DB::Q
 
         JoinPtr smj_join = std::make_shared<FullSortingMergeJoin>(table_join, 
right->getCurrentHeader().cloneEmpty(), -1);
         MultiEnum<DB::JoinAlgorithm> join_algorithm = 
context->getSettingsRef()[Setting::join_algorithm];
-        QueryPlanStepPtr join_step
-            = std::make_unique<DB::JoinStep>(left->getCurrentHeader(), 
right->getCurrentHeader(), smj_join, 8192, 1, false);
+        QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
+            left->getCurrentHeader(),
+            right->getCurrentHeader(),
+            smj_join,
+            context->getSettingsRef()[Setting::max_block_size],
+            1,
+            /* required_output_ = */ NameSet{},
+            false,
+            /* use_new_analyzer_ = */ false);
 
         join_step->setStepDescription("SORT_MERGE_JOIN");
         steps.emplace_back(join_step.get());
@@ -382,7 +390,11 @@ void JoinRelParser::addConvertStep(TableJoin & table_join, 
DB::QueryPlan & left,
     NameSet left_columns_set;
     for (const auto & col : left.getCurrentHeader().getNames())
         left_columns_set.emplace(col);
-    
table_join.setColumnsFromJoinedTable(right.getCurrentHeader().getNamesAndTypesList(),
 left_columns_set, getUniqueName("right") + ".");
+    table_join.setColumnsFromJoinedTable(
+        right.getCurrentHeader().getNamesAndTypesList(),
+        left_columns_set,
+        getUniqueName("right") + ".",
+        left.getCurrentHeader().getNamesAndTypesList());
 
     // fix right table key duplicate
     NamesWithAliases right_table_alias;
@@ -772,8 +784,15 @@ DB::QueryPlanPtr JoinRelParser::buildMultiOnClauseHashJoin(
     LOG_INFO(getLogger("JoinRelParser"), "multi join on clauses:\n{}", 
DB::TableJoin::formatClauses(table_join->getClauses()));
 
     JoinPtr hash_join = std::make_shared<HashJoin>(table_join, 
right_plan->getCurrentHeader());
-    QueryPlanStepPtr join_step
-        = std::make_unique<DB::JoinStep>(left_plan->getCurrentHeader(), 
right_plan->getCurrentHeader(), hash_join, 8192, 1, false);
+    QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
+        left_plan->getCurrentHeader(),
+        right_plan->getCurrentHeader(),
+        hash_join,
+        context->getSettingsRef()[Setting::max_block_size],
+        1,
+        /* required_output_ = */ NameSet{},
+        false,
+        /* use_new_analyzer_ = */ false);
     join_step->setStepDescription("Multi join on clause hash join");
     steps.emplace_back(join_step.get());
     std::vector<QueryPlanPtr> plans;
@@ -806,8 +825,15 @@ DB::QueryPlanPtr 
JoinRelParser::buildSingleOnClauseHashJoin(
     {
         hash_join = std::make_shared<HashJoin>(table_join, 
right_plan->getCurrentHeader().cloneEmpty());
     }
-    QueryPlanStepPtr join_step
-        = std::make_unique<DB::JoinStep>(left_plan->getCurrentHeader(), 
right_plan->getCurrentHeader(), hash_join, 8192, 1, false);
+    QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
+        left_plan->getCurrentHeader(),
+        right_plan->getCurrentHeader(),
+        hash_join,
+        context->getSettingsRef()[Setting::max_block_size],
+        1,
+        /* required_output_ = */ NameSet{},
+        false,
+        /* use_new_analyzer_ = */ false);
 
     join_step->setStepDescription("HASH_JOIN");
     steps.emplace_back(join_step.get());
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 2681ed7c1c..f5f7f95e62 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -28,10 +28,10 @@
 #include <IO/ReadBufferFromFile.h>
 #include <IO/ReadBufferFromS3.h>
 #include <IO/ReadSettings.h>
-#include <IO/SplittableBzip2ReadBuffer.h>
 #include <IO/S3/getObjectInfo.h>
 #include <IO/S3Common.h>
 #include <IO/SeekableReadBuffer.h>
+#include <IO/SplittableBzip2ReadBuffer.h>
 #include <Interpreters/Cache/FileCache.h>
 #include <Interpreters/Cache/FileCacheFactory.h>
 #include <Interpreters/Cache/FileCacheSettings.h>
@@ -321,8 +321,11 @@ public:
             DB::StoredObjects stored_objects{DB::StoredObject{remote_path, "", 
*file_size}};
             auto cache_creator = wrapWithCache(
                 read_buffer_creator, read_settings, remote_path, 
*modified_time, *file_size);
+            size_t buffer_size = 
std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
+            if (*file_size > 0)
+                buffer_size = std::min(*file_size, buffer_size);
             auto cache_hdfs_read = 
std::make_unique<DB::ReadBufferFromRemoteFSGather>(
-                std::move(cache_creator), stored_objects, read_settings, 
nullptr, /* use_external_buffer */ false);
+                std::move(cache_creator), stored_objects, read_settings, 
nullptr, /* use_external_buffer */ false, buffer_size);
             read_buffer = std::move(cache_hdfs_read);
         }
 
@@ -406,11 +409,14 @@ public:
 
         DB::StoredObjects stored_objects{DB::StoredObject{pathKey, "", 
object_size}};
         auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
-            std::move(cache_creator), stored_objects, read_settings, /* 
cache_log */ nullptr, /* use_external_buffer */ true);
+            std::move(cache_creator), stored_objects, read_settings, /* 
cache_log */ nullptr, /* use_external_buffer */ true, 0);
 
         auto & pool_reader = 
context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
+        size_t buffer_size = 
std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
+        if (object_size > 0)
+            buffer_size = std::min(object_size, buffer_size);
         auto async_reader
-            = 
std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl), 
pool_reader, read_settings);
+            = 
std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl), 
pool_reader, read_settings, buffer_size);
 
         if (read_settings.remote_fs_prefetch)
             async_reader->prefetch(Priority{});
diff --git a/cpp-ch/local-engine/tests/gtest_ch_join.cpp 
b/cpp-ch/local-engine/tests/gtest_ch_join.cpp
index 52120cede0..2d853b2eba 100644
--- a/cpp-ch/local-engine/tests/gtest_ch_join.cpp
+++ b/cpp-ch/local-engine/tests/gtest_ch_join.cpp
@@ -97,6 +97,10 @@ TEST(TestJoin, simple)
     for (const auto & column : join->columnsFromJoinedTable())
         join->addJoinedColumn(column);
 
+    auto columns_from_left_table = 
left_plan.getCurrentHeader().getNamesAndTypesList();
+    for (auto & column_from_joined_table : columns_from_left_table)
+        join->setUsedColumn(column_from_joined_table, JoinTableSide::Left);
+
     auto left_keys = left.getNamesAndTypesList();
     join->addJoinedColumnsAndCorrectTypes(left_keys, true);
     std::cerr << "after join:\n";
@@ -122,8 +126,8 @@ TEST(TestJoin, simple)
     }
     auto hash_join = std::make_shared<HashJoin>(join, 
right_plan.getCurrentHeader());
 
-    QueryPlanStepPtr join_step
-        = std::make_unique<JoinStep>(left_plan.getCurrentHeader(), 
right_plan.getCurrentHeader(), hash_join, 8192, 1, false);
+    QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
+        left_plan.getCurrentHeader(), right_plan.getCurrentHeader(), 
hash_join, 8192, 1, NameSet{}, false, false);
 
     std::cerr << "join step:" << join_step->getOutputHeader().dumpStructure() 
<< std::endl;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to