This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 154bb16f1 feat(make_idempotent): re-adapt `make_idempotent()` and 
`put()` to support batched single-update requests for atomic writes (#2249)
154bb16f1 is described below

commit 154bb16f1637116673acc803861157ac39e5d4c2
Author: Dan Wang <[email protected]>
AuthorDate: Wed May 28 15:26:43 2025 +0800

    feat(make_idempotent): re-adapt `make_idempotent()` and `put()` to support 
batched single-update requests for atomic writes (#2249)
    
    https://github.com/apache/incubator-pegasus/issues/2197
    
    Because a `check_and_mutate` request may be converted into multiple 
single-update
    requests, the `make_idempotent()` and `put()` interfaces for `incr`, 
`check_and_set`
    and `check_and_mutate` have been re-adapted and better supported through 
template
    functions.
    
    The number of batched single-operation requests corresponding to the 
batched requests
    of put/remove, or an atomic write request of incr, check_and_set or 
check_and_mutate
    has been changed from being stored in a variable to being recorded in an 
array.
    
    In addition, the writers responsible for applying single-update requests to 
the storage
    engine are now stored in an array instead of a map to improve performance.
---
 src/replica/replica_2pc.cpp                      |  57 +++--
 src/replica/replication_app_base.cpp             |  27 +-
 src/replica/replication_app_base.h               |  14 +-
 src/replica/storage/simple_kv/simple_kv.server.h |   3 +-
 src/replica/test/mock_utils.h                    |   3 +-
 src/server/pegasus_server_impl.cpp               |   7 +-
 src/server/pegasus_server_impl.h                 |   6 +-
 src/server/pegasus_server_write.cpp              | 307 +++++++++++++++--------
 src/server/pegasus_server_write.h                | 129 +++++++---
 src/server/pegasus_write_service.cpp             | 145 ++++++++---
 src/server/pegasus_write_service.h               |  84 +++++--
 src/server/pegasus_write_service_impl.h          |  33 ++-
 src/server/test/pegasus_server_write_test.cpp    |  16 +-
 13 files changed, 575 insertions(+), 256 deletions(-)

diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index d8954e939..fefcded06 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -148,20 +148,20 @@ void replica::on_client_write(dsn::message_ex *request, 
bool ignore_throttling)
     }
 
     if (dsn_unlikely(request->rpc_code() == TASK_CODE_INVALID)) {
-        LOG_ERROR("recv message with invalid RPC code {} from {}, trace_id = 
{}",
-                  request->rpc_code(),
-                  request->header->from_address,
-                  request->header->trace_id);
+        LOG_ERROR_PREFIX("recv message with invalid RPC code {} from {}, 
trace_id = {}",
+                         request->rpc_code(),
+                         request->header->from_address,
+                         request->header->trace_id);
         response_client_write(request, ERR_INVALID_PARAMETERS);
         return;
     }
 
     const auto *spec = task_spec::get(request->rpc_code());
     if (dsn_unlikely(spec == nullptr)) {
-        LOG_ERROR("recv message with unhandled RPC code {} from {}, trace_id = 
{}",
-                  request->rpc_code(),
-                  request->header->from_address,
-                  request->header->trace_id);
+        LOG_ERROR_PREFIX("recv message with unhandled RPC code {} from {}, 
trace_id = {}",
+                         request->rpc_code(),
+                         request->header->from_address,
+                         request->header->trace_id);
         response_client_write(request, ERR_HANDLER_NOT_FOUND);
         return;
     }
@@ -239,10 +239,14 @@ void replica::on_client_write(dsn::message_ex *request, 
bool ignore_throttling)
 bool replica::need_reject_non_idempotent(const task_spec *spec) const
 {
     if (!is_duplication_master()) {
+        // This is not the dup master that needs to duplicate writes to 
followers, thus
+        // non-idempotent requests are accepted.
         return false;
     }
 
     if (_app_info.atomic_idempotent) {
+        // Since the table which this replica belongs to has been configured 
to make
+        // all atomic write requests idempotent, certainly they are accepted.
         return false;
     }
 
@@ -265,37 +269,46 @@ bool replica::need_make_idempotent(message_ex *request) 
const
     }
 
     if (!_app_info.atomic_idempotent) {
+        // The table which this replica belongs to is not configured to make 
all atomic
+        // write requests idempotent.
         return false;
     }
 
     const auto *spec = task_spec::get(request->rpc_code());
-    CHECK_NOTNULL(spec, "RPC code {} not found", request->rpc_code());
+    CHECK_NOTNULL_PREFIX_MSG(spec, "RPC code {} not found", 
request->rpc_code());
 
     return !spec->rpc_request_is_write_idempotent;
 }
 
 int replica::make_idempotent(mutation_ptr &mu)
 {
-    CHECK_TRUE(!mu->client_requests.empty());
+    CHECK_PREFIX_MSG(!mu->client_requests.empty(),
+                     "the mutation should include at least one request");
 
     message_ex *request = mu->client_requests.front();
     if (!need_make_idempotent(request)) {
         return rocksdb::Status::kOk;
     }
 
-    // The original atomic write request must not be batched.
-    CHECK_EQ(mu->client_requests.size(), 1);
+    CHECK_EQ_PREFIX_MSG(
+        mu->client_requests.size(), 1, "the original atomic write request must 
not be batched");
 
-    dsn::message_ex *new_request = nullptr;
-    const int err = _app->make_idempotent(request, &new_request);
-    if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+    std::vector<dsn::message_ex *> new_requests;
+    const int err = _app->make_idempotent(request, new_requests);
+
+    // When the condition checks of `check_and_set` and `check_and_mutate` 
fail, make_idempotent()
+    // would return rocksdb::Status::kTryAgain. Therefore, there is still a 
certain probability
+    // that a status code other than rocksdb::Status::kOk is returned.
+    if (err != rocksdb::Status::kOk) {
         // Once some error occurred, the response with error must have been 
returned to the
         // client during _app->make_idempotent(). Thus do nothing here.
         return err;
     }
 
-    CHECK_NOTNULL(new_request,
-                  "new_request should not be null since its original write 
request must be atomic");
+    CHECK_PREFIX_MSG(
+        !new_requests.empty(),
+        "new_requests should not be empty since its original write request 
must be atomic "
+        "and translated into at least one idempotent request");
 
     // During make_idempotent(), the request has been deserialized (i.e. 
unmarshall() in the
     // constructor of `rpc_holder::internal`). Once deserialize it again, 
assertion would fail for
@@ -304,13 +317,15 @@ int replica::make_idempotent(mutation_ptr &mu)
     // To make it deserializable again to be applied into RocksDB, restore 
read for it.
     request->restore_read();
 
-    // The decree must have not been assigned.
-    CHECK_EQ(mu->get_decree(), invalid_decree);
+    CHECK_EQ_PREFIX_MSG(mu->get_decree(), invalid_decree, "the decree must 
have not been assigned");
 
-    // Create a new mutation to hold the new idempotent request. The old 
mutation holding the
+    // Create a new mutation to hold the new idempotent requests. The old 
mutation holding the
     // original atomic write request will be released automatically.
     mu = new_mutation(invalid_decree, request);
-    mu->add_client_request(new_request);
+    for (dsn::message_ex *new_request : new_requests) {
+        mu->add_client_request(new_request);
+    }
+
     return rocksdb::Status::kOk;
 }
 
diff --git a/src/replica/replication_app_base.cpp 
b/src/replica/replication_app_base.cpp
index d014c9061..40f043dd9 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -265,17 +265,20 @@ error_code 
replication_app_base::apply_checkpoint(chkpt_apply_mode mode, const l
 int replication_app_base::on_batched_write_requests(int64_t decree,
                                                     uint64_t timestamp,
                                                     message_ex **requests,
-                                                    int request_length,
+                                                    uint32_t count,
                                                     message_ex 
*original_request)
 {
     int storage_error = rocksdb::Status::kOk;
-    for (int i = 0; i < request_length; ++i) {
-        int e = on_request(requests[i]);
-        if (e != rocksdb::Status::kOk) {
-            LOG_ERROR_PREFIX("got storage engine error when handler 
request({})",
-                             requests[i]->header->rpc_name);
-            storage_error = e;
+    for (uint32_t i = 0; i < count; ++i) {
+        const int err = on_request(requests[i]);
+        if (err == rocksdb::Status::kOk) {
+            continue;
         }
+
+        LOG_ERROR_PREFIX("got storage engine error when handler request({}): 
error = {}",
+                         requests[i]->header->rpc_name,
+                         err);
+        storage_error = err;
     }
     return storage_error;
 }
@@ -294,12 +297,12 @@ error_code replication_app_base::apply_mutation(const 
mutation_ptr &mu)
     }
 
     bool has_ingestion_request = false;
-    const int request_count = static_cast<int>(mu->client_requests.size());
+    const auto request_count = 
static_cast<uint32_t>(mu->client_requests.size());
     auto **batched_requests = ALLOC_STACK(message_ex *, request_count);
     auto **faked_requests = ALLOC_STACK(message_ex *, request_count);
-    int batched_count = 0; // write-empties are not included.
-    int faked_count = 0;
-    for (int i = 0; i < request_count; ++i) {
+    uint32_t batched_count = 0; // write-empties are not included.
+    uint32_t faked_count = 0;
+    for (uint32_t i = 0; i < request_count; ++i) {
         const mutation_update &update = mu->data.updates[i];
         LOG_DEBUG_PREFIX("mutation {} #{}: dispatch rpc call {}", mu->name(), 
i, update.code);
         if (update.code == RPC_REPLICATION_WRITE_EMPTY) {
@@ -329,7 +332,7 @@ error_code replication_app_base::apply_mutation(const 
mutation_ptr &mu)
                                                         mu->original_request);
 
     // release faked requests
-    for (int i = 0; i < faked_count; ++i) {
+    for (uint32_t i = 0; i < faked_count; ++i) {
         faked_requests[i]->release_ref();
     }
 
diff --git a/src/replica/replication_app_base.h 
b/src/replica/replication_app_base.h
index c44b5d313..329a5e972 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -30,6 +30,7 @@
 #include <cstdint>
 #include <map>
 #include <string>
+#include <vector>
 
 #include "bulk_load_types.h"
 #include "common/json_helper.h"
@@ -254,19 +255,20 @@ public:
     // not idempotent. This function is used to translate them into requests 
like single put
     // which is naturally idempotent.
     //
-    // For the other requests which must be idempotent such as single 
put/remove or non-batch
-    // writes, this function would do nothing.
+    // For the other requests such as single put/remove or non-batch writes 
all of which must be
+    // idempotent, this function would do nothing.
     //
     // Parameters:
     // - request: the original request received from a client.
-    // - new_request: as the output parameter pointing to the resulting 
idempotent request if the
+    // - new_requests: the output parameter, holding the resulting idempotent 
requests if the
     // original request is atomic, otherwise keeping unchanged.
     //
     // Return:
     // - for an idempotent requess always return rocksdb::Status::kOk .
     // - for an atomic request, return rocksdb::Status::kOk if succeed in 
making it idempotent;
     // otherwise, return error code (rocksdb::Status::Code).
-    virtual int make_idempotent(dsn::message_ex *request, dsn::message_ex 
**new_request) = 0;
+    virtual int make_idempotent(dsn::message_ex *request,
+                                std::vector<dsn::message_ex *> &new_requests) 
= 0;
 
     // Apply batched write requests from a mutation. This is a virtual 
function, and base class
     // provide a naive implementation that just call on_request for each 
request. Storage engine
@@ -276,7 +278,7 @@ public:
     //  - decree: the decree of the mutation which these requests are batched 
into.
     //  - timestamp: an incremental timestamp generated for this batch of 
requests.
     //  - requests: the requests to be applied.
-    //  - request_length: the number of the requests.
+    //  - count: the number of the requests.
     //  - original_request: the original request received from the client. 
Must be an atomic
     //  request (i.e. incr, check_and_set and check_and_mutate) if non-null, 
and another
     //  parameter `requests` must hold the idempotent request translated from 
it. Used to
@@ -287,7 +289,7 @@ public:
     virtual int on_batched_write_requests(int64_t decree,
                                           uint64_t timestamp,
                                           message_ex **requests,
-                                          int request_length,
+                                          uint32_t count,
                                           message_ex *original_request);
 
     // Query compact state.
diff --git a/src/replica/storage/simple_kv/simple_kv.server.h 
b/src/replica/storage/simple_kv/simple_kv.server.h
index 1fc723cf2..c34aa0e81 100644
--- a/src/replica/storage/simple_kv/simple_kv.server.h
+++ b/src/replica/storage/simple_kv/simple_kv.server.h
@@ -53,7 +53,8 @@ public:
         return handle_request(request);
     }
 
-    int make_idempotent(dsn::message_ex *request, dsn::message_ex 
**new_request) override
+    int make_idempotent(dsn::message_ex *request,
+                        std::vector<dsn::message_ex *> &new_requests) override
     {
         return rocksdb::Status::kOk;
     }
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index e8a952810..ab30d9a5c 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -78,7 +78,8 @@ public:
         return ERR_OK;
     }
     int on_request(message_ex *request) override WARN_UNUSED_RESULT { return 
0; }
-    int make_idempotent(dsn::message_ex *request, dsn::message_ex 
**new_request) override
+    int make_idempotent(dsn::message_ex *request,
+                        std::vector<dsn::message_ex *> &new_requests) override
     {
         return rocksdb::Status::kOk;
     }
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index de9abc7b0..2d13948a9 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -345,17 +345,18 @@ void pegasus_server_impl::gc_checkpoints(bool 
force_reserve_one)
                     max_d);
 }
 
-int pegasus_server_impl::make_idempotent(dsn::message_ex *request, 
dsn::message_ex **new_request)
+int pegasus_server_impl::make_idempotent(dsn::message_ex *request,
+                                         std::vector<dsn::message_ex *> 
&new_requests)
 {
     CHECK_TRUE(_is_open);
 
-    return _server_write->make_idempotent(request, new_request);
+    return _server_write->make_idempotent(request, new_requests);
 }
 
 int pegasus_server_impl::on_batched_write_requests(int64_t decree,
                                                    uint64_t timestamp,
                                                    dsn::message_ex **requests,
-                                                   int count,
+                                                   uint32_t count,
                                                    dsn::message_ex 
*original_request)
 {
     CHECK_TRUE(_is_open);
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index b385137e0..64934ffaa 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -147,7 +147,9 @@ public:
     //  - ERR_FILE_OPERATION_FAILED
     ::dsn::error_code stop(bool clear_state) override;
 
-    int make_idempotent(dsn::message_ex *request, dsn::message_ex 
**new_request) override;
+    // See replication_app_base::make_idempotent() for details. Only called by 
primary replicas.
+    int make_idempotent(dsn::message_ex *request,
+                        std::vector<dsn::message_ex *> &new_requests) override;
 
     /// Each of the write request (specifically, the rpc that's configured as 
write, see
     /// option `rpc_request_is_write_operation` in rDSN `task_spec`) will 
first be
@@ -159,7 +161,7 @@ public:
     int on_batched_write_requests(int64_t decree,
                                   uint64_t timestamp,
                                   dsn::message_ex **requests,
-                                  int count,
+                                  uint32_t count,
                                   dsn::message_ex *original_request) override;
 
     ::dsn::error_code prepare_get_checkpoint(dsn::blob &learn_req) override
diff --git a/src/server/pegasus_server_write.cpp 
b/src/server/pegasus_server_write.cpp
index 583032afa..810746f4f 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -18,7 +18,6 @@
  */
 
 #include <fmt/core.h>
-#include <rocksdb/status.h>
 #include <thrift/transport/TTransportException.h>
 #include <algorithm>
 #include <string_view>
@@ -34,11 +33,9 @@
 #include "pegasus_server_write.h"
 #include "pegasus_utils.h"
 #include "rpc/rpc_holder.h"
-#include "rpc/rpc_message.h"
+#include "rpc/serialization.h"
 #include "rrdb/rrdb.code.definition.h"
-#include "runtime/message_utils.h"
 #include "server/pegasus_write_service.h"
-#include "task/task_spec.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
 #include "utils/flags.h"
@@ -52,33 +49,48 @@ METRIC_DEFINE_counter(replica,
 
 DSN_DECLARE_bool(rocksdb_verbose_log);
 
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
 
 pegasus_server_write::pegasus_server_write(pegasus_server_impl *server)
     : replica_base(server),
-      _write_svc(new pegasus_write_service(server)),
+      _write_svc(std::make_unique<pegasus_write_service>(server)),
       METRIC_VAR_INIT_replica(corrupt_writes)
 {
     init_make_idempotent_handlers();
     init_non_batch_write_handlers();
-    init_on_idempotent_handlers();
+    init_idempotent_writers();
 }
 
-int pegasus_server_write::make_idempotent(dsn::message_ex *request, 
dsn::message_ex **new_request)
+int pegasus_server_write::make_idempotent(dsn::message_ex *request,
+                                          std::vector<dsn::message_ex *> 
&new_requests)
 {
-    auto iter = 
std::as_const(_make_idempotent_handlers).find(request->rpc_code());
-    if (iter != _make_idempotent_handlers.end()) {
-        return iter->second(request, new_request);
+    const auto make_idempotent_handler =
+        std::as_const(_make_idempotent_handlers).find(request->rpc_code());
+    if (make_idempotent_handler == _make_idempotent_handlers.end()) {
+        // Those requests not in the handlers are considered as idempotent. 
Always be
+        // successful for them.
+        return rocksdb::Status::kOk;
     }
 
-    // Those requests not in the handlers are considered as idempotent. Always 
be successful
-    // for them.
-    return rocksdb::Status::kOk;
+    try {
+        return make_idempotent_handler->second(request, new_requests);
+    } catch (TTransportException &ex) {
+        METRIC_VAR_INCREMENT(corrupt_writes);
+        LOG_ERROR_PREFIX("make idempotent handler for {} failed: from = {}, 
exception = {}",
+                         request->rpc_code(),
+                         request->header->from_address,
+                         ex.what());
+
+        // The corrupt write is likely to be an attack or a scan for security. 
It needs to return
+        // `rocksdb::Status::kCorruption` to inform the caller that 
`make_idempotent()` has failed
+        // and that the subsequent 2PC should not proceed.
+        // See https://github.com/apache/incubator-pegasus/pull/798.
+        return rocksdb::Status::kCorruption;
+    }
 }
 
 int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
-                                                    int count,
+                                                    uint32_t count,
                                                     int64_t decree,
                                                     uint64_t timestamp,
                                                     dsn::message_ex 
*original_request)
@@ -93,64 +105,136 @@ int 
pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
         return _write_svc->empty_put(_decree);
     }
 
-    try {
-        auto iter = 
std::as_const(_non_batch_write_handlers).find(requests[0]->rpc_code());
-        if (iter != _non_batch_write_handlers.end()) {
-            CHECK_EQ(count, 1);
-            return iter->second(requests[0]);
-        }
-    } catch (TTransportException &ex) {
-        METRIC_VAR_INCREMENT(corrupt_writes);
-        LOG_ERROR_PREFIX("pegasus non-batch write handler failed, from = {}, 
exception = {}",
-                         requests[0]->header->from_address,
-                         ex.what());
+    const auto non_batch_write_handler =
+        std::as_const(_non_batch_write_handlers).find(requests[0]->rpc_code());
+    if (non_batch_write_handler != _non_batch_write_handlers.end()) {
+        CHECK_EQ_PREFIX_MSG(count,
+                            1,
+                            "there should be only one request for the 
non-batch "
+                            "write {}: from = {}, decree = {}",
+                            requests[0]->rpc_code(),
+                            requests[0]->header->from_address,
+                            _decree);
 
-        // The corrupt write is likely to be an attack or a scan for security. 
Since it has
-        // been in plog, just return rocksdb::Status::kOk to ignore it.
-        // See https://github.com/apache/incubator-pegasus/pull/798.
-        return rocksdb::Status::kOk;
+        try {
+            return non_batch_write_handler->second(requests[0]);
+        } catch (TTransportException &ex) {
+            METRIC_VAR_INCREMENT(corrupt_writes);
+            LOG_ERROR_PREFIX("non-batch write handler for {} failed: from = 
{}, "
+                             "decree = {}, exception = {}",
+                             requests[0]->rpc_code(),
+                             requests[0]->header->from_address,
+                             _decree,
+                             ex.what());
+
+            // The corrupt write is likely to be an attack or a scan for 
security. Since it has
+            // been in plog, just return rocksdb::Status::kOk to ignore it.
+            // See https://github.com/apache/incubator-pegasus/pull/798.
+            return rocksdb::Status::kOk;
+        }
     }
 
     if (original_request != nullptr) {
-        // The request is regarded as idempotent once its original request is 
attached.
-        CHECK_EQ(count, 1);
-        return on_idempotent(requests[0], original_request);
+        // Once `original_request` is set, `requests` must be idempotent and 
translated from
+        // an atomic write request.
+        return apply_idempotent(requests, count, original_request);
     }
 
     return on_batched_writes(requests, count);
 }
 
-int pegasus_server_write::on_idempotent(dsn::message_ex *request, 
dsn::message_ex *original_request)
+int pegasus_server_write::apply_idempotent(dsn::message_ex **requests,
+                                           uint32_t count,
+                                           dsn::message_ex *original_request)
 {
-    try {
-        auto iter = 
std::as_const(_on_idempotent_handlers).find(request->rpc_code());
-        if (iter != _on_idempotent_handlers.end()) {
-            return iter->second(request, original_request);
-        }
-    } catch (TTransportException &ex) {
-        METRIC_VAR_INCREMENT(corrupt_writes);
-        LOG_ERROR_PREFIX("pegasus on idempotent handler failed, from = {}, 
exception = {}",
-                         request->header->from_address,
-                         ex.what());
-        // The corrupt write is likely to be an attack or a scan for security. 
Since it has
-        // been in plog, just return rocksdb::Status::kOk to ignore it.
-        // See https://github.com/apache/incubator-pegasus/pull/798.
-        return rocksdb::Status::kOk;
+    CHECK_GT_PREFIX_MSG(count,
+                        0,
+                        "`requests` should consist of at least one request: "
+                        "original_request = ({}, {}), decree = {}",
+                        original_request->header->from_address,
+                        original_request->rpc_code(),
+                        _decree);
+
+    std::vector<dsn::apps::update_request> updates;
+    updates.reserve(count);
+
+    for (uint32_t i = 0; i < count; ++i) {
+        CHECK_EQ_PREFIX_MSG(requests[i]->rpc_code(),
+                            dsn::apps::RPC_RRDB_RRDB_PUT,
+                            "now all translated idempotent writes are 
RPC_RRDB_RRDB_PUT "
+                            "requests: original_request = ({}, {}), decree = 
{}, "
+                            "count = {}, index = {}",
+                            original_request->header->from_address,
+                            original_request->rpc_code(),
+                            _decree,
+                            count,
+                            i);
+
+        // Preinitialize an empty update.
+        updates.emplace_back();
+
+        // Deserialize the message to create the real idempotent update.
+        //
+        // There is no need to catch TTransportException, since `requests` are 
freshly
+        // generated by serializing the idempotent updates within 
make_idempotent().
+        dsn::unmarshall(requests[i], updates.back());
+
+        CHECK_PREFIX_MSG(
+            updates.back().__isset.type,
+            "update_request::type is not set for {} request: original_request 
= ({}, {}) "
+            "decree = {}, count = {}, index = {},",
+            requests[i]->rpc_code(),
+            original_request->header->from_address,
+            original_request->rpc_code(),
+            _decree,
+            count,
+            i);
     }
 
-    CHECK(false, "unsupported idempotent write request: rpc_code={}", 
request->rpc_code());
-    return rocksdb::Status::kNotSupported;
+    const auto update_type = static_cast<uint32_t>(updates.front().type);
+    CHECK_LT_PREFIX_MSG(update_type,
+                        _idempotent_writers.size(),
+                        "unsupported update_request::type {} for idempotent 
writer: "
+                        "original_request = ({}, {}), decree = {}, count = {}",
+                        update_type,
+                        original_request->header->from_address,
+                        original_request->rpc_code(),
+                        _decree,
+                        count);
+
+    // If the corresponding writer is retrieved using 
original_request->rpc_code(), a map
+    // would be needed. However, by choosing to obtain the writer based on the 
type of the
+    // first idempotent update, the writers can be stored in an array instead, 
which offers
+    // better performance than a map.
+    const auto writer = _idempotent_writers[update_type];
+    CHECK_PREFIX_MSG(writer,
+                     "idempotent writer for update_request::type {} should not 
be empty: "
+                     "original_request = ({}, {}), decree = {}, count = {}",
+                     update_type,
+                     original_request->header->from_address,
+                     original_request->rpc_code(),
+                     _decree,
+                     count);
+
+    // There is no need to catch TTransportException, since `original_request` 
has just
+    // been successfully deserialized in make_idempotent().
+    return writer(updates, original_request);
 }
 
 void pegasus_server_write::set_default_ttl(uint32_t ttl) { 
_write_svc->set_default_ttl(ttl); }
 
-int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int 
count)
+int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, 
uint32_t count)
 {
     _write_svc->batch_prepare(_decree);
 
     int err = rocksdb::Status::kOk;
-    for (int i = 0; i < count; ++i) {
-        CHECK_NOTNULL(requests[i], "request[{}] is null", i);
+    for (uint32_t i = 0; i < count; ++i) {
+        CHECK_NOTNULL_PREFIX_MSG(requests[i],
+                                 "batched requests[{}] should not be null: "
+                                 "decree = {}, count = {}",
+                                 i,
+                                 _decree,
+                                 count);
 
         // Make sure all writes are batched even if some of them failed, since 
we need to record
         // the total QPS and RPC latencies, and respond for all RPCs 
regardless of their result.
@@ -169,10 +253,6 @@ int 
pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
                 if (!update.__isset.type || update.type == 
dsn::apps::update_type::UT_PUT) {
                     // We must reply to the client for the plain single-put 
request.
                     rpc.enable_auto_reply();
-                } else if (update.type == dsn::apps::update_type::UT_INCR) {
-                    // This put request must originate from an incr request 
and never be
-                    // batched in plog.
-                    CHECK_EQ(count, 1);
                 }
 
                 local_err = on_single_put_in_batch(rpc);
@@ -183,9 +263,9 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex 
**requests, int coun
                 _remove_rpc_batch.emplace_back(std::move(rpc));
             } else {
                 if (_non_batch_write_handlers.find(rpc_code) != 
_non_batch_write_handlers.end()) {
-                    LOG_FATAL("rpc code not allow batch: {}", rpc_code);
+                    LOG_FATAL_PREFIX("rpc code not allow batch: {}", rpc_code);
                 } else {
-                    LOG_FATAL("rpc code not handled: {}", rpc_code);
+                    LOG_FATAL_PREFIX("rpc code not handled: {}", rpc_code);
                 }
             }
         } catch (TTransportException &ex) {
@@ -246,26 +326,16 @@ void pegasus_server_write::init_make_idempotent_handlers()
 {
     _make_idempotent_handlers = {
         {dsn::apps::RPC_RRDB_RRDB_INCR,
-         [this](dsn::message_ex *request, dsn::message_ex **new_request) -> 
int {
-             auto rpc = incr_rpc(request);
-             dsn::apps::update_request update;
-             // Translate an incr request into a single-put request.
-             const auto err = _write_svc->make_idempotent(rpc.request(), 
rpc.response(), update);
-             if (dsn_likely(err == rocksdb::Status::kOk)) {
-                 // Build the message based on the resulting put request.
-                 *new_request = dsn::from_thrift_request_to_received_message(
-                     update,
-                     dsn::apps::RPC_RRDB_RRDB_PUT,
-                     request->header->client.thread_hash,
-                     request->header->client.partition_hash,
-                     static_cast<dsn::dsn_msg_serialize_format>(
-                         request->header->context.u.serialize_format));
-             } else {
-                 // Once it failed, just reply to the client with error 
immediately.
-                 rpc.enable_auto_reply();
-             }
-
-             return err;
+         [this](dsn::message_ex *request, std::vector<dsn::message_ex *> 
&new_requests) -> int {
+             return make_idempotent<incr_rpc>(request, new_requests);
+         }},
+        {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET,
+         [this](dsn::message_ex *request, std::vector<dsn::message_ex *> 
&new_requests) -> int {
+             return make_idempotent<check_and_set_rpc>(request, new_requests);
+         }},
+        {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE,
+         [this](dsn::message_ex *request, std::vector<dsn::message_ex *> 
&new_requests) -> int {
+             return make_idempotent<check_and_mutate_rpc>(request, 
new_requests);
          }},
     };
 }
@@ -311,28 +381,63 @@ void pegasus_server_write::init_non_batch_write_handlers()
     };
 }
 
-void pegasus_server_write::init_on_idempotent_handlers()
+void pegasus_server_write::init_idempotent_writers()
 {
-    _on_idempotent_handlers = {
-        {dsn::apps::RPC_RRDB_RRDB_PUT,
-         [this](dsn::message_ex *request, dsn::message_ex *original_request) 
-> int {
-             auto put = put_rpc(request);
-
-             const auto &update = put.request();
-             CHECK(update.__isset.type, "update_request::type is not set for 
idempotent write");
-
-             if (update.type == dsn::apps::update_type::UT_INCR) {
-                 auto incr = incr_rpc::auto_reply(original_request);
-                 return _write_svc->put(_write_ctx, update, incr.response());
-             }
-
-             CHECK(false,
-                   "unsupported update_request::type for idempotent write {}",
-                   static_cast<int>(update.type));
-             return rocksdb::Status::kNotSupported;
-         }},
+    _idempotent_writers = {
+        nullptr, // No writer for UT_PUT.
+        [this](const std::vector<dsn::apps::update_request> &updates,
+               dsn::message_ex *original_request) -> int {
+            // The type of the only update in `updates` is UT_INCR, thus 
choose the writer
+            // for incr.
+            return put_incr(updates, original_request);
+        },
+        [this](const std::vector<dsn::apps::update_request> &updates,
+               dsn::message_ex *original_request) -> int {
+            // The type of the only update in `updates` is UT_CHECK_AND_SET, 
thus choose the
+            // writer for check_and_set.
+            return put_check_and_set(updates, original_request);
+        },
+        [this](const std::vector<dsn::apps::update_request> &updates,
+               dsn::message_ex *original_request) -> int {
+            // The type of the first update in batched `updates` is 
UT_CHECK_AND_MUTATE_PUT,
+            // thus choose the writer for check_and_mutate.
+            return put_check_and_mutate(updates, original_request);
+        },
+        [this](const std::vector<dsn::apps::update_request> &updates,
+               dsn::message_ex *original_request) -> int {
+            // The type of the first update in batched `updates` is 
UT_CHECK_AND_MUTATE_REMOVE,
+            // thus choose the writer for check_and_mutate.
+            return put_check_and_mutate(updates, original_request);
+        },
     };
+
+    CHECK_EQ_PREFIX_MSG(dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE + 1,
+                        _idempotent_writers.size(),
+                        "_idempotent_writers does not match 
dsn::apps::update_type");
+}
+
+int pegasus_server_write::put_incr(const 
std::vector<dsn::apps::update_request> &updates,
+                                   dsn::message_ex *original_request)
+{
+    CHECK_EQ_PREFIX(updates.size(), 1);
+    CHECK_EQ_PREFIX(original_request->rpc_code(), 
dsn::apps::RPC_RRDB_RRDB_INCR);
+    return put<incr_rpc>(updates, original_request);
+}
+
+int pegasus_server_write::put_check_and_set(const 
std::vector<dsn::apps::update_request> &updates,
+                                            dsn::message_ex *original_request)
+{
+    CHECK_EQ_PREFIX(updates.size(), 1);
+    CHECK_EQ_PREFIX(original_request->rpc_code(), 
dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET);
+    return put<check_and_set_rpc>(updates, original_request);
+}
+
+int pegasus_server_write::put_check_and_mutate(
+    const std::vector<dsn::apps::update_request> &updates, dsn::message_ex 
*original_request)
+{
+    CHECK_GT_PREFIX(updates.size(), 0);
+    CHECK_EQ_PREFIX(original_request->rpc_code(), 
dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE);
+    return put<check_and_mutate_rpc>(updates, original_request);
 }
 
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
diff --git a/src/server/pegasus_server_write.h 
b/src/server/pegasus_server_write.h
index c0481d5f0..9680f1a3c 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -19,26 +19,33 @@
 
 #pragma once
 
-#include <stdint.h>
+#include <rocksdb/status.h>
+#include <algorithm>
+#include <array>
+#include <cstdint>
 #include <functional>
 #include <map>
 #include <memory>
 #include <vector>
 
 #include "base/pegasus_rpc_types.h"
+#include "common/replication_other_types.h"
 #include "pegasus_write_service.h"
 #include "replica/replica_base.h"
+#include "rpc/rpc_message.h"
+#include "rrdb/rrdb.code.definition.h"
 #include "rrdb/rrdb_types.h"
+#include "runtime/message_utils.h"
 #include "task/task_code.h"
+#include "task/task_spec.h"
 #include "utils/metrics.h"
 
 namespace dsn {
 class blob;
-class message_ex;
 } // namespace dsn
 
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
+
 class pegasus_server_impl;
 
 /// This class implements the interface of 
`pegasus_sever_impl::on_batched_write_requests`.
@@ -47,21 +54,21 @@ class pegasus_server_write : public 
dsn::replication::replica_base
 public:
     explicit pegasus_server_write(pegasus_server_impl *server);
 
-    // See replication_app_base::make_idempotent() for details.
-    int make_idempotent(dsn::message_ex *request, dsn::message_ex 
**new_request);
+    // See replication_app_base::make_idempotent() for details. Only called by 
primary replicas.
+    int make_idempotent(dsn::message_ex *request, std::vector<dsn::message_ex 
*> &new_requests);
 
     // See replication_app_base::on_batched_write_requests() for details.
     //
-    /// \return error code returned by rocksdb, i.e rocksdb::Status::code.
-    /// **NOTE**
-    /// Error returned is regarded as the failure of replica, thus will trigger
-    /// cluster membership changes. Make sure no error is returned because of
-    /// invalid user argument.
-    /// As long as the returned error is rocksdb::Status::kOk, the operation 
is guaranteed to be
-    /// successfully applied into rocksdb, which means an empty_put will be 
called
-    /// even if there's no write.
+    // **NOTE**
+    // An error code other than rocksdb::Status::kOk would be regarded as the 
failure of the
+    // replica, leading to cluster membership changes. Make sure no errors 
occur due to
+    // invalid parameters.
+    //
+    // As long as the returned error is rocksdb::Status::kOk, the write 
requests are guaranteed
+    // to be applied into RocksDB successfully, which means empty_put() will 
be called even if
+    // there's no write.
     int on_batched_write_requests(dsn::message_ex **requests,
-                                  int count,
+                                  uint32_t count,
                                   int64_t decree,
                                   uint64_t timestamp,
                                   dsn::message_ex *original_request);
@@ -69,11 +76,72 @@ public:
     void set_default_ttl(uint32_t ttl);
 
 private:
-    // Apply the idempotent request and respond to its original request.
-    int on_idempotent(dsn::message_ex *request, dsn::message_ex 
*original_request);
+    // Used to call make_idempotent() for each type (specified by TRpcHolder) 
of atomic write.
+    // Only called by primary replicas.
+    template <typename TRpcHolder>
+    int make_idempotent(dsn::message_ex *request, std::vector<dsn::message_ex 
*> &new_requests)
+    {
+        auto rpc = TRpcHolder(request);
+
+        // Translate an atomic request into one or multiple idempotent 
single-update requests.
+        std::vector<dsn::apps::update_request> updates;
+        const int err = _write_svc->make_idempotent(rpc.request(), 
rpc.response(), updates);
+
+        // When the condition checks of `check_and_set` and `check_and_mutate` 
fail,
+        // make_idempotent() would return rocksdb::Status::kTryAgain. 
Therefore, there is
+        // still a certain probability that a status code other than 
rocksdb::Status::kOk
+        // is returned.
+        if (err != rocksdb::Status::kOk) {
+            // Once it failed, just reply to the client with error immediately.
+            rpc.enable_auto_reply();
+            return err;
+        }
+
+        // Build new messages based on the generated idempotent updates.
+        new_requests.clear();
+        for (const auto &update : updates) {
+            
new_requests.push_back(dsn::from_thrift_request_to_received_message(
+                update,
+                dsn::apps::RPC_RRDB_RRDB_PUT,
+                request->header->client.thread_hash,
+                request->header->client.partition_hash,
+                static_cast<dsn::dsn_msg_serialize_format>(
+                    request->header->context.u.serialize_format)));
+        }
+
+        return rocksdb::Status::kOk;
+    }
+
+    // Apply the idempotent updates `requests` into storage engine and respond 
to the original
+    // atomic write request `original_request`. Both of `requests` and 
`original_request` should
+    // not be null, while `count` should always be > 0.
+    int
+    apply_idempotent(dsn::message_ex **requests, uint32_t count, 
dsn::message_ex *original_request);
+
+    // Apply the batched (one or multiple) single-update requests into the 
storage engine.
+    // Only called by primary replicas.
+    template <typename TRpcHolder>
+    inline int put(const std::vector<dsn::apps::update_request> &updates,
+                   dsn::message_ex *original_request)
+    {
+        // Enable auto reply, since in primary replicas we need to reply to 
the client with
+        // the response to the original atomic write request after the 
idempotent updates
+        // were applied into the storage engine.
+        auto rpc = TRpcHolder::auto_reply(original_request);
+        return _write_svc->put(_write_ctx, updates, rpc.request(), 
rpc.response());
+    }
+
+    // Following functions are the writers that apply the idempotent updates 
translated from
+    // incr, check_and_set and check_and_mutate requests and then respond to 
them.
+    int put_incr(const std::vector<dsn::apps::update_request> &updates,
+                 dsn::message_ex *original_request);
+    int put_check_and_set(const std::vector<dsn::apps::update_request> 
&updates,
+                          dsn::message_ex *original_request);
+    int put_check_and_mutate(const std::vector<dsn::apps::update_request> 
&updates,
+                             dsn::message_ex *original_request);
 
     // Delay replying for the batched requests until all of them complete.
-    int on_batched_writes(dsn::message_ex **requests, int count);
+    int on_batched_writes(dsn::message_ex **requests, uint32_t count);
 
     int on_single_put_in_batch(put_rpc &rpc)
     {
@@ -95,7 +163,7 @@ private:
 
     void init_make_idempotent_handlers();
     void init_non_batch_write_handlers();
-    void init_on_idempotent_handlers();
+    void init_idempotent_writers();
 
     friend class pegasus_server_write_test;
     friend class pegasus_write_service_test;
@@ -107,24 +175,27 @@ private:
     std::vector<remove_rpc> _remove_rpc_batch;
 
     db_write_context _write_ctx;
-    int64_t _decree;
+    int64_t _decree{invalid_decree};
 
-    // Handlers that make an atomic request idempotent.
+    // Handlers that translate an atomic write request into one or multiple 
idempotent updates.
     using make_idempotent_map =
-        std::map<dsn::task_code, std::function<int(dsn::message_ex *, 
dsn::message_ex **)>>;
+        std::map<dsn::task_code,
+                 std::function<int(dsn::message_ex *, 
std::vector<dsn::message_ex *> &)>>;
     make_idempotent_map _make_idempotent_handlers;
 
-    // Handlers that process a request could not be batched, e.g. multi 
put/remove.
+    // Handlers that process a write request which could not be batched, e.g. 
multi put/remove.
     using non_batch_write_map = std::map<dsn::task_code, 
std::function<int(dsn::message_ex *)>>;
     non_batch_write_map _non_batch_write_handlers;
 
-    // Handlers that apply the idempotent request and respond to its original 
request.
-    using on_idempotent_map =
-        std::map<dsn::task_code, std::function<int(dsn::message_ex *, 
dsn::message_ex *)>>;
-    on_idempotent_map _on_idempotent_handlers;
+    // Writers that apply idempotent updates and respond to the original 
atomic write request.
+    // Each dsn::apps::update_type has its corresponding writer, therefore the 
number of writers
+    // must match the number of dsn::apps::update_type.
+    using idempotent_writer_map = std::array<
+        std::function<int(const std::vector<dsn::apps::update_request> &, 
dsn::message_ex *)>,
+        5>;
+    idempotent_writer_map _idempotent_writers{};
 
     METRIC_VAR_DECLARE_counter(corrupt_writes);
 };
 
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
diff --git a/src/server/pegasus_write_service.cpp 
b/src/server/pegasus_write_service.cpp
index dfea32601..df3766170 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-#include <string_view>
 #include <fmt/core.h>
 #include <rocksdb/status.h>
-#include <stddef.h>
+#include <algorithm>
 #include <functional>
 #include <set>
+#include <string_view>
 #include <vector>
 
 #include "base/pegasus_rpc_types.h"
@@ -38,13 +38,14 @@
 #include "rrdb/rrdb_types.h"
 #include "runtime/api_layer1.h"
 #include "runtime/message_utils.h"
+#include "server/pegasus_server_impl.h"
 #include "task/async_calls.h"
 #include "task/task_code.h"
-#include "server/pegasus_server_impl.h"
 #include "utils/autoref_ptr.h"
 #include "utils/error_code.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
+#include "utils/ports.h"
 
 METRIC_DEFINE_counter(replica,
                       put_requests,
@@ -99,6 +100,16 @@ METRIC_DEFINE_percentile_int64(replica,
                                "translating the check_and_set request into the 
single-put "
                                "request. Only used for the primary replicas");
 
+METRIC_DEFINE_percentile_int64(replica,
+                               make_check_and_mutate_idempotent_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The duration that a check_and_mutate request 
is made "
+                               "idempotent, including reading the check value 
from "
+                               "storage engine, validating the check 
conditions and "
+                               "translating the check_and_mutate request into 
multiple "
+                               "single-put and single-remove requests. Only 
used for the "
+                               "primary replicas");
+
 METRIC_DEFINE_percentile_int64(replica,
                                put_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
@@ -160,8 +171,8 @@ namespace dsn {
 class blob;
 class message_ex;
 } // namespace dsn
-namespace pegasus {
-namespace server {
+
+namespace pegasus::server {
 
 DEFINE_TASK_CODE(LPC_INGESTION, TASK_PRIORITY_COMMON, THREAD_POOL_INGESTION)
 
@@ -180,6 +191,7 @@ 
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
       METRIC_VAR_INIT_replica(check_and_mutate_requests),
       METRIC_VAR_INIT_replica(make_incr_idempotent_latency_ns),
       METRIC_VAR_INIT_replica(make_check_and_set_idempotent_latency_ns),
+      METRIC_VAR_INIT_replica(make_check_and_mutate_idempotent_latency_ns),
       METRIC_VAR_INIT_replica(put_latency_ns),
       METRIC_VAR_INIT_replica(multi_put_latency_ns),
       METRIC_VAR_INIT_replica(remove_latency_ns),
@@ -189,11 +201,7 @@ 
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
       METRIC_VAR_INIT_replica(check_and_mutate_latency_ns),
       METRIC_VAR_INIT_replica(dup_requests),
       METRIC_VAR_INIT_replica(dup_time_lag_ms),
-      METRIC_VAR_INIT_replica(dup_lagging_writes),
-      _put_batch_size(0),
-      _remove_batch_size(0),
-      _incr_batch_size(0),
-      _check_and_set_batch_size(0)
+      METRIC_VAR_INIT_replica(dup_lagging_writes)
 {
 }
 
@@ -235,24 +243,25 @@ int pegasus_write_service::multi_remove(int64_t decree,
 
 int pegasus_write_service::make_idempotent(const dsn::apps::incr_request &req,
                                            dsn::apps::incr_response &err_resp,
-                                           dsn::apps::update_request &update)
+                                           
std::vector<dsn::apps::update_request> &updates)
 {
     METRIC_VAR_AUTO_LATENCY(make_incr_idempotent_latency_ns);
 
-    return _impl->make_idempotent(req, err_resp, update);
+    return _impl->make_idempotent(req, err_resp, updates);
 }
 
 int pegasus_write_service::put(const db_write_context &ctx,
-                               const dsn::apps::update_request &update,
+                               const std::vector<dsn::apps::update_request> 
&updates,
+                               const dsn::apps::incr_request &req,
                                dsn::apps::incr_response &resp)
 {
     METRIC_VAR_AUTO_LATENCY(incr_latency_ns);
     METRIC_VAR_INCREMENT(incr_requests);
 
-    const int err = _impl->put(ctx, update, resp);
+    const int err = _impl->put(ctx, updates, resp);
 
     if (_server->is_primary()) {
-        _cu_calculator->add_incr_cu(resp.error, update.key);
+        _cu_calculator->add_incr_cu(resp.error, req.key);
     }
 
     return err;
@@ -276,22 +285,22 @@ int pegasus_write_service::incr(int64_t decree,
 
 int pegasus_write_service::make_idempotent(const 
dsn::apps::check_and_set_request &req,
                                            dsn::apps::check_and_set_response 
&err_resp,
-                                           dsn::apps::update_request &update)
+                                           
std::vector<dsn::apps::update_request> &updates)
 {
     METRIC_VAR_AUTO_LATENCY(make_check_and_set_idempotent_latency_ns);
 
-    return _impl->make_idempotent(req, err_resp, update);
+    return _impl->make_idempotent(req, err_resp, updates);
 }
 
 int pegasus_write_service::put(const db_write_context &ctx,
-                               const dsn::apps::update_request &update,
+                               const std::vector<dsn::apps::update_request> 
&updates,
                                const dsn::apps::check_and_set_request &req,
                                dsn::apps::check_and_set_response &resp)
 {
     METRIC_VAR_AUTO_LATENCY(check_and_set_latency_ns);
     METRIC_VAR_INCREMENT(check_and_set_requests);
 
-    const int err = _impl->put(ctx, update, resp);
+    const int err = _impl->put(ctx, updates, resp);
 
     if (_server->is_primary()) {
         _cu_calculator->add_check_and_set_cu(
@@ -321,6 +330,33 @@ int pegasus_write_service::check_and_set(int64_t decree,
     return err;
 }
 
+int pegasus_write_service::make_idempotent(const 
dsn::apps::check_and_mutate_request &req,
+                                           
dsn::apps::check_and_mutate_response &err_resp,
+                                           
std::vector<dsn::apps::update_request> &updates)
+{
+    METRIC_VAR_AUTO_LATENCY(make_check_and_mutate_idempotent_latency_ns);
+
+    return _impl->make_idempotent(req, err_resp, updates);
+}
+
+int pegasus_write_service::put(const db_write_context &ctx,
+                               const std::vector<dsn::apps::update_request> 
&updates,
+                               const dsn::apps::check_and_mutate_request &req,
+                               dsn::apps::check_and_mutate_response &resp)
+{
+    METRIC_VAR_AUTO_LATENCY(check_and_mutate_latency_ns);
+    METRIC_VAR_INCREMENT(check_and_mutate_requests);
+
+    const int err = _impl->put(ctx, updates, resp);
+
+    if (_server->is_primary()) {
+        _cu_calculator->add_check_and_mutate_cu(
+            resp.error, req.hash_key, req.check_sort_key, req.mutate_list);
+    }
+
+    return err;
+}
+
 int pegasus_write_service::check_and_mutate(int64_t decree,
                                             const 
dsn::apps::check_and_mutate_request &update,
                                             
dsn::apps::check_and_mutate_response &resp)
@@ -350,11 +386,25 @@ int pegasus_write_service::batch_put(const 
db_write_context &ctx,
                                      const dsn::apps::update_request &update,
                                      dsn::apps::update_response &resp)
 {
+    // `kBatchWriteTypeMap` is used to map dsn::apps::update_type field in the 
single put
+    // request to batch_write_type, to measure the size of requests in batch 
for each kind
+    // of write into `_batch_sizes`.
+    static constexpr std::array kBatchWriteTypeMap = {
+        batch_write_type::put,
+        batch_write_type::incr,
+        batch_write_type::check_and_set,
+        batch_write_type::check_and_mutate,
+        batch_write_type::check_and_mutate,
+    };
+    static_assert(dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE + 1 ==
+                      kBatchWriteTypeMap.size(),
+                  "kBatchWriteTypeMap does not match dsn::apps::update_type");
+
     CHECK_GT_MSG(_batch_start_time, 0, "batch_put must be called after 
batch_prepare");
 
-    if (!update.__isset.type || update.type == dsn::apps::update_type::UT_PUT) 
{
+    if (!update.__isset.type) {
         // This is a general single-put request.
-        ++_put_batch_size;
+        ++put_batch_size();
     } else {
         // There are only two possible situations for batch_put() where this 
put request
         // originates from an atomic write request:
@@ -364,10 +414,8 @@ int pegasus_write_service::batch_put(const 
db_write_context &ctx,
         //
         // Though this is a put request, we choose to udapte the metrics of 
its original
         // request (i.e. the atomic write).
-        if (update.type == dsn::apps::update_type::UT_INCR) {
-            ++_incr_batch_size;
-        } else if (update.type == dsn::apps::update_type::UT_CHECK_AND_SET) {
-            ++_check_and_set_batch_size;
+        if (dsn_likely(update.type < kBatchWriteTypeMap.size())) {
+            ++batch_size(kBatchWriteTypeMap.at(update.type));
         }
     }
 
@@ -386,7 +434,8 @@ int pegasus_write_service::batch_remove(int64_t decree,
 {
     CHECK_GT_MSG(_batch_start_time, 0, "batch_remove must be called after 
batch_prepare");
 
-    ++_remove_batch_size;
+    ++remove_batch_size();
+
     const int err = _impl->batch_remove(decree, key, resp);
 
     if (_server->is_primary()) {
@@ -418,28 +467,43 @@ void pegasus_write_service::set_default_ttl(uint32_t ttl) 
{ _impl->set_default_t
 
 void pegasus_write_service::batch_finish()
 {
-#define UPDATE_WRITE_BATCH_METRICS(op)                                         
                    \
+#define UPDATE_BATCH_METRICS(op, nrequests)                                    
                    \
     do {                                                                       
                    \
-        METRIC_VAR_INCREMENT_BY(op##_requests, 
static_cast<int64_t>(_##op##_batch_size));          \
-        METRIC_VAR_SET(op##_latency_ns, 
static_cast<size_t>(_##op##_batch_size), latency_ns);      \
-        _##op##_batch_size = 0;                                                
                    \
+        const auto __nrequests = (nrequests);                                  
                    \
+        METRIC_VAR_INCREMENT_BY(op##_requests, 
static_cast<int64_t>(__nrequests));                 \
+        METRIC_VAR_SET(op##_latency_ns, static_cast<size_t>(__nrequests), 
latency_ns);             \
     } while (0)
 
-    auto latency_ns = static_cast<int64_t>(dsn_now_ns() - _batch_start_time);
+#define UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE(op)                              
                    \
+    UPDATE_BATCH_METRICS(op, batch_size(batch_write_type::op))
+
+    const auto latency_ns = static_cast<int64_t>(dsn_now_ns() - 
_batch_start_time);
 
     // Take the latency of executing the entire batch as the latency for 
processing each
     // request within it, since the latency of each request could not be known.
-    UPDATE_WRITE_BATCH_METRICS(put);
-    UPDATE_WRITE_BATCH_METRICS(remove);
-
-    // These put requests are translated from atomic requests. See comments in 
batch_put()
-    // for the two possible situations where we are now.
-    UPDATE_WRITE_BATCH_METRICS(incr);
-    UPDATE_WRITE_BATCH_METRICS(check_and_set);
+    UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE(put);
+    UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE(remove);
+
+    // These idempotent updates are translated from the atomic write requests. 
See comments
+    // in batch_put() for the two possible situations where we are now.
+    //
+    // A batch only contains one update translated from an incr or 
check_and_set request.
+    // However, a batch may contain multiple updates translated from a 
check_and_mutate.
+    // Therefore, we need to measure the number of check_and_mutate requests:
+    // - 1 if the number of idempotent updates is at least 1;
+    // - 0 otherwise.
+    UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE(incr);
+    UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE(check_and_set);
+    UPDATE_BATCH_METRICS(check_and_mutate,
+                         std::min(1U, 
batch_size(batch_write_type::check_and_mutate)));
+
+    // Reset _batch_sizes for next calculation.
+    _batch_sizes.fill(0);
 
     _batch_start_time = 0;
 
-#undef UPDATE_WRITE_BATCH_METRICS
+#undef UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE
+#undef UPDATE_BATCH_METRICS
 }
 
 int pegasus_write_service::duplicate(int64_t decree,
@@ -551,5 +615,4 @@ int pegasus_write_service::ingest_files(int64_t decree,
     return rocksdb::Status::kOk;
 }
 
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
diff --git a/src/server/pegasus_write_service.h 
b/src/server/pegasus_write_service.h
index 3939a6a6e..c1bfd5b11 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -19,8 +19,11 @@
 
 #pragma once
 
+#include <array>
+#include <cstddef>
 #include <cstdint>
 #include <memory>
+#include <vector>
 
 #include "replica/replica_base.h"
 #include "utils/metrics.h"
@@ -49,8 +52,7 @@ class ingestion_response;
 } // namespace replication
 } // namespace dsn
 
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
 
 // The context of an mutation to the database.
 struct db_write_context
@@ -140,12 +142,13 @@ public:
     // replicas.
     int make_idempotent(const dsn::apps::incr_request &req,
                         dsn::apps::incr_response &err_resp,
-                        dsn::apps::update_request &update);
+                        std::vector<dsn::apps::update_request> &updates);
 
     // Write an idempotent INCR record (i.e. a PUT record) and reply to the 
client with INCR
     // response. Only called by primary replicas.
     int put(const db_write_context &ctx,
-            const dsn::apps::update_request &update,
+            const std::vector<dsn::apps::update_request> &updates,
+            const dsn::apps::incr_request &req,
             dsn::apps::incr_response &resp);
 
     // Write a non-idempotent INCR record.
@@ -155,12 +158,12 @@ public:
     // primary replicas.
     int make_idempotent(const dsn::apps::check_and_set_request &req,
                         dsn::apps::check_and_set_response &err_resp,
-                        dsn::apps::update_request &update);
+                        std::vector<dsn::apps::update_request> &updates);
 
     // Write an idempotent CHECK_AND_SET record (i.e. a PUT record) and reply 
to the client
     // with CHECK_AND_SET response. Only called by primary replicas.
     int put(const db_write_context &ctx,
-            const dsn::apps::update_request &update,
+            const std::vector<dsn::apps::update_request> &updates,
             const dsn::apps::check_and_set_request &req,
             dsn::apps::check_and_set_response &resp);
 
@@ -169,6 +172,20 @@ public:
                       const dsn::apps::check_and_set_request &update,
                       dsn::apps::check_and_set_response &resp);
 
+    // Translate a CHECK_AND_MUTATE request into multiple idempotent PUT 
requests, which are
+    // shared by both single-put and single-remove operations. Only called by 
primary replicas.
+    int make_idempotent(const dsn::apps::check_and_mutate_request &req,
+                        dsn::apps::check_and_mutate_response &err_resp,
+                        std::vector<dsn::apps::update_request> &updates);
+
+    // Write an idempotent CHECK_AND_MUTATE record (i.e. batched PUT records 
including both
+    // single-put and single-remove operations) and reply to the client with 
CHECK_AND_MUTATE
+    // response. Only called by primary replicas.
+    int put(const db_write_context &ctx,
+            const std::vector<dsn::apps::update_request> &updates,
+            const dsn::apps::check_and_mutate_request &req,
+            dsn::apps::check_and_mutate_response &resp);
+
     // Write CHECK_AND_MUTATE record.
     int check_and_mutate(int64_t decree,
                          const dsn::apps::check_and_mutate_request &update,
@@ -217,6 +234,28 @@ private:
     // Finish batch write with metrics such as latencies calculated and some 
states cleared.
     void batch_finish();
 
+    // Used to store the batch size for each type of write into an array, see 
comments for
+    // `_batch_sizes` for details.
+    enum class batch_write_type : uint32_t
+    {
+        put = 0,
+        remove,
+        incr,
+        check_and_set,
+        check_and_mutate,
+        COUNT,
+    };
+
+    // Read/change the batch size of the writes with the given type.
+    uint32_t &batch_size(batch_write_type type)
+    {
+        return _batch_sizes.at(static_cast<uint32_t>(type));
+    }
+
+    uint32_t &put_batch_size() { return batch_size(batch_write_type::put); }
+
+    uint32_t &remove_batch_size() { return 
batch_size(batch_write_type::remove); }
+
     friend class pegasus_write_service_test;
     friend class PegasusWriteServiceImplTest;
     friend class pegasus_server_write_test;
@@ -232,6 +271,21 @@ private:
 
     capacity_unit_calculator *_cu_calculator;
 
+    // To calculate the metrics such as the number of requests and latency for 
the writes
+    // allowed to be batched, measure the size of requests in batch applied 
into RocksDB
+    // for single put, single remove, incr, check_and_set and 
check_and_mutate, all of which
+    // are contained in batch_write_type. In fact, incr, check_and_set and 
check_and_mutate
+    // are not batched; the reason why they are contained in batch_write_type 
is because
+    // all of them are not actually incr, check_and_set, or check_and_mutate 
themselves, but
+    // rather single put operations that have been transformed from these 
operations to be
+    // idempotent. Therefore, they essentially all appear in the form of 
single puts with an
+    // extra field indicating what the original request is.
+    //
+    // Each request of single put, single remove, incr and check_and_set 
contains only one
+    // write operation, while check_and_mutate may contain multiple operations 
of single
+    // puts and removes.
+    std::array<uint32_t, static_cast<size_t>(batch_write_type::COUNT)> 
_batch_sizes{};
+
     METRIC_VAR_DECLARE_counter(put_requests);
     METRIC_VAR_DECLARE_counter(multi_put_requests);
     METRIC_VAR_DECLARE_counter(remove_requests);
@@ -242,6 +296,7 @@ private:
 
     METRIC_VAR_DECLARE_percentile_int64(make_incr_idempotent_latency_ns);
     
METRIC_VAR_DECLARE_percentile_int64(make_check_and_set_idempotent_latency_ns);
+    
METRIC_VAR_DECLARE_percentile_int64(make_check_and_mutate_idempotent_latency_ns);
 
     METRIC_VAR_DECLARE_percentile_int64(put_latency_ns);
     METRIC_VAR_DECLARE_percentile_int64(multi_put_latency_ns);
@@ -255,22 +310,7 @@ private:
     METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
     METRIC_VAR_DECLARE_counter(dup_lagging_writes);
 
-    // Measure the size of single-put requests in batch applied into RocksDB 
for metrics.
-    uint32_t _put_batch_size;
-
-    // Measure the size of single-remove requests in batch applied into 
RocksDB for metrics.
-    uint32_t _remove_batch_size;
-
-    // Measure the size of incr requests (with each translated into an 
idempotent put request)
-    // in batch applied into RocksDB for metrics.
-    uint32_t _incr_batch_size;
-
-    // Measure the size of check_and_set requests (with each translated into 
an idempotent put
-    // request) in batch applied into RocksDB for metrics.
-    uint32_t _check_and_set_batch_size;
-
     // TODO(wutao1): add metrics for failed rpc.
 };
 
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index 1c7c859d6..e8de292a6 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -34,8 +34,7 @@
 #include "utils/string_conv.h"
 #include "utils/strings.h"
 
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
 
 /// internal error codes used for fail injection
 // TODO(yingchun): Use real rocksdb::Status::code.
@@ -260,7 +259,7 @@ public:
               update.key,
               update.value);
 
-        return resp.error;
+        return rocksdb::Status::kOk;
     }
 
     int incr(int64_t decree, const dsn::apps::incr_request &update, 
dsn::apps::incr_response &resp)
@@ -532,12 +531,13 @@ public:
         return rocksdb::Status::kOk;
     }
 
-    // Used to call make_idempotent() for incr and check_and_set to get the 
idempotent single-put
-    // request which is stored as the unique element of `updates`.
+    // Calling make_idempotent() on either an incr or check_and_set operation 
produces an
+    // idempotent single-put request, which is also the only element in the 
output array
+    // `updates`.
     //
     // This interface is provided to ensure consistency between the 
make_idempotent() interfaces
-    // of incr/check_and_set operations and that of check_and_mutate (both 
using std::vector for
-    // `updates`), thereby facilitating uniform templated function invocation.
+    // of incr/check_and_set operations and that of check_and_mutate (both 
using std::vector
+    // for `updates`), thereby facilitating uniform templated function 
invocation.
     template <typename TRequest, typename TResponse>
     inline int make_idempotent(const TRequest &req,
                                TResponse &err_resp,
@@ -548,6 +548,22 @@ public:
         return make_idempotent(req, err_resp, updates.front());
     }
 
+    // Apply `updates` into storage engine. Actually `updates` contains only 
one element --
+    // an idempotent single-put request translated from either an incr or 
check_and_set
+    // operation.
+    //
+    // This interface is provided to ensure consistency between the put() 
interfaces of
+    // incr/check_and_set operations and that of check_and_mutate (both using 
std::vector
+    // for `updates`), thereby facilitating uniform templated function 
invocation.
+    template <typename TResponse>
+    int put(const db_write_context &ctx,
+            const std::vector<dsn::apps::update_request> &updates,
+            TResponse &resp)
+    {
+        CHECK_EQ(updates.size(), 1);
+        return put(ctx, updates.front(), resp);
+    }
+
     // Tranlate a check_and_mutate request into multiple single-put and 
single-remove requests
     // which are certainly idempotent. Return current status for RocksDB. Only 
called by primary
     // replicas.
@@ -1278,5 +1294,4 @@ private:
     std::vector<dsn::apps::update_response *> _update_responses;
 };
 
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
diff --git a/src/server/test/pegasus_server_write_test.cpp 
b/src/server/test/pegasus_server_write_test.cpp
index f9ab7b414..cfb1a0ebd 100644
--- a/src/server/test/pegasus_server_write_test.cpp
+++ b/src/server/test/pegasus_server_write_test.cpp
@@ -75,23 +75,23 @@ public:
                 req.key = key;
                 req.value.assign("value", 0, 5);
 
-                int put_rpc_cnt = dsn::rand::next_u32(1, 10);
-                int remove_rpc_cnt = dsn::rand::next_u32(1, 10);
-                int total_rpc_cnt = put_rpc_cnt + remove_rpc_cnt;
+                const uint32_t put_rpc_cnt = dsn::rand::next_u32(1, 10);
+                const uint32_t remove_rpc_cnt = dsn::rand::next_u32(1, 10);
+                const uint32_t total_rpc_cnt = put_rpc_cnt + remove_rpc_cnt;
                 /**
                  * writes[0] ~ writes[total_rpc_cnt-1] will be released by 
their corresponding
                  * rpc_holders, which created in on_batched_write_requests. So 
we don't need to
                  * release them here
                  **/
                 dsn::message_ex *writes[total_rpc_cnt];
-                for (int i = 0; i < put_rpc_cnt; i++) {
+                for (uint32_t i = 0; i < put_rpc_cnt; i++) {
                     writes[i] = pegasus::create_put_request(req);
                 }
-                for (int i = put_rpc_cnt; i < total_rpc_cnt; i++) {
+                for (uint32_t i = put_rpc_cnt; i < total_rpc_cnt; i++) {
                     writes[i] = pegasus::create_remove_request(key);
                 }
 
-                int err = _server_write->on_batched_write_requests(
+                const int err = _server_write->on_batched_write_requests(
                     writes, total_rpc_cnt, decree, 0, nullptr);
                 switch (err) {
                 case FAIL_DB_WRITE_BATCH_PUT:
@@ -106,8 +106,8 @@ public:
                 // make sure everything is cleanup after batch write.
                 ASSERT_TRUE(_server_write->_put_rpc_batch.empty());
                 ASSERT_TRUE(_server_write->_remove_rpc_batch.empty());
-                ASSERT_EQ(_server_write->_write_svc->_put_batch_size, 0);
-                ASSERT_EQ(_server_write->_write_svc->_remove_batch_size, 0);
+                ASSERT_EQ(_server_write->_write_svc->put_batch_size(), 0);
+                ASSERT_EQ(_server_write->_write_svc->remove_batch_size(), 0);
                 ASSERT_EQ(_server_write->_write_svc->_batch_start_time, 0);
                 
ASSERT_EQ(_server_write->_write_svc->_impl->_rocksdb_wrapper->_write_batch->Count(),
                           0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to