This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new e64faa7d0 feat(make_idempotent): support making `check_and_set`
request idempotent in `pegasus_write_service` (#2239)
e64faa7d0 is described below
commit e64faa7d03f836e5212874bbbfb8f900341897d7
Author: Dan Wang <[email protected]>
AuthorDate: Mon Apr 28 20:52:49 2025 +0800
feat(make_idempotent): support making `check_and_set` request idempotent in
`pegasus_write_service` (#2239)
https://github.com/apache/incubator-pegasus/issues/2197
Implement higher-level APIs `make_idempotent()` and `put()` on
`pegasus_write_service` for `check_and_set` request. Both of them
will Internally call APIs provided by `pegasus_write_service::impl`.
Two metrics are also introduced to measure the duration that make
`incr` and `check_and_set` idempotent.
---
src/server/pegasus_write_service.cpp | 77 ++++++++++++++++++++++++++++--------
src/server/pegasus_write_service.h | 40 +++++++++----------
2 files changed, 80 insertions(+), 37 deletions(-)
diff --git a/src/server/pegasus_write_service.cpp
b/src/server/pegasus_write_service.cpp
index c7232aacc..dfea32601 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -81,6 +81,24 @@ METRIC_DEFINE_counter(replica,
dsn::metric_unit::kRequests,
"The number of CHECK_AND_MUTATE requests");
+METRIC_DEFINE_percentile_int64(replica,
+ make_incr_idempotent_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The duration that an incr request is made
idempotent, "
+ "including reading the current value from
storage engine, "
+ "increasing it by a given amount and
translating the incr "
+ "request into the single-put request. Only used
for the "
+ "primary replicas");
+
+METRIC_DEFINE_percentile_int64(replica,
+ make_check_and_set_idempotent_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The duration that a check_and_set request is
made "
+ "idempotent, including reading the check value
from "
+ "storage engine, validating the check
conditions and "
+ "translating the check_and_set request into the
single-put "
+ "request. Only used for the primary replicas");
+
METRIC_DEFINE_percentile_int64(replica,
put_latency_ns,
dsn::metric_unit::kNanoSeconds,
@@ -152,7 +170,6 @@
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
_server(server),
_impl(new impl(server)),
_batch_start_time(0),
- _make_incr_idempotent_duration_ns(0),
_cu_calculator(server->_cu_calculator.get()),
METRIC_VAR_INIT_replica(put_requests),
METRIC_VAR_INIT_replica(multi_put_requests),
@@ -161,6 +178,8 @@
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
METRIC_VAR_INIT_replica(incr_requests),
METRIC_VAR_INIT_replica(check_and_set_requests),
METRIC_VAR_INIT_replica(check_and_mutate_requests),
+ METRIC_VAR_INIT_replica(make_incr_idempotent_latency_ns),
+ METRIC_VAR_INIT_replica(make_check_and_set_idempotent_latency_ns),
METRIC_VAR_INIT_replica(put_latency_ns),
METRIC_VAR_INIT_replica(multi_put_latency_ns),
METRIC_VAR_INIT_replica(remove_latency_ns),
@@ -173,7 +192,8 @@
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
METRIC_VAR_INIT_replica(dup_lagging_writes),
_put_batch_size(0),
_remove_batch_size(0),
- _incr_batch_size(0)
+ _incr_batch_size(0),
+ _check_and_set_batch_size(0)
{
}
@@ -217,22 +237,16 @@ int pegasus_write_service::make_idempotent(const
dsn::apps::incr_request &req,
dsn::apps::incr_response &err_resp,
dsn::apps::update_request &update)
{
- const uint64_t start_time = dsn_now_ns();
-
- const int err = _impl->make_idempotent(req, err_resp, update);
-
- // Calculate the duration that an incr request is translated into an
idempotent put request.
- _make_incr_idempotent_duration_ns = dsn_now_ns() - start_time;
+ METRIC_VAR_AUTO_LATENCY(make_incr_idempotent_latency_ns);
- return err;
+ return _impl->make_idempotent(req, err_resp, update);
}
int pegasus_write_service::put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::incr_response &resp)
{
- // The total latency should also include the duration of the translation.
- METRIC_VAR_AUTO_LATENCY(incr_latency_ns, dsn_now_ns() -
_make_incr_idempotent_duration_ns);
+ METRIC_VAR_AUTO_LATENCY(incr_latency_ns);
METRIC_VAR_INCREMENT(incr_requests);
const int err = _impl->put(ctx, update, resp);
@@ -260,6 +274,33 @@ int pegasus_write_service::incr(int64_t decree,
return err;
}
+int pegasus_write_service::make_idempotent(const
dsn::apps::check_and_set_request &req,
+ dsn::apps::check_and_set_response
&err_resp,
+ dsn::apps::update_request &update)
+{
+ METRIC_VAR_AUTO_LATENCY(make_check_and_set_idempotent_latency_ns);
+
+ return _impl->make_idempotent(req, err_resp, update);
+}
+
+int pegasus_write_service::put(const db_write_context &ctx,
+ const dsn::apps::update_request &update,
+ const dsn::apps::check_and_set_request &req,
+ dsn::apps::check_and_set_response &resp)
+{
+ METRIC_VAR_AUTO_LATENCY(check_and_set_latency_ns);
+ METRIC_VAR_INCREMENT(check_and_set_requests);
+
+ const int err = _impl->put(ctx, update, resp);
+
+ if (_server->is_primary()) {
+ _cu_calculator->add_check_and_set_cu(
+ resp.error, req.hash_key, req.check_sort_key, req.set_sort_key,
req.set_value);
+ }
+
+ return err;
+}
+
int pegasus_write_service::check_and_set(int64_t decree,
const
dsn::apps::check_and_set_request &update,
dsn::apps::check_and_set_response
&resp)
@@ -325,10 +366,12 @@ int pegasus_write_service::batch_put(const
db_write_context &ctx,
// request (i.e. the atomic write).
if (update.type == dsn::apps::update_type::UT_INCR) {
++_incr_batch_size;
+ } else if (update.type == dsn::apps::update_type::UT_CHECK_AND_SET) {
+ ++_check_and_set_batch_size;
}
}
- int err = _impl->batch_put(ctx, update, resp);
+ const int err = _impl->batch_put(ctx, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_put_cu(resp.error, update.key, update.value);
@@ -344,7 +387,7 @@ int pegasus_write_service::batch_remove(int64_t decree,
CHECK_GT_MSG(_batch_start_time, 0, "batch_remove must be called after
batch_prepare");
++_remove_batch_size;
- int err = _impl->batch_remove(decree, key, resp);
+ const int err = _impl->batch_remove(decree, key, resp);
if (_server->is_primary()) {
_cu_calculator->add_remove_cu(resp.error, key);
@@ -357,7 +400,7 @@ int pegasus_write_service::batch_commit(int64_t decree)
{
CHECK_GT_MSG(_batch_start_time, 0, "batch_commit must be called after
batch_prepare");
- int err = _impl->batch_commit(decree);
+ const int err = _impl->batch_commit(decree);
batch_finish();
return err;
}
@@ -389,10 +432,10 @@ void pegasus_write_service::batch_finish()
UPDATE_WRITE_BATCH_METRICS(put);
UPDATE_WRITE_BATCH_METRICS(remove);
- // Since the duration of translation is unknown for both possible
situations where these
- // put requests are actually translated from atomic requests (see comments
in batch_put()),
- // there's no need to add `_make_incr_idempotent_duration_ns` to the total
latency.
+ // These put requests are translated from atomic requests. See comments in
batch_put()
+ // for the two possible situations where we are now.
UPDATE_WRITE_BATCH_METRICS(incr);
+ UPDATE_WRITE_BATCH_METRICS(check_and_set);
_batch_start_time = 0;
diff --git a/src/server/pegasus_write_service.h
b/src/server/pegasus_write_service.h
index d69838a29..3939a6a6e 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -151,6 +151,19 @@ public:
// Write a non-idempotent INCR record.
int incr(int64_t decree, const dsn::apps::incr_request &update,
dsn::apps::incr_response &resp);
+ // Translate a CHECK_AND_SET request into an idempotent PUT request. Only
called by
+ // primary replicas.
+ int make_idempotent(const dsn::apps::check_and_set_request &req,
+ dsn::apps::check_and_set_response &err_resp,
+ dsn::apps::update_request &update);
+
+ // Write an idempotent CHECK_AND_SET record (i.e. a PUT record) and reply
to the client
+ // with CHECK_AND_SET response. Only called by primary replicas.
+ int put(const db_write_context &ctx,
+ const dsn::apps::update_request &update,
+ const dsn::apps::check_and_set_request &req,
+ dsn::apps::check_and_set_response &resp);
+
// Write CHECK_AND_SET record.
int check_and_set(int64_t decree,
const dsn::apps::check_and_set_request &update,
@@ -217,26 +230,6 @@ private:
uint64_t _batch_start_time;
- // Only used for primary replica to calculate the duration that an incr
request from
- // the client is translated into an idempotent put request before appended
to plog,
- // including reading the current value from RocksDB and incrementing it by
a given
- // amount.
- //
- // This variable is defined as per-replica rather than per-request, for
the reason
- // that the current design for implementing idempotence is to make sure
there is only
- // one atomic request being processed in the write pipeline for each
replica. This
- // pipeline consists of the following stages:
- // (1) read the current value from RocksDB and built the idempotent
request based on
- // it;
- // (2) append the corresponding mutation to plog;
- // (3) broadcast the prepare requests;
- // (4) apply the result for atomic operation back to RocksDB ultimately.
- // For a request, this variable will be set in stage (1) and read in stage
(4); since
- // there is only one request in the pipeline, this variable is guaranteed
not to be
- // set for another request before stage (4) is finished. Therefore, it is
safe to
- // define this variable as per-replica.
- uint64_t _make_incr_idempotent_duration_ns;
-
capacity_unit_calculator *_cu_calculator;
METRIC_VAR_DECLARE_counter(put_requests);
@@ -247,6 +240,9 @@ private:
METRIC_VAR_DECLARE_counter(check_and_set_requests);
METRIC_VAR_DECLARE_counter(check_and_mutate_requests);
+ METRIC_VAR_DECLARE_percentile_int64(make_incr_idempotent_latency_ns);
+
METRIC_VAR_DECLARE_percentile_int64(make_check_and_set_idempotent_latency_ns);
+
METRIC_VAR_DECLARE_percentile_int64(put_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(multi_put_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(remove_latency_ns);
@@ -269,6 +265,10 @@ private:
// in batch applied into RocksDB for metrics.
uint32_t _incr_batch_size;
+ // Measure the size of check_and_set requests (with each translated into
an idempotent put
+ // request) in batch applied into RocksDB for metrics.
+ uint32_t _check_and_set_batch_size;
+
// TODO(wutao1): add metrics for failed rpc.
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]