This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ddcaaabc [client] KUDU-3351: Add ResourceMetricsPB into 
WriteResponsePB
0ddcaaabc is described below

commit 0ddcaaabc97c85a4715ae79ff5604feb9b342779
Author: Riza Suminto <riza.sumi...@cloudera.com>
AuthorDate: Tue Apr 26 14:35:19 2022 -0700

    [client] KUDU-3351: Add ResourceMetricsPB into WriteResponsePB
    
    KUDU-1563 adds support for INSERT_IGNORE, UPDATE_IGNORE, and
    DELETE_IGNORE. However, it was lacking the per-session metrics about how
    many rows get ignored vs modified. This patch implements the per-session
    metrics by introducing a new ResourceMetricsPB field into the
    WriteResponsePB that's populated in every response sent back to the
    client.
    
    Change-Id: I9adefd64b0058c66274a00e1b12334653fcab2b3
    Reviewed-on: http://gerrit.cloudera.org:8080/18451
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/client/batcher.cc                         |  5 ++++
 src/kudu/client/client-test.cc                     | 33 ++++++++++++++++++++++
 src/kudu/client/client.cc                          |  4 +++
 src/kudu/client/client.h                           |  3 ++
 src/kudu/client/resource_metrics.h                 |  1 +
 src/kudu/client/session-internal.cc                | 23 ++++++++++++++-
 src/kudu/client/session-internal.h                 | 11 ++++++++
 .../integration-tests/exactly_once_writes-itest.cc |  8 ++++++
 src/kudu/tablet/ops/write_op.cc                    | 23 ++++++++++++++-
 src/kudu/tablet/ops/write_op.h                     |  5 ++++
 src/kudu/tserver/tserver.proto                     | 24 ++++++++++++++++
 11 files changed, 138 insertions(+), 2 deletions(-)

diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index 2db5033b0..cfde2eca3 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -966,6 +966,11 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
     had_errors_ = true;
   }
 
+  // Collect metrics
+  if (sp::shared_ptr<KuduSession> session = weak_session_.lock()) {
+    session->data_->UpdateWriteOpMetrics(rpc.resp().resource_metrics());
+  }
+
   // Remove all the ops from the "in-flight" list. It's essential to do so
   // _after_ adding all errors into the collector, otherwise there might be
   // a race which manifests itself as described at KUDU-1743. Essentially,
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index a7c7e9835..8543fbdc2 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -2936,6 +2936,24 @@ static void DoTestVerifyRows(const 
shared_ptr<KuduTable>& tbl, int num_rows) {
   }
 }
 
+static void DoVerifyMetrics(const KuduSession* session,
+                            int64_t successful_inserts,
+                            int64_t insert_ignore_errors,
+                            int64_t successful_upserts,
+                            int64_t successful_updates,
+                            int64_t update_ignore_errors,
+                            int64_t successful_deletes,
+                            int64_t delete_ignore_errors) {
+  auto metrics = session->GetWriteOpMetrics().Get();
+  ASSERT_EQ(successful_inserts, metrics["successful_inserts"]);
+  ASSERT_EQ(insert_ignore_errors, metrics["insert_ignore_errors"]);
+  ASSERT_EQ(successful_upserts, metrics["successful_upserts"]);
+  ASSERT_EQ(successful_updates, metrics["successful_updates"]);
+  ASSERT_EQ(update_ignore_errors, metrics["update_ignore_errors"]);
+  ASSERT_EQ(successful_deletes, metrics["successful_deletes"]);
+  ASSERT_EQ(delete_ignore_errors, metrics["delete_ignore_errors"]);
+}
+
 TEST_F(ClientTest, TestInsertIgnore) {
   shared_ptr<KuduSession> session = client_->NewSession();
   session->SetTimeoutMillis(10000);
@@ -2945,6 +2963,7 @@ TEST_F(ClientTest, TestInsertIgnore) {
     unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
     ASSERT_OK(session->Apply(insert.release()));
     DoTestVerifyRows(client_table_, 1);
+    DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0);
   }
 
   {
@@ -2952,6 +2971,7 @@ TEST_F(ClientTest, TestInsertIgnore) {
     unique_ptr<KuduInsertIgnore> 
insert_ignore(BuildTestInsertIgnore(client_table_.get(), 1));
     ASSERT_OK(session->Apply(insert_ignore.release()));
     DoTestVerifyRows(client_table_, 1);
+    DoVerifyMetrics(session.get(), 1, 1, 0, 0, 0, 0, 0);
   }
 
   {
@@ -2963,6 +2983,7 @@ TEST_F(ClientTest, TestInsertIgnore) {
     ASSERT_OK(insert_ignore->mutable_row()->SetInt32("non_null_with_default", 
999));
     ASSERT_OK(session->Apply(insert_ignore.release())); // returns ok but 
results in no change
     DoTestVerifyRows(client_table_, 1);
+    DoVerifyMetrics(session.get(), 1, 2, 0, 0, 0, 0, 0);
   }
 
   {
@@ -2970,6 +2991,7 @@ TEST_F(ClientTest, TestInsertIgnore) {
     unique_ptr<KuduInsertIgnore> 
insert_ignore(BuildTestInsertIgnore(client_table_.get(), 2));
     ASSERT_OK(session->Apply(insert_ignore.release()));
     DoTestVerifyRows(client_table_, 2);
+    DoVerifyMetrics(session.get(), 2, 2, 0, 0, 0, 0, 0);
   }
 }
 
@@ -2983,12 +3005,14 @@ TEST_F(ClientTest, TestUpdateIgnore) {
     unique_ptr<KuduUpdateIgnore> 
update_ignore(BuildTestUpdateIgnore(client_table_.get(), 1));
     ASSERT_OK(session->Apply(update_ignore.release()));
     DoTestVerifyRows(client_table_, 0);
+    DoVerifyMetrics(session.get(), 0, 0, 0, 0, 1, 0, 0);
   }
 
   {
     unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
     ASSERT_OK(session->Apply(insert.release()));
     DoTestVerifyRows(client_table_, 1);
+    DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0);
   }
 
   {
@@ -2999,6 +3023,7 @@ TEST_F(ClientTest, TestUpdateIgnore) {
     ASSERT_OK(update_ignore->mutable_row()->SetStringCopy("string_val", "hello 
world"));
     ASSERT_OK(update_ignore->mutable_row()->SetInt32("non_null_with_default", 
999));
     ASSERT_OK(session->Apply(update_ignore.release()));
+    DoVerifyMetrics(session.get(), 1, 0, 0, 1, 1, 0, 0);
 
     vector<string> rows;
     KuduScanner scanner(client_table_.get());
@@ -3018,6 +3043,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
     unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
     ASSERT_OK(session->Apply(insert.release()));
     DoTestVerifyRows(client_table_, 1);
+    DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0);
   }
 
   {
@@ -3025,6 +3051,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
     unique_ptr<KuduDeleteIgnore> 
delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1));
     ASSERT_OK(session->Apply(delete_ignore.release()));
     DoTestVerifyRows(client_table_, 0);
+    DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 0);
   }
 
   {
@@ -3032,6 +3059,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
     unique_ptr<KuduDeleteIgnore> 
delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1));
     ASSERT_OK(session->Apply(delete_ignore.release()));
     DoTestVerifyRows(client_table_, 0);
+    DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 1);
   }
 }
 
@@ -4438,6 +4466,7 @@ TEST_F(ClientTest, TestUpsert) {
   // Perform and verify UPSERT which acts as an INSERT.
   ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 1, "original 
row"));
   FlushSessionOrDie(session);
+  DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0);
 
   {
     vector<string> rows;
@@ -4450,6 +4479,7 @@ TEST_F(ClientTest, TestUpsert) {
   // Perform and verify UPSERT which acts as an UPDATE.
   ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 2, "upserted 
row"));
   FlushSessionOrDie(session);
+  DoVerifyMetrics(session.get(), 0, 0, 2, 0, 0, 0, 0);
 
   {
     vector<string> rows;
@@ -4468,6 +4498,7 @@ TEST_F(ClientTest, TestUpsert) {
     ASSERT_OK(row->SetInt32("non_null_with_default", 999));
     ASSERT_OK(session->Apply(update.release()));
     FlushSessionOrDie(session);
+    DoVerifyMetrics(session.get(), 0, 0, 2, 1, 0, 0, 0);
   }
   {
     vector<string> rows;
@@ -4481,6 +4512,7 @@ TEST_F(ClientTest, TestUpsert) {
   // column, and therefore should not revert it back to its default.
   ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 3, "upserted 
row 2"));
   FlushSessionOrDie(session);
+  DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 0, 0);
   {
     vector<string> rows;
     ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
@@ -4492,6 +4524,7 @@ TEST_F(ClientTest, TestUpsert) {
   // Delete the row.
   ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1));
   FlushSessionOrDie(session);
+  DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 1, 0);
   {
     vector<string> rows;
     ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 2e37713c8..61349527a 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1447,6 +1447,10 @@ KuduClient* KuduSession::client() const {
   return data_->client_.get();
 }
 
+const ResourceMetrics& KuduSession::GetWriteOpMetrics() const {
+  return data_->write_op_metrics_;
+}
+
 ////////////////////////////////////////////////////////////
 // KuduTableAlterer
 ////////////////////////////////////////////////////////////
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 35dab86e2..bf94a7b26 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -2566,6 +2566,9 @@ class KUDU_EXPORT KuduSession : public 
sp::enable_shared_from_this<KuduSession>
   /// @return Client for the session: pointer to the associated client object.
   KuduClient* client() const;
 
+  /// @return Cumulative write operation metrics since the beginning of the 
session.
+  const ResourceMetrics& GetWriteOpMetrics() const;
+
  private:
   class KUDU_NO_EXPORT Data;
 
diff --git a/src/kudu/client/resource_metrics.h 
b/src/kudu/client/resource_metrics.h
index 015087dab..dff31fedc 100644
--- a/src/kudu/client/resource_metrics.h
+++ b/src/kudu/client/resource_metrics.h
@@ -65,6 +65,7 @@ class KUDU_EXPORT ResourceMetrics {
 
  private:
   friend class KuduScanner;
+  friend class KuduSession;
   class KUDU_NO_EXPORT Data;
   Data* data_;
 };
diff --git a/src/kudu/client/session-internal.cc 
b/src/kudu/client/session-internal.cc
index 320ccac18..0cb0be561 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -22,29 +22,37 @@
 #include <utility>
 
 #include <glog/logging.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/message.h>
 
 #include "kudu/client/batcher.h"
 #include "kudu/client/callbacks.h"
 #include "kudu/client/error_collector.h"
+#include "kudu/client/resource_metrics-internal.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/client/write_op.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/logging.h"
 
-
+using google::protobuf::FieldDescriptor;
+using google::protobuf::Reflection;
 using kudu::client::internal::Batcher;
 using kudu::client::internal::ErrorCollector;
 using kudu::client::sp::shared_ptr;
 using kudu::client::sp::weak_ptr;
 using kudu::rpc::Messenger;
+using kudu::tserver::ResourceMetricsPB;
 using std::unique_ptr;
 using strings::Substitute;
 
 namespace kudu {
+
 namespace client {
 
 KuduSession::Data::Data(shared_ptr<KuduClient> client,
@@ -577,5 +585,18 @@ size_t KuduSession::Data::GetBatchersCountForTests() const 
{
   return batchers_num_;
 }
 
+void KuduSession::Data::UpdateWriteOpMetrics(const ResourceMetricsPB& 
resource_metrics) {
+  const auto* reflection = resource_metrics.GetReflection();
+  const auto* desc = resource_metrics.GetDescriptor();
+  for (int i = 0; i < desc->field_count(); i++) {
+    const FieldDescriptor* field = desc->field(i);
+    if (reflection->HasField(resource_metrics, field) &&
+        field->cpp_type() == FieldDescriptor::CPPTYPE_INT64) {
+      write_op_metrics_.data_->Increment(StringPiece(field->name()),
+                                         
reflection->GetInt64(resource_metrics, field));
+    }
+  }
+}
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/session-internal.h 
b/src/kudu/client/session-internal.h
index db9de7b83..66962dd9a 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -25,6 +25,7 @@
 #include "kudu/client/batcher.h"
 #include "kudu/client/client.h"
 #include "kudu/client/error_collector.h"
+#include "kudu/client/resource_metrics.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/common/txn_id.h"
 #include "kudu/gutil/macros.h"
@@ -40,6 +41,10 @@ namespace rpc {
 class Messenger;
 } // namespace rpc
 
+namespace tserver {
+class ResourceMetricsPB;
+}  // namespace tserver
+
 namespace client {
 
 class KuduStatusCallback;
@@ -160,6 +165,9 @@ class KuduSession::Data {
   // primary key and perform other validations with regard to the column 
schema.
   Status ValidateWriteOperation(KuduWriteOperation* op) const;
 
+  // Adds given write operation metrics into session's total write operation 
metrics.
+  void UpdateWriteOpMetrics(const tserver::ResourceMetricsPB& metrics);
+
   // This constant represents a meaningful name for the first argument in
   // expressions like FlushCurrentBatcher(1, cbk): this is the watermark
   // corresponding to 1 byte of data. This watermark level is the minimum
@@ -246,6 +254,9 @@ class KuduSession::Data {
   // returns true only if the upper-level session is a transactional one.
   const TxnId txn_id_;
 
+  // Metrics of all write operations in the session.
+  ResourceMetrics write_op_metrics_;
+
  private:
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc 
b/src/kudu/integration-tests/exactly_once_writes-itest.cc
index 51a43c209..9dece4513 100644
--- a/src/kudu/integration-tests/exactly_once_writes-itest.cc
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -28,6 +28,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/common/wire_protocol.h"
@@ -174,6 +175,13 @@ void 
ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(Sockaddr address,
       }
       // If there was no error, store the response.
       if (status.ok()) {
+        if (response.has_resource_metrics()) {
+          // Release resource_metrics because it is not relevant to this test.
+          // We are asserting that all responses are equal, but in case of 
exactly-once
+          // RPC semantics, metrics in retried requests may all come zeroed 
out or even
+          // not populated.
+          response.clear_resource_metrics();
+        }
         responses->push_back(response);
         break;
       }
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index b5ae1298b..099e2b782 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -81,11 +81,12 @@ using strings::Substitute;
 namespace kudu {
 namespace tablet {
 
-using pb_util::SecureShortDebugString;
 using consensus::CommitMsg;
 using consensus::DriverType;
 using consensus::ReplicateMsg;
 using consensus::WRITE_OP;
+using pb_util::SecureShortDebugString;
+using tserver::ResourceMetricsPB;
 using tserver::TabletServerErrorPB;
 using tserver::WriteRequestPB;
 using tserver::WriteResponsePB;
@@ -290,6 +291,11 @@ Status WriteOp::Apply(CommitMsg** commit_msg) {
 void WriteOp::Finish(OpResult result) {
   TRACE_EVENT0("op", "WriteOp::Finish");
 
+  if (result == Op::APPLIED) {
+    // Populate response metrics.
+    state()->FillResponseMetrics(type());
+  }
+
   state()->FinishApplyingOrAbort(result);
 
   if (PREDICT_FALSE(result == Op::ABORTED)) {
@@ -655,5 +661,20 @@ string WriteOpState::ToString() const {
                     row_ops_str);
 }
 
+void WriteOpState::FillResponseMetrics(consensus::DriverType type) {
+  const auto& op_m = op_metrics_;
+  tserver::ResourceMetricsPB* resp_metrics = 
response_->mutable_resource_metrics();
+  resp_metrics->set_successful_inserts(op_m.successful_inserts);
+  resp_metrics->set_insert_ignore_errors(op_m.insert_ignore_errors);
+  resp_metrics->set_successful_upserts(op_m.successful_upserts);
+  resp_metrics->set_successful_updates(op_m.successful_updates);
+  resp_metrics->set_update_ignore_errors(op_m.update_ignore_errors);
+  resp_metrics->set_successful_deletes(op_m.successful_deletes);
+  resp_metrics->set_delete_ignore_errors(op_m.delete_ignore_errors);
+  if (type == consensus::LEADER && external_consistency_mode() == COMMIT_WAIT) 
{
+    
resp_metrics->set_commit_wait_duration_usec(op_m.commit_wait_duration_usec);
+  }
+}
+
 }  // namespace tablet
 }  // namespace kudu
diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h
index ea618873f..5aa11dec7 100644
--- a/src/kudu/tablet/ops/write_op.h
+++ b/src/kudu/tablet/ops/write_op.h
@@ -261,6 +261,11 @@ class WriteOpState : public OpState {
   // the partition lock.
   void TransferOrReleasePartitionLock();
 
+  // Copy metrics from 'op_metrics_' into the response's 'resource_metrics'.
+  // Should only be called before FinishApplyingOrAbort() to make sure that 
'response_'
+  // has not been released.
+  void FillResponseMetrics(consensus::DriverType type);
+
  private:
   // Releases all the row locks acquired by this op.
   void ReleaseRowLocks();
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 82616b0f8..a6145b63b 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -185,6 +185,14 @@ message WriteResponsePB {
   // The timestamp chosen by the server for this write.
   // TODO KUDU-611 propagate timestamps with server signature.
   optional fixed64 timestamp = 3;
+
+  // The write operation metrics of this RPC.
+  // This metrics contains the number of successful/unsuccessful operation
+  // ('successful_inserts' to 'delete_ignore_errors' fields in 
ResourceMetricsPB message)
+  // related to the associated write request.
+  // Additionally, contains 'commit_wait_duration_usec' metric if the 
responding server is
+  // a LEADER and the request external_consistency_mode is COMMIT_WAIT.
+  optional ResourceMetricsPB resource_metrics = 4;
 }
 
 // A list tablets request
@@ -404,6 +412,22 @@ message ResourceMetricsPB {
   optional int64 cpu_user_nanos = 6;
   // Total elapsed CPU system time in nanoseconds for all scan rpc requests 
for this scanner.
   optional int64 cpu_system_nanos = 7;
+  // Total number of successful INSERT/INSERT_IGNORE operation.
+  optional int64 successful_inserts = 8;
+  // Total number of unsuccessful INSERT_IGNORE operation.
+  optional int64 insert_ignore_errors = 9;
+  // Total number of successful UPSERT operation.
+  optional int64 successful_upserts = 10;
+  // Total number of successful UPDATE/UPDATE_IGNORE operation.
+  optional int64 successful_updates = 11;
+  // Total number of unsuccessful UPDATE_IGNORE operation.
+  optional int64 update_ignore_errors = 12;
+  // Total number of successful DELETE operation.
+  optional int64 successful_deletes = 13;
+  // Total number of unsuccessful DELETE_IGNORE operation.
+  optional int64 delete_ignore_errors = 14;
+  // Total observed commit wait duration in microseconds.
+  optional int64 commit_wait_duration_usec = 15;
 }
 
 message ScanResponsePB {

Reply via email to