This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 34a88dddd9 [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241105)
(#7809)
34a88dddd9 is described below
commit 34a88dddd98d248ee18141cba7b170ec092c82e8
Author: Kyligence Git <[email protected]>
AuthorDate: Tue Nov 5 04:56:58 2024 -0600
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20241105) (#7809)
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241105)
* Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/71261
* Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/68682
---------
Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
---
cpp-ch/clickhouse.version | 4 +--
.../Parser/RelParsers/CrossRelParser.cpp | 21 ++++++++++--
.../Parser/RelParsers/JoinRelParser.cpp | 40 ++++++++++++++++++----
.../Storages/SubstraitSource/ReadBufferBuilder.cpp | 14 +++++---
cpp-ch/local-engine/tests/gtest_ch_join.cpp | 8 +++--
5 files changed, 69 insertions(+), 18 deletions(-)
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 445cd99068..a4dd7eb5f2 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20241101
-CH_COMMIT=7cd7bb8ece2
\ No newline at end of file
+CH_BRANCH=rebase_ch/20241105
+CH_COMMIT=500e1e35c0b
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
index 4fef282fe4..2e7f353156 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
@@ -17,6 +17,7 @@
#include "CrossRelParser.h"
#include <optional>
+#include <Core/Settings.h>
#include <Interpreters/CollectJoinOnKeysVisitor.h>
#include <Interpreters/GraceHashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
@@ -37,6 +38,10 @@
namespace DB
{
+namespace Setting
+{
+extern const SettingsUInt64 max_block_size;
+}
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
@@ -194,8 +199,15 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const
substrait::CrossRel & join, DB:
else
{
JoinPtr hash_join = std::make_shared<HashJoin>(table_join,
right->getCurrentHeader().cloneEmpty());
- QueryPlanStepPtr join_step
- = std::make_unique<DB::JoinStep>(left->getCurrentHeader(),
right->getCurrentHeader(), hash_join, 8192, 1, false);
+ QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
+ left->getCurrentHeader(),
+ right->getCurrentHeader(),
+ hash_join,
+ context->getSettingsRef()[Setting::max_block_size],
+ 1,
+ /* required_output_ = */ NameSet{},
+ false,
+ /* use_new_analyzer_ = */ false);
join_step->setStepDescription("CROSS_JOIN");
steps.emplace_back(join_step.get());
std::vector<QueryPlanPtr> plans;
@@ -243,7 +255,10 @@ void CrossRelParser::addConvertStep(TableJoin &
table_join, DB::QueryPlan & left
for (const auto & col : left.getCurrentHeader().getNames())
left_columns_set.emplace(col);
table_join.setColumnsFromJoinedTable(
- right.getCurrentHeader().getNamesAndTypesList(), left_columns_set,
getUniqueName("right") + ".");
+ right.getCurrentHeader().getNamesAndTypesList(),
+ left_columns_set,
+ getUniqueName("right") + ".",
+ left.getCurrentHeader().getNamesAndTypesList());
// fix right table key duplicate
NamesWithAliases right_table_alias;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
index 0781614bf0..99ec543066 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
@@ -45,6 +45,7 @@ namespace DB
namespace Setting
{
extern const SettingsJoinAlgorithm join_algorithm;
+extern const SettingsUInt64 max_block_size;
}
namespace ErrorCodes
{
@@ -313,8 +314,15 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const
substrait::JoinRel & join, DB::Q
JoinPtr smj_join = std::make_shared<FullSortingMergeJoin>(table_join,
right->getCurrentHeader().cloneEmpty(), -1);
MultiEnum<DB::JoinAlgorithm> join_algorithm =
context->getSettingsRef()[Setting::join_algorithm];
- QueryPlanStepPtr join_step
- = std::make_unique<DB::JoinStep>(left->getCurrentHeader(),
right->getCurrentHeader(), smj_join, 8192, 1, false);
+ QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
+ left->getCurrentHeader(),
+ right->getCurrentHeader(),
+ smj_join,
+ context->getSettingsRef()[Setting::max_block_size],
+ 1,
+ /* required_output_ = */ NameSet{},
+ false,
+ /* use_new_analyzer_ = */ false);
join_step->setStepDescription("SORT_MERGE_JOIN");
steps.emplace_back(join_step.get());
@@ -382,7 +390,11 @@ void JoinRelParser::addConvertStep(TableJoin & table_join,
DB::QueryPlan & left,
NameSet left_columns_set;
for (const auto & col : left.getCurrentHeader().getNames())
left_columns_set.emplace(col);
-
table_join.setColumnsFromJoinedTable(right.getCurrentHeader().getNamesAndTypesList(),
left_columns_set, getUniqueName("right") + ".");
+ table_join.setColumnsFromJoinedTable(
+ right.getCurrentHeader().getNamesAndTypesList(),
+ left_columns_set,
+ getUniqueName("right") + ".",
+ left.getCurrentHeader().getNamesAndTypesList());
// fix right table key duplicate
NamesWithAliases right_table_alias;
@@ -772,8 +784,15 @@ DB::QueryPlanPtr JoinRelParser::buildMultiOnClauseHashJoin(
LOG_INFO(getLogger("JoinRelParser"), "multi join on clauses:\n{}",
DB::TableJoin::formatClauses(table_join->getClauses()));
JoinPtr hash_join = std::make_shared<HashJoin>(table_join,
right_plan->getCurrentHeader());
- QueryPlanStepPtr join_step
- = std::make_unique<DB::JoinStep>(left_plan->getCurrentHeader(),
right_plan->getCurrentHeader(), hash_join, 8192, 1, false);
+ QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
+ left_plan->getCurrentHeader(),
+ right_plan->getCurrentHeader(),
+ hash_join,
+ context->getSettingsRef()[Setting::max_block_size],
+ 1,
+ /* required_output_ = */ NameSet{},
+ false,
+ /* use_new_analyzer_ = */ false);
join_step->setStepDescription("Multi join on clause hash join");
steps.emplace_back(join_step.get());
std::vector<QueryPlanPtr> plans;
@@ -806,8 +825,15 @@ DB::QueryPlanPtr
JoinRelParser::buildSingleOnClauseHashJoin(
{
hash_join = std::make_shared<HashJoin>(table_join,
right_plan->getCurrentHeader().cloneEmpty());
}
- QueryPlanStepPtr join_step
- = std::make_unique<DB::JoinStep>(left_plan->getCurrentHeader(),
right_plan->getCurrentHeader(), hash_join, 8192, 1, false);
+ QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
+ left_plan->getCurrentHeader(),
+ right_plan->getCurrentHeader(),
+ hash_join,
+ context->getSettingsRef()[Setting::max_block_size],
+ 1,
+ /* required_output_ = */ NameSet{},
+ false,
+ /* use_new_analyzer_ = */ false);
join_step->setStepDescription("HASH_JOIN");
steps.emplace_back(join_step.get());
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 2681ed7c1c..f5f7f95e62 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -28,10 +28,10 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadSettings.h>
-#include <IO/SplittableBzip2ReadBuffer.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3Common.h>
#include <IO/SeekableReadBuffer.h>
+#include <IO/SplittableBzip2ReadBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCacheSettings.h>
@@ -321,8 +321,11 @@ public:
DB::StoredObjects stored_objects{DB::StoredObject{remote_path, "",
*file_size}};
auto cache_creator = wrapWithCache(
read_buffer_creator, read_settings, remote_path,
*modified_time, *file_size);
+ size_t buffer_size =
std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
+ if (*file_size > 0)
+ buffer_size = std::min(*file_size, buffer_size);
auto cache_hdfs_read =
std::make_unique<DB::ReadBufferFromRemoteFSGather>(
- std::move(cache_creator), stored_objects, read_settings,
nullptr, /* use_external_buffer */ false);
+ std::move(cache_creator), stored_objects, read_settings,
nullptr, /* use_external_buffer */ false, buffer_size);
read_buffer = std::move(cache_hdfs_read);
}
@@ -406,11 +409,14 @@ public:
DB::StoredObjects stored_objects{DB::StoredObject{pathKey, "",
object_size}};
auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
- std::move(cache_creator), stored_objects, read_settings, /*
cache_log */ nullptr, /* use_external_buffer */ true);
+ std::move(cache_creator), stored_objects, read_settings, /*
cache_log */ nullptr, /* use_external_buffer */ true, 0);
auto & pool_reader =
context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
+ size_t buffer_size =
std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
+ if (object_size > 0)
+ buffer_size = std::min(object_size, buffer_size);
auto async_reader
- =
std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl),
pool_reader, read_settings);
+ =
std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl),
pool_reader, read_settings, buffer_size);
if (read_settings.remote_fs_prefetch)
async_reader->prefetch(Priority{});
diff --git a/cpp-ch/local-engine/tests/gtest_ch_join.cpp
b/cpp-ch/local-engine/tests/gtest_ch_join.cpp
index 52120cede0..2d853b2eba 100644
--- a/cpp-ch/local-engine/tests/gtest_ch_join.cpp
+++ b/cpp-ch/local-engine/tests/gtest_ch_join.cpp
@@ -97,6 +97,10 @@ TEST(TestJoin, simple)
for (const auto & column : join->columnsFromJoinedTable())
join->addJoinedColumn(column);
+ auto columns_from_left_table =
left_plan.getCurrentHeader().getNamesAndTypesList();
+ for (auto & column_from_joined_table : columns_from_left_table)
+ join->setUsedColumn(column_from_joined_table, JoinTableSide::Left);
+
auto left_keys = left.getNamesAndTypesList();
join->addJoinedColumnsAndCorrectTypes(left_keys, true);
std::cerr << "after join:\n";
@@ -122,8 +126,8 @@ TEST(TestJoin, simple)
}
auto hash_join = std::make_shared<HashJoin>(join,
right_plan.getCurrentHeader());
- QueryPlanStepPtr join_step
- = std::make_unique<JoinStep>(left_plan.getCurrentHeader(),
right_plan.getCurrentHeader(), hash_join, 8192, 1, false);
+ QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
+ left_plan.getCurrentHeader(), right_plan.getCurrentHeader(),
hash_join, 8192, 1, NameSet{}, false, false);
std::cerr << "join step:" << join_step->getOutputHeader().dumpStructure()
<< std::endl;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]