This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/migrate-metrics-dev by this
push:
new 16c9766da feat(new_metrics): migrate replica-level metrics for
pegasus_server_impl (part 1) (#1374)
16c9766da is described below
commit 16c9766da7cbcc03d947aa0497d6ecc91a874fb5
Author: Dan Wang <[email protected]>
AuthorDate: Tue Mar 7 14:32:56 2023 +0800
feat(new_metrics): migrate replica-level metrics for pegasus_server_impl
(part 1) (#1374)
This PR is to migrate replica-level metrics of `pegasus_server_impl` to new
framework
(https://github.com/apache/incubator-pegasus/issues/1333). Since there are
many replica-level
metrics in `pegasus_server_impl`, this PR is part 1 for this migration and
other metrics (all
of which are gauges) will be migrated in the later PRs.
---
src/server/pegasus_server_impl.cpp | 250 ++++++++++++++-------------
src/server/pegasus_server_impl.h | 32 ++--
src/server/pegasus_server_impl_init.cpp | 126 +++++++-------
src/server/pegasus_write_service_impl.h | 5 +-
src/server/rocksdb_wrapper.cpp | 6 +-
src/server/rocksdb_wrapper.h | 2 +-
src/server/test/pegasus_server_impl_test.cpp | 4 +-
src/utils/metrics.h | 16 +-
src/utils/test/metrics_test.cpp | 2 -
src/utils/time_utils.h | 2 +-
10 files changed, 242 insertions(+), 203 deletions(-)
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 3b9688e35..b83f48882 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -259,13 +259,50 @@ int
pegasus_server_impl::on_batched_write_requests(int64_t decree,
return _server_write->on_batched_write_requests(requests, count, decree,
timestamp);
}
+// Since LOG_ERROR_PREFIX depends on log_prefix(), this method could not be
declared as static or
+// with anonymous namespace.
+void pegasus_server_impl::log_expired_data(const char *op,
+ const dsn::rpc_address &addr,
+ const dsn::blob &hash_key,
+ const dsn::blob &sort_key) const
+{
+ LOG_ERROR_PREFIX("rocksdb data expired for {} from {}: hash_key = \"{}\",
sort_key = \"{}\"",
+ op,
+ addr,
+ pegasus::utils::c_escape_string(hash_key),
+ pegasus::utils::c_escape_string(sort_key));
+}
+
+void pegasus_server_impl::log_expired_data(const char *op,
+ const dsn::rpc_address &addr,
+ const dsn::blob &key) const
+{
+ dsn::blob hash_key, sort_key;
+ pegasus_restore_key(key, hash_key, sort_key);
+ log_expired_data(op, addr, hash_key, sort_key);
+}
+
+void pegasus_server_impl::log_expired_data(const char *op,
+ const dsn::rpc_address &addr,
+ const rocksdb::Slice &key) const
+{
+ dsn::blob raw_key(key.data(), 0, key.size());
+ log_expired_data(op, addr, raw_key);
+}
+
+#define LOG_EXPIRED_DATA_IF_VERBOSE(...)
\
+ do {
\
+ if (dsn_unlikely(_verbose_log)) {
\
+ log_expired_data(__FUNCTION__, rpc.remote_address(),
##__VA_ARGS__); \
+ }
\
+ } while (0)
+
void pegasus_server_impl::on_get(get_rpc rpc)
{
- CHECK(_is_open, "");
- _pfc_get_qps->increment();
- uint64_t start_time = dsn_now_ns();
+ CHECK_TRUE(_is_open);
+
+ METRIC_VAR_INCREMENT(get_requests);
- const auto &key = rpc.request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -277,21 +314,20 @@ void pegasus_server_impl::on_get(get_rpc rpc)
return;
}
+ METRIC_VAR_AUTO_LATENCY(get_latency_ns);
+
+ const auto &key = rpc.request();
rocksdb::Slice skey(key.data(), key.length());
std::string value;
rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey,
&value);
if (status.ok()) {
if (check_if_record_expired(utils::epoch_now(), value)) {
- _pfc_recent_expire_count->increment();
- if (_verbose_log) {
- LOG_ERROR_PREFIX("rocksdb data expired for get from {}",
rpc.remote_address());
- }
+ METRIC_VAR_INCREMENT(read_expired_values);
+ LOG_EXPIRED_DATA_IF_VERBOSE(key);
status = rocksdb::Status::NotFound();
}
- }
-
- if (!status.ok()) {
+ } else {
if (_verbose_log) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(key, hash_key, sort_key);
@@ -314,7 +350,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
usleep(10 * 1000);
#endif
- uint64_t time_used = dsn_now_ns() - start_time;
+ auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(get_latency_ns);
if (is_get_abnormal(time_used, value.size())) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(key, hash_key, sort_key);
@@ -327,7 +363,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
status.ToString(),
value.size(),
time_used);
- _pfc_recent_abnormal_count->increment();
+ METRIC_VAR_INCREMENT(abnormal_read_requests);
}
resp.error = status.code();
@@ -336,17 +372,14 @@ void pegasus_server_impl::on_get(get_rpc rpc)
}
_cu_calculator->add_get_cu(rpc.dsn_request(), resp.error, key, resp.value);
- _pfc_get_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
{
- CHECK(_is_open, "");
- _pfc_multi_get_qps->increment();
- uint64_t start_time = dsn_now_ns();
+ CHECK_TRUE(_is_open);
+
+ METRIC_VAR_INCREMENT(multi_get_requests);
- const auto &request = rpc.request();
- dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -358,6 +391,10 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
return;
}
+ METRIC_VAR_AUTO_LATENCY(multi_get_latency_ns);
+
+ const auto &request = rpc.request();
+ dsn::message_ex *req = rpc.dsn_request();
if (!is_filter_type_supported(request.sort_key_filter_type)) {
LOG_ERROR_PREFIX(
"invalid argument for multi_get from {}: sort key filter type {}
not supported",
@@ -365,7 +402,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key,
resp.kvs);
- _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
return;
}
@@ -452,8 +488,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
}
resp.error = rocksdb::Status::kOk;
_cu_calculator->add_multi_get_cu(req, resp.error,
request.hash_key, resp.kvs);
- _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
-
return;
}
@@ -656,7 +690,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
for (int i = 0; i < keys.size(); i++) {
rocksdb::Status &status = statuses[i];
std::string &value = values[i];
- // print log
if (!status.ok()) {
if (_verbose_log) {
LOG_ERROR_PREFIX("rocksdb get failed for multi_get from
{}: hash_key = \"{}\", "
@@ -670,41 +703,38 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
rpc.remote_address(),
status.ToString());
}
- }
- // check ttl
- if (status.ok()) {
- uint32_t expire_ts =
pegasus_extract_expire_ts(_pegasus_data_version, value);
- if (expire_ts > 0 && expire_ts <= epoch_now) {
- expire_count++;
- if (_verbose_log) {
- LOG_ERROR_PREFIX("rocksdb data expired for multi_get
from {}",
- rpc.remote_address());
- }
- status = rocksdb::Status::NotFound();
- }
- }
- // extract value
- if (status.ok()) {
- // check if exceed limit
- if (count >= max_kv_count || size >= max_kv_size) {
- exceed_limit = true;
- break;
- }
- ::dsn::apps::key_value kv;
- kv.key = request.sort_keys[i];
- if (!request.no_value) {
- pegasus_extract_user_data(_pegasus_data_version,
std::move(value), kv.value);
+
+ if (status.IsNotFound()) {
+ continue;
}
- count++;
- size += kv.key.length() + kv.value.length();
- resp.kvs.emplace_back(std::move(kv));
- }
- // if error occurred
- if (!status.ok() && !status.IsNotFound()) {
+
error_occurred = true;
final_status = status;
break;
}
+
+ // check ttl
+ if (check_if_record_expired(epoch_now, value)) {
+ expire_count++;
+ LOG_EXPIRED_DATA_IF_VERBOSE(request.hash_key,
request.sort_keys[i]);
+ status = rocksdb::Status::NotFound();
+ continue;
+ }
+
+ // check if exceed limit
+ if (dsn_unlikely(count >= max_kv_count || size >= max_kv_size)) {
+ exceed_limit = true;
+ break;
+ }
+
+ ::dsn::apps::key_value kv;
+ kv.key = request.sort_keys[i];
+ if (!request.no_value) {
+ pegasus_extract_user_data(_pegasus_data_version,
std::move(value), kv.value);
+ }
+ count++;
+ size += kv.key.length() + kv.value.length();
+ resp.kvs.emplace_back(std::move(kv));
}
if (error_occurred) {
@@ -722,7 +752,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
usleep(10 * 1000);
#endif
- uint64_t time_used = dsn_now_ns() - start_time;
+ auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(multi_get_latency_ns);
if (is_multi_get_abnormal(time_used, size, iteration_count)) {
LOG_WARNING_PREFIX(
"rocksdb abnormal multi_get from {}: hash_key = {}, "
@@ -748,25 +778,20 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
expire_count,
filter_count,
time_used);
- _pfc_recent_abnormal_count->increment();
+ METRIC_VAR_INCREMENT(abnormal_read_requests);
}
- if (expire_count > 0) {
- _pfc_recent_expire_count->add(expire_count);
- }
- if (filter_count > 0) {
- _pfc_recent_filter_count->add(filter_count);
- }
+ METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+ METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key,
resp.kvs);
- _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
{
- CHECK(_is_open, "");
- _pfc_batch_get_qps->increment();
- int64_t start_time = dsn_now_ns();
+ CHECK_TRUE(_is_open);
+
+ METRIC_VAR_INCREMENT(batch_get_requests);
auto &response = rpc.response();
response.app_id = _gpid.get_app_id();
@@ -779,13 +804,14 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
return;
}
+ METRIC_VAR_AUTO_LATENCY(batch_get_latency_ns);
+
const auto &request = rpc.request();
if (request.keys.empty()) {
response.error = rocksdb::Status::kInvalidArgument;
LOG_ERROR_PREFIX("Invalid argument for batch_get from {}: 'keys' field
in request is empty",
rpc.remote_address().to_string());
_cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error,
response.data);
- _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
return;
}
@@ -804,6 +830,8 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
bool error_occurred = false;
int64_t total_data_size = 0;
uint32_t epoch_now = pegasus::utils::epoch_now();
+ uint64_t expire_count = 0;
+
std::vector<std::string> values;
std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts,
keys, &values);
response.data.reserve(request.keys.size());
@@ -819,13 +847,8 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
if (dsn_likely(status.ok())) {
if (check_if_record_expired(epoch_now, value)) {
- if (_verbose_log) {
- LOG_ERROR_PREFIX(
- "rocksdb data expired for batch_get from {}, hash_key
= {}, sort_key = {}",
- rpc.remote_address().to_string(),
- pegasus::utils::c_escape_string(hash_key),
- pegasus::utils::c_escape_string(sort_key));
- }
+ ++expire_count;
+ LOG_EXPIRED_DATA_IF_VERBOSE(hash_key, sort_key);
continue;
}
@@ -863,7 +886,7 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
response.error = rocksdb::Status::kOk;
}
- int64_t time_used = dsn_now_ns() - start_time;
+ auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(batch_get_latency_ns);
if (is_batch_get_abnormal(time_used, total_data_size,
request.keys.size())) {
LOG_WARNING_PREFIX(
"rocksdb abnormal batch_get from {}: total data size = {}, row
count = {}, "
@@ -872,33 +895,36 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
total_data_size,
request.keys.size(),
time_used / 1000);
- _pfc_recent_abnormal_count->increment();
+ METRIC_VAR_INCREMENT(abnormal_read_requests);
}
+ METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+
_cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error,
response.data);
- _pfc_batch_get_latency->set(time_used);
}
void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
{
- CHECK(_is_open, "");
+ CHECK_TRUE(_is_open);
- _pfc_scan_qps->increment();
- uint64_t start_time = dsn_now_ns();
+ METRIC_VAR_INCREMENT(scan_requests);
- const auto &hash_key = rpc.request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
+
if (!_read_size_throttling_controller->available()) {
rpc.error() = dsn::ERR_BUSY;
_counter_recent_read_throttling_reject_count->increment();
return;
}
+ METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
// scan
::dsn::blob start_key, stop_key;
+ const auto &hash_key = rpc.request();
pegasus_generate_key(start_key, hash_key, ::dsn::blob());
pegasus_generate_next_blob(stop_key, hash_key);
rocksdb::Slice start(start_key.data(), start_key.length());
@@ -920,18 +946,14 @@ void
pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
if (check_if_record_expired(epoch_now, it->value())) {
expire_count++;
- if (_verbose_log) {
- LOG_ERROR_PREFIX("rocksdb data expired for sortkey_count from
{}",
- rpc.remote_address());
- }
+ LOG_EXPIRED_DATA_IF_VERBOSE(it->key());
} else {
resp.count++;
}
it->Next();
}
- if (expire_count > 0) {
- _pfc_recent_expire_count->add(expire_count);
- }
+
+ METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
resp.error = it->status().code();
if (!it->status().ok()) {
@@ -957,7 +979,6 @@ void
pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
}
_cu_calculator->add_sortkey_count_cu(rpc.dsn_request(), resp.error,
hash_key);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_ttl(ttl_rpc rpc)
@@ -985,7 +1006,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
if (status.ok()) {
expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
if (check_if_ts_expired(now_ts, expire_ts)) {
- _pfc_recent_expire_count->increment();
+ METRIC_VAR_INCREMENT(read_expired_values);
if (_verbose_log) {
LOG_ERROR_PREFIX("rocksdb data expired for ttl from {}",
rpc.remote_address());
}
@@ -1025,12 +1046,10 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
{
- CHECK(_is_open, "");
- _pfc_scan_qps->increment();
- uint64_t start_time = dsn_now_ns();
+ CHECK_TRUE(_is_open);
+
+ METRIC_VAR_INCREMENT(scan_requests);
- const auto &request = rpc.request();
- dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -1042,6 +1061,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc
rpc)
return;
}
+ METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
+ const auto &request = rpc.request();
+ dsn::message_ex *req = rpc.dsn_request();
if (!is_filter_type_supported(request.hash_key_filter_type)) {
LOG_ERROR_PREFIX(
"invalid argument for get_scanner from {}: hash key filter type {}
not supported",
@@ -1049,10 +1072,9 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc
rpc)
request.hash_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
return;
}
+
if (!is_filter_type_supported(request.sort_key_filter_type)) {
LOG_ERROR_PREFIX(
"invalid argument for get_scanner from {}: sort key filter type {}
not supported",
@@ -1060,8 +1082,6 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc
rpc)
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
return;
}
@@ -1117,8 +1137,6 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc
rpc)
}
resp.error = rocksdb::Status::kOk;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
return;
}
@@ -1271,24 +1289,18 @@ void
pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
}
- if (expire_count > 0) {
- _pfc_recent_expire_count->add(expire_count);
- }
- if (filter_count > 0) {
- _pfc_recent_filter_count->add(filter_count);
- }
+ METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+ METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_scan(scan_rpc rpc)
{
- CHECK(_is_open, "");
- _pfc_scan_qps->increment();
- uint64_t start_time = dsn_now_ns();
- const auto &request = rpc.request();
- dsn::message_ex *req = rpc.dsn_request();
+ CHECK_TRUE(_is_open);
+
+ METRIC_VAR_INCREMENT(scan_requests);
+
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -1300,6 +1312,10 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
return;
}
+ METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
+ const auto &request = rpc.request();
+ dsn::message_ex *req = rpc.dsn_request();
std::unique_ptr<pegasus_scan_context> context =
_context_cache.fetch(request.context_id);
if (context) {
rocksdb::Iterator *it = context->iterator.get();
@@ -1422,18 +1438,14 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
}
- if (expire_count > 0) {
- _pfc_recent_expire_count->add(expire_count);
- }
- if (filter_count > 0) {
- _pfc_recent_filter_count->add(filter_count);
- }
+ METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+ METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
+
} else {
resp.error = rocksdb::Status::Code::kNotFound;
}
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_clear_scanner(const int64_t &args) {
_context_cache.fetch(args); }
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index c907f05b4..8ac67652b 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -406,6 +406,15 @@ private:
dsn::replication::manual_compaction_status::type query_compact_status()
const override;
+ // Log expired keys for verbose mode.
+ void log_expired_data(const char *op,
+ const dsn::rpc_address &addr,
+ const dsn::blob &hash_key,
+ const dsn::blob &sort_key) const;
+ void log_expired_data(const char *op, const dsn::rpc_address &addr, const
dsn::blob &key) const;
+ void
+ log_expired_data(const char *op, const dsn::rpc_address &addr, const
rocksdb::Slice &key) const;
+
private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
@@ -485,20 +494,19 @@ private:
std::shared_ptr<throttling_controller> _read_size_throttling_controller;
- // perf counters
- ::dsn::perf_counter_wrapper _pfc_get_qps;
- ::dsn::perf_counter_wrapper _pfc_multi_get_qps;
- ::dsn::perf_counter_wrapper _pfc_batch_get_qps;
- ::dsn::perf_counter_wrapper _pfc_scan_qps;
+ METRIC_VAR_DECLARE_counter(get_requests);
+ METRIC_VAR_DECLARE_counter(multi_get_requests);
+ METRIC_VAR_DECLARE_counter(batch_get_requests);
+ METRIC_VAR_DECLARE_counter(scan_requests);
- ::dsn::perf_counter_wrapper _pfc_get_latency;
- ::dsn::perf_counter_wrapper _pfc_multi_get_latency;
- ::dsn::perf_counter_wrapper _pfc_batch_get_latency;
- ::dsn::perf_counter_wrapper _pfc_scan_latency;
+ METRIC_VAR_DECLARE_percentile_int64(get_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(multi_get_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(batch_get_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(scan_latency_ns);
- ::dsn::perf_counter_wrapper _pfc_recent_expire_count;
- ::dsn::perf_counter_wrapper _pfc_recent_filter_count;
- ::dsn::perf_counter_wrapper _pfc_recent_abnormal_count;
+ METRIC_VAR_DECLARE_counter(read_expired_values);
+ METRIC_VAR_DECLARE_counter(read_filtered_values);
+ METRIC_VAR_DECLARE_counter(abnormal_read_requests);
// rocksdb internal statistics
// server level
diff --git a/src/server/pegasus_server_impl_init.cpp
b/src/server/pegasus_server_impl_init.cpp
index 012ca6ac1..41bc65a19 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -31,6 +31,61 @@
#include "pegasus_server_write.h"
#include "hotkey_collector.h"
+METRIC_DEFINE_counter(replica,
+ get_requests,
+ dsn::metric_unit::kRequests,
+ "The number of GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ multi_get_requests,
+ dsn::metric_unit::kRequests,
+ "The number of MULTI_GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ batch_get_requests,
+ dsn::metric_unit::kRequests,
+ "The number of BATCH_GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ scan_requests,
+ dsn::metric_unit::kRequests,
+ "The number of SCAN requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ get_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of GET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ multi_get_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of MULTI_GET requests for each
replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ batch_get_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of BATCH_GET requests for each
replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ scan_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of SCAN requests for each
replica");
+
+METRIC_DEFINE_counter(replica,
+ read_expired_values,
+ dsn::metric_unit::kValues,
+ "The number of expired values read for each replica");
+
+METRIC_DEFINE_counter(replica,
+ read_filtered_values,
+ dsn::metric_unit::kValues,
+ "The number of filtered values read for each replica");
+
+METRIC_DEFINE_counter(replica,
+ abnormal_read_requests,
+ dsn::metric_unit::kRequests,
+ "The number of abnormal read requests for each replica");
+
namespace pegasus {
namespace server {
@@ -195,7 +250,18 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_last_durable_decree(0),
_is_checkpointing(false),
_manual_compact_svc(this),
- _partition_version(0)
+ _partition_version(0),
+ METRIC_VAR_INIT_replica(get_requests),
+ METRIC_VAR_INIT_replica(multi_get_requests),
+ METRIC_VAR_INIT_replica(batch_get_requests),
+ METRIC_VAR_INIT_replica(scan_requests),
+ METRIC_VAR_INIT_replica(get_latency_ns),
+ METRIC_VAR_INIT_replica(multi_get_latency_ns),
+ METRIC_VAR_INIT_replica(batch_get_latency_ns),
+ METRIC_VAR_INIT_replica(scan_latency_ns),
+ METRIC_VAR_INIT_replica(read_expired_values),
+ METRIC_VAR_INIT_replica(read_filtered_values),
+ METRIC_VAR_INIT_replica(abnormal_read_requests)
{
_primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
_gpid = get_gpid();
@@ -601,64 +667,6 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
char name[256];
// register the perf counters
- snprintf(name, 255, "get_qps@%s", str_gpid.c_str());
- _pfc_get_qps.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of GET
request");
-
- snprintf(name, 255, "multi_get_qps@%s", str_gpid.c_str());
- _pfc_multi_get_qps.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of
MULTI_GET request");
-
- snprintf(name, 255, "batch_get_qps@%s", str_gpid.c_str());
- _pfc_batch_get_qps.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of
BATCH_GET request");
-
- snprintf(name, 255, "scan_qps@%s", str_gpid.c_str());
- _pfc_scan_qps.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of SCAN
request");
-
- snprintf(name, 255, "get_latency@%s", str_gpid.c_str());
- _pfc_get_latency.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of GET request");
-
- snprintf(name, 255, "multi_get_latency@%s", str_gpid.c_str());
- _pfc_multi_get_latency.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of
MULTI_GET request");
-
- snprintf(name, 255, "batch_get_latency@%s", str_gpid.c_str());
- _pfc_batch_get_latency.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of
BATCH_GET request");
-
- snprintf(name, 255, "scan_latency@%s", str_gpid.c_str());
- _pfc_scan_latency.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of SCAN
request");
-
- snprintf(name, 255, "recent.expire.count@%s", str_gpid.c_str());
- _pfc_recent_expire_count.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_VOLATILE_NUMBER,
- "statistic the recent expired
value read count");
-
- snprintf(name, 255, "recent.filter.count@%s", str_gpid.c_str());
- _pfc_recent_filter_count.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_VOLATILE_NUMBER,
- "statistic the recent filtered
value read count");
-
- snprintf(name, 255, "recent.abnormal.count@%s", str_gpid.c_str());
- _pfc_recent_abnormal_count.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_VOLATILE_NUMBER,
- "statistic the recent abnormal
read count");
-
snprintf(name, 255, "disk.storage.sst.count@%s", str_gpid.c_str());
_pfc_rdb_sst_count.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the count of
sstable files");
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index 16108b852..d1ecf0664 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -88,8 +88,7 @@ public:
explicit impl(pegasus_server_impl *server)
: replica_base(server),
_primary_address(server->_primary_address),
- _pegasus_data_version(server->_pegasus_data_version),
- _pfc_recent_expire_count(server->_pfc_recent_expire_count)
+ _pegasus_data_version(server->_pegasus_data_version)
{
_rocksdb_wrapper = dsn::make_unique<rocksdb_wrapper>(server);
}
@@ -689,8 +688,6 @@ private:
const std::string _primary_address;
const uint32_t _pegasus_data_version;
- ::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
-
std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper;
// for setting update_response.error after committed.
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 4a10a3827..3e95af4ca 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -24,6 +24,8 @@
#include "pegasus_write_service_impl.h"
#include "base/pegasus_value_schema.h"
+METRIC_DECLARE_counter(read_expired_values);
+
namespace pegasus {
namespace server {
@@ -33,7 +35,7 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
_rd_opts(server->_data_cf_rd_opts),
_meta_cf(server->_meta_cf),
_pegasus_data_version(server->_pegasus_data_version),
- _pfc_recent_expire_count(server->_pfc_recent_expire_count),
+ METRIC_VAR_INIT_replica(read_expired_values),
_default_ttl(0)
{
_write_batch = dsn::make_unique<rocksdb::WriteBatch>();
@@ -55,7 +57,7 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/
db_get_context *ctx)
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version,
ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
- _pfc_recent_expire_count->increment();
+ METRIC_VAR_INCREMENT(read_expired_values);
}
return rocksdb::Status::kOk;
} else if (s.IsNotFound()) {
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index 51a46c7ff..a26fae34c 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -81,7 +81,7 @@ private:
rocksdb::ColumnFamilyHandle *_meta_cf;
const uint32_t _pegasus_data_version;
- dsn::perf_counter_wrapper &_pfc_recent_expire_count;
+ METRIC_VAR_DECLARE_counter(read_expired_values);
volatile uint32_t _default_ttl;
friend class rocksdb_wrapper_test;
diff --git a/src/server/test/pegasus_server_impl_test.cpp
b/src/server/test/pegasus_server_impl_test.cpp
index 16671e4dc..e008b7fad 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -54,7 +54,7 @@ public:
_server->update_app_envs(envs);
// do on_get/on_multi_get operation,
- long before_count =
_server->_pfc_recent_abnormal_count->get_integer_value();
+ auto before_count =
_server->METRIC_VAR_VALUE(abnormal_read_requests);
if (!test.is_multi_get) {
get_rpc rpc(dsn::make_unique<dsn::blob>(test_key),
dsn::apps::RPC_RRDB_RRDB_GET);
_server->on_get(rpc);
@@ -67,7 +67,7 @@ public:
dsn::apps::RPC_RRDB_RRDB_MULTI_GET);
_server->on_multi_get(rpc);
}
- long after_count =
_server->_pfc_recent_abnormal_count->get_integer_value();
+ auto after_count =
_server->METRIC_VAR_VALUE(abnormal_read_requests);
ASSERT_EQ(before_count + test.expect_perf_counter_incr,
after_count);
}
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index f32665f24..f44296ce5 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -157,7 +157,13 @@
#define METRIC_VAR_INIT_replica(name) METRIC_VAR_INIT(name, replica)
// Perform increment-related operations on metrics including gauge and counter.
-#define METRIC_VAR_INCREMENT_BY(name, x) _##name->increment_by(x)
+#define METRIC_VAR_INCREMENT_BY(name, x)
\
+ do {
\
+ if (x != 0) {
\
+ _##name->increment_by(x);
\
+ }
\
+ } while (0)
+
#define METRIC_VAR_INCREMENT(name) _##name->increment()
// Perform set() operations on metrics including gauge and percentile.
@@ -168,10 +174,15 @@
// such as percentile.
#define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__)
+// Read the current measurement of the metric.
+#define METRIC_VAR_VALUE(name) _##name->value()
+
// Convenient macro that is used to compute latency automatically, which is
dedicated to percentile.
#define METRIC_VAR_AUTO_LATENCY(name, ...)
\
dsn::auto_latency __##name##_auto_latency(_##name, ##__VA_ARGS__)
+#define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name)
__##name##_auto_latency.duration_ns()
+
namespace dsn {
class metric_prototype;
@@ -589,6 +600,7 @@ enum class metric_unit : size_t
kMilliSeconds,
kSeconds,
kRequests,
+ kValues,
kInvalidUnit,
};
@@ -1400,6 +1412,8 @@ public:
}
}
+ inline uint64_t duration_ns() const { return _chrono.duration_ns(); }
+
private:
percentile_ptr<int64_t> _percentile;
utils::chronograph _chrono;
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index 9a0d27e86..66cb41c13 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -3121,8 +3121,6 @@ void MetricVarTest::test_set_percentile(size_t n, int64_t
val)
EXPECT_EQ(std::vector<int64_t>(n, val),
METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
}
-#define METRIC_VAR_VALUE(name) _##name->value()
-
#define TEST_METRIC_VAR_INCREMENT(name)
\
do {
\
ASSERT_EQ(0, METRIC_VAR_VALUE(name));
\
diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h
index 18f665535..032bd5166 100644
--- a/src/utils/time_utils.h
+++ b/src/utils/time_utils.h
@@ -139,7 +139,7 @@ public:
inline void reset_start_time() { _start_time_ns = dsn_now_ns(); }
- inline uint64_t duration_ns()
+ inline uint64_t duration_ns() const
{
auto now = dsn_now_ns();
CHECK_GE(now, _start_time_ns);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]