This is an automated email from the ASF dual-hosted git repository.

zhangyifan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 20ae0e5df [Tool] Limit table copying speed
20ae0e5df is described below

commit 20ae0e5dfbdf98a2f21cf029cb3501c64fc02108
Author: xinghuayu007 <1450306...@qq.com>
AuthorDate: Tue Jun 18 16:53:46 2024 +0800

    [Tool] Limit table copying speed
    
    Migrating data to another Kudu cluster using the 'kudu table copy'
    CLI command when the data is very large may cause memory and/or
    network bandwidth perssure. To reduce the effect on other services,
    it is better to limit the copying speed.
    
    This patch introduces two parameter:
    --table_copy_throttler_bytes_per_sec and
    --table_copy_throttler_burst_factor to limit the table copying
    speed.
    
    Change-Id: I37d23f6f5158618f91b67528e152cf2ff4cf38f3
    Reviewed-on: http://gerrit.cloudera.org:8080/21527
    Reviewed-by: Zoltan Chovan <zcho...@cloudera.com>
    Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com>
    Tested-by: Yifan Zhang <chinazhangyi...@163.com>
    Reviewed-by: Yifan Zhang <chinazhangyi...@163.com>
---
 src/kudu/tools/kudu-tool-test.cc    | 53 +++++++++++++++++++++++++++++++++++++
 src/kudu/tools/table_scanner.cc     | 23 ++++++++++++++++
 src/kudu/tools/table_scanner.h      |  3 +++
 src/kudu/tools/tool_action_table.cc |  2 ++
 4 files changed, 81 insertions(+)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index dde8c65c3..8d97f4047 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -52,6 +52,7 @@
 #include "kudu/cfile/cfile_writer.h"
 #include "kudu/client/client-test-util.h"
 #include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/client/value.h"
@@ -77,6 +78,7 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
+#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
@@ -181,6 +183,8 @@ using kudu::client::KuduScanToken;
 using kudu::client::KuduScanTokenBuilder;
 using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduScanBatch;
+using kudu::client::KuduScanner;
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableAlterer;
@@ -5986,6 +5990,55 @@ TEST_F(ToolTest, TableScanFaultTolerant) {
   }
 }
 
+TEST_F(ToolTest, TableCopyLimitSpeed) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  NO_FATALS(StartExternalMiniCluster());
+  constexpr const char* const kTableName = "kudu.table.copy.limit_speed.from";
+  constexpr const char* const kNewTableName = "kudu.table.copy.limit_speed.to";
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableName);
+  workload.set_num_tablets(1);
+  workload.set_num_replicas(1);
+  workload.Setup();
+  workload.Start();
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_GE(workload.rows_inserted(), 20000);
+  });
+  workload.StopAndJoin();
+
+  const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+  int64 table_copy_throttler_bytes_per_sec = 10240;
+  MonoTime start_time = MonoTime::Now();
+  NO_FATALS(RunTool(
+      Substitute("table copy $0 $1 $2 --dst_table=$3 --write_type=upsert "
+                 "-scan_batch_size=1024 "
+                 "--table_copy_throttler_bytes_per_sec=$4 "
+                 "--table_copy_throttler_burst_factor=100",
+                 master_addr, kTableName,
+                 master_addr, kNewTableName,
+                 table_copy_throttler_bytes_per_sec
+                ), nullptr, nullptr));
+  MonoTime end_time = MonoTime::Now();
+
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(KuduClientBuilder()
+                .add_master_server_addr(master_addr)
+                .Build(&client));
+  shared_ptr<KuduTable> table;
+  client->OpenTable(kNewTableName, &table);
+  KuduScanner scanner(table.get());
+  scanner.Open();
+  KuduScanBatch batch;
+  int64_t data_size = 0;
+  while (scanner.HasMoreRows()) {
+    ASSERT_OK(scanner.NextBatch(&batch));
+    data_size = batch.direct_data().size() + batch.indirect_data().size();
+  }
+  // Table copy speed must less than table_copy_throttler_bytes_per_sec.
+  ASSERT_LE(data_size / (end_time - start_time).ToSeconds(), 
table_copy_throttler_bytes_per_sec);
+}
+
 TEST_F(ToolTest, TableCopyFaultTolerant) {
   constexpr const char* const kTableName = 
"kudu.table.copy.fault_tolerant.from";
   constexpr const char* const kNewTableName = 
"kudu.table.copy.fault_tolerant.to";
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 5e33c8182..eebb94a41 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -61,6 +61,7 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/string_case.h"
+#include "kudu/util/throttler.h"
 
 using kudu::client::KuduClient;
 using kudu::client::KuduColumnSchema;
@@ -139,6 +140,15 @@ DEFINE_string(replica_selection, "CLOSEST",
               "Replica selection for scan operations. Acceptable values are: "
               "CLOSEST, LEADER (maps into KuduClient::CLOSEST_REPLICA and "
               "KuduClient::LEADER_ONLY correspondingly).");
+DEFINE_int64(table_copy_throttler_bytes_per_sec, 0,
+             "Limit table copying speed. It limits the copying speed of all 
the tablets "
+             "in one table for one session. The default value is 0, which 
means not limiting "
+             "the speed. The unit is bytes/second");
+DEFINE_double(table_copy_throttler_burst_factor, 1.0F,
+             "Burst factor for table copy throttling. The maximum rate the 
throttler "
+             "allows within a token refill period (100ms) equals burst factor 
multiplied "
+             "base rate (--table_copy_throttler_bytes_per_sec). The default 
value is 1.0, "
+             "which means the maximum rate is equal to 
--table_copy_throttler_bytes_per_sec.");
 
 DECLARE_bool(row_count_only);
 DECLARE_int32(num_threads);
@@ -571,6 +581,11 @@ TableScanner::TableScanner(
       scan_batch_size_(-1),
       out_(nullptr) {
   CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
+  if (FLAGS_table_copy_throttler_bytes_per_sec > 0) {
+    throttler_ = std::make_shared<Throttler>(MonoTime::Now(), 0,
+                                             
FLAGS_table_copy_throttler_bytes_per_sec,
+                                             
FLAGS_table_copy_throttler_burst_factor);
+  }
 }
 
 Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens,
@@ -593,6 +608,14 @@ Status TableScanner::ScanData(const 
vector<KuduScanToken*>& tokens,
       count += batch.NumRows();
       total_count_ += batch.NumRows();
       ++next_batch_calls;
+      // Limit table copy speed.
+      if (throttler_) {
+        SCOPED_LOG_SLOW_EXECUTION(WARNING, 1000, "Table copy throttler");
+        while (!throttler_->Take(MonoTime::Now(), 0,
+                                 batch.direct_data().size() + 
batch.indirect_data().size())) {
+          SleepFor(MonoDelta::FromMilliseconds(10));
+        }
+      }
       RETURN_NOT_OK(cb(batch));
     }
     sw.stop();
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index 13dd39d39..1eba9f295 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -35,6 +35,8 @@
 #include "kudu/util/threadpool.h"
 
 namespace kudu {
+class Throttler;
+
 namespace tools {
 
 // This class is not thread-safe.
@@ -102,6 +104,7 @@ class TableScanner {
   std::optional<std::string> dst_table_name_;
   int32_t scan_batch_size_;
   std::unique_ptr<ThreadPool> thread_pool_;
+  std::shared_ptr<Throttler> throttler_;
 
   // Protects output to 'out_' so that rows don't get interleaved.
   Mutex output_lock_;
diff --git a/src/kudu/tools/tool_action_table.cc 
b/src/kudu/tools/tool_action_table.cc
index b6901d37a..21b8c04e8 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -2029,6 +2029,8 @@ unique_ptr<Mode> BuildTableMode() {
       .AddOptionalParameter("scan_batch_size")
       .AddOptionalParameter("tablets")
       .AddOptionalParameter("write_type")
+      .AddOptionalParameter("table_copy_throttler_bytes_per_sec")
+      .AddOptionalParameter("table_copy_throttler_burst_factor")
       .Build();
 
   unique_ptr<Action> set_extra_config =

Reply via email to