This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 28472f041 [tools] KUDU-1945 Print auto-incrementing counter in kudu wal dump 28472f041 is described below commit 28472f041c8eb63f58d1f2075007ef864f8d3f68 Author: Abhishek Chennaka <achenn...@cloudera.com> AuthorDate: Wed Apr 5 18:06:45 2023 -0700 [tools] KUDU-1945 Print auto-incrementing counter in kudu wal dump This patch allows the "kudu wal dump" tool to display auto-incrementing counter value if present in the WAL segment. It displays the counter value present in each INSERT/INSERT_IGNORE replicate op and also populates the corresponding auto incrementing column values for each row when printing rows. Change-Id: I4e807aaef48683ec7c5317eecdedf8e6e15950e2 Reviewed-on: http://gerrit.cloudera.org:8080/19698 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Alexey Serbin <ale...@apache.org> --- src/kudu/common/wire_protocol-test-util.h | 11 +++ src/kudu/tools/kudu-tool-test.cc | 147 ++++++++++++++++++++++++++++++ src/kudu/tools/tool_action_common.cc | 15 ++- 3 files changed, 171 insertions(+), 2 deletions(-) diff --git a/src/kudu/common/wire_protocol-test-util.h b/src/kudu/common/wire_protocol-test-util.h index 2fcf03bf6..f3daf8638 100644 --- a/src/kudu/common/wire_protocol-test-util.h +++ b/src/kudu/common/wire_protocol-test-util.h @@ -23,6 +23,7 @@ #include <map> #include <string> +#include "kudu/client/schema.h" #include "kudu/common/partial_row.h" #include "kudu/common/row.h" #include "kudu/common/row_operations.h" @@ -36,6 +37,16 @@ inline Schema GetSimpleTestSchema() { 1); } +inline client::KuduSchema GetAutoIncrementingTestSchema() { + client::KuduSchema kudu_schema; + client::KuduSchemaBuilder b; + b.AddColumn("key")->Type(client::KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey(); + b.AddColumn("int_val")->Type(client::KuduColumnSchema::INT32); + b.AddColumn("string_val")->Type(client::KuduColumnSchema::STRING)->Nullable(); + CHECK_OK(b.Build(&kudu_schema)); + return kudu_schema; +} + inline void RowAppendColumn(KuduPartialRow* row, const std::map<std::string, std::string>& columns) { for (const auto& column : columns) { diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 8cee955c6..c27057007 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -2428,6 +2428,8 @@ TEST_F(ToolTest, TestWalDump) { ASSERT_STR_MATCHES(stdout, "Header:"); ASSERT_STR_MATCHES(stdout, "1\\.1@1"); ASSERT_STR_MATCHES(stdout, "this is a test insert"); + ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter"); + ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id"); ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); ASSERT_STR_MATCHES(stdout, "Footer:"); @@ -2439,6 +2441,8 @@ TEST_F(ToolTest, TestWalDump) { SCOPED_TRACE(stdout); ASSERT_STR_MATCHES(stdout, "Header:"); ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter"); + ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id"); ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert"); ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); @@ -2450,6 +2454,8 @@ TEST_F(ToolTest, TestWalDump) { SCOPED_TRACE(stdout); ASSERT_STR_MATCHES(stdout, "Header:"); ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter"); + ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id"); ASSERT_STR_MATCHES(stdout, "this is a test insert"); ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); ASSERT_STR_MATCHES(stdout, "row_operations \\{"); @@ -2461,6 +2467,8 @@ TEST_F(ToolTest, TestWalDump) { SCOPED_TRACE(stdout); ASSERT_STR_MATCHES(stdout, "Header:"); ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter"); + ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id"); ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert"); ASSERT_STR_MATCHES(stdout, "t<truncated>"); ASSERT_STR_MATCHES(stdout, "row_operations \\{"); @@ -2472,6 +2480,8 @@ TEST_F(ToolTest, TestWalDump) { SCOPED_TRACE(stdout); ASSERT_STR_MATCHES(stdout, "Header:"); ASSERT_STR_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter"); + ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id"); ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert"); ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); @@ -2483,6 +2493,8 @@ TEST_F(ToolTest, TestWalDump) { SCOPED_TRACE(stdout); ASSERT_STR_NOT_MATCHES(stdout, "Header:"); ASSERT_STR_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter"); + ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id"); ASSERT_STR_MATCHES(stdout, "this is a test insert"); ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); ASSERT_STR_NOT_MATCHES(stdout, "Footer:"); @@ -2713,6 +2725,141 @@ TEST_F(ToolTest, TestWalDumpWithAlterSchema) { } } +TEST_F(ToolTest, TestWalDumpWithAutoIncrementingColumn) { + const string kTestDir = GetTestPath("test"); + const string kTestTablet = "ffffffffffffffffffffffffffffffff"; + const client::KuduSchema kSchema(GetAutoIncrementingTestSchema()); + const Schema schema = client::KuduSchema::ToSchema(kSchema); + const Schema schema_with_id = SchemaBuilder(client::KuduSchema::ToSchema(kSchema)).Build(); + + FsManager fs(env_, FsManagerOpts(kTestDir)); + ASSERT_OK(fs.CreateInitialFileSystemLayout()); + ASSERT_OK(fs.Open()); + + { + scoped_refptr<Log> log; + ASSERT_OK(Log::Open(LogOptions(), + &fs, + /*file_cache*/nullptr, + kTestTablet, + schema_with_id, + /*schema_version*/0, + /*metric_entity*/nullptr, + &log)); + + OpId opid = consensus::MakeOpId(1, 1); + ReplicateRefPtr replicate = + consensus::make_scoped_refptr_replicate(new ReplicateMsg()); + replicate->get()->set_op_type(consensus::WRITE_OP); + replicate->get()->mutable_id()->CopyFrom(opid); + replicate->get()->set_timestamp(1); + WriteRequestPB* write = replicate->get()->mutable_write_request(); + write->mutable_auto_incrementing_column()->set_auto_incrementing_counter(0x5a); + ASSERT_OK(SchemaToPB(schema, write->mutable_schema())); + AddTestRowToPB(RowOperationsPB::INSERT, schema, + opid.index(), + 0, + "this is a test insert", + write->mutable_row_operations()); + AddTestRowToPB(RowOperationsPB::INSERT, schema, + opid.index(), + 0, + "this is a test insert", + write->mutable_row_operations()); + write->set_tablet_id(kTestTablet); + Synchronizer s; + ASSERT_OK(log->AsyncAppendReplicates({ replicate }, s.AsStatusCallback())); + ASSERT_OK(s.Wait()); + } + + string wal_path = fs.GetWalSegmentFileName(kTestTablet, 1); + string encryption_args; + if (env_->IsEncryptionEnabled()) { + encryption_args = GetEncryptionArgs() + " --instance_file=" + + fs.GetInstanceMetadataPath(kTestDir); + } + string stdout; + for (const auto& args : { Substitute("wal dump $0 $1", wal_path, encryption_args), + Substitute("local_replica dump wals --fs_wal_dir=$0 $1 $2", + kTestDir, kTestTablet, encryption_args) + }) { + SCOPED_TRACE(args); + { + NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=true", + args), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_MATCHES(stdout, "Header:"); + ASSERT_STR_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_MATCHES(stdout, "Auto Incrementing Counter: 90"); + ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=91"); + ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=92"); + ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); + ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_MATCHES(stdout, "Footer:"); + } + { + NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=false", + args), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_MATCHES(stdout, "Header:"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter"); + ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id="); + ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); + ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_MATCHES(stdout, "Footer:"); + } + { + NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=pb", + args), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_MATCHES(stdout, "Header:"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter"); + ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id="); + ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); + ASSERT_STR_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_MATCHES(stdout, "Footer:"); + } + { + NO_FATALS(RunActionStdoutString(Substitute( + "$0 --print_entries=pb --truncate_data=1", args), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_MATCHES(stdout, "Header:"); + ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter"); + ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id="); + ASSERT_STR_MATCHES(stdout, "t<truncated>"); + ASSERT_STR_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_MATCHES(stdout, "Footer:"); + } + { + NO_FATALS(RunActionStdoutString(Substitute( + "$0 --print_entries=id", args), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_MATCHES(stdout, "Header:"); + ASSERT_STR_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter"); + ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id="); + ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>"); + ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_MATCHES(stdout, "Footer:"); + } + { + NO_FATALS(RunActionStdoutString(Substitute( + "$0 --print_meta=false", args), &stdout)); + SCOPED_TRACE(stdout); + ASSERT_STR_NOT_MATCHES(stdout, "Header:"); + ASSERT_STR_MATCHES(stdout, "1\\.1@1"); + ASSERT_STR_MATCHES(stdout, "Auto Incrementing Counter: 90"); + ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=91"); + ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=92"); + ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{"); + ASSERT_STR_NOT_MATCHES(stdout, "Footer:"); + } + } +} + TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) { constexpr const char* const kTestTablet = "ffffffffffffffffffffffffffffffff"; constexpr const char* const kTestTableId = "test-table"; diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc index f54ac1aee..f8413ee7f 100644 --- a/src/kudu/tools/tool_action_common.cc +++ b/src/kudu/tools/tool_action_common.cc @@ -348,8 +348,15 @@ Status PrintDecodedWriteRequestPB(const string& indent, Arena arena(32 * 1024); RowOperationsPBDecoder dec(&write.row_operations(), &request_schema, &tablet_schema, &arena); vector<DecodedRowOperation> ops; - RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops)); - + if (write.has_auto_incrementing_column()) { + // Define auto_incrementing_counter and use it as in-out parameter during decoding of the ops + // in DecodeOperations(). + int64_t auto_incrementing_counter = + write.auto_incrementing_column().auto_incrementing_counter(); + RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops, &auto_incrementing_counter)); + } else { + RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops)); + } cout << indent << "Tablet: " << write.tablet_id() << endl; cout << indent << "RequestId: " << (request_id ? SecureShortDebugString(*request_id) : "None") << endl; @@ -358,6 +365,10 @@ Status PrintDecodedWriteRequestPB(const string& indent, if (write.has_propagated_timestamp()) { cout << indent << "Propagated TS: " << write.propagated_timestamp() << endl; } + if (write.has_auto_incrementing_column()) { + cout << indent << "Auto Incrementing Counter: " + << write.auto_incrementing_column().auto_incrementing_counter() << endl; + } int i = 0; for (const DecodedRowOperation& op : ops) {