This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 05903d962 [Tools] Support to config hash bucket numbers when copy a 
table
05903d962 is described below

commit 05903d96296f9d06423deaddcfbeb5f22461dc76
Author: xinghuayu007 <1450306...@qq.com>
AuthorDate: Thu Jun 9 11:09:16 2022 +0800

    [Tools] Support to config hash bucket numbers when copy a table
    
    When copying a table to another table, we can create the table
    with the same schema. But we can not configure the number of hash bucket
    in the new table. Why we need to config hash bucket number? The old table
    might be configured with small number of hash buckets, but contained a lot
    of data in it. When copying the table to new cluster, we want to add more
    hash buckets to store. And there isn't a way to change the number of hash
    buckets in the partition schema of an already existing table.
    
    Change-Id: I1cec38e5ea09c66bfed20622b85033602da60d41
    Reviewed-on: http://gerrit.cloudera.org:8080/18604
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/tools/kudu-tool-test.cc    | 117 +++++++++++++++++++++++++++++++++---
 src/kudu/tools/table_scanner.cc     |  46 +++++++++++++-
 src/kudu/tools/tool_action_table.cc |   1 +
 3 files changed, 155 insertions(+), 9 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 876a1854e..23601cedf 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -522,6 +522,7 @@ class ToolTest : public KuduTest {
     string columns;
     TableCopyMode mode;
     int32_t create_table_replication_factor;
+    string create_table_hash_bucket_nums;
   };
 
   void RunCopyTableCheck(const RunCopyTableCheckArgs& args) {
@@ -569,9 +570,11 @@ class ToolTest : public KuduTest {
 
     // Execute copy command.
     string stdout;
-    NO_FATALS(RunActionStdoutString(
+    string stderr;
+    Status s = RunActionStdoutStderrString(
                 Substitute("table copy $0 $1 $2 -dst_table=$3 -predicates=$4 
-write_type=$5 "
-                           "-create_table=$6 
-create_table_replication_factor=$7",
+                           "-create_table=$6 
-create_table_replication_factor=$7 "
+                           "-create_table_hash_bucket_nums=$8",
                            cluster_->master()->bound_rpc_addr().ToString(),
                            args.src_table_name,
                            cluster_->master()->bound_rpc_addr().ToString(),
@@ -579,8 +582,36 @@ class ToolTest : public KuduTest {
                            args.predicates_json,
                            write_type,
                            create_table,
-                           args.create_table_replication_factor),
-                &stdout));
+                           args.create_table_replication_factor,
+                           args.create_table_hash_bucket_nums),
+                &stdout, &stderr);
+    if (args.create_table_hash_bucket_nums == "10,aa") {
+      ASSERT_STR_CONTAINS(stderr, "cannot parse the number of hash buckets.");
+      return;
+    }
+    if (args.create_table_hash_bucket_nums == "10,20,30") {
+      ASSERT_STR_CONTAINS(stderr, "The count of hash bucket numbers must be 
equal to the "
+                                  "number of hash schema dimensions.");
+      return;
+    }
+    if (args.create_table_hash_bucket_nums == "10") {
+      ASSERT_STR_CONTAINS(stderr, "The count of hash bucket numbers must be 
equal to the "
+                                  "number of hash schema dimensions.");
+      return;
+    }
+    if (args.create_table_hash_bucket_nums == "10,1") {
+      ASSERT_STR_CONTAINS(stderr, "The number of hash buckets must not be less 
than 2.");
+      return;
+    }
+    if (args.create_table_hash_bucket_nums == "10,1") {
+      ASSERT_STR_CONTAINS(stderr, "The number of hash buckets must not be less 
than 2.");
+      return;
+    }
+    if (args.create_table_hash_bucket_nums == "10,50") {
+      ASSERT_STR_CONTAINS(stderr, "There are no hash partitions defined in 
this table.");
+      return;
+    }
+    ASSERT_TRUE(s.ok());
 
     // Check total count.
     int64_t total = max<int64_t>(args.max_value - args.min_value + 1, 0);
@@ -612,6 +643,32 @@ class ToolTest : public KuduTest {
         // Replication factor is different when explicitly set it to 3 
(default 1).
         if (args.create_table_replication_factor == 3 &&
             HasPrefixString(src_schema[i], "REPLICAS ")) continue;
+        vector<string> hash_bucket_nums = 
Split(args.create_table_hash_bucket_nums,
+                                            ",", strings::SkipEmpty());
+        if (args.create_table_hash_bucket_nums == "10,20" &&
+            HasPrefixString(src_schema[i], "HASH (key_hash0)")) {
+          ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0",
+                                                        hash_bucket_nums[0]));
+          continue;
+        }
+        if (args.create_table_hash_bucket_nums == "10,20" &&
+            HasPrefixString(src_schema[i], "HASH (key_hash1, key_hash2)")) {
+          ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0",
+                                                        hash_bucket_nums[1]));
+          continue;
+        }
+        if (args.create_table_hash_bucket_nums == "10,2" &&
+            HasPrefixString(src_schema[i], "HASH (key_hash0)")) {
+          ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0",
+                                                        hash_bucket_nums[0]));
+          continue;
+        }
+        if (args.create_table_hash_bucket_nums == "10,2" &&
+            HasPrefixString(src_schema[i], "HASH (key_hash1, key_hash2)")) {
+          ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0",
+                                                        hash_bucket_nums[1]));
+          continue;
+        }
         ASSERT_EQ(src_schema[i], dst_schema[i]);
       }
     }
@@ -804,13 +861,57 @@ class ToolTestCopyTableParameterized :
         }
         return multi_args;
       }
-      case kTestCopyTableComplexSchema:
+      case kTestCopyTableComplexSchema: {
         args.columns = kComplexSchemaColumns;
         args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
-        return { args };
-      case kTestCopyUnpartitionedTable:
+        vector<RunCopyTableCheckArgs> multi_args;
+        {
+          auto args_temp = args;
+          args.create_table_hash_bucket_nums = "10,20";
+          multi_args.emplace_back(std::move(args_temp));
+        }
+        {
+          auto args_temp = args;
+          args_temp.create_table_hash_bucket_nums = "10,aa";
+          multi_args.emplace_back(std::move(args_temp));
+        }
+        {
+          auto args_temp = args;
+          args_temp.create_table_hash_bucket_nums = "10,20,30";
+          multi_args.emplace_back(std::move(args_temp));
+        }
+        {
+          auto args_temp = args;
+          args_temp.create_table_hash_bucket_nums = "10";
+          multi_args.emplace_back(std::move(args_temp));
+        }
+        {
+          auto args_temp = args;
+          args_temp.create_table_hash_bucket_nums = "10,1";
+          multi_args.emplace_back(std::move(args_temp));
+        }
+        {
+          auto args_temp = args;
+          args_temp.create_table_hash_bucket_nums = "10,2";
+          multi_args.emplace_back(std::move(args_temp));
+        }
+        {
+          auto args_temp = args;
+          args_temp.create_table_hash_bucket_nums = "";
+          multi_args.emplace_back(std::move(args_temp));
+        }
+        return multi_args;
+      }
+      case kTestCopyUnpartitionedTable: {
         args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
-        return {args};
+        vector<RunCopyTableCheckArgs> multi_args;
+        {
+          auto args_temp = args;
+          args.create_table_hash_bucket_nums = "10,50";
+          multi_args.emplace_back(std::move(args_temp));
+        }
+        return multi_args;
+      }
       case kTestCopyTablePredicates: {
         auto mid = total_rows_ / 2;
         vector<RunCopyTableCheckArgs> multi_args;
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index c2c535db9..f60c3a386 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -48,6 +48,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/bitmap.h"
@@ -93,6 +94,8 @@ DEFINE_bool(create_table, true,
 DEFINE_int32(create_table_replication_factor, -1,
              "The replication factor of the destination table if the table 
will be created. "
              "By default, the replication factor of source table will be 
used.");
+DEFINE_string(create_table_hash_bucket_nums, "",
+              "The number of hash buckets in each hash dimension seperated by 
comma");
 DEFINE_bool(fill_cache, true,
             "Whether to fill block cache when scanning.");
 DEFINE_string(predicates, "",
@@ -428,11 +431,52 @@ Status CreateDstTableIfNeeded(const 
client::sp::shared_ptr<KuduTable>& src_table
       .num_replicas(num_replicas);
 
   // Add hash partition schema.
+  vector<int> hash_bucket_nums;
+  if (!partition_schema.hash_schema().empty()) {
+    vector<string> hash_bucket_nums_str = 
Split(FLAGS_create_table_hash_bucket_nums,
+                                                ",", strings::SkipEmpty());
+    // FLAGS_create_table_hash_bucket_nums is not defined, set it to -1 
defaultly.
+    if (hash_bucket_nums_str.empty()) {
+      for (int i = 0; i < partition_schema.hash_schema().size(); i++) {
+        hash_bucket_nums.push_back(-1);
+      }
+    } else {
+      // If the --create_table_hash_bucket_nums flag is set, the number
+      // of comma-separated elements must be equal to the number of hash 
schema dimensions.
+      if (partition_schema.hash_schema().size() != 
hash_bucket_nums_str.size()) {
+        return Status::InvalidArgument("The count of hash bucket numbers must 
be equal to the "
+                                       "number of hash schema dimensions.");
+      }
+      for (int i = 0; i < hash_bucket_nums_str.size(); i++) {
+        int bucket_num = 0;
+        bool is_number = safe_strto32(hash_bucket_nums_str[i], &bucket_num);
+        if (!is_number) {
+          return Status::InvalidArgument(Substitute("'$0': cannot parse the 
number "
+                                                    "of hash buckets.",
+                                                    hash_bucket_nums_str[i]));
+        }
+        if (bucket_num < 2) {
+          return Status::InvalidArgument("The number of hash buckets must not 
be less than 2.");
+        }
+        hash_bucket_nums.push_back(bucket_num);
+      }
+    }
+  }
+
+  if (partition_schema.hash_schema().empty() &&
+      !FLAGS_create_table_hash_bucket_nums.empty()) {
+    return Status::InvalidArgument("There are no hash partitions defined in 
this table.");
+  }
+
+  int i = 0;
   for (const auto& hash_dimension : partition_schema.hash_schema()) {
+    int num_buckets = hash_bucket_nums[i] != -1 ? hash_bucket_nums[i] :
+                                                  hash_dimension.num_buckets;
     auto hash_columns = convert_column_ids_to_names(hash_dimension.column_ids);
     table_creator->add_hash_partitions(hash_columns,
-                                       hash_dimension.num_buckets,
+                                       num_buckets,
                                        hash_dimension.seed);
+    i++;
   }
 
   // Add range partition schema.
diff --git a/src/kudu/tools/tool_action_table.cc 
b/src/kudu/tools/tool_action_table.cc
index 23b260bc7..e68c6f1f7 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -1567,6 +1567,7 @@ unique_ptr<Mode> BuildTableMode() {
       .AddRequiredParameter({ kTableNameArg, "Name of the source table" })
       .AddRequiredParameter({ kDestMasterAddressesArg, 
kDestMasterAddressesArgDesc })
       .AddOptionalParameter("create_table")
+      .AddOptionalParameter("create_table_hash_bucket_nums")
       .AddOptionalParameter("create_table_replication_factor")
       .AddOptionalParameter("dst_table")
       .AddOptionalParameter("num_threads")

Reply via email to