This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 28472f041 [tools] KUDU-1945 Print auto-incrementing counter in kudu 
wal dump
28472f041 is described below

commit 28472f041c8eb63f58d1f2075007ef864f8d3f68
Author: Abhishek Chennaka <achenn...@cloudera.com>
AuthorDate: Wed Apr 5 18:06:45 2023 -0700

    [tools] KUDU-1945 Print auto-incrementing counter in kudu wal dump
    
    This patch allows the "kudu wal dump" tool to display auto-incrementing
    counter value if present in the WAL segment. It displays the counter
    value present in each INSERT/INSERT_IGNORE replicate op and also populates
    the corresponding auto incrementing column values for each row when
    printing rows.
    
    Change-Id: I4e807aaef48683ec7c5317eecdedf8e6e15950e2
    Reviewed-on: http://gerrit.cloudera.org:8080/19698
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/common/wire_protocol-test-util.h |  11 +++
 src/kudu/tools/kudu-tool-test.cc          | 147 ++++++++++++++++++++++++++++++
 src/kudu/tools/tool_action_common.cc      |  15 ++-
 3 files changed, 171 insertions(+), 2 deletions(-)

diff --git a/src/kudu/common/wire_protocol-test-util.h 
b/src/kudu/common/wire_protocol-test-util.h
index 2fcf03bf6..f3daf8638 100644
--- a/src/kudu/common/wire_protocol-test-util.h
+++ b/src/kudu/common/wire_protocol-test-util.h
@@ -23,6 +23,7 @@
 #include <map>
 #include <string>
 
+#include "kudu/client/schema.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row.h"
 #include "kudu/common/row_operations.h"
@@ -36,6 +37,16 @@ inline Schema GetSimpleTestSchema() {
                 1);
 }
 
+inline client::KuduSchema GetAutoIncrementingTestSchema() {
+  client::KuduSchema kudu_schema;
+  client::KuduSchemaBuilder b;
+  
b.AddColumn("key")->Type(client::KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey();
+  b.AddColumn("int_val")->Type(client::KuduColumnSchema::INT32);
+  
b.AddColumn("string_val")->Type(client::KuduColumnSchema::STRING)->Nullable();
+  CHECK_OK(b.Build(&kudu_schema));
+  return kudu_schema;
+}
+
 inline void RowAppendColumn(KuduPartialRow* row,
                             const std::map<std::string, std::string>& columns) 
{
   for (const auto& column : columns) {
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 8cee955c6..c27057007 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2428,6 +2428,8 @@ TEST_F(ToolTest, TestWalDump) {
       ASSERT_STR_MATCHES(stdout, "Header:");
       ASSERT_STR_MATCHES(stdout, "1\\.1@1");
       ASSERT_STR_MATCHES(stdout, "this is a test insert");
+      ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+      ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
       ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
       ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
       ASSERT_STR_MATCHES(stdout, "Footer:");
@@ -2439,6 +2441,8 @@ TEST_F(ToolTest, TestWalDump) {
       SCOPED_TRACE(stdout);
       ASSERT_STR_MATCHES(stdout, "Header:");
       ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+      ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
       ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert");
       ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
       ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
@@ -2450,6 +2454,8 @@ TEST_F(ToolTest, TestWalDump) {
       SCOPED_TRACE(stdout);
       ASSERT_STR_MATCHES(stdout, "Header:");
       ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+      ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
       ASSERT_STR_MATCHES(stdout, "this is a test insert");
       ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
       ASSERT_STR_MATCHES(stdout, "row_operations \\{");
@@ -2461,6 +2467,8 @@ TEST_F(ToolTest, TestWalDump) {
       SCOPED_TRACE(stdout);
       ASSERT_STR_MATCHES(stdout, "Header:");
       ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+      ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
       ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert");
       ASSERT_STR_MATCHES(stdout, "t<truncated>");
       ASSERT_STR_MATCHES(stdout, "row_operations \\{");
@@ -2472,6 +2480,8 @@ TEST_F(ToolTest, TestWalDump) {
       SCOPED_TRACE(stdout);
       ASSERT_STR_MATCHES(stdout, "Header:");
       ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+      ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
       ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert");
       ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
       ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
@@ -2483,6 +2493,8 @@ TEST_F(ToolTest, TestWalDump) {
       SCOPED_TRACE(stdout);
       ASSERT_STR_NOT_MATCHES(stdout, "Header:");
       ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+      ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
       ASSERT_STR_MATCHES(stdout, "this is a test insert");
       ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
       ASSERT_STR_NOT_MATCHES(stdout, "Footer:");
@@ -2713,6 +2725,141 @@ TEST_F(ToolTest, TestWalDumpWithAlterSchema) {
   }
 }
 
+TEST_F(ToolTest, TestWalDumpWithAutoIncrementingColumn) {
+  const string kTestDir = GetTestPath("test");
+  const string kTestTablet = "ffffffffffffffffffffffffffffffff";
+  const client::KuduSchema kSchema(GetAutoIncrementingTestSchema());
+  const Schema schema = client::KuduSchema::ToSchema(kSchema);
+  const Schema schema_with_id = 
SchemaBuilder(client::KuduSchema::ToSchema(kSchema)).Build();
+
+  FsManager fs(env_, FsManagerOpts(kTestDir));
+  ASSERT_OK(fs.CreateInitialFileSystemLayout());
+  ASSERT_OK(fs.Open());
+
+  {
+    scoped_refptr<Log> log;
+    ASSERT_OK(Log::Open(LogOptions(),
+                        &fs,
+                        /*file_cache*/nullptr,
+                        kTestTablet,
+                        schema_with_id,
+                        /*schema_version*/0,
+                        /*metric_entity*/nullptr,
+                        &log));
+
+    OpId opid = consensus::MakeOpId(1, 1);
+    ReplicateRefPtr replicate =
+        consensus::make_scoped_refptr_replicate(new ReplicateMsg());
+    replicate->get()->set_op_type(consensus::WRITE_OP);
+    replicate->get()->mutable_id()->CopyFrom(opid);
+    replicate->get()->set_timestamp(1);
+    WriteRequestPB* write = replicate->get()->mutable_write_request();
+    
write->mutable_auto_incrementing_column()->set_auto_incrementing_counter(0x5a);
+    ASSERT_OK(SchemaToPB(schema, write->mutable_schema()));
+    AddTestRowToPB(RowOperationsPB::INSERT, schema,
+                   opid.index(),
+                   0,
+                   "this is a test insert",
+                   write->mutable_row_operations());
+    AddTestRowToPB(RowOperationsPB::INSERT, schema,
+                   opid.index(),
+                   0,
+                   "this is a test insert",
+                   write->mutable_row_operations());
+    write->set_tablet_id(kTestTablet);
+    Synchronizer s;
+    ASSERT_OK(log->AsyncAppendReplicates({ replicate }, s.AsStatusCallback()));
+    ASSERT_OK(s.Wait());
+  }
+
+  string wal_path = fs.GetWalSegmentFileName(kTestTablet, 1);
+  string encryption_args;
+  if (env_->IsEncryptionEnabled()) {
+    encryption_args = GetEncryptionArgs() + " --instance_file=" +
+        fs.GetInstanceMetadataPath(kTestDir);
+  }
+  string stdout;
+  for (const auto& args : { Substitute("wal dump $0 $1", wal_path, 
encryption_args),
+                            Substitute("local_replica dump wals 
--fs_wal_dir=$0 $1 $2",
+                                       kTestDir, kTestTablet, encryption_args)
+  }) {
+    SCOPED_TRACE(args);
+    {
+      NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=true",
+                                                 args), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_MATCHES(stdout, "Header:");
+      ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_MATCHES(stdout, "Auto Incrementing Counter: 90");
+      ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=91");
+      ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=92");
+      ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+      ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_MATCHES(stdout, "Footer:");
+    }
+    {
+      NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=false",
+                                                 args), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_MATCHES(stdout, "Header:");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+      ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id=");
+      ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+      ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_MATCHES(stdout, "Footer:");
+    }
+    {
+      NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=pb",
+                                                 args), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_MATCHES(stdout, "Header:");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+      ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id=");
+      ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+      ASSERT_STR_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_MATCHES(stdout, "Footer:");
+    }
+    {
+      NO_FATALS(RunActionStdoutString(Substitute(
+          "$0 --print_entries=pb --truncate_data=1", args), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_MATCHES(stdout, "Header:");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+      ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id=");
+      ASSERT_STR_MATCHES(stdout, "t<truncated>");
+      ASSERT_STR_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_MATCHES(stdout, "Footer:");
+    }
+    {
+      NO_FATALS(RunActionStdoutString(Substitute(
+          "$0 --print_entries=id", args), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_MATCHES(stdout, "Header:");
+      ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+      ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id=");
+      ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+      ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_MATCHES(stdout, "Footer:");
+    }
+    {
+      NO_FATALS(RunActionStdoutString(Substitute(
+          "$0 --print_meta=false", args), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_NOT_MATCHES(stdout, "Header:");
+      ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_MATCHES(stdout, "Auto Incrementing Counter: 90");
+      ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=91");
+      ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=92");
+      ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_NOT_MATCHES(stdout, "Footer:");
+    }
+  }
+}
+
 TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) {
   constexpr const char* const kTestTablet = "ffffffffffffffffffffffffffffffff";
   constexpr const char* const kTestTableId = "test-table";
diff --git a/src/kudu/tools/tool_action_common.cc 
b/src/kudu/tools/tool_action_common.cc
index f54ac1aee..f8413ee7f 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -348,8 +348,15 @@ Status PrintDecodedWriteRequestPB(const string& indent,
   Arena arena(32 * 1024);
   RowOperationsPBDecoder dec(&write.row_operations(), &request_schema, 
&tablet_schema, &arena);
   vector<DecodedRowOperation> ops;
-  RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops));
-
+  if (write.has_auto_incrementing_column()) {
+    // Define auto_incrementing_counter and use it as in-out parameter during 
decoding of the ops
+    // in DecodeOperations().
+    int64_t auto_incrementing_counter =
+        write.auto_incrementing_column().auto_incrementing_counter();
+    RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops, 
&auto_incrementing_counter));
+  } else {
+    RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops));
+  }
   cout << indent << "Tablet: " << write.tablet_id() << endl;
   cout << indent << "RequestId: "
       << (request_id ? SecureShortDebugString(*request_id) : "None") << endl;
@@ -358,6 +365,10 @@ Status PrintDecodedWriteRequestPB(const string& indent,
   if (write.has_propagated_timestamp()) {
     cout << indent << "Propagated TS: " << write.propagated_timestamp() << 
endl;
   }
+  if (write.has_auto_incrementing_column()) {
+    cout << indent << "Auto Incrementing Counter: "
+        << write.auto_incrementing_column().auto_incrementing_counter() << 
endl;
+  }
 
   int i = 0;
   for (const DecodedRowOperation& op : ops) {

Reply via email to