This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 8a58c5838 [tools] exit gracefully on errors during table copying 8a58c5838 is described below commit 8a58c5838ec634146a0af46cdb47d4db404aef0a Author: Alexey Serbin <ale...@apache.org> AuthorDate: Thu Mar 2 20:58:43 2023 -0800 [tools] exit gracefully on errors during table copying Prior to this patch, an error to write a batch of rows into the destination table would lead to a crash while running `kudu table copy` CLI tool. In addition, the information on errors would not be printed because CheckPendingErrors() was called before Session::Flush(). This patch updates addresses these issues: * no crashes for non-OK status within TableScanner::CopyTask() * information on errors encountered during Session::Flush() is printed into the output stream This is a follow-up to 28f8e972fcd042a26a502cbb1f1102c487c9398d. Change-Id: I90dc29b5a3fb8334ec1f54425b84d13faeb19cd5 Reviewed-on: http://gerrit.cloudera.org:8080/19575 Reviewed-by: Yingchun Lai <laiyingc...@apache.org> Tested-by: Alexey Serbin <ale...@apache.org> --- src/kudu/tools/table_scanner.cc | 78 +++++++++++++++++++++++++---------------- src/kudu/tools/table_scanner.h | 4 +-- 2 files changed, 50 insertions(+), 32 deletions(-) diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc index 614cce862..c7ea9744c 100644 --- a/src/kudu/tools/table_scanner.cc +++ b/src/kudu/tools/table_scanner.cc @@ -21,6 +21,7 @@ #include <cstddef> #include <cstdint> #include <cstring> +#include <functional> #include <iomanip> #include <iostream> #include <iterator> @@ -77,6 +78,7 @@ using kudu::client::KuduValue; using kudu::client::KuduWriteOperation; using kudu::iequals; using std::endl; +using std::function; using std::map; using std::nullopt; using std::optional; @@ -557,9 +559,9 @@ TableScanner::TableScanner( CHECK_OK(SetReplicaSelection(FLAGS_replica_selection)); } -Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens, - const std::function<void(const KuduScanBatch& batch)>& cb) { - for (auto token : tokens) { +Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens, + const function<Status(const KuduScanBatch& batch)>& cb) { + for (const auto* token : tokens) { Stopwatch sw(Stopwatch::THIS_THREAD); sw.start(); @@ -577,7 +579,7 @@ Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& t count += batch.NumRows(); total_count_ += batch.NumRows(); ++next_batch_calls; - cb(batch); + RETURN_NOT_OK(cb(batch)); } sw.stop(); @@ -598,6 +600,7 @@ Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& t } void TableScanner::ScanTask(const vector<KuduScanToken*>& tokens, Status* thread_status) { + DCHECK(thread_status); *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) { if (out_ && FLAGS_show_values) { MutexLock l(output_lock_); @@ -606,29 +609,41 @@ void TableScanner::ScanTask(const vector<KuduScanToken*>& tokens, Status* thread } out_->flush(); } + return Status::OK(); }); } void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread_status) { +#define TASK_RET_NOT_OK(s) do { \ + const Status& _s = (s); \ + if (PREDICT_FALSE(!_s.ok())) {\ + *thread_status = _s; \ + return; \ + } \ + } while (0) + + DCHECK(thread_status); client::sp::shared_ptr<KuduTable> dst_table; - CHECK_OK((*dst_client_)->OpenTable(*dst_table_name_, &dst_table)); - const KuduSchema& dst_table_schema = dst_table->schema(); + TASK_RET_NOT_OK((*dst_client_)->OpenTable(*dst_table_name_, &dst_table)); + const auto& dst_table_schema = dst_table->schema(); // One session per thread. client::sp::shared_ptr<KuduSession> session((*dst_client_)->NewSession()); - CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); - CHECK_OK(session->SetErrorBufferSpace(1024)); + TASK_RET_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); + TASK_RET_NOT_OK(session->SetErrorBufferSpace(1024 * 1024)); session->SetTimeoutMillis(FLAGS_timeout_ms); *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) { for (const auto& row : batch) { - CHECK_OK(AddRow(dst_table, dst_table_schema, row, session)); + RETURN_NOT_OK(AddRow(dst_table, dst_table_schema, row, session)); } + // Flush here to make sure all write operations have been sent. + auto s = session->Flush(); CheckPendingErrors(session); - // Flush here to make sure all write operations have been sent, - // and all strings reference to batch are still valid. - CHECK_OK(session->Flush()); + return s; }); + +#undef TASK_RET_NOT_OK } void TableScanner::SetOutput(ostream* out) { @@ -650,12 +665,12 @@ void TableScanner::SetScanBatchSize(int32_t scan_batch_size) { scan_batch_size_ = scan_batch_size; } -Status TableScanner::StartWork(WorkType type) { +Status TableScanner::StartWork(WorkType work_type) { client::sp::shared_ptr<KuduTable> src_table; RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table)); // Create destination table if needed. - if (type == WorkType::kCopy) { + if (work_type == WorkType::kCopy) { RETURN_NOT_OK(CreateDstTableIfNeeded(src_table, *dst_client_, *dst_table_name_)); if (FLAGS_write_type.empty()) { // Create table only. @@ -686,7 +701,7 @@ Status TableScanner::StartWork(WorkType type) { } // Set projection if needed. - if (type == WorkType::kScan) { + if (work_type == WorkType::kScan) { const auto project_all = FLAGS_columns == "*" || FLAGS_columns.empty(); if (!project_all || FLAGS_row_count_only) { vector<string> projected_column_names; @@ -704,35 +719,36 @@ Status TableScanner::StartWork(WorkType type) { ElementDeleter deleter(&tokens); RETURN_NOT_OK(builder.Build(&tokens)); + const int num_threads = FLAGS_num_threads; + // Set tablet filter. const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace()); map<int, vector<KuduScanToken*>> thread_tokens; int i = 0; for (auto* token : tokens) { if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) { - thread_tokens[i++ % FLAGS_num_threads].emplace_back(token); + thread_tokens[i++ % num_threads].emplace_back(token); } } - // Initialize statuses for each thread. - vector<Status> thread_statuses(FLAGS_num_threads); - RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool") - .set_max_threads(FLAGS_num_threads) + .set_max_threads(num_threads) .set_idle_timeout(MonoDelta::FromMilliseconds(1)) .Build(&thread_pool_)); - Status end_status = Status::OK(); + // Initialize statuses for each thread. + vector<Status> thread_statuses(num_threads); + Stopwatch sw(Stopwatch::THIS_THREAD); sw.start(); - for (i = 0; i < FLAGS_num_threads; ++i) { + for (i = 0; i < num_threads; ++i) { auto* t_tokens = &thread_tokens[i]; auto* t_status = &thread_statuses[i]; - if (type == WorkType::kScan) { + if (work_type == WorkType::kScan) { RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]() { this->ScanTask(*t_tokens, t_status); })); } else { - CHECK(type == WorkType::kCopy); + DCHECK(work_type == WorkType::kCopy); RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]() { this->CopyTask(*t_tokens, t_status); })); } @@ -748,18 +764,20 @@ Status TableScanner::StartWork(WorkType type) { << " cost " << sw.elapsed().wall_seconds() << " seconds" << endl; } - for (i = 0; i < FLAGS_num_threads; ++i) { - if (!thread_statuses[i].ok()) { + const auto& operation = work_type == WorkType::kScan ? "Scanning" : "Copying"; + Status result_status; + for (const auto& s : thread_statuses) { + if (!s.ok()) { if (out_) { - *out_ << "Scanning failed " << thread_statuses[i].ToString() << endl; + *out_ << operation << " failed: " << s.ToString() << endl; } - if (end_status.ok()) { - end_status = thread_statuses[i]; + if (result_status.ok()) { + result_status = s; } } } - return end_status; + return result_status; } Status TableScanner::StartScan() { diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h index 440b62c64..b8608c64b 100644 --- a/src/kudu/tools/table_scanner.h +++ b/src/kudu/tools/table_scanner.h @@ -90,9 +90,9 @@ class TableScanner { const std::string& selection_str, client::KuduClient::ReplicaSelection* selection); - Status StartWork(WorkType type); + Status StartWork(WorkType work_type); Status ScanData(const std::vector<client::KuduScanToken*>& tokens, - const std::function<void(const client::KuduScanBatch& batch)>& cb); + const std::function<Status(const client::KuduScanBatch& batch)>& cb); void ScanTask(const std::vector<client::KuduScanToken*>& tokens, Status* thread_status); void CopyTask(const std::vector<client::KuduScanToken*>& tokens,