This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new d4693fd2b perf(make_idempotent): improve performance of idempotent 
writes by reducing deserialization and removing the mapping table for atomic 
write handlers (#2261)
d4693fd2b is described below

commit d4693fd2b2a3d7cd3e52aab5c8c4ee44c06c1292
Author: Dan Wang <[email protected]>
AuthorDate: Tue Jul 1 18:59:59 2025 +0800

    perf(make_idempotent): improve performance of idempotent writes by reducing 
deserialization and removing the mapping table for atomic write handlers (#2261)
    
    https://github.com/apache/incubator-pegasus/issues/2197
    
    Introduce `idempotent_writer` class to the primary replica to cache the 
idempotent
    single-update requests generated during the `make_idempotent` phase as well 
as
    the original atomic write RPC request from the client to improve 
performance, since:
    1. both deserializations for single-update requests and the original atomic 
write RPC
    request are reduced;
    2. use `std::variant` to hold the applying function and the RPC for each 
atomic write
    instead of the mapping table for its handler.
---
 .../duplication/test/replica_duplicator_test.cpp   |  17 +-
 src/replica/idempotent_writer.h                    | 136 ++++++++++++++++
 src/replica/mutation.h                             |  10 +-
 src/replica/replica.cpp                            |   4 +-
 src/replica/replica.h                              |  16 +-
 src/replica/replica_2pc.cpp                        |  17 +-
 src/replica/replication_app_base.cpp               |   5 +-
 src/replica/replication_app_base.h                 |  25 +--
 src/replica/storage/simple_kv/simple_kv.server.h   |   3 +-
 src/replica/test/mock_utils.h                      |   3 +-
 src/rpc/rpc_holder.h                               |  11 +-
 src/server/pegasus_server_impl.cpp                 |  10 +-
 src/server/pegasus_server_impl.h                   |   6 +-
 src/server/pegasus_server_write.cpp                | 176 +++------------------
 src/server/pegasus_server_write.h                  |  66 +++-----
 src/server/pegasus_write_service_impl.h            |  12 +-
 16 files changed, 256 insertions(+), 261 deletions(-)

diff --git a/src/replica/duplication/test/replica_duplicator_test.cpp 
b/src/replica/duplication/test/replica_duplicator_test.cpp
index fc07be00a..37b70f0ae 100644
--- a/src/replica/duplication/test/replica_duplicator_test.cpp
+++ b/src/replica/duplication/test/replica_duplicator_test.cpp
@@ -33,23 +33,11 @@
 #include "replica/mutation_log.h"
 #include "replica/test/mock_utils.h"
 #include "runtime/pipeline.h"
-#include "task/task_code.h"
 #include "utils/autoref_ptr.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
-#include "utils/threadpool_code.h"
 
-namespace dsn {
-namespace apps {
-
-// for loading PUT mutations from log file.
-DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_PUT, TASK_PRIORITY_COMMON, 
::dsn::THREAD_POOL_DEFAULT);
-
-} // namespace apps
-} // namespace dsn
-
-namespace dsn {
-namespace replication {
+namespace dsn::replication {
 
 class replica_duplicator_test : public duplication_test_base
 {
@@ -223,5 +211,4 @@ TEST_P(replica_duplicator_test, prepare_dup)
     ASSERT_EQ(100, last_durable_decree());
 }
 
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
diff --git a/src/replica/idempotent_writer.h b/src/replica/idempotent_writer.h
new file mode 100644
index 000000000..59b47db2e
--- /dev/null
+++ b/src/replica/idempotent_writer.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <variant>
+#include <vector>
+
+#include "base/pegasus_rpc_types.h"
+#include "rpc/rpc_message.h"
+#include "rrdb/rrdb_types.h"
+#include "utils/ports.h"
+
+namespace pegasus {
+
+// The `idempotent_writer` class is used by the primary replica to cache the 
idempotent
+// single-update requests generated during the "make_idempotent" phase, as 
well as the original
+// atomic write RPC request from the client. Later, during the 2PC process, it 
can directly
+// apply the cached single-update requests to the storage engine and 
automatically respond to
+// the client based on the cached atomic write RPC request.
+//
+// With `idempotent_writer`, the extra and unnecessary deserialization of the 
idempotent write
+// requests and the original atomic write request can be avoided.
+class idempotent_writer
+{
+public:
+    template <typename TRpcHolder>
+    using apply_func_t =
+        std::function<int(const std::vector<dsn::apps::update_request> &, 
const TRpcHolder &)>;
+
+    // Parameters for the constructor:
+    // - original_rpc: the RPC holder with the deserialized original request, 
restricted to the
+    // atomic write RPC requests (i.e. incr, check_and_set or 
check_and_mutate).
+    // - apply_func: the user-defined function that applies the idempotent 
single-update requests
+    // and builds the response to the client.
+    // - updates: the idempotent single-update requests translated from the 
original atomic write
+    // request.
+    template <typename TRpcHolder,
+              std::enable_if_t<std::disjunction_v<std::is_same<TRpcHolder, 
incr_rpc>,
+                                                  std::is_same<TRpcHolder, 
check_and_set_rpc>,
+                                                  std::is_same<TRpcHolder, 
check_and_mutate_rpc>>,
+                               int> = 0>
+    idempotent_writer(TRpcHolder &&original_rpc,
+                      apply_func_t<TRpcHolder> &&apply_func,
+                      std::vector<dsn::apps::update_request> &&updates)
+        : _apply_runner(std::in_place_type<apply_runner<TRpcHolder>>,
+                        std::forward<TRpcHolder>(original_rpc),
+                        std::move(apply_func)),
+          _updates(std::move(updates))
+    {
+    }
+
+    ~idempotent_writer() = default;
+
+    // Return the serialized message of the original RPC request, which should 
never be null.
+    [[nodiscard]] dsn::message_ex *request() const
+    {
+        return std::visit([](auto &runner) { return runner.rpc.dsn_request(); 
}, _apply_runner);
+    }
+
+    // Apply single-update requests to the storage engine, and automatically 
respond to the
+    // client -- it won't respond until the internal holder of the RPC is 
destructed. Return
+    // rocksdb::Status::kOk if succeed, otherwise some error code 
(rocksdb::Status::Code).
+    [[nodiscard]] int apply() const
+    {
+        return std::visit(
+            [this](auto &runner) {
+                // Enable automatic reply to the client no matter whether it 
would succeed
+                // or some error might occur.
+                runner.rpc.enable_auto_reply();
+
+                return runner.func(_updates, runner.rpc);
+            },
+            _apply_runner);
+    }
+
+private:
+    template <typename TRpcHolder>
+    struct apply_runner
+    {
+        apply_runner(TRpcHolder &&original_rpc, apply_func_t<TRpcHolder> 
&&apply_func)
+            : rpc(std::forward<TRpcHolder>(original_rpc)), 
func(std::move(apply_func))
+        {
+            // Disable automatic reply to make sure we won't respond to the 
client until we
+            // are ready to apply idempotent single-update requests to the 
storage engine,
+            // because:
+            // 1. Automatic reply only makes sense after the response is 
ready, so it should
+            // be enabled right before applying to the storage engine.
+            // 2. Before applying to the storage engine, an error might occur 
and the client
+            // could be proactively replied to externally (e.g., via 
reply_with_error()),
+            // so automatic reply needs to be disabled during this stage.
+            rpc.disable_auto_reply();
+        }
+
+        // Holds the original RPC request and the response to it.
+        const TRpcHolder rpc;
+
+        // The user-defined function that applies idempotent single-update 
requests to the
+        // storage engine.
+        const apply_func_t<TRpcHolder> func;
+    };
+
+    using apply_runner_t = std::variant<apply_runner<incr_rpc>,
+                                        apply_runner<check_and_set_rpc>,
+                                        apply_runner<check_and_mutate_rpc>>;
+
+    const apply_runner_t _apply_runner;
+
+    // The idempotent single-update requests that are translated from the 
original atomic
+    // write request.
+    const std::vector<dsn::apps::update_request> _updates;
+
+    DISALLOW_COPY_AND_ASSIGN(idempotent_writer);
+    DISALLOW_MOVE_AND_ASSIGN(idempotent_writer);
+};
+
+using idempotent_writer_ptr = std::unique_ptr<idempotent_writer>;
+
+} // namespace pegasus
diff --git a/src/replica/mutation.h b/src/replica/mutation.h
index e88a90dce..7246a5e66 100644
--- a/src/replica/mutation.h
+++ b/src/replica/mutation.h
@@ -42,6 +42,7 @@
 #include "common/replication_common.h"
 #include "common/replication_other_types.h"
 #include "consensus_types.h"
+#include "replica/idempotent_writer.h"
 #include "rpc/rpc_message.h"
 #include "runtime/api_layer1.h"
 #include "task/task.h"
@@ -195,10 +196,11 @@ public:
     // This field is only used by primary replicas.
     bool is_blocking_candidate{false};
 
-    // The original request received from the client. While making an atomic 
request (incr,
-    // check_and_set and check_and_mutate) idempotent, an extra variable is 
needed to hold
-    // its original request for the purpose of replying to the client.
-    dsn::message_ptr original_request;
+    // While an atomic write request (i.e. incr, check_and_set or 
check_and_mutate) is received
+    // and required to be idempotent, a data structure is needed to hold the 
original RPC and
+    // the idempotent single-update requests until all idempotent requests 
have been applied to
+    // the storage engine and the client has been responded to.
+    pegasus::idempotent_writer_ptr idem_writer;
 
     std::shared_ptr<dsn::utils::latency_tracer> _tracer;
 
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index a775b02e2..7b16fa2da 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -583,10 +583,10 @@ mutation_ptr replica::new_mutation(decree d, bool 
is_blocking_candidate)
     return mu;
 }
 
-mutation_ptr replica::new_mutation(decree d, dsn::message_ex *original_request)
+mutation_ptr replica::new_mutation(decree d, pegasus::idempotent_writer_ptr 
&&idem_writer)
 {
     auto mu = new_mutation(d);
-    mu->original_request = original_request;
+    mu->idem_writer = std::move(idem_writer);
     return mu;
 }
 
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 50e0105c2..1f5e8dae2 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -48,6 +48,7 @@
 #include "prepare_list.h"
 #include "ranger/access_type.h"
 #include "replica/backup/cold_backup_context.h"
+#include "replica/idempotent_writer.h"
 #include "replica/replica_base.h"
 #include "replica_context.h"
 #include "rpc/rpc_message.h"
@@ -165,9 +166,6 @@ class replica : public serverlet<replica>, public 
ref_counter, public replica_ba
 public:
     ~replica() override;
 
-    DISALLOW_COPY_AND_ASSIGN(replica);
-    DISALLOW_MOVE_AND_ASSIGN(replica);
-
     // return true when the mutation is valid for the current replica
     bool replay_mutation(mutation_ptr &mu, bool is_private);
     void reset_prepare_list_after_replay();
@@ -393,15 +391,16 @@ private:
     // Return the newly created mutation.
     mutation_ptr new_mutation(decree d, bool is_blocking_candidate);
 
-    // Create a new mutation with specified decree and the original atomic 
write request,
-    // which is used to build the response to the client.
+    // Create a new mutation with specified decree and idempotent writer.
     //
     // Parameters:
     // - d: invalid_decree, or the real decree assigned to this mutation.
-    // - original_request: the original request of the atomic write.
+    // - idem_writer: the data structure that applies the idempotent requests 
to the storage
+    // engine and automatically responds to the atomic write request (i.e. 
incr, check_and_set
+    // or check_and_mutate).
     //
     // Return the newly created mutation.
-    mutation_ptr new_mutation(decree d, dsn::message_ex *original_request);
+    mutation_ptr new_mutation(decree d, pegasus::idempotent_writer_ptr 
&&idem_writer);
 
     // initialization
     replica(replica_stub *stub,
@@ -904,6 +903,9 @@ private:
     bool _allow_ingest_behind{false};
     // Indicate where the storage engine data is corrupted and unrecoverable.
     bool _data_corrupted{false};
+
+    DISALLOW_COPY_AND_ASSIGN(replica);
+    DISALLOW_MOVE_AND_ASSIGN(replica);
 };
 
 using replica_ptr = dsn::ref_ptr<replica>;
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index fefcded06..5d8dc5adb 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -52,6 +52,7 @@
 #include "mutation_log.h"
 #include "ranger/access_type.h"
 #include "replica.h"
+#include "replica/idempotent_writer.h"
 #include "replica/prepare_list.h"
 #include "replica/replica_context.h"
 #include "replica/replication_app_base.h"
@@ -294,7 +295,8 @@ int replica::make_idempotent(mutation_ptr &mu)
         mu->client_requests.size(), 1, "the original atomic write request must 
not be batched");
 
     std::vector<dsn::message_ex *> new_requests;
-    const int err = _app->make_idempotent(request, new_requests);
+    pegasus::idempotent_writer_ptr idem_writer;
+    const int err = _app->make_idempotent(request, new_requests, idem_writer);
 
     // When the condition checks of `check_and_set` and `check_and_mutate` 
fail, make_idempotent()
     // would return rocksdb::Status::kTryAgain. Therefore, there is still a 
certain probability
@@ -310,18 +312,13 @@ int replica::make_idempotent(mutation_ptr &mu)
         "new_requests should not be empty since its original write request 
must be atomic "
         "and translated into at least one idempotent request");
 
-    // During make_idempotent(), the request has been deserialized (i.e. 
unmarshall() in the
-    // constructor of `rpc_holder::internal`). Once deserialize it again, 
assertion would fail for
-    // set_read_msg() in the constructor of `rpc_read_stream`.
-    //
-    // To make it deserializable again to be applied into RocksDB, restore 
read for it.
-    request->restore_read();
+    CHECK_PREFIX_MSG(idem_writer, "idem_writer should not be empty");
 
     CHECK_EQ_PREFIX_MSG(mu->get_decree(), invalid_decree, "the decree must 
have not been assigned");
 
     // Create a new mutation to hold the new idempotent requests. The old 
mutation holding the
     // original atomic write request will be released automatically.
-    mu = new_mutation(invalid_decree, request);
+    mu = new_mutation(invalid_decree, std::move(idem_writer));
     for (dsn::message_ex *new_request : new_requests) {
         mu->add_client_request(new_request);
     }
@@ -471,8 +468,8 @@ void replica::init_prepare(mutation_ptr &mu, bool 
reconciliation, bool pop_all_c
 void replica::reply_with_error(const mutation_ptr &mu, const error_code &err)
 {
     // Respond to the original atomic request if it is non-null. And it could 
never be batched.
-    if (mu->original_request != nullptr) {
-        response_client_write(mu->original_request, err);
+    if (mu->idem_writer) {
+        response_client_write(mu->idem_writer->request(), err);
         return;
     }
 
diff --git a/src/replica/replication_app_base.cpp 
b/src/replica/replication_app_base.cpp
index 40f043dd9..fd871fbd6 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -40,6 +40,7 @@
 #include "consensus_types.h"
 #include "dsn.layer2_types.h"
 #include "replica.h"
+#include "replica/idempotent_writer.h"
 #include "replica/mutation.h"
 #include "replica/replication_app_base.h"
 #include "rpc/rpc_message.h"
@@ -266,7 +267,7 @@ int replication_app_base::on_batched_write_requests(int64_t 
decree,
                                                     uint64_t timestamp,
                                                     message_ex **requests,
                                                     uint32_t count,
-                                                    message_ex 
*original_request)
+                                                    
pegasus::idempotent_writer_ptr &&idem_writer)
 {
     int storage_error = rocksdb::Status::kOk;
     for (uint32_t i = 0; i < count; ++i) {
@@ -329,7 +330,7 @@ error_code replication_app_base::apply_mutation(const 
mutation_ptr &mu)
                                                         
mu->data.header.timestamp,
                                                         batched_requests,
                                                         batched_count,
-                                                        mu->original_request);
+                                                        
std::move(mu->idem_writer));
 
     // release faked requests
     for (uint32_t i = 0; i < faked_count; ++i) {
diff --git a/src/replica/replication_app_base.h 
b/src/replica/replication_app_base.h
index 329a5e972..5842e66c0 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -37,6 +37,7 @@
 #include "common/replication_other_types.h"
 #include "metadata_types.h"
 #include "mutation.h"
+#include "replica/idempotent_writer.h"
 #include "replica/replica_base.h"
 #include "replica_admin_types.h"
 #include "utils/error_code.h"
@@ -262,27 +263,31 @@ public:
     // - request: the original request received from a client.
     // - new_requests: the output parameter, holding the resulting idempotent 
requests if the
     // original request is atomic, otherwise keeping unchanged.
+    // - idem_writer: the output parameter, holding the resulting idempotent 
writer.
     //
     // Return:
     // - for an idempotent requess always return rocksdb::Status::kOk .
     // - for an atomic request, return rocksdb::Status::kOk if succeed in 
making it idempotent;
     // otherwise, return error code (rocksdb::Status::Code).
     virtual int make_idempotent(dsn::message_ex *request,
-                                std::vector<dsn::message_ex *> &new_requests) 
= 0;
+                                std::vector<dsn::message_ex *> &new_requests,
+                                pegasus::idempotent_writer_ptr &idem_writer) = 
0;
 
     // Apply batched write requests from a mutation. This is a virtual 
function, and base class
     // provide a naive implementation that just call on_request for each 
request. Storage engine
     // may override this function to get better performance.
     //
     // Parameters:
-    //  - decree: the decree of the mutation which these requests are batched 
into.
-    //  - timestamp: an incremental timestamp generated for this batch of 
requests.
-    //  - requests: the requests to be applied.
-    //  - count: the number of the requests.
-    //  - original_request: the original request received from the client. 
Must be an atomic
-    //  request (i.e. incr, check_and_set and check_and_mutate) if non-null, 
and another
-    //  parameter `requests` must hold the idempotent request translated from 
it. Used to
-    //  reply to the client.
+    // - decree: the decree of the mutation which these requests are batched 
into.
+    // - timestamp: an incremental timestamp generated for this batch of 
requests.
+    // - requests: the requests to be applied.
+    // - count: the number of the requests.
+    // - idem_writer: the original request received from the client. Must be 
an atomic
+    // request (i.e. incr, check_and_set and check_and_mutate) if non-null
+    // - idem_writer: if non-null, used to apply the idempotent requests to 
the storage engine
+    // and automatically responds to the atomic write request (i.e. incr, 
check_and_set or
+    // check_and_mutate), with the parameter `requests` holding the serialized 
idempotent
+    // requests and the parameter `count` recording the number.
     //
     // Return rocksdb::Status::kOk or some error code (rocksdb::Status::Code) 
if these requests
     // failed to be applied by storage engine.
@@ -290,7 +295,7 @@ public:
                                           uint64_t timestamp,
                                           message_ex **requests,
                                           uint32_t count,
-                                          message_ex *original_request);
+                                          pegasus::idempotent_writer_ptr 
&&idem_writer);
 
     // Query compact state.
     [[nodiscard]] virtual std::string query_compact_state() const = 0;
diff --git a/src/replica/storage/simple_kv/simple_kv.server.h 
b/src/replica/storage/simple_kv/simple_kv.server.h
index c34aa0e81..8ec386250 100644
--- a/src/replica/storage/simple_kv/simple_kv.server.h
+++ b/src/replica/storage/simple_kv/simple_kv.server.h
@@ -54,7 +54,8 @@ public:
     }
 
     int make_idempotent(dsn::message_ex *request,
-                        std::vector<dsn::message_ex *> &new_requests) override
+                        std::vector<dsn::message_ex *> &new_requests,
+                        pegasus::idempotent_writer_ptr &idem_writer) override
     {
         return rocksdb::Status::kOk;
     }
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index ab30d9a5c..3899b5da8 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -79,7 +79,8 @@ public:
     }
     int on_request(message_ex *request) override WARN_UNUSED_RESULT { return 
0; }
     int make_idempotent(dsn::message_ex *request,
-                        std::vector<dsn::message_ex *> &new_requests) override
+                        std::vector<dsn::message_ex *> &new_requests,
+                        pegasus::idempotent_writer_ptr &idem_writer) override
     {
         return rocksdb::Status::kOk;
     }
diff --git a/src/rpc/rpc_holder.h b/src/rpc/rpc_holder.h
index 0ad39a21a..3dcc569ae 100644
--- a/src/rpc/rpc_holder.h
+++ b/src/rpc/rpc_holder.h
@@ -224,9 +224,11 @@ public:
         dsn_rpc_forward(dsn_request(), addr);
     }
 
-    inline void set_auto_reply(bool auto_reply) { _i->auto_reply = auto_reply; 
}
+    [[nodiscard]] inline bool is_auto_reply() const { return _i->auto_reply; }
 
-    inline void enable_auto_reply() { set_auto_reply(true); }
+    inline void enable_auto_reply() const { set_auto_reply(true); }
+
+    inline void disable_auto_reply() const { set_auto_reply(false); }
 
     // Returns an rpc_holder that will reply the request after its lifetime 
ends.
     // By default rpc_holder never replies.
@@ -276,6 +278,8 @@ public:
     friend bool operator<(const rpc_holder &lhs, const rpc_holder &rhs) { 
return lhs._i < rhs._i; }
 
 private:
+    inline void set_auto_reply(bool auto_reply) const { _i->auto_reply = 
auto_reply; }
+
     friend class rpc_holder_test;
 
     struct internal
@@ -333,6 +337,9 @@ private:
         dsn::error_code rpc_error = dsn::ERR_OK;
 
         bool auto_reply;
+
+        DISALLOW_COPY_AND_ASSIGN(internal);
+        DISALLOW_MOVE_AND_ASSIGN(internal);
     };
 
     std::shared_ptr<internal> _i;
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index 2d13948a9..1ba03fb16 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -61,6 +61,7 @@
 #include "pegasus_rpc_types.h"
 #include "pegasus_server_write.h"
 #include "replica_admin_types.h"
+#include "replica/idempotent_writer.h"
 #include "rpc/rpc_message.h"
 #include "rrdb/rrdb.code.definition.h"
 #include "rrdb/rrdb_types.h"
@@ -346,24 +347,25 @@ void pegasus_server_impl::gc_checkpoints(bool 
force_reserve_one)
 }
 
 int pegasus_server_impl::make_idempotent(dsn::message_ex *request,
-                                         std::vector<dsn::message_ex *> 
&new_requests)
+                                         std::vector<dsn::message_ex *> 
&new_requests,
+                                         idempotent_writer_ptr &idem_writer)
 {
     CHECK_TRUE(_is_open);
 
-    return _server_write->make_idempotent(request, new_requests);
+    return _server_write->make_idempotent(request, new_requests, idem_writer);
 }
 
 int pegasus_server_impl::on_batched_write_requests(int64_t decree,
                                                    uint64_t timestamp,
                                                    dsn::message_ex **requests,
                                                    uint32_t count,
-                                                   dsn::message_ex 
*original_request)
+                                                   idempotent_writer_ptr 
&&idem_writer)
 {
     CHECK_TRUE(_is_open);
     CHECK_NOTNULL(requests, "");
 
     return _server_write->on_batched_write_requests(
-        requests, count, decree, timestamp, original_request);
+        requests, count, decree, timestamp, std::move(idem_writer));
 }
 
 // Since LOG_ERROR_PREFIX depends on log_prefix(), this method could not be 
declared as static or
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 64934ffaa..e64eb1da1 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -45,6 +45,7 @@
 #include "pegasus_utils.h"
 #include "pegasus_value_schema.h"
 #include "range_read_limiter.h"
+#include "replica/idempotent_writer.h"
 #include "replica/replication_app_base.h"
 #include "task/task.h"
 #include "task/task_tracker.h"
@@ -149,7 +150,8 @@ public:
 
     // See replication_app_base::make_idempotent() for details. Only called by 
primary replicas.
     int make_idempotent(dsn::message_ex *request,
-                        std::vector<dsn::message_ex *> &new_requests) override;
+                        std::vector<dsn::message_ex *> &new_requests,
+                        idempotent_writer_ptr &idem_writer) override;
 
     /// Each of the write request (specifically, the rpc that's configured as 
write, see
     /// option `rpc_request_is_write_operation` in rDSN `task_spec`) will 
first be
@@ -162,7 +164,7 @@ public:
                                   uint64_t timestamp,
                                   dsn::message_ex **requests,
                                   uint32_t count,
-                                  dsn::message_ex *original_request) override;
+                                  idempotent_writer_ptr &&idem_writer) 
override;
 
     ::dsn::error_code prepare_get_checkpoint(dsn::blob &learn_req) override
     {
diff --git a/src/server/pegasus_server_write.cpp 
b/src/server/pegasus_server_write.cpp
index 810746f4f..b4db97427 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -32,8 +32,8 @@
 #include "pegasus_server_impl.h"
 #include "pegasus_server_write.h"
 #include "pegasus_utils.h"
+#include "replica/idempotent_writer.h"
 #include "rpc/rpc_holder.h"
-#include "rpc/serialization.h"
 #include "rrdb/rrdb.code.definition.h"
 #include "server/pegasus_write_service.h"
 #include "utils/autoref_ptr.h"
@@ -58,11 +58,11 @@ 
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server)
 {
     init_make_idempotent_handlers();
     init_non_batch_write_handlers();
-    init_idempotent_writers();
 }
 
 int pegasus_server_write::make_idempotent(dsn::message_ex *request,
-                                          std::vector<dsn::message_ex *> 
&new_requests)
+                                          std::vector<dsn::message_ex *> 
&new_requests,
+                                          idempotent_writer_ptr &idem_writer)
 {
     const auto make_idempotent_handler =
         std::as_const(_make_idempotent_handlers).find(request->rpc_code());
@@ -73,7 +73,7 @@ int pegasus_server_write::make_idempotent(dsn::message_ex 
*request,
     }
 
     try {
-        return make_idempotent_handler->second(request, new_requests);
+        return make_idempotent_handler->second(request, new_requests, 
idem_writer);
     } catch (TTransportException &ex) {
         METRIC_VAR_INCREMENT(corrupt_writes);
         LOG_ERROR_PREFIX("make idempotent handler for {} failed: from = {}, 
exception = {}",
@@ -93,7 +93,7 @@ int 
pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
                                                     uint32_t count,
                                                     int64_t decree,
                                                     uint64_t timestamp,
-                                                    dsn::message_ex 
*original_request)
+                                                    idempotent_writer_ptr 
&&idem_writer)
 {
     _write_ctx = db_write_context::create(decree, timestamp);
     _decree = decree;
@@ -134,93 +134,18 @@ int 
pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
         }
     }
 
-    if (original_request != nullptr) {
-        // Once `original_request` is set, `requests` must be idempotent and 
translated from
-        // an atomic write request.
-        return apply_idempotent(requests, count, original_request);
+    if (idem_writer) {
+        // Pass ownership down the call stack layer by layer, and ultimately 
transfer it to
+        // the local variable `writer`, so that upon destruction it can 
automatically reply
+        // to the client and release the memory occupied by the atomic write 
RPC and its
+        // idempotent single-update requests.
+        const idempotent_writer_ptr writer(std::move(idem_writer));
+        return writer->apply();
     }
 
     return on_batched_writes(requests, count);
 }
 
-int pegasus_server_write::apply_idempotent(dsn::message_ex **requests,
-                                           uint32_t count,
-                                           dsn::message_ex *original_request)
-{
-    CHECK_GT_PREFIX_MSG(count,
-                        0,
-                        "`requests` should consist of at least one request: "
-                        "original_request = ({}, {}), decree = {}",
-                        original_request->header->from_address,
-                        original_request->rpc_code(),
-                        _decree);
-
-    std::vector<dsn::apps::update_request> updates;
-    updates.reserve(count);
-
-    for (uint32_t i = 0; i < count; ++i) {
-        CHECK_EQ_PREFIX_MSG(requests[i]->rpc_code(),
-                            dsn::apps::RPC_RRDB_RRDB_PUT,
-                            "now all translated idempotent writes are 
RPC_RRDB_RRDB_PUT "
-                            "requests: original_request = ({}, {}), decree = 
{}, "
-                            "count = {}, index = {}",
-                            original_request->header->from_address,
-                            original_request->rpc_code(),
-                            _decree,
-                            count,
-                            i);
-
-        // Preinitialize an empty update.
-        updates.emplace_back();
-
-        // Deserialize the message to create the real idempotent update.
-        //
-        // There is no need to catch TTransportException, since `requests` are 
freshly
-        // generated by serializing the idempotent updates within 
make_idempotent().
-        dsn::unmarshall(requests[i], updates.back());
-
-        CHECK_PREFIX_MSG(
-            updates.back().__isset.type,
-            "update_request::type is not set for {} request: original_request 
= ({}, {}) "
-            "decree = {}, count = {}, index = {},",
-            requests[i]->rpc_code(),
-            original_request->header->from_address,
-            original_request->rpc_code(),
-            _decree,
-            count,
-            i);
-    }
-
-    const auto update_type = static_cast<uint32_t>(updates.front().type);
-    CHECK_LT_PREFIX_MSG(update_type,
-                        _idempotent_writers.size(),
-                        "unsupported update_request::type {} for idempotent 
writer: "
-                        "original_request = ({}, {}), decree = {}, count = {}",
-                        update_type,
-                        original_request->header->from_address,
-                        original_request->rpc_code(),
-                        _decree,
-                        count);
-
-    // If the corresponding writer is retrieved using 
original_request->rpc_code(), a map
-    // would be needed. However, by choosing to obtain the writer based on the 
type of the
-    // first idempotent update, the writers can be stored in an array instead, 
which offers
-    // better performance than a map.
-    const auto writer = _idempotent_writers[update_type];
-    CHECK_PREFIX_MSG(writer,
-                     "idempotent writer for update_request::type {} should not 
be empty: "
-                     "original_request = ({}, {}), decree = {}, count = {}",
-                     update_type,
-                     original_request->header->from_address,
-                     original_request->rpc_code(),
-                     _decree,
-                     count);
-
-    // There is no need to catch TTransportException, since `original_request` 
has just
-    // been successfully deserialized in make_idempotent().
-    return writer(updates, original_request);
-}
-
 void pegasus_server_write::set_default_ttl(uint32_t ttl) { 
_write_svc->set_default_ttl(ttl); }
 
 int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, 
uint32_t count)
@@ -326,16 +251,22 @@ void pegasus_server_write::init_make_idempotent_handlers()
 {
     _make_idempotent_handlers = {
         {dsn::apps::RPC_RRDB_RRDB_INCR,
-         [this](dsn::message_ex *request, std::vector<dsn::message_ex *> 
&new_requests) -> int {
-             return make_idempotent<incr_rpc>(request, new_requests);
+         [this](dsn::message_ex *request,
+                std::vector<dsn::message_ex *> &new_requests,
+                idempotent_writer_ptr &idem_writer) -> int {
+             return make_idempotent<incr_rpc>(request, new_requests, 
idem_writer);
          }},
         {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET,
-         [this](dsn::message_ex *request, std::vector<dsn::message_ex *> 
&new_requests) -> int {
-             return make_idempotent<check_and_set_rpc>(request, new_requests);
+         [this](dsn::message_ex *request,
+                std::vector<dsn::message_ex *> &new_requests,
+                idempotent_writer_ptr &idem_writer) -> int {
+             return make_idempotent<check_and_set_rpc>(request, new_requests, 
idem_writer);
          }},
         {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE,
-         [this](dsn::message_ex *request, std::vector<dsn::message_ex *> 
&new_requests) -> int {
-             return make_idempotent<check_and_mutate_rpc>(request, 
new_requests);
+         [this](dsn::message_ex *request,
+                std::vector<dsn::message_ex *> &new_requests,
+                idempotent_writer_ptr &idem_writer) -> int {
+             return make_idempotent<check_and_mutate_rpc>(request, 
new_requests, idem_writer);
          }},
     };
 }
@@ -381,63 +312,4 @@ void pegasus_server_write::init_non_batch_write_handlers()
     };
 }
 
-void pegasus_server_write::init_idempotent_writers()
-{
-    _idempotent_writers = {
-        nullptr, // No writer for UT_PUT.
-        [this](const std::vector<dsn::apps::update_request> &updates,
-               dsn::message_ex *original_request) -> int {
-            // The type of the only update in `updates` is UT_INCR, thus 
choose the writer
-            // for incr.
-            return put_incr(updates, original_request);
-        },
-        [this](const std::vector<dsn::apps::update_request> &updates,
-               dsn::message_ex *original_request) -> int {
-            // The type of the only update in `updates` is UT_CHECK_AND_SET, 
thus choose the
-            // writer for check_and_set.
-            return put_check_and_set(updates, original_request);
-        },
-        [this](const std::vector<dsn::apps::update_request> &updates,
-               dsn::message_ex *original_request) -> int {
-            // The type of the first update in batched `updates` is 
UT_CHECK_AND_MUTATE_PUT,
-            // thus choose the writer for check_and_mutate.
-            return put_check_and_mutate(updates, original_request);
-        },
-        [this](const std::vector<dsn::apps::update_request> &updates,
-               dsn::message_ex *original_request) -> int {
-            // The type of the first update in batched `updates` is 
UT_CHECK_AND_MUTATE_REMOVE,
-            // thus choose the writer for check_and_mutate.
-            return put_check_and_mutate(updates, original_request);
-        },
-    };
-
-    CHECK_EQ_PREFIX_MSG(dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE + 1,
-                        _idempotent_writers.size(),
-                        "_idempotent_writers does not match 
dsn::apps::update_type");
-}
-
-int pegasus_server_write::put_incr(const 
std::vector<dsn::apps::update_request> &updates,
-                                   dsn::message_ex *original_request)
-{
-    CHECK_EQ_PREFIX(updates.size(), 1);
-    CHECK_EQ_PREFIX(original_request->rpc_code(), 
dsn::apps::RPC_RRDB_RRDB_INCR);
-    return put<incr_rpc>(updates, original_request);
-}
-
-int pegasus_server_write::put_check_and_set(const 
std::vector<dsn::apps::update_request> &updates,
-                                            dsn::message_ex *original_request)
-{
-    CHECK_EQ_PREFIX(updates.size(), 1);
-    CHECK_EQ_PREFIX(original_request->rpc_code(), 
dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET);
-    return put<check_and_set_rpc>(updates, original_request);
-}
-
-int pegasus_server_write::put_check_and_mutate(
-    const std::vector<dsn::apps::update_request> &updates, dsn::message_ex 
*original_request)
-{
-    CHECK_GT_PREFIX(updates.size(), 0);
-    CHECK_EQ_PREFIX(original_request->rpc_code(), 
dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE);
-    return put<check_and_mutate_rpc>(updates, original_request);
-}
-
 } // namespace pegasus::server
diff --git a/src/server/pegasus_server_write.h 
b/src/server/pegasus_server_write.h
index 9680f1a3c..6d578c389 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -21,16 +21,17 @@
 
 #include <rocksdb/status.h>
 #include <algorithm>
-#include <array>
 #include <cstdint>
 #include <functional>
 #include <map>
 #include <memory>
+#include <utility>
 #include <vector>
 
 #include "base/pegasus_rpc_types.h"
 #include "common/replication_other_types.h"
 #include "pegasus_write_service.h"
+#include "replica/idempotent_writer.h"
 #include "replica/replica_base.h"
 #include "rpc/rpc_message.h"
 #include "rrdb/rrdb.code.definition.h"
@@ -55,7 +56,9 @@ public:
     explicit pegasus_server_write(pegasus_server_impl *server);
 
     // See replication_app_base::make_idempotent() for details. Only called by 
primary replicas.
-    int make_idempotent(dsn::message_ex *request, std::vector<dsn::message_ex 
*> &new_requests);
+    int make_idempotent(dsn::message_ex *request,
+                        std::vector<dsn::message_ex *> &new_requests,
+                        idempotent_writer_ptr &idem_writer);
 
     // See replication_app_base::on_batched_write_requests() for details.
     //
@@ -71,7 +74,7 @@ public:
                                   uint32_t count,
                                   int64_t decree,
                                   uint64_t timestamp,
-                                  dsn::message_ex *original_request);
+                                  idempotent_writer_ptr &&idem_writer);
 
     void set_default_ttl(uint32_t ttl);
 
@@ -79,7 +82,9 @@ private:
     // Used to call make_idempotent() for each type (specified by TRpcHolder) 
of atomic write.
     // Only called by primary replicas.
     template <typename TRpcHolder>
-    int make_idempotent(dsn::message_ex *request, std::vector<dsn::message_ex 
*> &new_requests)
+    int make_idempotent(dsn::message_ex *request,
+                        std::vector<dsn::message_ex *> &new_requests,
+                        idempotent_writer_ptr &idem_writer)
     {
         auto rpc = TRpcHolder(request);
 
@@ -109,37 +114,18 @@ private:
                     request->header->context.u.serialize_format)));
         }
 
-        return rocksdb::Status::kOk;
-    }
-
-    // Apply the idempotent updates `requests` into storage engine and respond 
to the original
-    // atomic write request `original_request`. Both of `requests` and 
`original_request` should
-    // not be null, while `count` should always be > 0.
-    int
-    apply_idempotent(dsn::message_ex **requests, uint32_t count, 
dsn::message_ex *original_request);
+        idem_writer = std::make_unique<pegasus::idempotent_writer>(
+            std::move(rpc),
+            typename pegasus::idempotent_writer::template 
apply_func_t<TRpcHolder>(
+                [this](const std::vector<dsn::apps::update_request> &updates,
+                       const TRpcHolder &rpc) -> int {
+                    return _write_svc->put(_write_ctx, updates, rpc.request(), 
rpc.response());
+                }),
+            std::move(updates));
 
-    // Apply the batched (one or multiple) single-update requests into the 
storage engine.
-    // Only called by primary replicas.
-    template <typename TRpcHolder>
-    inline int put(const std::vector<dsn::apps::update_request> &updates,
-                   dsn::message_ex *original_request)
-    {
-        // Enable auto reply, since in primary replicas we need to reply to 
the client with
-        // the response to the original atomic write request after the 
idempotent updates
-        // were applied into the storage engine.
-        auto rpc = TRpcHolder::auto_reply(original_request);
-        return _write_svc->put(_write_ctx, updates, rpc.request(), 
rpc.response());
+        return rocksdb::Status::kOk;
     }
 
-    // Following functions are the writers that apply the idempotent updates 
translated from
-    // incr, check_and_set and check_and_mutate requests and then respond to 
them.
-    int put_incr(const std::vector<dsn::apps::update_request> &updates,
-                 dsn::message_ex *original_request);
-    int put_check_and_set(const std::vector<dsn::apps::update_request> 
&updates,
-                          dsn::message_ex *original_request);
-    int put_check_and_mutate(const std::vector<dsn::apps::update_request> 
&updates,
-                             dsn::message_ex *original_request);
-
     // Delay replying for the batched requests until all of them complete.
     int on_batched_writes(dsn::message_ex **requests, uint32_t count);
 
@@ -163,7 +149,6 @@ private:
 
     void init_make_idempotent_handlers();
     void init_non_batch_write_handlers();
-    void init_idempotent_writers();
 
     friend class pegasus_server_write_test;
     friend class pegasus_write_service_test;
@@ -178,23 +163,16 @@ private:
     int64_t _decree{invalid_decree};
 
     // Handlers that translate an atomic write request into one or multiple 
idempotent updates.
-    using make_idempotent_map =
-        std::map<dsn::task_code,
-                 std::function<int(dsn::message_ex *, 
std::vector<dsn::message_ex *> &)>>;
+    using make_idempotent_map = std::map<
+        dsn::task_code,
+        std::function<int(
+            dsn::message_ex *, std::vector<dsn::message_ex *> &, 
idempotent_writer_ptr &)>>;
     make_idempotent_map _make_idempotent_handlers;
 
     // Handlers that process a write request which could not be batched, e.g. 
multi put/remove.
     using non_batch_write_map = std::map<dsn::task_code, 
std::function<int(dsn::message_ex *)>>;
     non_batch_write_map _non_batch_write_handlers;
 
-    // Writers that apply idempotent updates and respond to the original 
atomic write request.
-    // Each dsn::apps::update_type has its corresponding writer, therefore the 
number of writers
-    // must match the number of dsn::apps::update_type.
-    using idempotent_writer_map = std::array<
-        std::function<int(const std::vector<dsn::apps::update_request> &, 
dsn::message_ex *)>,
-        5>;
-    idempotent_writer_map _idempotent_writers{};
-
     METRIC_VAR_DECLARE_counter(corrupt_writes);
 };
 
diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index e8de292a6..8bdf0d976 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -254,10 +254,10 @@ public:
         }
 
         // Shouldn't fail to parse since the value must be a valid int64.
-        CHECK(dsn::buf2int64(update.value.to_string_view(), resp.new_value),
-              "invalid int64 value for put idempotent incr: key={}, value={}",
-              update.key,
-              update.value);
+        CHECK_PREFIX_MSG(dsn::buf2int64(update.value.to_string_view(), 
resp.new_value),
+                         "invalid int64 value for put idempotent incr: key={}, 
value={}",
+                         update.key,
+                         update.value);
 
         return rocksdb::Status::kOk;
     }
@@ -560,7 +560,7 @@ public:
             const std::vector<dsn::apps::update_request> &updates,
             TResponse &resp)
     {
-        CHECK_EQ(updates.size(), 1);
+        CHECK_EQ_PREFIX(updates.size(), 1);
         return put(ctx, updates.front(), resp);
     }
 
@@ -678,6 +678,8 @@ public:
             const std::vector<dsn::apps::update_request> &updates,
             dsn::apps::check_and_mutate_response &resp)
     {
+        CHECK_GT_PREFIX(updates.size(), 0);
+
         make_basic_response(ctx.decree, resp);
 
         // Copy check_value's fields from the first idempotent request to the 
check_and_mutate


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to