This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 0ddcaaabc [client] KUDU-3351: Add ResourceMetricsPB into WriteResponsePB 0ddcaaabc is described below commit 0ddcaaabc97c85a4715ae79ff5604feb9b342779 Author: Riza Suminto <riza.sumi...@cloudera.com> AuthorDate: Tue Apr 26 14:35:19 2022 -0700 [client] KUDU-3351: Add ResourceMetricsPB into WriteResponsePB KUDU-1563 adds support for INSERT_IGNORE, UPDATE_IGNORE, and DELETE_IGNORE. However, it was lacking the per-session metrics about how many rows get ignored vs modified. This patch implements the per-session metrics by introducing a new ResourceMetricsPB field into the WriteResponsePB that's populated in every response sent back to the client. Change-Id: I9adefd64b0058c66274a00e1b12334653fcab2b3 Reviewed-on: http://gerrit.cloudera.org:8080/18451 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <ale...@apache.org> --- src/kudu/client/batcher.cc | 5 ++++ src/kudu/client/client-test.cc | 33 ++++++++++++++++++++++ src/kudu/client/client.cc | 4 +++ src/kudu/client/client.h | 3 ++ src/kudu/client/resource_metrics.h | 1 + src/kudu/client/session-internal.cc | 23 ++++++++++++++- src/kudu/client/session-internal.h | 11 ++++++++ .../integration-tests/exactly_once_writes-itest.cc | 8 ++++++ src/kudu/tablet/ops/write_op.cc | 23 ++++++++++++++- src/kudu/tablet/ops/write_op.h | 5 ++++ src/kudu/tserver/tserver.proto | 24 ++++++++++++++++ 11 files changed, 138 insertions(+), 2 deletions(-) diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc index 2db5033b0..cfde2eca3 100644 --- a/src/kudu/client/batcher.cc +++ b/src/kudu/client/batcher.cc @@ -966,6 +966,11 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc, had_errors_ = true; } + // Collect metrics + if (sp::shared_ptr<KuduSession> session = weak_session_.lock()) { + session->data_->UpdateWriteOpMetrics(rpc.resp().resource_metrics()); + } + // Remove all the ops from the "in-flight" list. It's essential to do so // _after_ adding all errors into the collector, otherwise there might be // a race which manifests itself as described at KUDU-1743. Essentially, diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index a7c7e9835..8543fbdc2 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -2936,6 +2936,24 @@ static void DoTestVerifyRows(const shared_ptr<KuduTable>& tbl, int num_rows) { } } +static void DoVerifyMetrics(const KuduSession* session, + int64_t successful_inserts, + int64_t insert_ignore_errors, + int64_t successful_upserts, + int64_t successful_updates, + int64_t update_ignore_errors, + int64_t successful_deletes, + int64_t delete_ignore_errors) { + auto metrics = session->GetWriteOpMetrics().Get(); + ASSERT_EQ(successful_inserts, metrics["successful_inserts"]); + ASSERT_EQ(insert_ignore_errors, metrics["insert_ignore_errors"]); + ASSERT_EQ(successful_upserts, metrics["successful_upserts"]); + ASSERT_EQ(successful_updates, metrics["successful_updates"]); + ASSERT_EQ(update_ignore_errors, metrics["update_ignore_errors"]); + ASSERT_EQ(successful_deletes, metrics["successful_deletes"]); + ASSERT_EQ(delete_ignore_errors, metrics["delete_ignore_errors"]); +} + TEST_F(ClientTest, TestInsertIgnore) { shared_ptr<KuduSession> session = client_->NewSession(); session->SetTimeoutMillis(10000); @@ -2945,6 +2963,7 @@ TEST_F(ClientTest, TestInsertIgnore) { unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1)); ASSERT_OK(session->Apply(insert.release())); DoTestVerifyRows(client_table_, 1); + DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0); } { @@ -2952,6 +2971,7 @@ TEST_F(ClientTest, TestInsertIgnore) { unique_ptr<KuduInsertIgnore> insert_ignore(BuildTestInsertIgnore(client_table_.get(), 1)); ASSERT_OK(session->Apply(insert_ignore.release())); DoTestVerifyRows(client_table_, 1); + DoVerifyMetrics(session.get(), 1, 1, 0, 0, 0, 0, 0); } { @@ -2963,6 +2983,7 @@ TEST_F(ClientTest, TestInsertIgnore) { ASSERT_OK(insert_ignore->mutable_row()->SetInt32("non_null_with_default", 999)); ASSERT_OK(session->Apply(insert_ignore.release())); // returns ok but results in no change DoTestVerifyRows(client_table_, 1); + DoVerifyMetrics(session.get(), 1, 2, 0, 0, 0, 0, 0); } { @@ -2970,6 +2991,7 @@ TEST_F(ClientTest, TestInsertIgnore) { unique_ptr<KuduInsertIgnore> insert_ignore(BuildTestInsertIgnore(client_table_.get(), 2)); ASSERT_OK(session->Apply(insert_ignore.release())); DoTestVerifyRows(client_table_, 2); + DoVerifyMetrics(session.get(), 2, 2, 0, 0, 0, 0, 0); } } @@ -2983,12 +3005,14 @@ TEST_F(ClientTest, TestUpdateIgnore) { unique_ptr<KuduUpdateIgnore> update_ignore(BuildTestUpdateIgnore(client_table_.get(), 1)); ASSERT_OK(session->Apply(update_ignore.release())); DoTestVerifyRows(client_table_, 0); + DoVerifyMetrics(session.get(), 0, 0, 0, 0, 1, 0, 0); } { unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1)); ASSERT_OK(session->Apply(insert.release())); DoTestVerifyRows(client_table_, 1); + DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0); } { @@ -2999,6 +3023,7 @@ TEST_F(ClientTest, TestUpdateIgnore) { ASSERT_OK(update_ignore->mutable_row()->SetStringCopy("string_val", "hello world")); ASSERT_OK(update_ignore->mutable_row()->SetInt32("non_null_with_default", 999)); ASSERT_OK(session->Apply(update_ignore.release())); + DoVerifyMetrics(session.get(), 1, 0, 0, 1, 1, 0, 0); vector<string> rows; KuduScanner scanner(client_table_.get()); @@ -3018,6 +3043,7 @@ TEST_F(ClientTest, TestDeleteIgnore) { unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1)); ASSERT_OK(session->Apply(insert.release())); DoTestVerifyRows(client_table_, 1); + DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0); } { @@ -3025,6 +3051,7 @@ TEST_F(ClientTest, TestDeleteIgnore) { unique_ptr<KuduDeleteIgnore> delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1)); ASSERT_OK(session->Apply(delete_ignore.release())); DoTestVerifyRows(client_table_, 0); + DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 0); } { @@ -3032,6 +3059,7 @@ TEST_F(ClientTest, TestDeleteIgnore) { unique_ptr<KuduDeleteIgnore> delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1)); ASSERT_OK(session->Apply(delete_ignore.release())); DoTestVerifyRows(client_table_, 0); + DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 1); } } @@ -4438,6 +4466,7 @@ TEST_F(ClientTest, TestUpsert) { // Perform and verify UPSERT which acts as an INSERT. ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 1, "original row")); FlushSessionOrDie(session); + DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0); { vector<string> rows; @@ -4450,6 +4479,7 @@ TEST_F(ClientTest, TestUpsert) { // Perform and verify UPSERT which acts as an UPDATE. ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 2, "upserted row")); FlushSessionOrDie(session); + DoVerifyMetrics(session.get(), 0, 0, 2, 0, 0, 0, 0); { vector<string> rows; @@ -4468,6 +4498,7 @@ TEST_F(ClientTest, TestUpsert) { ASSERT_OK(row->SetInt32("non_null_with_default", 999)); ASSERT_OK(session->Apply(update.release())); FlushSessionOrDie(session); + DoVerifyMetrics(session.get(), 0, 0, 2, 1, 0, 0, 0); } { vector<string> rows; @@ -4481,6 +4512,7 @@ TEST_F(ClientTest, TestUpsert) { // column, and therefore should not revert it back to its default. ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 3, "upserted row 2")); FlushSessionOrDie(session); + DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 0, 0); { vector<string> rows; ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows)); @@ -4492,6 +4524,7 @@ TEST_F(ClientTest, TestUpsert) { // Delete the row. ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1)); FlushSessionOrDie(session); + DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 1, 0); { vector<string> rows; ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows)); diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index 2e37713c8..61349527a 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -1447,6 +1447,10 @@ KuduClient* KuduSession::client() const { return data_->client_.get(); } +const ResourceMetrics& KuduSession::GetWriteOpMetrics() const { + return data_->write_op_metrics_; +} + //////////////////////////////////////////////////////////// // KuduTableAlterer //////////////////////////////////////////////////////////// diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 35dab86e2..bf94a7b26 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -2566,6 +2566,9 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession> /// @return Client for the session: pointer to the associated client object. KuduClient* client() const; + /// @return Cumulative write operation metrics since the beginning of the session. + const ResourceMetrics& GetWriteOpMetrics() const; + private: class KUDU_NO_EXPORT Data; diff --git a/src/kudu/client/resource_metrics.h b/src/kudu/client/resource_metrics.h index 015087dab..dff31fedc 100644 --- a/src/kudu/client/resource_metrics.h +++ b/src/kudu/client/resource_metrics.h @@ -65,6 +65,7 @@ class KUDU_EXPORT ResourceMetrics { private: friend class KuduScanner; + friend class KuduSession; class KUDU_NO_EXPORT Data; Data* data_; }; diff --git a/src/kudu/client/session-internal.cc b/src/kudu/client/session-internal.cc index 320ccac18..0cb0be561 100644 --- a/src/kudu/client/session-internal.cc +++ b/src/kudu/client/session-internal.cc @@ -22,29 +22,37 @@ #include <utility> #include <glog/logging.h> +#include <google/protobuf/descriptor.h> +#include <google/protobuf/message.h> #include "kudu/client/batcher.h" #include "kudu/client/callbacks.h" #include "kudu/client/error_collector.h" +#include "kudu/client/resource_metrics-internal.h" #include "kudu/client/shared_ptr.h" // IWYU pragma: keep #include "kudu/client/write_op.h" #include "kudu/common/partial_row.h" #include "kudu/common/schema.h" #include "kudu/gutil/port.h" +#include "kudu/gutil/strings/stringpiece.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/rpc/messenger.h" +#include "kudu/tserver/tserver.pb.h" #include "kudu/util/logging.h" - +using google::protobuf::FieldDescriptor; +using google::protobuf::Reflection; using kudu::client::internal::Batcher; using kudu::client::internal::ErrorCollector; using kudu::client::sp::shared_ptr; using kudu::client::sp::weak_ptr; using kudu::rpc::Messenger; +using kudu::tserver::ResourceMetricsPB; using std::unique_ptr; using strings::Substitute; namespace kudu { + namespace client { KuduSession::Data::Data(shared_ptr<KuduClient> client, @@ -577,5 +585,18 @@ size_t KuduSession::Data::GetBatchersCountForTests() const { return batchers_num_; } +void KuduSession::Data::UpdateWriteOpMetrics(const ResourceMetricsPB& resource_metrics) { + const auto* reflection = resource_metrics.GetReflection(); + const auto* desc = resource_metrics.GetDescriptor(); + for (int i = 0; i < desc->field_count(); i++) { + const FieldDescriptor* field = desc->field(i); + if (reflection->HasField(resource_metrics, field) && + field->cpp_type() == FieldDescriptor::CPPTYPE_INT64) { + write_op_metrics_.data_->Increment(StringPiece(field->name()), + reflection->GetInt64(resource_metrics, field)); + } + } +} + } // namespace client } // namespace kudu diff --git a/src/kudu/client/session-internal.h b/src/kudu/client/session-internal.h index db9de7b83..66962dd9a 100644 --- a/src/kudu/client/session-internal.h +++ b/src/kudu/client/session-internal.h @@ -25,6 +25,7 @@ #include "kudu/client/batcher.h" #include "kudu/client/client.h" #include "kudu/client/error_collector.h" +#include "kudu/client/resource_metrics.h" #include "kudu/client/shared_ptr.h" // IWYU pragma: keep #include "kudu/common/txn_id.h" #include "kudu/gutil/macros.h" @@ -40,6 +41,10 @@ namespace rpc { class Messenger; } // namespace rpc +namespace tserver { +class ResourceMetricsPB; +} // namespace tserver + namespace client { class KuduStatusCallback; @@ -160,6 +165,9 @@ class KuduSession::Data { // primary key and perform other validations with regard to the column schema. Status ValidateWriteOperation(KuduWriteOperation* op) const; + // Adds given write operation metrics into session's total write operation metrics. + void UpdateWriteOpMetrics(const tserver::ResourceMetricsPB& metrics); + // This constant represents a meaningful name for the first argument in // expressions like FlushCurrentBatcher(1, cbk): this is the watermark // corresponding to 1 byte of data. This watermark level is the minimum @@ -246,6 +254,9 @@ class KuduSession::Data { // returns true only if the upper-level session is a transactional one. const TxnId txn_id_; + // Metrics of all write operations in the session. + ResourceMetrics write_op_metrics_; + private: FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks); FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector); diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc b/src/kudu/integration-tests/exactly_once_writes-itest.cc index 51a43c209..9dece4513 100644 --- a/src/kudu/integration-tests/exactly_once_writes-itest.cc +++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc @@ -28,6 +28,7 @@ #include <glog/logging.h> #include <gtest/gtest.h> +#include "kudu/common/row_operations.pb.h" #include "kudu/common/schema.h" #include "kudu/common/wire_protocol-test-util.h" #include "kudu/common/wire_protocol.h" @@ -174,6 +175,13 @@ void ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(Sockaddr address, } // If there was no error, store the response. if (status.ok()) { + if (response.has_resource_metrics()) { + // Release resource_metrics because it is not relevant to this test. + // We are asserting that all responses are equal, but in case of exactly-once + // RPC semantics, metrics in retried requests may all come zeroed out or even + // not populated. + response.clear_resource_metrics(); + } responses->push_back(response); break; } diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc index b5ae1298b..099e2b782 100644 --- a/src/kudu/tablet/ops/write_op.cc +++ b/src/kudu/tablet/ops/write_op.cc @@ -81,11 +81,12 @@ using strings::Substitute; namespace kudu { namespace tablet { -using pb_util::SecureShortDebugString; using consensus::CommitMsg; using consensus::DriverType; using consensus::ReplicateMsg; using consensus::WRITE_OP; +using pb_util::SecureShortDebugString; +using tserver::ResourceMetricsPB; using tserver::TabletServerErrorPB; using tserver::WriteRequestPB; using tserver::WriteResponsePB; @@ -290,6 +291,11 @@ Status WriteOp::Apply(CommitMsg** commit_msg) { void WriteOp::Finish(OpResult result) { TRACE_EVENT0("op", "WriteOp::Finish"); + if (result == Op::APPLIED) { + // Populate response metrics. + state()->FillResponseMetrics(type()); + } + state()->FinishApplyingOrAbort(result); if (PREDICT_FALSE(result == Op::ABORTED)) { @@ -655,5 +661,20 @@ string WriteOpState::ToString() const { row_ops_str); } +void WriteOpState::FillResponseMetrics(consensus::DriverType type) { + const auto& op_m = op_metrics_; + tserver::ResourceMetricsPB* resp_metrics = response_->mutable_resource_metrics(); + resp_metrics->set_successful_inserts(op_m.successful_inserts); + resp_metrics->set_insert_ignore_errors(op_m.insert_ignore_errors); + resp_metrics->set_successful_upserts(op_m.successful_upserts); + resp_metrics->set_successful_updates(op_m.successful_updates); + resp_metrics->set_update_ignore_errors(op_m.update_ignore_errors); + resp_metrics->set_successful_deletes(op_m.successful_deletes); + resp_metrics->set_delete_ignore_errors(op_m.delete_ignore_errors); + if (type == consensus::LEADER && external_consistency_mode() == COMMIT_WAIT) { + resp_metrics->set_commit_wait_duration_usec(op_m.commit_wait_duration_usec); + } +} + } // namespace tablet } // namespace kudu diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h index ea618873f..5aa11dec7 100644 --- a/src/kudu/tablet/ops/write_op.h +++ b/src/kudu/tablet/ops/write_op.h @@ -261,6 +261,11 @@ class WriteOpState : public OpState { // the partition lock. void TransferOrReleasePartitionLock(); + // Copy metrics from 'op_metrics_' into the response's 'resource_metrics'. + // Should only be called before FinishApplyingOrAbort() to make sure that 'response_' + // has not been released. + void FillResponseMetrics(consensus::DriverType type); + private: // Releases all the row locks acquired by this op. void ReleaseRowLocks(); diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto index 82616b0f8..a6145b63b 100644 --- a/src/kudu/tserver/tserver.proto +++ b/src/kudu/tserver/tserver.proto @@ -185,6 +185,14 @@ message WriteResponsePB { // The timestamp chosen by the server for this write. // TODO KUDU-611 propagate timestamps with server signature. optional fixed64 timestamp = 3; + + // The write operation metrics of this RPC. + // This metrics contains the number of successful/unsuccessful operation + // ('successful_inserts' to 'delete_ignore_errors' fields in ResourceMetricsPB message) + // related to the associated write request. + // Additionally, contains 'commit_wait_duration_usec' metric if the responding server is + // a LEADER and the request external_consistency_mode is COMMIT_WAIT. + optional ResourceMetricsPB resource_metrics = 4; } // A list tablets request @@ -404,6 +412,22 @@ message ResourceMetricsPB { optional int64 cpu_user_nanos = 6; // Total elapsed CPU system time in nanoseconds for all scan rpc requests for this scanner. optional int64 cpu_system_nanos = 7; + // Total number of successful INSERT/INSERT_IGNORE operation. + optional int64 successful_inserts = 8; + // Total number of unsuccessful INSERT_IGNORE operation. + optional int64 insert_ignore_errors = 9; + // Total number of successful UPSERT operation. + optional int64 successful_upserts = 10; + // Total number of successful UPDATE/UPDATE_IGNORE operation. + optional int64 successful_updates = 11; + // Total number of unsuccessful UPDATE_IGNORE operation. + optional int64 update_ignore_errors = 12; + // Total number of successful DELETE operation. + optional int64 successful_deletes = 13; + // Total number of unsuccessful DELETE_IGNORE operation. + optional int64 delete_ignore_errors = 14; + // Total observed commit wait duration in microseconds. + optional int64 commit_wait_duration_usec = 15; } message ScanResponsePB {