This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new e64faa7d0 feat(make_idempotent): support making `check_and_set` 
request idempotent in `pegasus_write_service` (#2239)
e64faa7d0 is described below

commit e64faa7d03f836e5212874bbbfb8f900341897d7
Author: Dan Wang <[email protected]>
AuthorDate: Mon Apr 28 20:52:49 2025 +0800

    feat(make_idempotent): support making `check_and_set` request idempotent in 
`pegasus_write_service` (#2239)
    
    https://github.com/apache/incubator-pegasus/issues/2197
    
    Implement higher-level APIs `make_idempotent()` and `put()` on
    `pegasus_write_service` for `check_and_set` request. Both of them
    will Internally call APIs provided by `pegasus_write_service::impl`.
    Two metrics are also introduced to measure the duration that make
    `incr` and `check_and_set` idempotent.
---
 src/server/pegasus_write_service.cpp | 77 ++++++++++++++++++++++++++++--------
 src/server/pegasus_write_service.h   | 40 +++++++++----------
 2 files changed, 80 insertions(+), 37 deletions(-)

diff --git a/src/server/pegasus_write_service.cpp 
b/src/server/pegasus_write_service.cpp
index c7232aacc..dfea32601 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -81,6 +81,24 @@ METRIC_DEFINE_counter(replica,
                       dsn::metric_unit::kRequests,
                       "The number of CHECK_AND_MUTATE requests");
 
+METRIC_DEFINE_percentile_int64(replica,
+                               make_incr_idempotent_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The duration that an incr request is made 
idempotent, "
+                               "including reading the current value from 
storage engine, "
+                               "increasing it by a given amount and 
translating the incr "
+                               "request into the single-put request. Only used 
for the "
+                               "primary replicas");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               make_check_and_set_idempotent_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The duration that a check_and_set request is 
made "
+                               "idempotent, including reading the check value 
from "
+                               "storage engine, validating the check 
conditions and "
+                               "translating the check_and_set request into the 
single-put "
+                               "request. Only used for the primary replicas");
+
 METRIC_DEFINE_percentile_int64(replica,
                                put_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
@@ -152,7 +170,6 @@ 
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
       _server(server),
       _impl(new impl(server)),
       _batch_start_time(0),
-      _make_incr_idempotent_duration_ns(0),
       _cu_calculator(server->_cu_calculator.get()),
       METRIC_VAR_INIT_replica(put_requests),
       METRIC_VAR_INIT_replica(multi_put_requests),
@@ -161,6 +178,8 @@ 
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
       METRIC_VAR_INIT_replica(incr_requests),
       METRIC_VAR_INIT_replica(check_and_set_requests),
       METRIC_VAR_INIT_replica(check_and_mutate_requests),
+      METRIC_VAR_INIT_replica(make_incr_idempotent_latency_ns),
+      METRIC_VAR_INIT_replica(make_check_and_set_idempotent_latency_ns),
       METRIC_VAR_INIT_replica(put_latency_ns),
       METRIC_VAR_INIT_replica(multi_put_latency_ns),
       METRIC_VAR_INIT_replica(remove_latency_ns),
@@ -173,7 +192,8 @@ 
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
       METRIC_VAR_INIT_replica(dup_lagging_writes),
       _put_batch_size(0),
       _remove_batch_size(0),
-      _incr_batch_size(0)
+      _incr_batch_size(0),
+      _check_and_set_batch_size(0)
 {
 }
 
@@ -217,22 +237,16 @@ int pegasus_write_service::make_idempotent(const 
dsn::apps::incr_request &req,
                                            dsn::apps::incr_response &err_resp,
                                            dsn::apps::update_request &update)
 {
-    const uint64_t start_time = dsn_now_ns();
-
-    const int err = _impl->make_idempotent(req, err_resp, update);
-
-    // Calculate the duration that an incr request is translated into an 
idempotent put request.
-    _make_incr_idempotent_duration_ns = dsn_now_ns() - start_time;
+    METRIC_VAR_AUTO_LATENCY(make_incr_idempotent_latency_ns);
 
-    return err;
+    return _impl->make_idempotent(req, err_resp, update);
 }
 
 int pegasus_write_service::put(const db_write_context &ctx,
                                const dsn::apps::update_request &update,
                                dsn::apps::incr_response &resp)
 {
-    // The total latency should also include the duration of the translation.
-    METRIC_VAR_AUTO_LATENCY(incr_latency_ns, dsn_now_ns() - 
_make_incr_idempotent_duration_ns);
+    METRIC_VAR_AUTO_LATENCY(incr_latency_ns);
     METRIC_VAR_INCREMENT(incr_requests);
 
     const int err = _impl->put(ctx, update, resp);
@@ -260,6 +274,33 @@ int pegasus_write_service::incr(int64_t decree,
     return err;
 }
 
+int pegasus_write_service::make_idempotent(const 
dsn::apps::check_and_set_request &req,
+                                           dsn::apps::check_and_set_response 
&err_resp,
+                                           dsn::apps::update_request &update)
+{
+    METRIC_VAR_AUTO_LATENCY(make_check_and_set_idempotent_latency_ns);
+
+    return _impl->make_idempotent(req, err_resp, update);
+}
+
+int pegasus_write_service::put(const db_write_context &ctx,
+                               const dsn::apps::update_request &update,
+                               const dsn::apps::check_and_set_request &req,
+                               dsn::apps::check_and_set_response &resp)
+{
+    METRIC_VAR_AUTO_LATENCY(check_and_set_latency_ns);
+    METRIC_VAR_INCREMENT(check_and_set_requests);
+
+    const int err = _impl->put(ctx, update, resp);
+
+    if (_server->is_primary()) {
+        _cu_calculator->add_check_and_set_cu(
+            resp.error, req.hash_key, req.check_sort_key, req.set_sort_key, 
req.set_value);
+    }
+
+    return err;
+}
+
 int pegasus_write_service::check_and_set(int64_t decree,
                                          const 
dsn::apps::check_and_set_request &update,
                                          dsn::apps::check_and_set_response 
&resp)
@@ -325,10 +366,12 @@ int pegasus_write_service::batch_put(const 
db_write_context &ctx,
         // request (i.e. the atomic write).
         if (update.type == dsn::apps::update_type::UT_INCR) {
             ++_incr_batch_size;
+        } else if (update.type == dsn::apps::update_type::UT_CHECK_AND_SET) {
+            ++_check_and_set_batch_size;
         }
     }
 
-    int err = _impl->batch_put(ctx, update, resp);
+    const int err = _impl->batch_put(ctx, update, resp);
 
     if (_server->is_primary()) {
         _cu_calculator->add_put_cu(resp.error, update.key, update.value);
@@ -344,7 +387,7 @@ int pegasus_write_service::batch_remove(int64_t decree,
     CHECK_GT_MSG(_batch_start_time, 0, "batch_remove must be called after 
batch_prepare");
 
     ++_remove_batch_size;
-    int err = _impl->batch_remove(decree, key, resp);
+    const int err = _impl->batch_remove(decree, key, resp);
 
     if (_server->is_primary()) {
         _cu_calculator->add_remove_cu(resp.error, key);
@@ -357,7 +400,7 @@ int pegasus_write_service::batch_commit(int64_t decree)
 {
     CHECK_GT_MSG(_batch_start_time, 0, "batch_commit must be called after 
batch_prepare");
 
-    int err = _impl->batch_commit(decree);
+    const int err = _impl->batch_commit(decree);
     batch_finish();
     return err;
 }
@@ -389,10 +432,10 @@ void pegasus_write_service::batch_finish()
     UPDATE_WRITE_BATCH_METRICS(put);
     UPDATE_WRITE_BATCH_METRICS(remove);
 
-    // Since the duration of translation is unknown for both possible 
situations where these
-    // put requests are actually translated from atomic requests (see comments 
in batch_put()),
-    // there's no need to add `_make_incr_idempotent_duration_ns` to the total 
latency.
+    // These put requests are translated from atomic requests. See comments in 
batch_put()
+    // for the two possible situations where we are now.
     UPDATE_WRITE_BATCH_METRICS(incr);
+    UPDATE_WRITE_BATCH_METRICS(check_and_set);
 
     _batch_start_time = 0;
 
diff --git a/src/server/pegasus_write_service.h 
b/src/server/pegasus_write_service.h
index d69838a29..3939a6a6e 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -151,6 +151,19 @@ public:
     // Write a non-idempotent INCR record.
     int incr(int64_t decree, const dsn::apps::incr_request &update, 
dsn::apps::incr_response &resp);
 
+    // Translate a CHECK_AND_SET request into an idempotent PUT request. Only 
called by
+    // primary replicas.
+    int make_idempotent(const dsn::apps::check_and_set_request &req,
+                        dsn::apps::check_and_set_response &err_resp,
+                        dsn::apps::update_request &update);
+
+    // Write an idempotent CHECK_AND_SET record (i.e. a PUT record) and reply 
to the client
+    // with CHECK_AND_SET response. Only called by primary replicas.
+    int put(const db_write_context &ctx,
+            const dsn::apps::update_request &update,
+            const dsn::apps::check_and_set_request &req,
+            dsn::apps::check_and_set_response &resp);
+
     // Write CHECK_AND_SET record.
     int check_and_set(int64_t decree,
                       const dsn::apps::check_and_set_request &update,
@@ -217,26 +230,6 @@ private:
 
     uint64_t _batch_start_time;
 
-    // Only used for primary replica to calculate the duration that an incr 
request from
-    // the client is translated into an idempotent put request before appended 
to plog,
-    // including reading the current value from RocksDB and incrementing it by 
a given
-    // amount.
-    //
-    // This variable is defined as per-replica rather than per-request, for 
the reason
-    // that the current design for implementing idempotence is to make sure 
there is only
-    // one atomic request being processed in the write pipeline for each 
replica. This
-    // pipeline consists of the following stages:
-    // (1) read the current value from RocksDB and built the idempotent 
request based on
-    // it;
-    // (2) append the corresponding mutation to plog;
-    // (3) broadcast the prepare requests;
-    // (4) apply the result for atomic operation back to RocksDB ultimately.
-    // For a request, this variable will be set in stage (1) and read in stage 
(4); since
-    // there is only one request in the pipeline, this variable is guaranteed 
not to be
-    // set for another request before stage (4) is finished. Therefore, it is 
safe to
-    // define this variable as per-replica.
-    uint64_t _make_incr_idempotent_duration_ns;
-
     capacity_unit_calculator *_cu_calculator;
 
     METRIC_VAR_DECLARE_counter(put_requests);
@@ -247,6 +240,9 @@ private:
     METRIC_VAR_DECLARE_counter(check_and_set_requests);
     METRIC_VAR_DECLARE_counter(check_and_mutate_requests);
 
+    METRIC_VAR_DECLARE_percentile_int64(make_incr_idempotent_latency_ns);
+    
METRIC_VAR_DECLARE_percentile_int64(make_check_and_set_idempotent_latency_ns);
+
     METRIC_VAR_DECLARE_percentile_int64(put_latency_ns);
     METRIC_VAR_DECLARE_percentile_int64(multi_put_latency_ns);
     METRIC_VAR_DECLARE_percentile_int64(remove_latency_ns);
@@ -269,6 +265,10 @@ private:
     // in batch applied into RocksDB for metrics.
     uint32_t _incr_batch_size;
 
+    // Measure the size of check_and_set requests (with each translated into 
an idempotent put
+    // request) in batch applied into RocksDB for metrics.
+    uint32_t _check_and_set_batch_size;
+
     // TODO(wutao1): add metrics for failed rpc.
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to