This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 17528c7a8 [Tools] Copy tablets from a remote server in batch 17528c7a8 is described below commit 17528c7a8ab1e44a421d535346e849cfaf37d3d1 Author: xinghuayu007 <1450306...@qq.com> AuthorDate: Thu Jul 14 18:40:50 2022 +0800 [Tools] Copy tablets from a remote server in batch The command: kudu local_replica copy_from_remote only supports to copy one tablet from a remote server once a time. It is not efficient when we need to copy all tablets from a remote server. Therefore this patch supports to copy tablets from a remote server in batch. Change-Id: Ib598142883b8ab958625a4f04648d58ea95f3664 Reviewed-on: http://gerrit.cloudera.org:8080/18732 Tested-by: Kudu Jenkins Reviewed-by: Yingchun Lai <acelyc1112...@gmail.com> --- src/kudu/tablet/tablet_replica.h | 5 +- src/kudu/tools/kudu-tool-test.cc | 39 +++- src/kudu/tools/tool_action_local_replica.cc | 283 +++++++++++++++++----------- 3 files changed, 212 insertions(+), 115 deletions(-) diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h index ead1fb830..099111766 100644 --- a/src/kudu/tablet/tablet_replica.h +++ b/src/kudu/tablet/tablet_replica.h @@ -83,8 +83,7 @@ class ResultTracker; } // namespace rpc namespace tools { -struct RunnerContext; -Status CopyFromLocal(const RunnerContext& context); +class TabletCopier; } // namespace tools namespace tablet { @@ -395,7 +394,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>, void BeginTxnParticipantOp(int64_t txn_id, RegisteredTxnCallback began_txn_cb); private: - friend Status kudu::tools::CopyFromLocal(const kudu::tools::RunnerContext& context); + friend class kudu::tools::TabletCopier; friend class kudu::AlterTableTest; friend class RefCountedThreadSafe<TabletReplica>; friend class TabletReplicaTest; diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 697edc4a3..e568eecff 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -213,6 +213,7 @@ using kudu::tserver::TabletServerErrorPB; using kudu::tserver::WriteRequestPB; using std::back_inserter; using std::copy; +using std::find; using std::make_pair; using std::map; using std::max; @@ -1237,7 +1238,7 @@ TEST_F(ToolTest, TestModeHelp) { "cmeta.*Operate on a local tablet replica's consensus", "data_size.*Summarize the data size", "dump.*Dump a Kudu filesystem", - "copy_from_remote.*Copy a tablet replica from a remote server", + "copy_from_remote.*Copy tablet replicas from a remote server", "copy_from_local.*Copy tablet replicas from local filesystem", "delete.*Delete tablet replicas from the local filesystem", "list.*Show list of tablet replicas", @@ -1266,7 +1267,7 @@ TEST_F(ToolTest, TestModeHelp) { } { const vector<string> kLocalReplicaCopyFromRemoteRegexes = { - "Copy a tablet replica from a remote server", + "Copy tablet replicas from a remote server", }; NO_FATALS(RunTestHelp("local_replica copy_from_remote --help", kLocalReplicaCopyFromRemoteRegexes)); @@ -8179,6 +8180,40 @@ TEST_F(ToolTest, TestLocalReplicaCopyLocal) { ASSERT_EQ(src_stdout, dst_stdout); } +TEST_F(ToolTest, TestLocalReplicaCopyRemote) { + InternalMiniClusterOptions opts; + opts.num_tablet_servers = 2; + NO_FATALS(StartMiniCluster(std::move(opts))); + NO_FATALS(CreateTableWithFlushedData("table1", mini_cluster_.get(), 3, 1)); + NO_FATALS(CreateTableWithFlushedData("table2", mini_cluster_.get(), 3, 1)); + int source_tserver_tablet_count = mini_cluster_->mini_tablet_server(0)->ListTablets().size(); + int target_tserver_tablet_count_before = mini_cluster_->mini_tablet_server(1) + ->ListTablets().size(); + string tablet_ids_str = JoinStrings(mini_cluster_->mini_tablet_server(0)->ListTablets(), ","); + string source_tserver_rpc_addr = mini_cluster_->mini_tablet_server(0) + ->bound_rpc_addr().ToString(); + string wal_dir = mini_cluster_->mini_tablet_server(1)->options()->fs_opts.wal_root; + string data_dirs = JoinStrings(mini_cluster_->mini_tablet_server(1) + ->options()->fs_opts.data_roots, ","); + NO_FATALS(mini_cluster_->mini_tablet_server(1)->Shutdown()); + // Copy tablet replicas from tserver0 to tserver1. + NO_FATALS(RunActionStdoutNone( + Substitute("local_replica copy_from_remote $0 $1 " + "-fs_data_dirs=$2 -fs_wal_dir=$3 -num_threads=3", + tablet_ids_str, + source_tserver_rpc_addr, + data_dirs, + wal_dir))); + NO_FATALS(mini_cluster_->mini_tablet_server(1)->Start()); + const vector<string>& target_tablet_ids = mini_cluster_->mini_tablet_server(1)->ListTablets(); + ASSERT_EQ(source_tserver_tablet_count + target_tserver_tablet_count_before, + target_tablet_ids.size()); + for (string tablet_id : mini_cluster_->mini_tablet_server(0)->ListTablets()) { + ASSERT_TRUE(find(target_tablet_ids.begin(), target_tablet_ids.end(), tablet_id) + != target_tablet_ids.end()); + } +} + TEST_F(ToolTest, TestRebuildTserverByLocalReplicaCopy) { // Local copies are not supported on encrypted severs at this time. if (FLAGS_encrypt_data_at_rest) { diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc index 82cc0cfff..afe405837 100644 --- a/src/kudu/tools/tool_action_local_replica.cc +++ b/src/kudu/tools/tool_action_local_replica.cc @@ -168,11 +168,13 @@ using kudu::tablet::TabletMetadata; using kudu::tablet::TabletReplica; using kudu::tserver::LocalTabletCopyClient; using kudu::tserver::RemoteTabletCopyClient; +using kudu::tserver::TabletCopyClient; using kudu::tserver::TabletCopyClientMetrics; using kudu::tserver::TSTabletManager; using std::cout; using std::endl; using std::map; +using std::move; using std::pair; using std::shared_ptr; using std::set; @@ -206,6 +208,160 @@ string Indent(int indent) { } } // anonymous namespace +class TabletCopier { + public: + TabletCopier(set<string> tablet_ids_to_copy, + FsManager* dst_fs_manager, + scoped_refptr<ConsensusMetadataManager> dst_cmeta_manager, + HostPort source_addr) : + tablet_ids_to_copy_(move(tablet_ids_to_copy)), + dst_fs_manager_(dst_fs_manager), + dst_cmeta_manager_(move(dst_cmeta_manager)), + source_addr_(move(source_addr)), + copy_type_(CopyType::FROM_REMOTE) { + } + + TabletCopier(set<string> tablet_ids_to_copy, + FsManager* dst_fs_manager, + scoped_refptr<ConsensusMetadataManager> dst_cmeta_manager, + FsManager* src_fs_manager, + set<string> src_tablet_ids_set) : + tablet_ids_to_copy_(move(tablet_ids_to_copy)), + dst_fs_manager_(dst_fs_manager), + dst_cmeta_manager_(move(dst_cmeta_manager)), + src_fs_manager_(move(src_fs_manager)), + src_tablet_ids_set_(move(src_tablet_ids_set)), + copy_type_(CopyType::FROM_LOCAL) { + } + + ~TabletCopier() = default; + + Status CopyTablets() { + // Prepare to check copy progress. + int total_tablet_count = tablet_ids_to_copy_.size(); + // 'lock' is used for protecting 'copying_replicas', 'failed_tablet_ids' + // and 'succeed_tablet_count'. + simple_spinlock lock; + set<TabletReplica*> copying_replicas; + set<string> failed_tablet_ids; + int succeed_tablet_count = 0; + if (copy_type_ == CopyType::FROM_LOCAL) { + for (auto tablet_id = tablet_ids_to_copy_.begin(); + tablet_id != tablet_ids_to_copy_.end();) { + if (!ContainsKey(src_tablet_ids_set_, *tablet_id)) { + LOG(ERROR) << Substitute("Tablet $0 copy failed: not found in source filesystem.", + *tablet_id); + InsertOrDie(&failed_tablet_ids, *tablet_id); + tablet_id = tablet_ids_to_copy_.erase(tablet_id); + } else { + tablet_id++; + } + } + } + + // Create a thread to obtain copy process periodically. + CountDownLatch latch(1); + scoped_refptr<Thread> check_thread; + RETURN_NOT_OK(Thread::Create("tool-tablet-copy", "check-progress", + [&] () { + while (!latch.WaitFor(MonoDelta::FromSeconds(10))) { + std::lock_guard<simple_spinlock> l(lock); + for (const auto& copying_replica : copying_replicas) { + LOG(INFO) << Substitute("Tablet $0 copy status: $1", + copying_replica->tablet_id(), + copying_replica->last_status()); + } + } + }, &check_thread)); + + // Init TabletCopyClientMetrics. + MetricRegistry metric_registry; + scoped_refptr<MetricEntity> metric_entity( + METRIC_ENTITY_server.Instantiate(&metric_registry, "tool-tablet-copy")); + TabletCopyClientMetrics tablet_copy_client_metrics(metric_entity); + + // Create a thread pool to copy tablets. + std::unique_ptr<ThreadPool> copy_pool; + ThreadPoolBuilder("tool-tablet-copy-pool") + .set_max_threads(FLAGS_num_threads) + .set_min_threads(FLAGS_num_threads) + .Build(©_pool); + + shared_ptr<Messenger> messenger; + RETURN_NOT_OK(BuildMessenger("tablet_copy_client", &messenger)); + // Start to copy tablets. + for (const auto& tablet_id : tablet_ids_to_copy_) { + RETURN_NOT_OK(copy_pool->Submit([&]() { + // 'fake_replica' is used for checking copy progress only. + scoped_refptr<TabletReplica> fake_replica(new TabletReplica()); + { + std::lock_guard<simple_spinlock> l(lock); + LOG(WARNING) << "Start to copy tablet " << tablet_id; + InsertOrDie(©ing_replicas, fake_replica.get()); + } + Status s; + unique_ptr<TabletCopyClient> client; + if (copy_type_ == CopyType::FROM_REMOTE) { + client.reset(new RemoteTabletCopyClient(tablet_id, dst_fs_manager_, dst_cmeta_manager_, + messenger, &tablet_copy_client_metrics)); + s = client->Start(source_addr_, nullptr); + } else { + CHECK_EQ(copy_type_, CopyType::FROM_LOCAL); + client.reset(new LocalTabletCopyClient(tablet_id, dst_fs_manager_, + dst_cmeta_manager_, /* messenger */ nullptr, + &tablet_copy_client_metrics, src_fs_manager_, + /* tablet_copy_source_metrics */ nullptr)); + s = client->Start(tablet_id, /* meta */ nullptr); + } + s = s.AndThen([&] { + return client->FetchAll(fake_replica); + }).AndThen([&] { + return client->Finish(); + }); + { + std::lock_guard<simple_spinlock> l(lock); + if (!s.ok()) { + InsertOrDie(&failed_tablet_ids, tablet_id); + LOG(ERROR) << Substitute("Tablet $0 copy failed: $1.", tablet_id, s.ToString()); + } else { + succeed_tablet_count++; + LOG(INFO) << Substitute("Tablet $0 copy succeed.", tablet_id); + } + copying_replicas.erase(fake_replica.get()); + + LOG(INFO) << Substitute("$0/$1 tablets, $2 bytes copied, include $3 failed tablets.", + succeed_tablet_count + failed_tablet_ids.size(), + total_tablet_count, + tablet_copy_client_metrics.bytes_fetched->value(), + failed_tablet_ids.size()); + } + return Status::OK(); + })); + } + + copy_pool->Wait(); + copy_pool->Shutdown(); + latch.CountDown(); + check_thread->Join(); + + return Status::OK(); + } + + private: + enum CopyType { + FROM_LOCAL, + FROM_REMOTE, + }; + + set<string> tablet_ids_to_copy_; + FsManager* dst_fs_manager_; + scoped_refptr<consensus::ConsensusMetadataManager> dst_cmeta_manager_; + FsManager* src_fs_manager_; + const set<string> src_tablet_ids_set_; + const HostPort source_addr_; + CopyType copy_type_; +}; + Status FsInit(bool skip_block_manager, unique_ptr<FsManager>* fs_manager) { FsManagerOpts fs_opts; fs_opts.read_only = true; @@ -423,23 +579,21 @@ Status SetRaftTerm(const RunnerContext& context) { Status CopyFromRemote(const RunnerContext& context) { // Parse the tablet ID and source arguments. - const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg); - const string& rpc_address = FindOrDie(context.required_args, "source"); + const string& tablet_ids_str = FindOrDie(context.required_args, kTabletIdsCsvArg); + set<string> tablet_ids_to_copy = Split(tablet_ids_str, ",", strings::SkipWhitespace()); + if (tablet_ids_to_copy.empty()) + return Status::InvalidArgument("no tablet identifiers provided"); + const string& rpc_address = FindOrDie(context.required_args, "source"); HostPort hp; RETURN_NOT_OK(ParseHostPortString(rpc_address, &hp)); - // Copy the tablet over. FsManager fs_manager(Env::Default(), FsManagerOpts()); RETURN_NOT_OK(fs_manager.Open()); scoped_refptr<ConsensusMetadataManager> cmeta_manager(new ConsensusMetadataManager(&fs_manager)); - shared_ptr<Messenger> messenger; - RETURN_NOT_OK(BuildMessenger("tablet_copy_client", &messenger)); - RemoteTabletCopyClient client(tablet_id, &fs_manager, cmeta_manager, - messenger, nullptr /* no metrics */); - RETURN_NOT_OK(client.Start(hp, nullptr)); - RETURN_NOT_OK(client.FetchAll(nullptr)); - return client.Finish(); + + TabletCopier copier(move(tablet_ids_to_copy), &fs_manager, move(cmeta_manager), move(hp)); + return copier.CopyTablets(); } Status CopyFromLocal(const RunnerContext& context) { @@ -472,104 +626,12 @@ Status CopyFromLocal(const RunnerContext& context) { RETURN_NOT_OK(src_fs_manager.ListTabletIds(&src_tablet_ids)); set<string> src_tablet_ids_set(src_tablet_ids.begin(), src_tablet_ids.end()); - // Prepare to check copy progress. - int total_tablet_count = tablet_ids_to_copy.size(); - // 'lock' is used for protecting 'copying_replicas', 'failed_tablet_ids' - // and 'succeed_tablet_count'. - simple_spinlock lock; - set<TabletReplica*> copying_replicas; - set<string> failed_tablet_ids; - int succeed_tablet_count = 0; - for (auto tablet_id = tablet_ids_to_copy.begin(); - tablet_id != tablet_ids_to_copy.end();) { - if (!ContainsKey(src_tablet_ids_set, *tablet_id)) { - LOG(ERROR) << Substitute("Tablet $0 copy failed: not found in source filesystem.", - *tablet_id); - InsertOrDie(&failed_tablet_ids, *tablet_id); - tablet_id = tablet_ids_to_copy.erase(tablet_id); - } else { - tablet_id++; - } - } - - // Create a thread to obtain copy process periodically. - CountDownLatch latch(1); - scoped_refptr<Thread> check_thread; - RETURN_NOT_OK(Thread::Create("tool-tablet-copy", "check-progress", - [&] () { - while (!latch.WaitFor(MonoDelta::FromSeconds(10))) { - std::lock_guard<simple_spinlock> l(lock); - for (const auto& copying_replica : copying_replicas) { - LOG(INFO) << Substitute("Tablet $0 copy status: $1", - copying_replica->tablet_id(), - copying_replica->last_status()); - } - } - }, &check_thread)); - - // Init TabletCopyClientMetrics. - MetricRegistry metric_registry; - scoped_refptr<MetricEntity> metric_entity( - METRIC_ENTITY_server.Instantiate(&metric_registry, "tool-tablet-copy")); - TabletCopyClientMetrics tablet_copy_client_metrics(metric_entity); - - // Create a thread pool to copy tablets. - std::unique_ptr<ThreadPool> copy_pool; - ThreadPoolBuilder("tool-tablet-copy-pool") - .set_max_threads(FLAGS_num_threads) - .set_min_threads(FLAGS_num_threads) - .Build(©_pool); - - // Start to copy tablets. - for (const auto& tablet_id : tablet_ids_to_copy) { - RETURN_NOT_OK(copy_pool->Submit([&]() { - // 'fake_replica' is used for checking copy progress only. - scoped_refptr<TabletReplica> fake_replica(new TabletReplica()); - { - std::lock_guard<simple_spinlock> l(lock); - LOG(WARNING) << "Start to copy tablet " << tablet_id; - InsertOrDie(©ing_replicas, fake_replica.get()); - } - - LocalTabletCopyClient client(tablet_id, - &dst_fs_manager, - dst_cmeta_manager, - /* messenger */ nullptr, - &tablet_copy_client_metrics, - &src_fs_manager, - /* tablet_copy_source_metrics */ nullptr); - Status s = client.Start(tablet_id, /* meta */ nullptr).AndThen([&] { - return client.FetchAll(fake_replica); - }).AndThen([&] { - return client.Finish(); - }); - - { - std::lock_guard<simple_spinlock> l(lock); - if (!s.ok()) { - InsertOrDie(&failed_tablet_ids, tablet_id); - LOG(ERROR) << Substitute("Tablet $0 copy failed: $1.", tablet_id, s.ToString()); - } else { - succeed_tablet_count++; - LOG(INFO) << Substitute("Tablet $0 copy succeed.", tablet_id); - } - copying_replicas.erase(fake_replica.get()); - - LOG(INFO) << Substitute("$0/$1 tablets, $2 bytes copied, include $3 failed tablets.", - succeed_tablet_count + failed_tablet_ids.size(), - total_tablet_count, - tablet_copy_client_metrics.bytes_fetched->value(), - failed_tablet_ids.size()); - } - })); - } - - copy_pool->Wait(); - copy_pool->Shutdown(); - latch.CountDown(); - check_thread->Join(); - - return Status::OK(); + TabletCopier copier(move(tablet_ids_to_copy), + &dst_fs_manager, + move(dst_cmeta_manager), + &src_fs_manager, + move(src_tablet_ids_set)); + return copier.CopyTablets(); } Status DeleteLocalReplica(const string& tablet_id, @@ -1109,14 +1171,15 @@ unique_ptr<Mode> BuildLocalReplicaMode() { unique_ptr<Action> copy_from_remote = ActionBuilder("copy_from_remote", &CopyFromRemote) - .Description("Copy a tablet replica from a remote server") - .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc }) + .Description("Copy tablet replicas from a remote server") + .AddRequiredParameter({ kTabletIdsCsvArg, kTabletIdsCsvArgDesc }) .AddRequiredParameter({ "source", "Source RPC address of " "form hostname:port" }) .AddOptionalParameter("fs_data_dirs") .AddOptionalParameter("fs_metadata_dir") .AddOptionalParameter("fs_wal_dir") .AddOptionalParameter("tablet_copy_download_threads_nums_per_session") + .AddOptionalParameter("num_threads") .Build(); unique_ptr<Action> copy_from_local =