This is an automated email from the ASF dual-hosted git repository. zhangyifan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 20ae0e5df [Tool] Limit table copying speed 20ae0e5df is described below commit 20ae0e5dfbdf98a2f21cf029cb3501c64fc02108 Author: xinghuayu007 <1450306...@qq.com> AuthorDate: Tue Jun 18 16:53:46 2024 +0800 [Tool] Limit table copying speed Migrating data to another Kudu cluster using the 'kudu table copy' CLI command when the data is very large may cause memory and/or network bandwidth perssure. To reduce the effect on other services, it is better to limit the copying speed. This patch introduces two parameter: --table_copy_throttler_bytes_per_sec and --table_copy_throttler_burst_factor to limit the table copying speed. Change-Id: I37d23f6f5158618f91b67528e152cf2ff4cf38f3 Reviewed-on: http://gerrit.cloudera.org:8080/21527 Reviewed-by: Zoltan Chovan <zcho...@cloudera.com> Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com> Tested-by: Yifan Zhang <chinazhangyi...@163.com> Reviewed-by: Yifan Zhang <chinazhangyi...@163.com> --- src/kudu/tools/kudu-tool-test.cc | 53 +++++++++++++++++++++++++++++++++++++ src/kudu/tools/table_scanner.cc | 23 ++++++++++++++++ src/kudu/tools/table_scanner.h | 3 +++ src/kudu/tools/tool_action_table.cc | 2 ++ 4 files changed, 81 insertions(+) diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index dde8c65c3..8d97f4047 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -52,6 +52,7 @@ #include "kudu/cfile/cfile_writer.h" #include "kudu/client/client-test-util.h" #include "kudu/client/client.h" +#include "kudu/client/scan_batch.h" #include "kudu/client/schema.h" #include "kudu/client/shared_ptr.h" // IWYU pragma: keep #include "kudu/client/value.h" @@ -77,6 +78,7 @@ #include "kudu/fs/fs_manager.h" #include "kudu/fs/fs_report.h" #include "kudu/fs/log_block_manager.h" +#include "kudu/gutil/integral_types.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stl_util.h" @@ -181,6 +183,8 @@ using kudu::client::KuduScanToken; using kudu::client::KuduScanTokenBuilder; using kudu::client::KuduSchema; using kudu::client::KuduSchemaBuilder; +using kudu::client::KuduScanBatch; +using kudu::client::KuduScanner; using kudu::client::KuduSession; using kudu::client::KuduTable; using kudu::client::KuduTableAlterer; @@ -5986,6 +5990,55 @@ TEST_F(ToolTest, TableScanFaultTolerant) { } } +TEST_F(ToolTest, TableCopyLimitSpeed) { + SKIP_IF_SLOW_NOT_ALLOWED(); + + NO_FATALS(StartExternalMiniCluster()); + constexpr const char* const kTableName = "kudu.table.copy.limit_speed.from"; + constexpr const char* const kNewTableName = "kudu.table.copy.limit_speed.to"; + TestWorkload workload(cluster_.get()); + workload.set_table_name(kTableName); + workload.set_num_tablets(1); + workload.set_num_replicas(1); + workload.Setup(); + workload.Start(); + ASSERT_EVENTUALLY([&]() { + ASSERT_GE(workload.rows_inserted(), 20000); + }); + workload.StopAndJoin(); + + const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); + int64 table_copy_throttler_bytes_per_sec = 10240; + MonoTime start_time = MonoTime::Now(); + NO_FATALS(RunTool( + Substitute("table copy $0 $1 $2 --dst_table=$3 --write_type=upsert " + "-scan_batch_size=1024 " + "--table_copy_throttler_bytes_per_sec=$4 " + "--table_copy_throttler_burst_factor=100", + master_addr, kTableName, + master_addr, kNewTableName, + table_copy_throttler_bytes_per_sec + ), nullptr, nullptr)); + MonoTime end_time = MonoTime::Now(); + + shared_ptr<KuduClient> client; + ASSERT_OK(KuduClientBuilder() + .add_master_server_addr(master_addr) + .Build(&client)); + shared_ptr<KuduTable> table; + client->OpenTable(kNewTableName, &table); + KuduScanner scanner(table.get()); + scanner.Open(); + KuduScanBatch batch; + int64_t data_size = 0; + while (scanner.HasMoreRows()) { + ASSERT_OK(scanner.NextBatch(&batch)); + data_size = batch.direct_data().size() + batch.indirect_data().size(); + } + // Table copy speed must less than table_copy_throttler_bytes_per_sec. + ASSERT_LE(data_size / (end_time - start_time).ToSeconds(), table_copy_throttler_bytes_per_sec); +} + TEST_F(ToolTest, TableCopyFaultTolerant) { constexpr const char* const kTableName = "kudu.table.copy.fault_tolerant.from"; constexpr const char* const kNewTableName = "kudu.table.copy.fault_tolerant.to"; diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc index 5e33c8182..eebb94a41 100644 --- a/src/kudu/tools/table_scanner.cc +++ b/src/kudu/tools/table_scanner.cc @@ -61,6 +61,7 @@ #include "kudu/util/slice.h" #include "kudu/util/stopwatch.h" #include "kudu/util/string_case.h" +#include "kudu/util/throttler.h" using kudu::client::KuduClient; using kudu::client::KuduColumnSchema; @@ -139,6 +140,15 @@ DEFINE_string(replica_selection, "CLOSEST", "Replica selection for scan operations. Acceptable values are: " "CLOSEST, LEADER (maps into KuduClient::CLOSEST_REPLICA and " "KuduClient::LEADER_ONLY correspondingly)."); +DEFINE_int64(table_copy_throttler_bytes_per_sec, 0, + "Limit table copying speed. It limits the copying speed of all the tablets " + "in one table for one session. The default value is 0, which means not limiting " + "the speed. The unit is bytes/second"); +DEFINE_double(table_copy_throttler_burst_factor, 1.0F, + "Burst factor for table copy throttling. The maximum rate the throttler " + "allows within a token refill period (100ms) equals burst factor multiplied " + "base rate (--table_copy_throttler_bytes_per_sec). The default value is 1.0, " + "which means the maximum rate is equal to --table_copy_throttler_bytes_per_sec."); DECLARE_bool(row_count_only); DECLARE_int32(num_threads); @@ -571,6 +581,11 @@ TableScanner::TableScanner( scan_batch_size_(-1), out_(nullptr) { CHECK_OK(SetReplicaSelection(FLAGS_replica_selection)); + if (FLAGS_table_copy_throttler_bytes_per_sec > 0) { + throttler_ = std::make_shared<Throttler>(MonoTime::Now(), 0, + FLAGS_table_copy_throttler_bytes_per_sec, + FLAGS_table_copy_throttler_burst_factor); + } } Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens, @@ -593,6 +608,14 @@ Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens, count += batch.NumRows(); total_count_ += batch.NumRows(); ++next_batch_calls; + // Limit table copy speed. + if (throttler_) { + SCOPED_LOG_SLOW_EXECUTION(WARNING, 1000, "Table copy throttler"); + while (!throttler_->Take(MonoTime::Now(), 0, + batch.direct_data().size() + batch.indirect_data().size())) { + SleepFor(MonoDelta::FromMilliseconds(10)); + } + } RETURN_NOT_OK(cb(batch)); } sw.stop(); diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h index 13dd39d39..1eba9f295 100644 --- a/src/kudu/tools/table_scanner.h +++ b/src/kudu/tools/table_scanner.h @@ -35,6 +35,8 @@ #include "kudu/util/threadpool.h" namespace kudu { +class Throttler; + namespace tools { // This class is not thread-safe. @@ -102,6 +104,7 @@ class TableScanner { std::optional<std::string> dst_table_name_; int32_t scan_batch_size_; std::unique_ptr<ThreadPool> thread_pool_; + std::shared_ptr<Throttler> throttler_; // Protects output to 'out_' so that rows don't get interleaved. Mutex output_lock_; diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc index b6901d37a..21b8c04e8 100644 --- a/src/kudu/tools/tool_action_table.cc +++ b/src/kudu/tools/tool_action_table.cc @@ -2029,6 +2029,8 @@ unique_ptr<Mode> BuildTableMode() { .AddOptionalParameter("scan_batch_size") .AddOptionalParameter("tablets") .AddOptionalParameter("write_type") + .AddOptionalParameter("table_copy_throttler_bytes_per_sec") + .AddOptionalParameter("table_copy_throttler_burst_factor") .Build(); unique_ptr<Action> set_extra_config =