This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 05903d962 [Tools] Support to config hash bucket numbers when copy a table 05903d962 is described below commit 05903d96296f9d06423deaddcfbeb5f22461dc76 Author: xinghuayu007 <1450306...@qq.com> AuthorDate: Thu Jun 9 11:09:16 2022 +0800 [Tools] Support to config hash bucket numbers when copy a table When copying a table to another table, we can create the table with the same schema. But we can not configure the number of hash bucket in the new table. Why we need to config hash bucket number? The old table might be configured with small number of hash buckets, but contained a lot of data in it. When copying the table to new cluster, we want to add more hash buckets to store. And there isn't a way to change the number of hash buckets in the partition schema of an already existing table. Change-Id: I1cec38e5ea09c66bfed20622b85033602da60d41 Reviewed-on: http://gerrit.cloudera.org:8080/18604 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Alexey Serbin <ale...@apache.org> --- src/kudu/tools/kudu-tool-test.cc | 117 +++++++++++++++++++++++++++++++++--- src/kudu/tools/table_scanner.cc | 46 +++++++++++++- src/kudu/tools/tool_action_table.cc | 1 + 3 files changed, 155 insertions(+), 9 deletions(-) diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 876a1854e..23601cedf 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -522,6 +522,7 @@ class ToolTest : public KuduTest { string columns; TableCopyMode mode; int32_t create_table_replication_factor; + string create_table_hash_bucket_nums; }; void RunCopyTableCheck(const RunCopyTableCheckArgs& args) { @@ -569,9 +570,11 @@ class ToolTest : public KuduTest { // Execute copy command. string stdout; - NO_FATALS(RunActionStdoutString( + string stderr; + Status s = RunActionStdoutStderrString( Substitute("table copy $0 $1 $2 -dst_table=$3 -predicates=$4 -write_type=$5 " - "-create_table=$6 -create_table_replication_factor=$7", + "-create_table=$6 -create_table_replication_factor=$7 " + "-create_table_hash_bucket_nums=$8", cluster_->master()->bound_rpc_addr().ToString(), args.src_table_name, cluster_->master()->bound_rpc_addr().ToString(), @@ -579,8 +582,36 @@ class ToolTest : public KuduTest { args.predicates_json, write_type, create_table, - args.create_table_replication_factor), - &stdout)); + args.create_table_replication_factor, + args.create_table_hash_bucket_nums), + &stdout, &stderr); + if (args.create_table_hash_bucket_nums == "10,aa") { + ASSERT_STR_CONTAINS(stderr, "cannot parse the number of hash buckets."); + return; + } + if (args.create_table_hash_bucket_nums == "10,20,30") { + ASSERT_STR_CONTAINS(stderr, "The count of hash bucket numbers must be equal to the " + "number of hash schema dimensions."); + return; + } + if (args.create_table_hash_bucket_nums == "10") { + ASSERT_STR_CONTAINS(stderr, "The count of hash bucket numbers must be equal to the " + "number of hash schema dimensions."); + return; + } + if (args.create_table_hash_bucket_nums == "10,1") { + ASSERT_STR_CONTAINS(stderr, "The number of hash buckets must not be less than 2."); + return; + } + if (args.create_table_hash_bucket_nums == "10,1") { + ASSERT_STR_CONTAINS(stderr, "The number of hash buckets must not be less than 2."); + return; + } + if (args.create_table_hash_bucket_nums == "10,50") { + ASSERT_STR_CONTAINS(stderr, "There are no hash partitions defined in this table."); + return; + } + ASSERT_TRUE(s.ok()); // Check total count. int64_t total = max<int64_t>(args.max_value - args.min_value + 1, 0); @@ -612,6 +643,32 @@ class ToolTest : public KuduTest { // Replication factor is different when explicitly set it to 3 (default 1). if (args.create_table_replication_factor == 3 && HasPrefixString(src_schema[i], "REPLICAS ")) continue; + vector<string> hash_bucket_nums = Split(args.create_table_hash_bucket_nums, + ",", strings::SkipEmpty()); + if (args.create_table_hash_bucket_nums == "10,20" && + HasPrefixString(src_schema[i], "HASH (key_hash0)")) { + ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0", + hash_bucket_nums[0])); + continue; + } + if (args.create_table_hash_bucket_nums == "10,20" && + HasPrefixString(src_schema[i], "HASH (key_hash1, key_hash2)")) { + ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0", + hash_bucket_nums[1])); + continue; + } + if (args.create_table_hash_bucket_nums == "10,2" && + HasPrefixString(src_schema[i], "HASH (key_hash0)")) { + ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0", + hash_bucket_nums[0])); + continue; + } + if (args.create_table_hash_bucket_nums == "10,2" && + HasPrefixString(src_schema[i], "HASH (key_hash1, key_hash2)")) { + ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0", + hash_bucket_nums[1])); + continue; + } ASSERT_EQ(src_schema[i], dst_schema[i]); } } @@ -804,13 +861,57 @@ class ToolTestCopyTableParameterized : } return multi_args; } - case kTestCopyTableComplexSchema: + case kTestCopyTableComplexSchema: { args.columns = kComplexSchemaColumns; args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE; - return { args }; - case kTestCopyUnpartitionedTable: + vector<RunCopyTableCheckArgs> multi_args; + { + auto args_temp = args; + args.create_table_hash_bucket_nums = "10,20"; + multi_args.emplace_back(std::move(args_temp)); + } + { + auto args_temp = args; + args_temp.create_table_hash_bucket_nums = "10,aa"; + multi_args.emplace_back(std::move(args_temp)); + } + { + auto args_temp = args; + args_temp.create_table_hash_bucket_nums = "10,20,30"; + multi_args.emplace_back(std::move(args_temp)); + } + { + auto args_temp = args; + args_temp.create_table_hash_bucket_nums = "10"; + multi_args.emplace_back(std::move(args_temp)); + } + { + auto args_temp = args; + args_temp.create_table_hash_bucket_nums = "10,1"; + multi_args.emplace_back(std::move(args_temp)); + } + { + auto args_temp = args; + args_temp.create_table_hash_bucket_nums = "10,2"; + multi_args.emplace_back(std::move(args_temp)); + } + { + auto args_temp = args; + args_temp.create_table_hash_bucket_nums = ""; + multi_args.emplace_back(std::move(args_temp)); + } + return multi_args; + } + case kTestCopyUnpartitionedTable: { args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE; - return {args}; + vector<RunCopyTableCheckArgs> multi_args; + { + auto args_temp = args; + args.create_table_hash_bucket_nums = "10,50"; + multi_args.emplace_back(std::move(args_temp)); + } + return multi_args; + } case kTestCopyTablePredicates: { auto mid = total_rows_ / 2; vector<RunCopyTableCheckArgs> multi_args; diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc index c2c535db9..f60c3a386 100644 --- a/src/kudu/tools/table_scanner.cc +++ b/src/kudu/tools/table_scanner.cc @@ -48,6 +48,7 @@ #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/numbers.h" #include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/bitmap.h" @@ -93,6 +94,8 @@ DEFINE_bool(create_table, true, DEFINE_int32(create_table_replication_factor, -1, "The replication factor of the destination table if the table will be created. " "By default, the replication factor of source table will be used."); +DEFINE_string(create_table_hash_bucket_nums, "", + "The number of hash buckets in each hash dimension seperated by comma"); DEFINE_bool(fill_cache, true, "Whether to fill block cache when scanning."); DEFINE_string(predicates, "", @@ -428,11 +431,52 @@ Status CreateDstTableIfNeeded(const client::sp::shared_ptr<KuduTable>& src_table .num_replicas(num_replicas); // Add hash partition schema. + vector<int> hash_bucket_nums; + if (!partition_schema.hash_schema().empty()) { + vector<string> hash_bucket_nums_str = Split(FLAGS_create_table_hash_bucket_nums, + ",", strings::SkipEmpty()); + // FLAGS_create_table_hash_bucket_nums is not defined, set it to -1 defaultly. + if (hash_bucket_nums_str.empty()) { + for (int i = 0; i < partition_schema.hash_schema().size(); i++) { + hash_bucket_nums.push_back(-1); + } + } else { + // If the --create_table_hash_bucket_nums flag is set, the number + // of comma-separated elements must be equal to the number of hash schema dimensions. + if (partition_schema.hash_schema().size() != hash_bucket_nums_str.size()) { + return Status::InvalidArgument("The count of hash bucket numbers must be equal to the " + "number of hash schema dimensions."); + } + for (int i = 0; i < hash_bucket_nums_str.size(); i++) { + int bucket_num = 0; + bool is_number = safe_strto32(hash_bucket_nums_str[i], &bucket_num); + if (!is_number) { + return Status::InvalidArgument(Substitute("'$0': cannot parse the number " + "of hash buckets.", + hash_bucket_nums_str[i])); + } + if (bucket_num < 2) { + return Status::InvalidArgument("The number of hash buckets must not be less than 2."); + } + hash_bucket_nums.push_back(bucket_num); + } + } + } + + if (partition_schema.hash_schema().empty() && + !FLAGS_create_table_hash_bucket_nums.empty()) { + return Status::InvalidArgument("There are no hash partitions defined in this table."); + } + + int i = 0; for (const auto& hash_dimension : partition_schema.hash_schema()) { + int num_buckets = hash_bucket_nums[i] != -1 ? hash_bucket_nums[i] : + hash_dimension.num_buckets; auto hash_columns = convert_column_ids_to_names(hash_dimension.column_ids); table_creator->add_hash_partitions(hash_columns, - hash_dimension.num_buckets, + num_buckets, hash_dimension.seed); + i++; } // Add range partition schema. diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc index 23b260bc7..e68c6f1f7 100644 --- a/src/kudu/tools/tool_action_table.cc +++ b/src/kudu/tools/tool_action_table.cc @@ -1567,6 +1567,7 @@ unique_ptr<Mode> BuildTableMode() { .AddRequiredParameter({ kTableNameArg, "Name of the source table" }) .AddRequiredParameter({ kDestMasterAddressesArg, kDestMasterAddressesArgDesc }) .AddOptionalParameter("create_table") + .AddOptionalParameter("create_table_hash_bucket_nums") .AddOptionalParameter("create_table_replication_factor") .AddOptionalParameter("dst_table") .AddOptionalParameter("num_threads")