This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new adc1cc64e [tools] update 'kudu table copy' CLI tool adc1cc64e is described below commit adc1cc64e3fa26227b1400ec5a7ede870f03d39d Author: Alexey Serbin <ale...@apache.org> AuthorDate: Mon Mar 6 19:09:34 2023 -0800 [tools] update 'kudu table copy' CLI tool This patch adds 'insert_ignore' and 'upsert_ignore' write operations for the 'kudu table copy' CLI tool. It also updates the implementation of TableScanner::CopyTask() and TableScanner::AddRow() to perform less string comparisons. Change-Id: I8061f510710a30019db62627c91fb4caf4a13d27 Reviewed-on: http://gerrit.cloudera.org:8080/19595 Tested-by: Kudu Jenkins Reviewed-by: Yingchun Lai <laiyingc...@apache.org> --- src/kudu/tools/kudu-tool-test.cc | 91 +++++++++++++++++++++++++++---------- src/kudu/tools/table_scanner.cc | 97 ++++++++++++++++++++++++++++++---------- src/kudu/tools/table_scanner.h | 17 +++---- 3 files changed, 146 insertions(+), 59 deletions(-) diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 7714dcad6..92719b6f1 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -514,10 +514,12 @@ class ToolTest : public KuduTest { } enum class TableCopyMode { - INSERT_TO_EXIST_TABLE = 0, - INSERT_TO_NOT_EXIST_TABLE = 1, - UPSERT_TO_EXIST_TABLE = 2, + INSERT_TO_EXISTING_TABLE = 0, + INSERT_TO_NEW_TABLE = 1, + UPSERT_TO_EXISTING_TABLE = 2, COPY_SCHEMA_ONLY = 3, + INSERT_IGNORE_TO_EXISTING_TABLE = 4, + UPSERT_IGNORE_TO_EXISTING_TABLE = 5, }; struct RunCopyTableCheckArgs { @@ -532,7 +534,7 @@ class ToolTest : public KuduTest { }; void RunCopyTableCheck(const RunCopyTableCheckArgs& args) { - const string kDstTableName = "kudu.table.copy.to"; + static constexpr const char* const kDstTableName = "kudu.table.copy.to"; // Prepare command flags, create destination table and write some data if needed. string write_type; @@ -541,24 +543,49 @@ class ToolTest : public KuduTest { ww.set_table_name(kDstTableName); ww.set_num_replicas(1); switch (args.mode) { - case TableCopyMode::INSERT_TO_EXIST_TABLE: + case TableCopyMode::INSERT_TO_EXISTING_TABLE: write_type = "insert"; create_table = "false"; // Create the dst table. ww.set_num_write_threads(0); ww.Setup(); break; - case TableCopyMode::INSERT_TO_NOT_EXIST_TABLE: + case TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE: + write_type = "insert_ignore"; + create_table = "false"; + // Create the dst table and write some data to it. + ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); + ww.set_num_write_threads(1); + ww.Setup(); + ww.Start(); + ASSERT_EVENTUALLY([&]() { + ASSERT_GE(ww.rows_inserted(), 100); + }); + ww.StopAndJoin(); + break; + case TableCopyMode::INSERT_TO_NEW_TABLE: write_type = "insert"; create_table = "true"; break; - case TableCopyMode::UPSERT_TO_EXIST_TABLE: + case TableCopyMode::UPSERT_TO_EXISTING_TABLE: write_type = "upsert"; create_table = "false"; // Create the dst table and write some data to it. ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); ww.set_num_write_threads(1); - ww.set_write_batch_size(1); + ww.Setup(); + ww.Start(); + ASSERT_EVENTUALLY([&]() { + ASSERT_GE(ww.rows_inserted(), 100); + }); + ww.StopAndJoin(); + break; + case TableCopyMode::UPSERT_IGNORE_TO_EXISTING_TABLE: + write_type = "upsert_ignore"; + create_table = "false"; + // Create the dst table and write some data to it. + ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); + ww.set_num_write_threads(1); ww.Setup(); ww.Start(); ASSERT_EVENTUALLY([&]() { @@ -628,7 +655,7 @@ class ToolTest : public KuduTest { } // Check schema equals when destination table is created automatically. - if (args.mode == TableCopyMode::INSERT_TO_NOT_EXIST_TABLE || + if (args.mode == TableCopyMode::INSERT_TO_NEW_TABLE || args.mode == TableCopyMode::COPY_SCHEMA_ONLY) { vector<string> src_schema; NO_FATALS(RunActionStdoutLines( @@ -698,20 +725,26 @@ class ToolTest : public KuduTest { ASSERT_GE(dst_lines.size(), 1); ASSERT_STR_CONTAINS(*dst_lines.rbegin(), "Total count 0 "); } else { - // Rows scanned from source table can be found in destination table. + // Rows scanned from the source table can be found in the destination table. set<string> sorted_dst_lines(dst_lines.begin(), dst_lines.end()); - for (auto src_line = src_lines.begin(); src_line != src_lines.end();) { - if (src_line->find("key") != string::npos) { - ASSERT_TRUE(ContainsKey(sorted_dst_lines, *src_line)); - sorted_dst_lines.erase(*src_line); + for (auto src_line_it = src_lines.begin(); src_line_it != src_lines.end();) { + if (src_line_it->find("key") != string::npos) { + if (args.mode != TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE) { + ASSERT_TRUE(ContainsKey(sorted_dst_lines, *src_line_it)) << *src_line_it; + } + sorted_dst_lines.erase(*src_line_it); } - src_line = src_lines.erase(src_line); + src_line_it = src_lines.erase(src_line_it); } - // Under all modes except UPSERT_TO_EXIST_TABLE, destination table is empty before - // copying, that means destination table should have no more rows than source table - // after copying. - if (args.mode != TableCopyMode::UPSERT_TO_EXIST_TABLE) { + // Under all modes except for UPSERT_TO_EXISTING_TABLE, + // INSERT_IGNORE_TO_EXISTING_TABLE, UPSERT_IGNORE_TO_EXISTING_TABLE, + // the destination table is empty before copying. That means the + // destination table should not have more rows than the source table + // after copying is complete. + if (args.mode != TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE && + args.mode != TableCopyMode::UPSERT_TO_EXISTING_TABLE && + args.mode != TableCopyMode::UPSERT_IGNORE_TO_EXISTING_TABLE) { for (const auto& dst_line : sorted_dst_lines) { ASSERT_STR_NOT_CONTAINS(dst_line, "key"); } @@ -813,7 +846,9 @@ INSTANTIATE_TEST_SUITE_P(ToolTestKerberosParameterized, ToolTestKerberosParamete enum RunCopyTableCheckArgsType { kTestCopyTableDstTableExist, kTestCopyTableDstTableNotExist, + kTestCopyTableInsertIgnore, kTestCopyTableUpsert, + kTestCopyTableUpsertIgnore, kTestCopyTableSchemaOnly, kTestCopyTableComplexSchema, kTestCopyUnpartitionedTable, @@ -879,16 +914,22 @@ class ToolTestCopyTableParameterized : 1, total_rows_, kSimpleSchemaColumns, - TableCopyMode::INSERT_TO_EXIST_TABLE, + TableCopyMode::INSERT_TO_EXISTING_TABLE, -1 }; switch (test_case_) { case kTestCopyTableDstTableExist: return { args }; case kTestCopyTableDstTableNotExist: - args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE; + args.mode = TableCopyMode::INSERT_TO_NEW_TABLE; + return { args }; + case kTestCopyTableInsertIgnore: + args.mode = TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE; return { args }; case kTestCopyTableUpsert: - args.mode = TableCopyMode::UPSERT_TO_EXIST_TABLE; + args.mode = TableCopyMode::UPSERT_TO_EXISTING_TABLE; + return { args }; + case kTestCopyTableUpsertIgnore: + args.mode = TableCopyMode::UPSERT_IGNORE_TO_EXISTING_TABLE; return { args }; case kTestCopyTableSchemaOnly: { args.mode = TableCopyMode::COPY_SCHEMA_ONLY; @@ -911,7 +952,7 @@ class ToolTestCopyTableParameterized : } case kTestCopyTableComplexSchema: { args.columns = kComplexSchemaColumns; - args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE; + args.mode = TableCopyMode::INSERT_TO_NEW_TABLE; vector<RunCopyTableCheckArgs> multi_args; { auto args_temp = args; @@ -951,7 +992,7 @@ class ToolTestCopyTableParameterized : return multi_args; } case kTestCopyUnpartitionedTable: { - args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE; + args.mode = TableCopyMode::INSERT_TO_NEW_TABLE; vector<RunCopyTableCheckArgs> multi_args; { auto args_temp = args; @@ -1176,7 +1217,9 @@ INSTANTIATE_TEST_SUITE_P(CopyTableParameterized, ToolTestCopyTableParameterized, ::testing::Values(kTestCopyTableDstTableExist, kTestCopyTableDstTableNotExist, + kTestCopyTableInsertIgnore, kTestCopyTableUpsert, + kTestCopyTableUpsertIgnore, kTestCopyTableSchemaOnly, kTestCopyTableComplexSchema, kTestCopyUnpartitionedTable, diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc index c7ea9744c..8178c7379 100644 --- a/src/kudu/tools/table_scanner.cc +++ b/src/kudu/tools/table_scanner.cc @@ -129,9 +129,12 @@ DEFINE_bool(report_scanner_stats, false, DEFINE_bool(show_values, false, "Whether to show values of scanned rows."); DEFINE_string(write_type, "insert", - "How data should be copied to the destination table. Valid values are 'insert', " - "'upsert' or the empty string. If the empty string, data will not be copied " - "(useful when --create_table=true)."); + "Write operation type to use when populating the destination " + "table with the rows from the source table. Choose from " + "'insert', 'insert_ignore', 'upsert', 'upsert_ignore', or an " + "empty string. Empty string means the data isn't going to be " + "copied, which is useful with --create_table=true when just " + "creating the destination table without copying the data."); DEFINE_string(replica_selection, "CLOSEST", "Replica selection for scan operations. Acceptable values are: " "CLOSEST, LEADER (maps into KuduClient::CLOSEST_REPLICA and " @@ -165,9 +168,20 @@ bool IsFlagValueAcceptable(const char* flag_name, return false; } +constexpr const char* const kWriteTypeInsert = "insert"; +constexpr const char* const kWriteTypeInsertIgnore = "insert_ignore"; +constexpr const char* const kWriteTypeUpsert = "upsert"; +constexpr const char* const kWriteTypeUpsertIgnore = "upsert_ignore"; + bool ValidateWriteType(const char* flag_name, const string& flag_value) { - static const vector<string> kWriteTypes = { "insert", "upsert", "" }; + static const vector<string> kWriteTypes = { + "", + kWriteTypeInsert, + kWriteTypeInsertIgnore, + kWriteTypeUpsert, + kWriteTypeUpsertIgnore, + }; return IsFlagValueAcceptable(flag_name, flag_value, kWriteTypes); } @@ -535,7 +549,7 @@ Status CreateDstTableIfNeeded(const client::sp::shared_ptr<KuduTable>& src_table return Status::OK(); } -void CheckPendingErrors(const client::sp::shared_ptr<KuduSession>& session) { +void CheckPendingErrors(KuduSession* session) { vector<KuduError*> errors; ElementDeleter d(&errors); session->GetPendingErrors(&errors, nullptr); @@ -623,9 +637,24 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread } while (0) DCHECK(thread_status); + KuduWriteOperation::Type op_type; + const auto& op_type_str = FLAGS_write_type; + if (op_type_str == kWriteTypeInsert) { + op_type = KuduWriteOperation::INSERT; + } else if (op_type_str == kWriteTypeInsertIgnore) { + op_type = KuduWriteOperation::INSERT_IGNORE; + } else if (op_type_str == kWriteTypeUpsert) { + op_type = KuduWriteOperation::UPSERT; + } else if (op_type_str == kWriteTypeUpsertIgnore) { + op_type = KuduWriteOperation::UPSERT_IGNORE; + } else { + *thread_status = Status::InvalidArgument(Substitute( + "invalid write operation type: $0", op_type_str)); + return; + } + client::sp::shared_ptr<KuduTable> dst_table; TASK_RET_NOT_OK((*dst_client_)->OpenTable(*dst_table_name_, &dst_table)); - const auto& dst_table_schema = dst_table->schema(); // One session per thread. client::sp::shared_ptr<KuduSession> session((*dst_client_)->NewSession()); @@ -633,13 +662,22 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread TASK_RET_NOT_OK(session->SetErrorBufferSpace(1024 * 1024)); session->SetTimeoutMillis(FLAGS_timeout_ms); - *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) { + // The callback's lambda of ScanData() keeps references to the session and + // the destination table objects, making sure they are alive when the callback + // is invoked. + *thread_status = ScanData(tokens, [table = std::move(dst_table), + session = std::move(session), + op_type] (const KuduScanBatch& batch) { + auto* s_ptr = session.get(); + auto* t_ptr = table.get(); for (const auto& row : batch) { - RETURN_NOT_OK(AddRow(dst_table, dst_table_schema, row, session)); + RETURN_NOT_OK(AddRow(s_ptr, t_ptr, row, op_type)); } - // Flush here to make sure all write operations have been sent. - auto s = session->Flush(); - CheckPendingErrors(session); + // Flush the session to make sure all write operations have been sent + // to the server. If any error happens, CheckPendingErrors() will report + // on them. + auto s = s_ptr->Flush(); + CheckPendingErrors(s_ptr); return s; }); @@ -791,23 +829,34 @@ Status TableScanner::StartCopy() { return StartWork(WorkType::kCopy); } -Status TableScanner::AddRow(const client::sp::shared_ptr<KuduTable>& table, - const KuduSchema& table_schema, +Status TableScanner::AddRow(KuduSession* session, + KuduTable* table, const KuduScanBatch::RowPtr& src_row, - const client::sp::shared_ptr<KuduSession>& session) { + KuduWriteOperation::Type write_op_type) { unique_ptr<KuduWriteOperation> write_op; - if (FLAGS_write_type == "insert") { - write_op.reset(table->NewInsert()); - } else if (FLAGS_write_type == "upsert") { - write_op.reset(table->NewUpsert()); - } else { - LOG(FATAL) << Substitute("invalid write_type: $0", FLAGS_write_type); + switch (write_op_type) { + case KuduWriteOperation::INSERT: + write_op.reset(table->NewInsert()); + break; + case KuduWriteOperation::INSERT_IGNORE: + write_op.reset(table->NewInsertIgnore()); + break; + case KuduWriteOperation::UPSERT: + write_op.reset(table->NewUpsert()); + break; + case KuduWriteOperation::UPSERT_IGNORE: + write_op.reset(table->NewUpsertIgnore()); + break; + default: + return Status::InvalidArgument( + Substitute("unexpected op type: $0", write_op_type)); + break; // unreachable } - KuduPartialRow* dst_row = write_op->mutable_row(); - size_t row_size = ContiguousRowHelper::row_size(*src_row.schema_); - memcpy(dst_row->row_data_, src_row.row_data_, row_size); - BitmapChangeBits(dst_row->isset_bitmap_, 0, table_schema.num_columns(), true); + auto* dst_row = write_op->mutable_row(); + memcpy(dst_row->row_data_, src_row.row_data_, + ContiguousRowHelper::row_size(*src_row.schema_)); + BitmapChangeBits(dst_row->isset_bitmap_, 0, table->schema().num_columns(), true); return session->Apply(write_op.release()); } diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h index b8608c64b..13dd39d39 100644 --- a/src/kudu/tools/table_scanner.h +++ b/src/kudu/tools/table_scanner.h @@ -21,24 +21,20 @@ #include <cstdint> #include <functional> #include <iosfwd> -#include <optional> #include <memory> +#include <optional> #include <string> #include <vector> #include "kudu/client/client.h" #include "kudu/client/scan_batch.h" #include "kudu/client/shared_ptr.h" // IWYU pragma: keep +#include "kudu/client/write_op.h" #include "kudu/util/mutex.h" #include "kudu/util/status.h" #include "kudu/util/threadpool.h" namespace kudu { - -namespace client { -class KuduSchema; -} // namespace client - namespace tools { // This class is not thread-safe. @@ -78,11 +74,10 @@ class TableScanner { kCopy }; - static Status AddRow( - const client::sp::shared_ptr<client::KuduTable>& table, - const client::KuduSchema& table_schema, - const client::KuduScanBatch::RowPtr& src_row, - const client::sp::shared_ptr<client::KuduSession>& session); + static Status AddRow(client::KuduSession* session, + client::KuduTable* table, + const client::KuduScanBatch::RowPtr& src_row, + client::KuduWriteOperation::Type write_op_type); // Convert replica selection from string into the KuduClient::ReplicaSelection // enumerator.