This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new cd71c5b0a feat(make_idempotent): support making `incr` request
idempotent in `pegasus_write_service` (#2192)
cd71c5b0a is described below
commit cd71c5b0a541b261a1369de2b3b43012950f08cf
Author: Dan Wang <[email protected]>
AuthorDate: Mon Feb 17 15:59:13 2025 +0800
feat(make_idempotent): support making `incr` request idempotent in
`pegasus_write_service` (#2192)
In https://github.com/apache/incubator-pegasus/pull/2185, we've implemented
two APIs
`make_idempotent()` and `put()` in `pegasus_write_service::impl` to support
idempotent
`incr` operations. As the higher-level abstraction, `pegasus_write_service`
is able to process
idempotent `incr` requests. It will also implement `make_idempotent()` and
`put()` by calling
APIs provided by `pegasus_write_service::impl` internally. Similarly, both
APIs are actually also
provided for higher-level abstraction.
---
idl/rrdb.thrift | 2 +-
src/server/pegasus_write_service.cpp | 78 ++++++++++++++++++++++++++++-----
src/server/pegasus_write_service.h | 47 ++++++++++++++++++--
src/server/pegasus_write_service_impl.h | 6 +--
src/utils/metrics.h | 3 ++
5 files changed, 118 insertions(+), 18 deletions(-)
diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift
index c0534785d..712de1ec7 100644
--- a/idl/rrdb.thrift
+++ b/idl/rrdb.thrift
@@ -86,7 +86,7 @@ struct update_request
// single-put request from the one translated from a non-idempotent atomic
write request:
// - a general single-put request, if `type` is UT_PUT or not set by
default as it's
// optional, or
- // - a put request translated from a non-idempotent incr request, if
`type` is UT_INCR.
+ // - a put request translated from an incr request, if `type` is UT_INCR.
4:optional update_type type;
}
diff --git a/src/server/pegasus_write_service.cpp
b/src/server/pegasus_write_service.cpp
index af2e07302..c7232aacc 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -152,6 +152,7 @@
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
_server(server),
_impl(new impl(server)),
_batch_start_time(0),
+ _make_incr_idempotent_duration_ns(0),
_cu_calculator(server->_cu_calculator.get()),
METRIC_VAR_INIT_replica(put_requests),
METRIC_VAR_INIT_replica(multi_put_requests),
@@ -171,7 +172,8 @@
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
METRIC_VAR_INIT_replica(dup_time_lag_ms),
METRIC_VAR_INIT_replica(dup_lagging_writes),
_put_batch_size(0),
- _remove_batch_size(0)
+ _remove_batch_size(0),
+ _incr_batch_size(0)
{
}
@@ -211,6 +213,37 @@ int pegasus_write_service::multi_remove(int64_t decree,
return err;
}
+int pegasus_write_service::make_idempotent(const dsn::apps::incr_request &req,
+ dsn::apps::incr_response &err_resp,
+ dsn::apps::update_request &update)
+{
+ const uint64_t start_time = dsn_now_ns();
+
+ const int err = _impl->make_idempotent(req, err_resp, update);
+
+ // Calculate the duration that an incr request is translated into an
idempotent put request.
+ _make_incr_idempotent_duration_ns = dsn_now_ns() - start_time;
+
+ return err;
+}
+
+int pegasus_write_service::put(const db_write_context &ctx,
+ const dsn::apps::update_request &update,
+ dsn::apps::incr_response &resp)
+{
+ // The total latency should also include the duration of the translation.
+ METRIC_VAR_AUTO_LATENCY(incr_latency_ns, dsn_now_ns() -
_make_incr_idempotent_duration_ns);
+ METRIC_VAR_INCREMENT(incr_requests);
+
+ const int err = _impl->put(ctx, update, resp);
+
+ if (_server->is_primary()) {
+ _cu_calculator->add_incr_cu(resp.error, update.key);
+ }
+
+ return err;
+}
+
int pegasus_write_service::incr(int64_t decree,
const dsn::apps::incr_request &update,
dsn::apps::incr_response &resp)
@@ -278,7 +311,23 @@ int pegasus_write_service::batch_put(const
db_write_context &ctx,
{
CHECK_GT_MSG(_batch_start_time, 0, "batch_put must be called after
batch_prepare");
- ++_put_batch_size;
+ if (!update.__isset.type || update.type == dsn::apps::update_type::UT_PUT)
{
+ // This is a general single-put request.
+ ++_put_batch_size;
+ } else {
+ // There are only two possible situations for batch_put() where this
put request
+ // originates from an atomic write request:
+ // - now we are replaying plog into RocksDB at startup of this replica.
+ // - now we are in a secondary replica: just received a prepare
request and appended
+ // it to plog, now we are applying it into RocksDB.
+ //
+ // Though this is a put request, we choose to udapte the metrics of
its original
+ // request (i.e. the atomic write).
+ if (update.type == dsn::apps::update_type::UT_INCR) {
+ ++_incr_batch_size;
+ }
+ }
+
int err = _impl->batch_put(ctx, update, resp);
if (_server->is_primary()) {
@@ -309,7 +358,7 @@ int pegasus_write_service::batch_commit(int64_t decree)
CHECK_GT_MSG(_batch_start_time, 0, "batch_commit must be called after
batch_prepare");
int err = _impl->batch_commit(decree);
- clear_up_batch_states();
+ batch_finish();
return err;
}
@@ -319,14 +368,14 @@ void pegasus_write_service::batch_abort(int64_t decree,
int err)
CHECK(err, "must abort on non-zero err");
_impl->batch_abort(decree, err);
- clear_up_batch_states();
+ batch_finish();
}
void pegasus_write_service::set_default_ttl(uint32_t ttl) {
_impl->set_default_ttl(ttl); }
-void pegasus_write_service::clear_up_batch_states()
+void pegasus_write_service::batch_finish()
{
-#define PROCESS_WRITE_BATCH(op)
\
+#define UPDATE_WRITE_BATCH_METRICS(op)
\
do {
\
METRIC_VAR_INCREMENT_BY(op##_requests,
static_cast<int64_t>(_##op##_batch_size)); \
METRIC_VAR_SET(op##_latency_ns,
static_cast<size_t>(_##op##_batch_size), latency_ns); \
@@ -335,20 +384,27 @@ void pegasus_write_service::clear_up_batch_states()
auto latency_ns = static_cast<int64_t>(dsn_now_ns() - _batch_start_time);
- PROCESS_WRITE_BATCH(put);
- PROCESS_WRITE_BATCH(remove);
+ // Take the latency of executing the entire batch as the latency for
processing each
+ // request within it, since the latency of each request could not be known.
+ UPDATE_WRITE_BATCH_METRICS(put);
+ UPDATE_WRITE_BATCH_METRICS(remove);
+
+ // Since the duration of translation is unknown for both possible
situations where these
+ // put requests are actually translated from atomic requests (see comments
in batch_put()),
+ // there's no need to add `_make_incr_idempotent_duration_ns` to the total
latency.
+ UPDATE_WRITE_BATCH_METRICS(incr);
_batch_start_time = 0;
-#undef PROCESS_WRITE_BATCH
+#undef UPDATE_WRITE_BATCH_METRICS
}
int pegasus_write_service::duplicate(int64_t decree,
- const dsn::apps::duplicate_request
&requests,
+ const dsn::apps::duplicate_request
&update,
dsn::apps::duplicate_response &resp)
{
// Verifies the cluster_id.
- for (const auto &request : requests.entries) {
+ for (const auto &request : update.entries) {
if
(!dsn::replication::is_dup_cluster_id_configured(request.cluster_id)) {
resp.__set_error(rocksdb::Status::kInvalidArgument);
resp.__set_error_hint("request cluster id is unconfigured");
diff --git a/src/server/pegasus_write_service.h
b/src/server/pegasus_write_service.h
index 2142792a1..d69838a29 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -136,7 +136,19 @@ public:
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp);
- // Write INCR record.
+ // Translate an INCR request into an idempotent PUT request. Only called
by primary
+ // replicas.
+ int make_idempotent(const dsn::apps::incr_request &req,
+ dsn::apps::incr_response &err_resp,
+ dsn::apps::update_request &update);
+
+ // Write an idempotent INCR record (i.e. a PUT record) and reply to the
client with INCR
+ // response. Only called by primary replicas.
+ int put(const db_write_context &ctx,
+ const dsn::apps::update_request &update,
+ dsn::apps::incr_response &resp);
+
+ // Write a non-idempotent INCR record.
int incr(int64_t decree, const dsn::apps::incr_request &update,
dsn::apps::incr_response &resp);
// Write CHECK_AND_SET record.
@@ -167,6 +179,8 @@ public:
// Add PUT record in batch write.
// \returns rocksdb::Status::Code.
// NOTE that `resp` should not be moved or freed while the batch is not
committed.
+ // When called by secondary replicas, this put request may be translated
from an incr
+ // request for idempotence.
int batch_put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp);
@@ -187,7 +201,8 @@ public:
void set_default_ttl(uint32_t ttl);
private:
- void clear_up_batch_states();
+ // Finish batch write with metrics such as latencies calculated and some
states cleared.
+ void batch_finish();
friend class pegasus_write_service_test;
friend class PegasusWriteServiceImplTest;
@@ -202,6 +217,26 @@ private:
uint64_t _batch_start_time;
+ // Only used for primary replica to calculate the duration that an incr
request from
+ // the client is translated into an idempotent put request before appended
to plog,
+ // including reading the current value from RocksDB and incrementing it by
a given
+ // amount.
+ //
+ // This variable is defined as per-replica rather than per-request, for
the reason
+ // that the current design for implementing idempotence is to make sure
there is only
+ // one atomic request being processed in the write pipeline for each
replica. This
+ // pipeline consists of the following stages:
+ // (1) read the current value from RocksDB and built the idempotent
request based on
+ // it;
+ // (2) append the corresponding mutation to plog;
+ // (3) broadcast the prepare requests;
+ // (4) apply the result for atomic operation back to RocksDB ultimately.
+ // For a request, this variable will be set in stage (1) and read in stage
(4); since
+ // there is only one request in the pipeline, this variable is guaranteed
not to be
+ // set for another request before stage (4) is finished. Therefore, it is
safe to
+ // define this variable as per-replica.
+ uint64_t _make_incr_idempotent_duration_ns;
+
capacity_unit_calculator *_cu_calculator;
METRIC_VAR_DECLARE_counter(put_requests);
@@ -224,10 +259,16 @@ private:
METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
METRIC_VAR_DECLARE_counter(dup_lagging_writes);
- // Record batch size for put and remove requests.
+ // Measure the size of single-put requests in batch applied into RocksDB
for metrics.
uint32_t _put_batch_size;
+
+ // Measure the size of single-remove requests in batch applied into
RocksDB for metrics.
uint32_t _remove_batch_size;
+ // Measure the size of incr requests (with each translated into an
idempotent put request)
+ // in batch applied into RocksDB for metrics.
+ uint32_t _incr_batch_size;
+
// TODO(wutao1): add metrics for failed rpc.
};
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index ff0f2bfac..494a384e0 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -171,8 +171,8 @@ public:
return resp.error;
}
- // Tranlate an incr request which is certainly non-idempotent into a
single-put request
- // which is certainly idempotent. Return current status for RocksDB.
+ // Tranlate an incr request into a single-put request which is certainly
idempotent.
+ // Return current status for RocksDB. Only called by primary replicas.
int make_idempotent(const dsn::apps::incr_request &req,
dsn::apps::incr_response &err_resp,
dsn::apps::update_request &update)
@@ -229,7 +229,7 @@ public:
}
// Apply single-put request translated from incr request into RocksDB, and
build response
- // for incr. Return current status for RocksDB.
+ // for incr. Return current status for RocksDB. Only called by primary
replicas.
int put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::incr_response &resp)
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index b399d9313..25545ba7e 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -1342,6 +1342,9 @@ public:
_samples.get()[index & (_sample_size - 1)] = val;
}
+ // Set the same value for n times, used to treat a single value as the
result of multiple
+ // observations, e.g. taking the latency of executing the entire batch as
the latency for
+ // processing each request within it (see
pegasus_write_service::batch_finish()).
void set(size_t n, const value_type &val)
{
for (size_t i = 0; i < n; ++i) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]