This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch branch-1.16.x in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.16.x by this push: new f5e06a5 [tool] fix a command bug, cmd: kudu wal dump ... f5e06a5 is described below commit f5e06a556905574ee56a948f969cf235b894b04c Author: shenxingwuying <shenxingwuy...@gmail.com> AuthorDate: Thu Jan 20 23:04:19 2022 +0800 [tool] fix a command bug, cmd: kudu wal dump ... kudu wal dump command will interrupt after reading alter schema entry. Change-Id: I27acc71597d038cafbbe687117bddb1ce16576c0 Reviewed-on: http://gerrit.cloudera.org:8080/18169 Tested-by: Kudu Jenkins Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com> Reviewed-by: Andrew Wong <aw...@cloudera.com> (cherry picked from commit 9f164b3c542a2e44c963c43488b4fc5b0d0f0f65) Reviewed-on: http://gerrit.cloudera.org:8080/18260 Tested-by: Alexey Serbin <aser...@cloudera.com> --- src/kudu/common/wire_protocol-test-util.h | 41 +++++- src/kudu/tools/kudu-tool-test.cc | 218 ++++++++++++++++++++++++++++++ src/kudu/tools/tool_action_common.cc | 14 +- 3 files changed, 269 insertions(+), 4 deletions(-) diff --git a/src/kudu/common/wire_protocol-test-util.h b/src/kudu/common/wire_protocol-test-util.h index a5a0412..2fcf03b 100644 --- a/src/kudu/common/wire_protocol-test-util.h +++ b/src/kudu/common/wire_protocol-test-util.h @@ -20,6 +20,7 @@ #include "kudu/common/wire_protocol.h" +#include <map> #include <string> #include "kudu/common/partial_row.h" @@ -35,6 +36,13 @@ inline Schema GetSimpleTestSchema() { 1); } +inline void RowAppendColumn(KuduPartialRow* row, + const std::map<std::string, std::string>& columns) { + for (const auto& column : columns) { + CHECK_OK(row->SetStringCopy(column.first.c_str(), column.second.c_str())); + } +} + inline void AddTestRowWithNullableStringToPB(RowOperationsPB::Type op_type, const Schema& schema, int32_t key, @@ -46,12 +54,32 @@ inline void AddTestRowWithNullableStringToPB(RowOperationsPB::Type op_type, CHECK_OK(row.SetInt32("key", key)); CHECK_OK(row.SetInt32("int_val", int_val)); if (string_val) { - CHECK_OK(row.SetStringCopy("string_val", string_val)); + RowAppendColumn(&row, {{"string_val", std::string(string_val)}}); } RowOperationsPBEncoder enc(ops); enc.Add(op_type, row); } +inline void AddTestRowWithNullableColumnsStringToPB( + RowOperationsPB::Type op_type, const Schema& schema, + int32_t key, int32_t int_val, const char* string_val, + const std::map<std::string, std::string>& columns, + RowOperationsPB* ops) { + DCHECK(schema.initialized()); + KuduPartialRow row(&schema); + CHECK_OK(row.SetInt32("key", key)); + CHECK_OK(row.SetInt32("int_val", int_val)); + if (string_val) { + RowAppendColumn(&row, {{"string_val", std::string(string_val)}}); + } + if (!columns.empty()) { + RowAppendColumn(&row, columns); + } + RowOperationsPBEncoder enc(ops); + enc.Add(op_type, row); +} + + inline void AddTestRowToPB(RowOperationsPB::Type op_type, const Schema& schema, int32_t key, @@ -61,6 +89,17 @@ inline void AddTestRowToPB(RowOperationsPB::Type op_type, AddTestRowWithNullableStringToPB(op_type, schema, key, int_val, string_val.c_str(), ops); } +inline void AddTestRowToPBAppendColumns(RowOperationsPB::Type op_type, + const Schema& schema, int32_t key, + int32_t int_val, + const std::string& string_val, + const std::map<std::string, std::string>& columns, + RowOperationsPB* ops) { + AddTestRowWithNullableColumnsStringToPB(op_type, schema, + key, int_val, string_val.c_str(), + columns, ops); +} + inline void AddTestKeyToPB(RowOperationsPB::Type op_type, const Schema& schema, int32_t key, diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 72898c2..2383b2f 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -2128,6 +2128,224 @@ TEST_F(ToolTest, TestWalDump) { } } +TEST_F(ToolTest, TestWalDumpWithAlterSchema) { + const string kTestDir = GetTestPath("test"); + const string kTestTablet = "ffffffffffffffffffffffffffffffff"; + Schema schema(GetSimpleTestSchema()); + Schema schema_with_ids(SchemaBuilder(schema).Build()); + + FsManager fs(env_, FsManagerOpts(kTestDir)); + ASSERT_OK(fs.CreateInitialFileSystemLayout()); + ASSERT_OK(fs.Open()); + + const std::string kFirstMessage("this is a test insert"); + const std::string kAddColumnName1("addcolumn1_addcolumn1"); + const std::string kAddColumnName2("addcolumn2_addcolumn2"); + const std::string kAddColumnName1Message("insert a record, after alter schema"); + const std::string kAddColumnName2Message("upsert a record, after alter schema"); + { + scoped_refptr<Log> log; + ASSERT_OK(Log::Open(LogOptions(), + &fs, + /*file_cache*/nullptr, + kTestTablet, + schema_with_ids, + 0, // schema_version + /*metric_entity*/nullptr, + &log)); + + std::vector<ReplicateRefPtr> replicates; + { + OpId opid = consensus::MakeOpId(1, 1); + ReplicateRefPtr replicate = + consensus::make_scoped_refptr_replicate(new ReplicateMsg()); + replicate->get()->set_op_type(consensus::WRITE_OP); + replicate->get()->mutable_id()->CopyFrom(opid); + replicate->get()->set_timestamp(1); + WriteRequestPB* write = replicate->get()->mutable_write_request(); + ASSERT_OK(SchemaToPB(schema, write->mutable_schema())); + AddTestRowToPB(RowOperationsPB::INSERT, schema, + opid.index(), 0, kFirstMessage, + write->mutable_row_operations()); + write->set_tablet_id(kTestTablet); + replicates.emplace_back(replicate); + } + { + OpId opid = consensus::MakeOpId(1, 2); + ReplicateRefPtr replicate = + consensus::make_scoped_refptr_replicate(new ReplicateMsg()); + replicate->get()->set_op_type(consensus::ALTER_SCHEMA_OP); + replicate->get()->mutable_id()->CopyFrom(opid); + replicate->get()->set_timestamp(2); + tserver::AlterSchemaRequestPB* alter_schema = + replicate->get()->mutable_alter_schema_request(); + SchemaBuilder schema_builder = SchemaBuilder(schema); + ASSERT_OK(schema_builder.AddColumn(kAddColumnName1, STRING, true, nullptr, nullptr)); + ASSERT_OK(schema_builder.AddColumn(kAddColumnName2, STRING, true, nullptr, nullptr)); + schema = schema_builder.BuildWithoutIds(); + schema_with_ids = SchemaBuilder(schema).Build(); + ASSERT_OK(SchemaToPB(schema_with_ids, alter_schema->mutable_schema())); + alter_schema->set_tablet_id(kTestTablet); + alter_schema->set_schema_version(1); + replicates.emplace_back(replicate); + } + { + OpId opid = consensus::MakeOpId(1, 3); + ReplicateRefPtr replicate = + consensus::make_scoped_refptr_replicate(new ReplicateMsg()); + replicate->get()->set_op_type(consensus::WRITE_OP); + replicate->get()->mutable_id()->CopyFrom(opid); + replicate->get()->set_timestamp(3); + WriteRequestPB* write = replicate->get()->mutable_write_request(); + ASSERT_OK(SchemaToPB(schema, write->mutable_schema())); + AddTestRowToPBAppendColumns(RowOperationsPB::INSERT, schema, + opid.index(), 2, kFirstMessage, + {{kAddColumnName1, kAddColumnName1Message}}, + write->mutable_row_operations()); + write->set_tablet_id(kTestTablet); + replicates.emplace_back(replicate); + } + { + OpId opid = consensus::MakeOpId(1, 4); + ReplicateRefPtr replicate = + consensus::make_scoped_refptr_replicate(new ReplicateMsg()); + replicate->get()->set_op_type(consensus::WRITE_OP); + replicate->get()->mutable_id()->CopyFrom(opid); + replicate->get()->set_timestamp(4); + WriteRequestPB* write = replicate->get()->mutable_write_request(); + ASSERT_OK(SchemaToPB(schema, write->mutable_schema())); + AddTestRowToPBAppendColumns(RowOperationsPB::UPSERT, schema, + 1, 1111, kFirstMessage, + { + {kAddColumnName1, kAddColumnName1Message}, + {kAddColumnName2, kAddColumnName2Message}, + }, + write->mutable_row_operations()); + write->set_tablet_id(kTestTablet); + replicates.emplace_back(replicate); + } + + Synchronizer s; + ASSERT_OK(log->AsyncAppendReplicates(replicates, s.AsStatusCallback())); + ASSERT_OK(s.Wait()); + } + + string wal_path = fs.GetWalSegmentFileName(kTestTablet, 1); + string stdout; + for (const auto& args : { Substitute("wal dump $0", wal_path), + Substitute("local_replica dump wals --fs_wal_dir=$0 $1", + kTestDir, kTestTablet) + }) { + SCOPED_TRACE(args); + for (const auto& print_entries : { "true", "1", "yes", "decoded" }) { + SCOPED_TRACE(print_entries); + NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=$1", + args, print_entries), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_MATCHES(stdout, "Header:"); + ASSERT_STR_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_MATCHES(stdout, "1\\.2@2"); + ASSERT_STR_MATCHES(stdout, "1\\.3@3"); + ASSERT_STR_MATCHES(stdout, "1\\.4@4"); + ASSERT_STR_MATCHES(stdout, kFirstMessage); + ASSERT_STR_MATCHES(stdout, kAddColumnName1); + ASSERT_STR_MATCHES(stdout, "ALTER_SCHEMA_OP"); + ASSERT_STR_MATCHES(stdout, kAddColumnName1Message); + ASSERT_STR_MATCHES(stdout, kAddColumnName2Message); + ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); + ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_MATCHES(stdout, "Footer:"); + } + for (const auto& print_entries : { "false", "0", "no" }) { + SCOPED_TRACE(print_entries); + NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=$1", + args, print_entries), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_MATCHES(stdout, "Header:"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.2@2"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.3@3"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.4@4"); + ASSERT_STR_NOT_MATCHES(stdout, kFirstMessage); + ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1); + ASSERT_STR_NOT_MATCHES(stdout, "ALTER_SCHEMA_OP"); + ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1Message); + ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName2Message); + ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); + ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_MATCHES(stdout, "Footer:"); + } + { + NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=pb", + args), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_MATCHES(stdout, "Header:"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.2@2"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.3@3"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.4@4"); + ASSERT_STR_MATCHES(stdout, kFirstMessage); + ASSERT_STR_MATCHES(stdout, kAddColumnName1); + ASSERT_STR_MATCHES(stdout, "ALTER_SCHEMA_OP"); + ASSERT_STR_MATCHES(stdout, kAddColumnName1Message); + ASSERT_STR_MATCHES(stdout, kAddColumnName2Message); + ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); + ASSERT_STR_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_MATCHES(stdout, "Footer:"); + } + { + NO_FATALS(RunActionStdoutString(Substitute( + "$0 --print_entries=pb --truncate_data=1", args), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_MATCHES(stdout, "Header:"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.2@2"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.3@3"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.4@4"); + ASSERT_STR_NOT_MATCHES(stdout, kFirstMessage); + ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1); + ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1Message); + ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName2Message); + ASSERT_STR_MATCHES(stdout, "t<truncated>"); + ASSERT_STR_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_MATCHES(stdout, "Footer:"); + } + { + NO_FATALS(RunActionStdoutString(Substitute( + "$0 --print_entries=id", args), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_MATCHES(stdout, "Header:"); + ASSERT_STR_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_MATCHES(stdout, "1\\.2@2"); + ASSERT_STR_MATCHES(stdout, "1\\.3@3"); + ASSERT_STR_MATCHES(stdout, "1\\.4@4"); + ASSERT_STR_NOT_MATCHES(stdout, kFirstMessage); + ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1); + ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1Message); + ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName2Message); + ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); + ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_MATCHES(stdout, "Footer:"); + } + { + NO_FATALS(RunActionStdoutString(Substitute( + "$0 --print_meta=false", args), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_NOT_MATCHES(stdout, "Header:"); + ASSERT_STR_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_MATCHES(stdout, "1\\.2@2"); + ASSERT_STR_MATCHES(stdout, "1\\.3@3"); + ASSERT_STR_MATCHES(stdout, "1\\.4@4"); + ASSERT_STR_MATCHES(stdout, kFirstMessage); + ASSERT_STR_MATCHES(stdout, kAddColumnName1); + ASSERT_STR_MATCHES(stdout, kAddColumnName1Message); + ASSERT_STR_MATCHES(stdout, kAddColumnName2Message); + ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_NOT_MATCHES(stdout, "Footer:"); + } + } +} + TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) { constexpr const char* const kTestTablet = "ffffffffffffffffffffffffffffffff"; constexpr const char* const kTestTableId = "test-table"; diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc index a89e0b4..b615b71 100644 --- a/src/kudu/tools/tool_action_common.cc +++ b/src/kudu/tools/tool_action_common.cc @@ -74,6 +74,7 @@ #include "kudu/tools/tool.pb.h" // IWYU pragma: keep #include "kudu/tools/tool_action.h" #include "kudu/tserver/tserver.pb.h" +#include "kudu/tserver/tserver_admin.pb.h" #include "kudu/tserver/tserver_admin.proxy.h" // IWYU pragma: keep #include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep #include "kudu/util/async_util.h" @@ -345,7 +346,7 @@ Status PrintDecodedWriteRequestPB(const string& indent, return Status::OK(); } -Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) { +Status PrintDecoded(const LogEntryPB& entry, Schema* tablet_schema) { PrintIdOnly(entry); const string indent = "\t"; @@ -356,9 +357,16 @@ Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) { if (replicate.op_type() == consensus::WRITE_OP) { RETURN_NOT_OK(PrintDecodedWriteRequestPB( indent, - tablet_schema, + *tablet_schema, replicate.write_request(), replicate.has_request_id() ? &replicate.request_id() : nullptr)); + } else if (replicate.op_type() == consensus::ALTER_SCHEMA_OP) { + if (!replicate.has_alter_schema_request()) { + LOG(ERROR) << "read an ALTER_SCHEMA_OP log entry, but has no alter_schema_request"; + return Status::RuntimeError("ALTER_SCHEMA_OP log entry has no alter_schema_request"); + } + RETURN_NOT_OK(SchemaFromPB(replicate.alter_schema_request().schema(), tablet_schema)); + cout << indent << SecureShortDebugString(replicate) << endl; } else { cout << indent << SecureShortDebugString(replicate) << endl; } @@ -555,7 +563,7 @@ Status PrintSegment(const scoped_refptr<ReadableLogSegment>& segment) { cout << "Entry:\n" << SecureDebugString(*entry); } else if (print_type == PRINT_DECODED) { - RETURN_NOT_OK(PrintDecoded(*entry, tablet_schema)); + RETURN_NOT_OK(PrintDecoded(*entry, &tablet_schema)); } else if (print_type == PRINT_ID) { PrintIdOnly(*entry); }