This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new cd71c5b0a feat(make_idempotent): support making `incr` request 
idempotent in `pegasus_write_service` (#2192)
cd71c5b0a is described below

commit cd71c5b0a541b261a1369de2b3b43012950f08cf
Author: Dan Wang <[email protected]>
AuthorDate: Mon Feb 17 15:59:13 2025 +0800

    feat(make_idempotent): support making `incr` request idempotent in 
`pegasus_write_service` (#2192)
    
    In https://github.com/apache/incubator-pegasus/pull/2185, we've implemented 
two APIs
    `make_idempotent()` and `put()` in `pegasus_write_service::impl` to support 
idempotent
    `incr` operations. As the higher-level abstraction, `pegasus_write_service` 
is able to process
    idempotent `incr` requests.  It will also implement `make_idempotent()` and 
`put()` by calling
    APIs provided by `pegasus_write_service::impl` internally. Similarly, both 
APIs are actually also
    provided for higher-level abstraction.
---
 idl/rrdb.thrift                         |  2 +-
 src/server/pegasus_write_service.cpp    | 78 ++++++++++++++++++++++++++++-----
 src/server/pegasus_write_service.h      | 47 ++++++++++++++++++--
 src/server/pegasus_write_service_impl.h |  6 +--
 src/utils/metrics.h                     |  3 ++
 5 files changed, 118 insertions(+), 18 deletions(-)

diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift
index c0534785d..712de1ec7 100644
--- a/idl/rrdb.thrift
+++ b/idl/rrdb.thrift
@@ -86,7 +86,7 @@ struct update_request
     // single-put request from the one translated from a non-idempotent atomic 
write request:
     // - a general single-put request, if `type` is UT_PUT or not set by 
default as it's
     // optional, or
-    // - a put request translated from a non-idempotent incr request, if 
`type` is UT_INCR.
+    // - a put request translated from an incr request, if `type` is UT_INCR.
     4:optional update_type type;
 }
 
diff --git a/src/server/pegasus_write_service.cpp 
b/src/server/pegasus_write_service.cpp
index af2e07302..c7232aacc 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -152,6 +152,7 @@ 
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
       _server(server),
       _impl(new impl(server)),
       _batch_start_time(0),
+      _make_incr_idempotent_duration_ns(0),
       _cu_calculator(server->_cu_calculator.get()),
       METRIC_VAR_INIT_replica(put_requests),
       METRIC_VAR_INIT_replica(multi_put_requests),
@@ -171,7 +172,8 @@ 
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
       METRIC_VAR_INIT_replica(dup_time_lag_ms),
       METRIC_VAR_INIT_replica(dup_lagging_writes),
       _put_batch_size(0),
-      _remove_batch_size(0)
+      _remove_batch_size(0),
+      _incr_batch_size(0)
 {
 }
 
@@ -211,6 +213,37 @@ int pegasus_write_service::multi_remove(int64_t decree,
     return err;
 }
 
+int pegasus_write_service::make_idempotent(const dsn::apps::incr_request &req,
+                                           dsn::apps::incr_response &err_resp,
+                                           dsn::apps::update_request &update)
+{
+    const uint64_t start_time = dsn_now_ns();
+
+    const int err = _impl->make_idempotent(req, err_resp, update);
+
+    // Calculate the duration that an incr request is translated into an 
idempotent put request.
+    _make_incr_idempotent_duration_ns = dsn_now_ns() - start_time;
+
+    return err;
+}
+
+int pegasus_write_service::put(const db_write_context &ctx,
+                               const dsn::apps::update_request &update,
+                               dsn::apps::incr_response &resp)
+{
+    // The total latency should also include the duration of the translation.
+    METRIC_VAR_AUTO_LATENCY(incr_latency_ns, dsn_now_ns() - 
_make_incr_idempotent_duration_ns);
+    METRIC_VAR_INCREMENT(incr_requests);
+
+    const int err = _impl->put(ctx, update, resp);
+
+    if (_server->is_primary()) {
+        _cu_calculator->add_incr_cu(resp.error, update.key);
+    }
+
+    return err;
+}
+
 int pegasus_write_service::incr(int64_t decree,
                                 const dsn::apps::incr_request &update,
                                 dsn::apps::incr_response &resp)
@@ -278,7 +311,23 @@ int pegasus_write_service::batch_put(const 
db_write_context &ctx,
 {
     CHECK_GT_MSG(_batch_start_time, 0, "batch_put must be called after 
batch_prepare");
 
-    ++_put_batch_size;
+    if (!update.__isset.type || update.type == dsn::apps::update_type::UT_PUT) 
{
+        // This is a general single-put request.
+        ++_put_batch_size;
+    } else {
+        // There are only two possible situations for batch_put() where this 
put request
+        // originates from an atomic write request:
+        // - now we are replaying plog into RocksDB at startup of this replica.
+        // - now we are in a secondary replica: just received a prepare 
request and appended
+        // it to plog, now we are applying it into RocksDB.
+        //
+        // Though this is a put request, we choose to udapte the metrics of 
its original
+        // request (i.e. the atomic write).
+        if (update.type == dsn::apps::update_type::UT_INCR) {
+            ++_incr_batch_size;
+        }
+    }
+
     int err = _impl->batch_put(ctx, update, resp);
 
     if (_server->is_primary()) {
@@ -309,7 +358,7 @@ int pegasus_write_service::batch_commit(int64_t decree)
     CHECK_GT_MSG(_batch_start_time, 0, "batch_commit must be called after 
batch_prepare");
 
     int err = _impl->batch_commit(decree);
-    clear_up_batch_states();
+    batch_finish();
     return err;
 }
 
@@ -319,14 +368,14 @@ void pegasus_write_service::batch_abort(int64_t decree, 
int err)
     CHECK(err, "must abort on non-zero err");
 
     _impl->batch_abort(decree, err);
-    clear_up_batch_states();
+    batch_finish();
 }
 
 void pegasus_write_service::set_default_ttl(uint32_t ttl) { 
_impl->set_default_ttl(ttl); }
 
-void pegasus_write_service::clear_up_batch_states()
+void pegasus_write_service::batch_finish()
 {
-#define PROCESS_WRITE_BATCH(op)                                                
                    \
+#define UPDATE_WRITE_BATCH_METRICS(op)                                         
                    \
     do {                                                                       
                    \
         METRIC_VAR_INCREMENT_BY(op##_requests, 
static_cast<int64_t>(_##op##_batch_size));          \
         METRIC_VAR_SET(op##_latency_ns, 
static_cast<size_t>(_##op##_batch_size), latency_ns);      \
@@ -335,20 +384,27 @@ void pegasus_write_service::clear_up_batch_states()
 
     auto latency_ns = static_cast<int64_t>(dsn_now_ns() - _batch_start_time);
 
-    PROCESS_WRITE_BATCH(put);
-    PROCESS_WRITE_BATCH(remove);
+    // Take the latency of executing the entire batch as the latency for 
processing each
+    // request within it, since the latency of each request could not be known.
+    UPDATE_WRITE_BATCH_METRICS(put);
+    UPDATE_WRITE_BATCH_METRICS(remove);
+
+    // Since the duration of translation is unknown for both possible 
situations where these
+    // put requests are actually translated from atomic requests (see comments 
in batch_put()),
+    // there's no need to add `_make_incr_idempotent_duration_ns` to the total 
latency.
+    UPDATE_WRITE_BATCH_METRICS(incr);
 
     _batch_start_time = 0;
 
-#undef PROCESS_WRITE_BATCH
+#undef UPDATE_WRITE_BATCH_METRICS
 }
 
 int pegasus_write_service::duplicate(int64_t decree,
-                                     const dsn::apps::duplicate_request 
&requests,
+                                     const dsn::apps::duplicate_request 
&update,
                                      dsn::apps::duplicate_response &resp)
 {
     // Verifies the cluster_id.
-    for (const auto &request : requests.entries) {
+    for (const auto &request : update.entries) {
         if 
(!dsn::replication::is_dup_cluster_id_configured(request.cluster_id)) {
             resp.__set_error(rocksdb::Status::kInvalidArgument);
             resp.__set_error_hint("request cluster id is unconfigured");
diff --git a/src/server/pegasus_write_service.h 
b/src/server/pegasus_write_service.h
index 2142792a1..d69838a29 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -136,7 +136,19 @@ public:
                      const dsn::apps::multi_remove_request &update,
                      dsn::apps::multi_remove_response &resp);
 
-    // Write INCR record.
+    // Translate an INCR request into an idempotent PUT request. Only called 
by primary
+    // replicas.
+    int make_idempotent(const dsn::apps::incr_request &req,
+                        dsn::apps::incr_response &err_resp,
+                        dsn::apps::update_request &update);
+
+    // Write an idempotent INCR record (i.e. a PUT record) and reply to the 
client with INCR
+    // response. Only called by primary replicas.
+    int put(const db_write_context &ctx,
+            const dsn::apps::update_request &update,
+            dsn::apps::incr_response &resp);
+
+    // Write a non-idempotent INCR record.
     int incr(int64_t decree, const dsn::apps::incr_request &update, 
dsn::apps::incr_response &resp);
 
     // Write CHECK_AND_SET record.
@@ -167,6 +179,8 @@ public:
     // Add PUT record in batch write.
     // \returns rocksdb::Status::Code.
     // NOTE that `resp` should not be moved or freed while the batch is not 
committed.
+    // When called by secondary replicas, this put request may be translated 
from an incr
+    // request for idempotence.
     int batch_put(const db_write_context &ctx,
                   const dsn::apps::update_request &update,
                   dsn::apps::update_response &resp);
@@ -187,7 +201,8 @@ public:
     void set_default_ttl(uint32_t ttl);
 
 private:
-    void clear_up_batch_states();
+    // Finish batch write with metrics such as latencies calculated and some 
states cleared.
+    void batch_finish();
 
     friend class pegasus_write_service_test;
     friend class PegasusWriteServiceImplTest;
@@ -202,6 +217,26 @@ private:
 
     uint64_t _batch_start_time;
 
+    // Only used for primary replica to calculate the duration that an incr 
request from
+    // the client is translated into an idempotent put request before appended 
to plog,
+    // including reading the current value from RocksDB and incrementing it by 
a given
+    // amount.
+    //
+    // This variable is defined as per-replica rather than per-request, for 
the reason
+    // that the current design for implementing idempotence is to make sure 
there is only
+    // one atomic request being processed in the write pipeline for each 
replica. This
+    // pipeline consists of the following stages:
+    // (1) read the current value from RocksDB and built the idempotent 
request based on
+    // it;
+    // (2) append the corresponding mutation to plog;
+    // (3) broadcast the prepare requests;
+    // (4) apply the result for atomic operation back to RocksDB ultimately.
+    // For a request, this variable will be set in stage (1) and read in stage 
(4); since
+    // there is only one request in the pipeline, this variable is guaranteed 
not to be
+    // set for another request before stage (4) is finished. Therefore, it is 
safe to
+    // define this variable as per-replica.
+    uint64_t _make_incr_idempotent_duration_ns;
+
     capacity_unit_calculator *_cu_calculator;
 
     METRIC_VAR_DECLARE_counter(put_requests);
@@ -224,10 +259,16 @@ private:
     METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
     METRIC_VAR_DECLARE_counter(dup_lagging_writes);
 
-    // Record batch size for put and remove requests.
+    // Measure the size of single-put requests in batch applied into RocksDB 
for metrics.
     uint32_t _put_batch_size;
+
+    // Measure the size of single-remove requests in batch applied into 
RocksDB for metrics.
     uint32_t _remove_batch_size;
 
+    // Measure the size of incr requests (with each translated into an 
idempotent put request)
+    // in batch applied into RocksDB for metrics.
+    uint32_t _incr_batch_size;
+
     // TODO(wutao1): add metrics for failed rpc.
 };
 
diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index ff0f2bfac..494a384e0 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -171,8 +171,8 @@ public:
         return resp.error;
     }
 
-    // Tranlate an incr request which is certainly non-idempotent into a 
single-put request
-    // which is certainly idempotent. Return current status for RocksDB.
+    // Tranlate an incr request into a single-put request which is certainly 
idempotent.
+    // Return current status for RocksDB. Only called by primary replicas.
     int make_idempotent(const dsn::apps::incr_request &req,
                         dsn::apps::incr_response &err_resp,
                         dsn::apps::update_request &update)
@@ -229,7 +229,7 @@ public:
     }
 
     // Apply single-put request translated from incr request into RocksDB, and 
build response
-    // for incr. Return current status for RocksDB.
+    // for incr. Return current status for RocksDB. Only called by primary 
replicas.
     int put(const db_write_context &ctx,
             const dsn::apps::update_request &update,
             dsn::apps::incr_response &resp)
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index b399d9313..25545ba7e 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -1342,6 +1342,9 @@ public:
         _samples.get()[index & (_sample_size - 1)] = val;
     }
 
+    // Set the same value for n times, used to treat a single value as the 
result of multiple
+    // observations, e.g. taking the latency of executing the entire batch as 
the latency for
+    // processing each request within it (see 
pegasus_write_service::batch_finish()).
     void set(size_t n, const value_type &val)
     {
         for (size_t i = 0; i < n; ++i) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to