This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 6249c13ae feat(make_idempotent): support making `incr` request 
idempotent in `pegasus_server_write` and `replication_app_base` (#2196)
6249c13ae is described below

commit 6249c13aed066caa3415f8623a6a125235bfddbd
Author: Dan Wang <[email protected]>
AuthorDate: Fri Feb 21 19:32:49 2025 +0800

    feat(make_idempotent): support making `incr` request idempotent in 
`pegasus_server_write` and `replication_app_base` (#2196)
    
    https://github.com/apache/incubator-pegasus/issues/2197
    
    To support idempotence, a new interface `make_idempotent()` is introduced 
and
    an existing interface `on_batched_write_requests()` is changed for both the 
classes
    `pegasus_server_write` and `replication_app_base`. This is different from 
what we
    have done for `pegasus_write_service` and `pegasus_write_service::impl`, 
both of
    which provide `make_idempotent()` and `put()` by the following PRs:
    - https://github.com/apache/incubator-pegasus/pull/2185
    - https://github.com/apache/incubator-pegasus/pull/2192
    
    `make_idempotent()` for `replication_app_base` is provided as a virtual 
function
    called by primary replicas, implemented internally by `pegasus_server_impl` 
and
    `pegasus_server_write`. `on_batched_write_requests`  for 
`replication_app_base`
    is the same. It is changed with a new parameter `original_request` added. 
It is just
    the original request received from the client. It must be an atomic request 
(i.e. `incr`,
    `check_and_set` and `check_and_mutate`) if it is non-null, and used to 
decide if a
    write request is atomic and generate the response corresponding to the 
atomic write
    request.
---
 .clang-tidy                                      |   2 +-
 build_tools/clang_tidy.py                        |   2 +
 src/redis_protocol/proxy_ut/redis_proxy_test.cpp |  11 +-
 src/replica/mutation.h                           |   5 +
 src/replica/replication_app_base.cpp             |  52 +++---
 src/replica/replication_app_base.h               |  53 ++++--
 src/replica/storage/simple_kv/simple_kv.server.h |  17 +-
 src/replica/test/mock_utils.h                    |   6 +-
 src/rpc/rpc_holder.h                             |   6 +-
 src/rpc/rpc_message.cpp                          |   8 +-
 src/rpc/rpc_message.h                            |  16 +-
 src/rpc/rpc_stream.h                             |  24 +--
 src/rpc/test/rpc_message_test.cpp                |   4 +-
 src/server/pegasus_server_impl.cpp               |  15 +-
 src/server/pegasus_server_impl.h                 |   5 +-
 src/server/pegasus_server_write.cpp              | 197 ++++++++++++++++++-----
 src/server/pegasus_server_write.h                |  33 +++-
 src/server/test/hotkey_collector_test.cpp        |  20 ++-
 src/server/test/pegasus_server_write_test.cpp    |   4 +-
 src/tools/mutation_log_tool.cpp                  |   9 +-
 src/utils/alloc.h                                |   6 +
 21 files changed, 360 insertions(+), 135 deletions(-)

diff --git a/.clang-tidy b/.clang-tidy
index 95dd7616d..deaca38ca 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -20,7 +20,7 @@
 CheckOptions: []
 # Disable some checks that are not useful for us now.
 # They are sorted by names, and should be consistent to 
build_tools/clang_tidy.py.
-Checks: 
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines
 [...]
+Checks: 
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-bugprone-sizeof-expression,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-
 [...]
 ExtraArgs:
 ExtraArgsBefore: []
 FormatStyle: none
diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py
index 8a35e1a5d..b91593026 100755
--- a/build_tools/clang_tidy.py
+++ b/build_tools/clang_tidy.py
@@ -65,6 +65,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
                    "-checks=-bugprone-easily-swappable-parameters,"
                            "-bugprone-lambda-function-name,"
                            "-bugprone-macro-parentheses,"
+                           "-bugprone-sizeof-expression,"
                            "-cert-err58-cpp,"
                            "-concurrency-mt-unsafe,"
                            "-cppcoreguidelines-avoid-c-arrays,"
@@ -78,6 +79,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
                            "-cppcoreguidelines-pro-type-const-cast,"
                            "-cppcoreguidelines-pro-type-union-access,"
                            "-fuchsia-default-arguments-calls,"
+                           "-fuchsia-multiple-inheritance,"
                            "-fuchsia-overloaded-operator,"
                            "-fuchsia-statically-constructed-objects,"
                            
"-google-readability-avoid-underscore-in-googletest-name,"
diff --git a/src/redis_protocol/proxy_ut/redis_proxy_test.cpp 
b/src/redis_protocol/proxy_ut/redis_proxy_test.cpp
index dafa9cb79..d87267f52 100644
--- a/src/redis_protocol/proxy_ut/redis_proxy_test.cpp
+++ b/src/redis_protocol/proxy_ut/redis_proxy_test.cpp
@@ -112,16 +112,15 @@ public:
         _reserved_entry[index]->request.sub_requests = 
std::move(msg.sub_requests);
     }
 
-    static dsn::message_ex *create_message(const char *data)
+    static dsn::message_ex *create_message(const char *data, unsigned int 
length)
     {
         return dsn::message_ex::create_received_request(
-            RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, (void *)data, 
strlen(data));
+            RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, data, length);
     }
 
-    static dsn::message_ex *create_message(const char *data, int length)
+    static dsn::message_ex *create_message(const char *data)
     {
-        return dsn::message_ex::create_received_request(
-            RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, (void *)data, 
length);
+        return create_message(data, strlen(data));
     }
 
     static dsn::message_ex *marshalling_array(const redis_request &request)
@@ -379,7 +378,7 @@ TEST_F(proxy_test, test_random_cases)
         offsets.insert(total_body_size);
 
         int last_offset = 0;
-        for (int offset : offsets) {
+        for (auto offset : offsets) {
             dsn::message_ex *msg = 
redis_test_parser::create_message(msg_buffer_ptr + last_offset,
                                                                      offset - 
last_offset);
             ASSERT_TRUE(parse(msg));
diff --git a/src/replica/mutation.h b/src/replica/mutation.h
index 757ce450e..b6b8ecd14 100644
--- a/src/replica/mutation.h
+++ b/src/replica/mutation.h
@@ -153,6 +153,11 @@ public:
     // user requests
     std::vector<dsn::message_ex *> client_requests;
 
+    // The original request received from the client. While making an atomic 
request (incr,
+    // check_and_set and check_and_mutate) idempotent, an extra variable is 
needed to hold
+    // its original request for the purpose of replying to the client.
+    dsn::message_ptr original_request;
+
     // used by pending mutation queue only
     mutation *next;
 
diff --git a/src/replica/replication_app_base.cpp 
b/src/replica/replication_app_base.cpp
index 0c82513dc..dd4471158 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -24,7 +24,6 @@
  * THE SOFTWARE.
  */
 
-#include <alloca.h>
 #include <fmt/core.h>
 #include <rocksdb/env.h>
 #include <rocksdb/status.h>
@@ -47,6 +46,7 @@
 #include "rpc/serialization.h"
 #include "task/task_code.h"
 #include "task/task_spec.h"
+#include "utils/alloc.h"
 #include "utils/autoref_ptr.h"
 #include "utils/binary_reader.h"
 #include "utils/binary_writer.h"
@@ -264,7 +264,8 @@ error_code 
replication_app_base::apply_checkpoint(chkpt_apply_mode mode, const l
 int replication_app_base::on_batched_write_requests(int64_t decree,
                                                     uint64_t timestamp,
                                                     message_ex **requests,
-                                                    int request_length)
+                                                    int request_length,
+                                                    message_ex 
*original_request)
 {
     int storage_error = rocksdb::Status::kOk;
     for (int i = 0; i < request_length; ++i) {
@@ -292,37 +293,42 @@ error_code replication_app_base::apply_mutation(const 
mutation *mu)
     }
 
     bool has_ingestion_request = false;
-    int request_count = static_cast<int>(mu->client_requests.size());
-    message_ex **batched_requests = (message_ex **)alloca(sizeof(message_ex *) 
* request_count);
-    message_ex **faked_requests = (message_ex **)alloca(sizeof(message_ex *) * 
request_count);
+    const int request_count = static_cast<int>(mu->client_requests.size());
+    auto **batched_requests = ALLOC_STACK(message_ex *, request_count);
+    auto **faked_requests = ALLOC_STACK(message_ex *, request_count);
     int batched_count = 0; // write-empties are not included.
     int faked_count = 0;
-    for (int i = 0; i < request_count; i++) {
+    for (int i = 0; i < request_count; ++i) {
         const mutation_update &update = mu->data.updates[i];
-        message_ex *req = mu->client_requests[i];
         LOG_DEBUG_PREFIX("mutation {} #{}: dispatch rpc call {}", mu->name(), 
i, update.code);
-        if (update.code != RPC_REPLICATION_WRITE_EMPTY) {
-            if (req == nullptr) {
-                req = message_ex::create_received_request(
-                    update.code,
-                    (dsn_msg_serialize_format)update.serialization_type,
-                    (void *)update.data.data(),
-                    update.data.length());
-                faked_requests[faked_count++] = req;
-            }
+        if (update.code == RPC_REPLICATION_WRITE_EMPTY) {
+            continue;
+        }
 
-            batched_requests[batched_count++] = req;
-            if (update.code == apps::RPC_RRDB_RRDB_BULK_LOAD) {
-                has_ingestion_request = true;
-            }
+        message_ex *req = mu->client_requests[i];
+        if (req == nullptr) {
+            req = message_ex::create_received_request(
+                update.code,
+                
static_cast<dsn_msg_serialize_format>(update.serialization_type),
+                update.data.data(),
+                update.data.length());
+            faked_requests[faked_count++] = req;
+        }
+
+        batched_requests[batched_count++] = req;
+        if (update.code == apps::RPC_RRDB_RRDB_BULK_LOAD) {
+            has_ingestion_request = true;
         }
     }
 
-    int storage_error = on_batched_write_requests(
-        mu->data.header.decree, mu->data.header.timestamp, batched_requests, 
batched_count);
+    const int storage_error = on_batched_write_requests(mu->data.header.decree,
+                                                        
mu->data.header.timestamp,
+                                                        batched_requests,
+                                                        batched_count,
+                                                        mu->original_request);
 
     // release faked requests
-    for (int i = 0; i < faked_count; i++) {
+    for (int i = 0; i < faked_count; ++i) {
         faked_requests[i]->release_ref();
     }
 
diff --git a/src/replica/replication_app_base.h 
b/src/replica/replication_app_base.h
index 2a88618f6..6549c8d01 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -129,16 +129,16 @@ public:
     static const std::string kDataDir;
     static const std::string kRdbDir;
 
-    virtual ~replication_app_base() {}
+    virtual ~replication_app_base() = default;
 
-    bool is_primary() const;
+    [[nodiscard]] bool is_primary() const;
 
     // Whether this replica is duplicating as master.
-    virtual bool is_duplication_master() const;
+    [[nodiscard]] virtual bool is_duplication_master() const;
     // Whether this replica is duplicating as follower.
-    virtual bool is_duplication_follower() const;
+    [[nodiscard]] virtual bool is_duplication_follower() const;
 
-    const ballot &get_ballot() const;
+    [[nodiscard]] const ballot &get_ballot() const;
 
     //
     // Open the app.
@@ -248,21 +248,50 @@ public:
     // The return type is generated by storage engine, e.g. 
rocksdb::Status::Code, 0 always mean OK.
     virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0;
 
+    // Make an atomic request received from the client idempotent. Only called 
by primary replicas.
+    //
+    // Current implementation for atomic requests (incr, check_and_set and 
check_and_mutate) is
+    // not idempotent. This function is used to translate them into requests 
like single put
+    // which is naturally idempotent.
+    //
+    // For the other requests which must be idempotent such as single 
put/remove or non-batch
+    // writes, this function would do nothing.
     //
     // Parameters:
-    //  - timestamp: an incremental timestamp generated for this batch of 
requests.
+    // - request: the original request received from a client.
+    // - new_request: as the output parameter pointing to the resulting 
idempotent request if the
+    // original request is atomic, otherwise keeping unchanged.
     //
-    // The base class gives a naive implementation that just call on_request
-    // repeatedly. Storage engine may override this function to get better 
performance.
+    // Return:
+    // - for an idempotent requess always return rocksdb::Status::kOk .
+    // - for an atomic request, return rocksdb::Status::kOk if succeed in 
making it idempotent;
+    // otherwise, return error code (rocksdb::Status::Code).
+    virtual int make_idempotent(dsn::message_ex *request, dsn::message_ex 
**new_request) = 0;
+
+    // Apply batched write requests from a mutation. This is a virtual 
function, and base class
+    // provide a naive implementation that just call on_request for each 
request. Storage engine
+    // may override this function to get better performance.
     //
-    // The return type is generated by storage engine, e.g. 
rocksdb::Status::Code, 0 always mean OK.
+    // Parameters:
+    //  - decree: the decree of the mutation which these requests are batched 
into.
+    //  - timestamp: an incremental timestamp generated for this batch of 
requests.
+    //  - requests: the requests to be applied.
+    //  - request_length: the number of the requests.
+    //  - original_request: the original request received from the client. 
Must be an atomic
+    //  request (i.e. incr, check_and_set and check_and_mutate) if non-null, 
and another
+    //  parameter `requests` must hold the idempotent request translated from 
it. Used to
+    //  reply to the client.
+    //
+    // Return rocksdb::Status::kOk or some error code (rocksdb::Status::Code) 
if these requests
+    // failed to be applied by storage engine.
     virtual int on_batched_write_requests(int64_t decree,
                                           uint64_t timestamp,
                                           message_ex **requests,
-                                          int request_length);
+                                          int request_length,
+                                          message_ex *original_request);
 
-    // query compact state.
-    virtual std::string query_compact_state() const = 0;
+    // Query compact state.
+    [[nodiscard]] virtual std::string query_compact_state() const = 0;
 
     // update app envs.
     virtual void update_app_envs(const std::map<std::string, std::string> 
&envs) = 0;
diff --git a/src/replica/storage/simple_kv/simple_kv.server.h 
b/src/replica/storage/simple_kv/simple_kv.server.h
index 6acfc85a0..1fc723cf2 100644
--- a/src/replica/storage/simple_kv/simple_kv.server.h
+++ b/src/replica/storage/simple_kv/simple_kv.server.h
@@ -39,14 +39,25 @@ namespace application {
 class simple_kv_service : public replication_app_base, public 
storage_serverlet<simple_kv_service>
 {
 public:
-    simple_kv_service(replica *r) : replication_app_base(r) {}
-    virtual ~simple_kv_service() {}
+    explicit simple_kv_service(replica *r) : replication_app_base(r) {}
+    ~simple_kv_service() override = default;
 
-    virtual int on_request(dsn::message_ex *request) override 
WARN_UNUSED_RESULT
+    simple_kv_service(const simple_kv_service &) = delete;
+    simple_kv_service &operator=(const simple_kv_service &) = delete;
+
+    simple_kv_service(simple_kv_service &&) = delete;
+    simple_kv_service &operator=(simple_kv_service &&) = delete;
+
+    int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT
     {
         return handle_request(request);
     }
 
+    int make_idempotent(dsn::message_ex *request, dsn::message_ex 
**new_request) override
+    {
+        return rocksdb::Status::kOk;
+    }
+
 protected:
     // all service handlers to be implemented further
     // RPC_SIMPLE_KV_SIMPLE_KV_READ
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index 0ae2dd96a..e8a952810 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -78,7 +78,11 @@ public:
         return ERR_OK;
     }
     int on_request(message_ex *request) override WARN_UNUSED_RESULT { return 
0; }
-    std::string query_compact_state() const { return ""; };
+    int make_idempotent(dsn::message_ex *request, dsn::message_ex 
**new_request) override
+    {
+        return rocksdb::Status::kOk;
+    }
+    [[nodiscard]] std::string query_compact_state() const override { return 
""; };
 
     // we mock the followings
     void update_app_envs(const std::map<std::string, std::string> &envs) 
override { _envs = envs; }
diff --git a/src/rpc/rpc_holder.h b/src/rpc/rpc_holder.h
index 9d753354c..0ad39a21a 100644
--- a/src/rpc/rpc_holder.h
+++ b/src/rpc/rpc_holder.h
@@ -224,13 +224,17 @@ public:
         dsn_rpc_forward(dsn_request(), addr);
     }
 
+    inline void set_auto_reply(bool auto_reply) { _i->auto_reply = auto_reply; 
}
+
+    inline void enable_auto_reply() { set_auto_reply(true); }
+
     // Returns an rpc_holder that will reply the request after its lifetime 
ends.
     // By default rpc_holder never replies.
     // SEE: serverlet<T>::register_rpc_handler_with_rpc_holder
     static inline rpc_holder auto_reply(message_ex *req)
     {
         rpc_holder rpc(req);
-        rpc._i->auto_reply = true;
+        rpc.enable_auto_reply();
         return rpc;
     }
 
diff --git a/src/rpc/rpc_message.cpp b/src/rpc/rpc_message.cpp
index 4c394f1b9..d28829af7 100644
--- a/src/rpc/rpc_message.cpp
+++ b/src/rpc/rpc_message.cpp
@@ -142,13 +142,13 @@ message_ex *message_ex::create_receive_message(const blob 
&data)
 
 message_ex *message_ex::create_received_request(dsn::task_code code,
                                                 dsn_msg_serialize_format 
format,
-                                                void *buffer,
-                                                int size,
+                                                const char *buffer,
+                                                unsigned int size,
                                                 int thread_hash,
                                                 uint64_t partition_hash)
 {
-    ::dsn::blob bb((const char *)buffer, 0, size);
-    auto msg = 
::dsn::message_ex::create_receive_message_with_standalone_header(bb);
+    dsn::blob bb(buffer, 0, size);
+    auto *msg = 
::dsn::message_ex::create_receive_message_with_standalone_header(bb);
     msg->local_rpc_code = code;
     const char *name = code.to_string();
     strncpy(msg->header->rpc_name, name, sizeof(msg->header->rpc_name) - 1);
diff --git a/src/rpc/rpc_message.h b/src/rpc/rpc_message.h
index 9e1d73717..fbf7df17c 100644
--- a/src/rpc/rpc_message.h
+++ b/src/rpc/rpc_message.h
@@ -174,10 +174,18 @@ public:
 
     static message_ex *create_received_request(dsn::task_code rpc_code,
                                                dsn_msg_serialize_format format,
-                                               void *buffer,
-                                               int size,
-                                               int thread_hash = 0,
-                                               uint64_t partition_hash = 0);
+                                               const char *buffer,
+                                               unsigned int size,
+                                               int thread_hash,
+                                               uint64_t partition_hash);
+
+    static message_ex *create_received_request(dsn::task_code rpc_code,
+                                               dsn_msg_serialize_format format,
+                                               const char *buffer,
+                                               unsigned int size)
+    {
+        return create_received_request(rpc_code, format, buffer, size, 0, 0);
+    }
 
     /// This method is only used for receiving request.
     /// The returned message:
diff --git a/src/rpc/rpc_stream.h b/src/rpc/rpc_stream.h
index ffaa049e2..5d7faae3f 100644
--- a/src/rpc/rpc_stream.h
+++ b/src/rpc/rpc_stream.h
@@ -54,23 +54,27 @@ public:
     void set_read_msg(message_ex *msg)
     {
         _msg = msg;
-        if (nullptr != _msg) {
-            ::dsn::blob bb;
-            CHECK(((::dsn::message_ex *)_msg)->read_next(bb),
-                  "read msg must have one segment of buffer ready");
-            init(std::move(bb));
+        if (_msg == nullptr) {
+            return;
         }
+
+        dsn::blob bb;
+        CHECK(_msg->read_next(bb), "read msg must have one segment of buffer 
ready");
+
+        init(std::move(bb));
     }
 
-    int read(char *buffer, int sz) { return inner_read(buffer, sz); }
+    int read(char *buffer, int sz) override { return inner_read(buffer, sz); }
 
-    int read(blob &blob, int len) { return inner_read(blob, len); }
+    int read(blob &blob, int len) override { return inner_read(blob, len); }
 
-    ~rpc_read_stream()
+    ~rpc_read_stream() override
     {
-        if (_msg) {
-            _msg->read_commit((size_t)(total_size() - get_remaining_size()));
+        if (_msg == nullptr) {
+            return;
         }
+
+        _msg->read_commit(static_cast<size_t>(total_size() - 
get_remaining_size()));
     }
 
 private:
diff --git a/src/rpc/test/rpc_message_test.cpp 
b/src/rpc/test/rpc_message_test.cpp
index 19bde1e2d..f8300e6f9 100644
--- a/src/rpc/test/rpc_message_test.cpp
+++ b/src/rpc/test/rpc_message_test.cpp
@@ -167,8 +167,8 @@ TEST(rpc_message_test, message_ex)
 
         message_ex *receive = message_ex::create_received_request(
             request->local_rpc_code,
-            
(dsn_msg_serialize_format)request->header->context.u.serialize_format,
-            (void *)request->buffers[1].data(),
+            
static_cast<dsn_msg_serialize_format>(request->header->context.u.serialize_format),
+            request->buffers[1].data(),
             request->buffers[1].size(),
             request->header->client.thread_hash,
             request->header->client.partition_hash);
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index 827748efc..90e6ed5b9 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -350,15 +350,24 @@ void pegasus_server_impl::gc_checkpoints(bool 
force_reserve_one)
                     max_d);
 }
 
+int pegasus_server_impl::make_idempotent(dsn::message_ex *request, 
dsn::message_ex **new_request)
+{
+    CHECK_TRUE(_is_open);
+
+    return _server_write->make_idempotent(request, new_request);
+}
+
 int pegasus_server_impl::on_batched_write_requests(int64_t decree,
                                                    uint64_t timestamp,
                                                    dsn::message_ex **requests,
-                                                   int count)
+                                                   int count,
+                                                   dsn::message_ex 
*original_request)
 {
-    CHECK(_is_open, "");
+    CHECK_TRUE(_is_open);
     CHECK_NOTNULL(requests, "");
 
-    return _server_write->on_batched_write_requests(requests, count, decree, 
timestamp);
+    return _server_write->on_batched_write_requests(
+        requests, count, decree, timestamp, original_request);
 }
 
 // Since LOG_ERROR_PREFIX depends on log_prefix(), this method could not be 
declared as static or
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 01c37319d..b23898775 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -147,6 +147,8 @@ public:
     //  - ERR_FILE_OPERATION_FAILED
     ::dsn::error_code stop(bool clear_state) override;
 
+    int make_idempotent(dsn::message_ex *request, dsn::message_ex 
**new_request) override;
+
     /// Each of the write request (specifically, the rpc that's configured as 
write, see
     /// option `rpc_request_is_write_operation` in rDSN `task_spec`) will 
first be
     /// replicated to the replicas through the underlying PacificA protocol in 
rDSN, and
@@ -157,7 +159,8 @@ public:
     int on_batched_write_requests(int64_t decree,
                                   uint64_t timestamp,
                                   dsn::message_ex **requests,
-                                  int count) override;
+                                  int count,
+                                  dsn::message_ex *original_request) override;
 
     ::dsn::error_code prepare_get_checkpoint(dsn::blob &learn_req) override
     {
diff --git a/src/server/pegasus_server_write.cpp 
b/src/server/pegasus_server_write.cpp
index 77f679ee7..8cf7d1d2e 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -22,6 +22,7 @@
 #include <thrift/transport/TTransportException.h>
 #include <algorithm>
 #include <string_view>
+#include <type_traits>
 #include <utility>
 
 #include "base/pegasus_key_schema.h"
@@ -35,6 +36,7 @@
 #include "rpc/rpc_holder.h"
 #include "rpc/rpc_message.h"
 #include "rrdb/rrdb.code.definition.h"
+#include "runtime/message_utils.h"
 #include "server/pegasus_write_service.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
@@ -57,13 +59,28 @@ 
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server)
       _write_svc(new pegasus_write_service(server)),
       METRIC_VAR_INIT_replica(corrupt_writes)
 {
+    init_make_idempotent_handlers();
     init_non_batch_write_handlers();
+    init_on_idempotent_handlers();
+}
+
+int pegasus_server_write::make_idempotent(dsn::message_ex *request, 
dsn::message_ex **new_request)
+{
+    auto iter = 
std::as_const(_make_idempotent_handlers).find(request->rpc_code());
+    if (iter != _make_idempotent_handlers.end()) {
+        return iter->second(request, new_request);
+    }
+
+    // Those requests not in the handlers are considered as idempotent. Always 
be successful
+    // for them.
+    return rocksdb::Status::kOk;
 }
 
 int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
                                                     int count,
                                                     int64_t decree,
-                                                    uint64_t timestamp)
+                                                    uint64_t timestamp,
+                                                    dsn::message_ex 
*original_request)
 {
     _write_ctx = db_write_context::create(decree, timestamp);
     _decree = decree;
@@ -76,75 +93,122 @@ int 
pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
     }
 
     try {
-        auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
+        auto iter = 
std::as_const(_non_batch_write_handlers).find(requests[0]->rpc_code());
         if (iter != _non_batch_write_handlers.end()) {
             CHECK_EQ(count, 1);
             return iter->second(requests[0]);
         }
     } catch (TTransportException &ex) {
         METRIC_VAR_INCREMENT(corrupt_writes);
-        LOG_ERROR_PREFIX("pegasus not batch write handler failed, from = {}, 
exception = {}",
+        LOG_ERROR_PREFIX("pegasus non-batch write handler failed, from = {}, 
exception = {}",
                          requests[0]->header->from_address,
                          ex.what());
+
+        // The corrupt write is likely to be an attack or a scan for security. 
Since it has
+        // been in plog, just return rocksdb::Status::kOk to ignore it.
+        // See https://github.com/apache/incubator-pegasus/pull/798.
         return rocksdb::Status::kOk;
     }
 
+    if (original_request != nullptr) {
+        // The request is regarded as idempotent once its original request is 
attached.
+        CHECK_EQ(count, 1);
+        return on_idempotent(requests[0], original_request);
+    }
+
     return on_batched_writes(requests, count);
 }
 
+int pegasus_server_write::on_idempotent(dsn::message_ex *request, 
dsn::message_ex *original_request)
+{
+    try {
+        auto iter = 
std::as_const(_on_idempotent_handlers).find(request->rpc_code());
+        if (iter != _on_idempotent_handlers.end()) {
+            return iter->second(request, original_request);
+        }
+    } catch (TTransportException &ex) {
+        METRIC_VAR_INCREMENT(corrupt_writes);
+        LOG_ERROR_PREFIX("pegasus on idempotent handler failed, from = {}, 
exception = {}",
+                         request->header->from_address,
+                         ex.what());
+        // The corrupt write is likely to be an attack or a scan for security. 
Since it has
+        // been in plog, just return rocksdb::Status::kOk to ignore it.
+        // See https://github.com/apache/incubator-pegasus/pull/798.
+        return rocksdb::Status::kOk;
+    }
+
+    CHECK(false, "unsupported idempotent write request: rpc_code={}", 
request->rpc_code());
+    return rocksdb::Status::kNotSupported;
+}
+
 void pegasus_server_write::set_default_ttl(uint32_t ttl) { 
_write_svc->set_default_ttl(ttl); }
 
 int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int 
count)
 {
+    _write_svc->batch_prepare(_decree);
+
     int err = rocksdb::Status::kOk;
-    {
-        _write_svc->batch_prepare(_decree);
-
-        for (int i = 0; i < count; ++i) {
-            CHECK_NOTNULL(requests[i], "request[{}] is null", i);
-
-            // Make sure all writes are batched even if they are failed,
-            // since we need to record the total qps and rpc latencies,
-            // and respond for all RPCs regardless of their result.
-            int local_err = rocksdb::Status::kOk;
-            try {
-                dsn::task_code rpc_code(requests[i]->rpc_code());
-                if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
-                    auto rpc = put_rpc::auto_reply(requests[i]);
-                    local_err = on_single_put_in_batch(rpc);
-                    _put_rpc_batch.emplace_back(std::move(rpc));
-                } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
-                    auto rpc = remove_rpc::auto_reply(requests[i]);
-                    local_err = on_single_remove_in_batch(rpc);
-                    _remove_rpc_batch.emplace_back(std::move(rpc));
-                } else {
-                    if (_non_batch_write_handlers.find(rpc_code) !=
-                        _non_batch_write_handlers.end()) {
-                        LOG_FATAL("rpc code not allow batch: {}", rpc_code);
-                    } else {
-                        LOG_FATAL("rpc code not handled: {}", rpc_code);
-                    }
+    for (int i = 0; i < count; ++i) {
+        CHECK_NOTNULL(requests[i], "request[{}] is null", i);
+
+        // Make sure all writes are batched even if some of them failed, since 
we need to record
+        // the total QPS and RPC latencies, and respond for all RPCs 
regardless of their result.
+        int local_err = rocksdb::Status::kOk;
+        try {
+            dsn::task_code rpc_code(requests[i]->rpc_code());
+            if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
+                // Once this single-put request is found originating from an 
atomic request,
+                // there's no need to reply to the client since now we must be 
in one of the
+                // following situations:
+                // - now we are replaying plog into RocksDB at startup of this 
replica.
+                // - now we are in a secondary replica: just received a 
prepare request and
+                // appended it to plog, now we are applying it into RocksDB.
+                auto rpc = put_rpc(requests[i]);
+                const auto &update = rpc.request();
+                if (!update.__isset.type || update.type == 
dsn::apps::update_type::UT_PUT) {
+                    // We must reply to the client for the plain single-put 
request.
+                    rpc.enable_auto_reply();
+                } else if (update.type == dsn::apps::update_type::UT_INCR) {
+                    // This put request must originate from an incr request 
and never be
+                    // batched in plog.
+                    CHECK_EQ(count, 1);
                 }
-            } catch (TTransportException &ex) {
-                METRIC_VAR_INCREMENT(corrupt_writes);
-                LOG_ERROR_PREFIX("pegasus batch writes handler failed, from = 
{}, exception = {}",
-                                 requests[i]->header->from_address,
-                                 ex.what());
-            }
 
-            if (err == rocksdb::Status::kOk && local_err != 
rocksdb::Status::kOk) {
-                err = local_err;
+                local_err = on_single_put_in_batch(rpc);
+                _put_rpc_batch.emplace_back(std::move(rpc));
+            } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
+                auto rpc = remove_rpc::auto_reply(requests[i]);
+                local_err = on_single_remove_in_batch(rpc);
+                _remove_rpc_batch.emplace_back(std::move(rpc));
+            } else {
+                if (_non_batch_write_handlers.find(rpc_code) != 
_non_batch_write_handlers.end()) {
+                    LOG_FATAL("rpc code not allow batch: {}", rpc_code);
+                } else {
+                    LOG_FATAL("rpc code not handled: {}", rpc_code);
+                }
             }
+        } catch (TTransportException &ex) {
+            METRIC_VAR_INCREMENT(corrupt_writes);
+            LOG_ERROR_PREFIX("pegasus batch writes handler failed, from = {}, 
exception = {}",
+                             requests[i]->header->from_address,
+                             ex.what());
+            // The corrupt write is likely to be an attack or a scan for 
security. Since it has
+            // been in plog, just ignore it.
+            // See https://github.com/apache/incubator-pegasus/pull/798.
         }
 
-        if (dsn_unlikely(err != rocksdb::Status::kOk ||
-                         (_put_rpc_batch.empty() && 
_remove_rpc_batch.empty()))) {
-            _write_svc->batch_abort(_decree, err == rocksdb::Status::kOk ? -1 
: err);
-        } else {
-            err = _write_svc->batch_commit(_decree);
+        if (err == rocksdb::Status::kOk && local_err != rocksdb::Status::kOk) {
+            err = local_err;
         }
     }
 
+    if (dsn_unlikely(err != rocksdb::Status::kOk ||
+                     (_put_rpc_batch.empty() && _remove_rpc_batch.empty()))) {
+        _write_svc->batch_abort(_decree, err == rocksdb::Status::kOk ? -1 : 
err);
+    } else {
+        err = _write_svc->batch_commit(_decree);
+    }
+
     // reply the batched RPCs
     _put_rpc_batch.clear();
     _remove_rpc_batch.clear();
@@ -177,6 +241,29 @@ void pegasus_server_write::request_key_check(int64_t 
decree,
     }
 }
 
+void pegasus_server_write::init_make_idempotent_handlers()
+{
+    _make_idempotent_handlers = {
+        {dsn::apps::RPC_RRDB_RRDB_INCR,
+         [this](dsn::message_ex *request, dsn::message_ex **new_request) -> 
int {
+             auto rpc = incr_rpc(request);
+             dsn::apps::update_request update;
+             // Translate an incr request into a single-put request.
+             const auto err = _write_svc->make_idempotent(rpc.request(), 
rpc.response(), update);
+             if (dsn_likely(err == rocksdb::Status::kOk)) {
+                 // Build the message based on the resulting put request.
+                 *new_request = dsn::from_thrift_request_to_received_message(
+                     update, dsn::apps::RPC_RRDB_RRDB_PUT);
+             } else {
+                 // Once it failed, just reply to the client with error 
immediately.
+                 rpc.enable_auto_reply();
+             }
+
+             return err;
+         }},
+    };
+}
+
 void pegasus_server_write::init_non_batch_write_handlers()
 {
     _non_batch_write_handlers = {
@@ -217,5 +304,29 @@ void pegasus_server_write::init_non_batch_write_handlers()
          }},
     };
 }
+
+void pegasus_server_write::init_on_idempotent_handlers()
+{
+    _on_idempotent_handlers = {
+        {dsn::apps::RPC_RRDB_RRDB_PUT,
+         [this](dsn::message_ex *request, dsn::message_ex *original_request) 
-> int {
+             auto put = put_rpc(request);
+
+             const auto &update = put.request();
+             CHECK(update.__isset.type, "update_request::type is not set for 
idempotent write");
+
+             if (update.type == dsn::apps::update_type::UT_INCR) {
+                 auto incr = incr_rpc::auto_reply(original_request);
+                 return _write_svc->put(_write_ctx, update, incr.response());
+             }
+
+             CHECK(false,
+                   "unsupported update_request::type for idempotent write {}",
+                   static_cast<int>(update.type));
+             return rocksdb::Status::kNotSupported;
+         }},
+    };
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/pegasus_server_write.h 
b/src/server/pegasus_server_write.h
index d2be2d5a0..c0481d5f0 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -45,8 +45,13 @@ class pegasus_server_impl;
 class pegasus_server_write : public dsn::replication::replica_base
 {
 public:
-    pegasus_server_write(pegasus_server_impl *server);
+    explicit pegasus_server_write(pegasus_server_impl *server);
 
+    // See replication_app_base::make_idempotent() for details.
+    int make_idempotent(dsn::message_ex *request, dsn::message_ex 
**new_request);
+
+    // See replication_app_base::on_batched_write_requests() for details.
+    //
     /// \return error code returned by rocksdb, i.e rocksdb::Status::code.
     /// **NOTE**
     /// Error returned is regarded as the failure of replica, thus will trigger
@@ -58,12 +63,16 @@ public:
     int on_batched_write_requests(dsn::message_ex **requests,
                                   int count,
                                   int64_t decree,
-                                  uint64_t timestamp);
+                                  uint64_t timestamp,
+                                  dsn::message_ex *original_request);
 
     void set_default_ttl(uint32_t ttl);
 
 private:
-    /// Delay replying for the batched requests until all of them complete.
+    // Apply the idempotent request and respond to its original request.
+    int on_idempotent(dsn::message_ex *request, dsn::message_ex 
*original_request);
+
+    // Delay replying for the batched requests until all of them complete.
     int on_batched_writes(dsn::message_ex **requests, int count);
 
     int on_single_put_in_batch(put_rpc &rpc)
@@ -84,8 +93,9 @@ private:
     // In verbose mode it will log for every request.
     void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob 
&key);
 
-private:
+    void init_make_idempotent_handlers();
     void init_non_batch_write_handlers();
+    void init_on_idempotent_handlers();
 
     friend class pegasus_server_write_test;
     friend class pegasus_write_service_test;
@@ -99,8 +109,19 @@ private:
     db_write_context _write_ctx;
     int64_t _decree;
 
-    typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>> 
non_batch_writes_map;
-    non_batch_writes_map _non_batch_write_handlers;
+    // Handlers that make an atomic request idempotent.
+    using make_idempotent_map =
+        std::map<dsn::task_code, std::function<int(dsn::message_ex *, 
dsn::message_ex **)>>;
+    make_idempotent_map _make_idempotent_handlers;
+
+    // Handlers that process a request could not be batched, e.g. multi 
put/remove.
+    using non_batch_write_map = std::map<dsn::task_code, 
std::function<int(dsn::message_ex *)>>;
+    non_batch_write_map _non_batch_write_handlers;
+
+    // Handlers that apply the idempotent request and respond to its original 
request.
+    using on_idempotent_map =
+        std::map<dsn::task_code, std::function<int(dsn::message_ex *, 
dsn::message_ex *)>>;
+    on_idempotent_map _on_idempotent_handlers;
 
     METRIC_VAR_DECLARE_counter(corrupt_writes);
 };
diff --git a/src/server/test/hotkey_collector_test.cpp 
b/src/server/test/hotkey_collector_test.cpp
index 7219c5ca0..688efcb02 100644
--- a/src/server/test/hotkey_collector_test.cpp
+++ b/src/server/test/hotkey_collector_test.cpp
@@ -18,9 +18,12 @@
 #include "server/hotkey_collector.h"
 
 #include <fmt/core.h>
+#include <algorithm>
 #include <chrono>
+#include <cstddef>
 #include <string_view>
 #include <thread>
+#include <vector>
 
 #include "base/pegasus_key_schema.h"
 #include "common/gpid.h"
@@ -257,7 +260,7 @@ public:
         _server->on_detect_hotkey(req, resp);
     }
 
-    get_rpc generate_get_rpc(std::string hash_key)
+    static get_rpc generate_get_rpc(const std::string &hash_key)
     {
         dsn::blob raw_key;
         pegasus_generate_key(raw_key, hash_key, std::string("sortkey"));
@@ -265,7 +268,7 @@ public:
         return rpc;
     }
 
-    dsn::apps::update_request generate_set_req(std::string hash_key)
+    static dsn::apps::update_request generate_set_req(const std::string 
&hash_key)
     {
         dsn::apps::update_request req;
         dsn::blob raw_key;
@@ -377,14 +380,15 @@ TEST_P(hotkey_collector_test, data_completeness)
                      resp);
     ASSERT_EQ(resp.err, dsn::ERR_OK);
 
-    const uint16_t WRITE_REQUEST_COUNT = 1000;
-    dsn::message_ex *writes[WRITE_REQUEST_COUNT];
-    for (int i = 0; i < WRITE_REQUEST_COUNT; i++) {
-        writes[i] = create_put_request(generate_set_req(std::to_string(i)));
+    static const size_t kWriteRequestCount = 1000;
+    std::vector<dsn::message_ex *> writes;
+    writes.reserve(kWriteRequestCount);
+    for (size_t i = 0; i < kWriteRequestCount; ++i) {
+        
writes.push_back(create_put_request(generate_set_req(std::to_string(i))));
     }
-    _server->on_batched_write_requests(int64_t(0), uint64_t(0), writes, 
WRITE_REQUEST_COUNT);
+    _server->on_batched_write_requests(0, 0, writes.data(), 
kWriteRequestCount, nullptr);
 
-    for (int i = 0; i < WRITE_REQUEST_COUNT; i++) {
+    for (size_t i = 0; i < kWriteRequestCount; ++i) {
         auto rpc = generate_get_rpc(std::to_string(i));
         _server->on_get(rpc);
         auto value = rpc.response().value.to_string();
diff --git a/src/server/test/pegasus_server_write_test.cpp 
b/src/server/test/pegasus_server_write_test.cpp
index 72d21eb6a..4b7c6fc84 100644
--- a/src/server/test/pegasus_server_write_test.cpp
+++ b/src/server/test/pegasus_server_write_test.cpp
@@ -91,8 +91,8 @@ public:
                     writes[i] = pegasus::create_remove_request(key);
                 }
 
-                int err =
-                    _server_write->on_batched_write_requests(writes, 
total_rpc_cnt, decree, 0);
+                int err = _server_write->on_batched_write_requests(
+                    writes, total_rpc_cnt, decree, 0, nullptr);
                 switch (err) {
                 case FAIL_DB_WRITE_BATCH_PUT:
                 case FAIL_DB_WRITE_BATCH_DELETE:
diff --git a/src/tools/mutation_log_tool.cpp b/src/tools/mutation_log_tool.cpp
index ef367e746..10e5fda24 100644
--- a/src/tools/mutation_log_tool.cpp
+++ b/src/tools/mutation_log_tool.cpp
@@ -26,7 +26,6 @@
 
 #include "mutation_log_tool.h"
 
-#include <alloca.h>
 #include <memory>
 #include <vector>
 
@@ -41,6 +40,7 @@
 #include "replica/replica_stub.h"
 #include "rpc/rpc_message.h"
 #include "task/task_spec.h"
+#include "utils/alloc.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
 #include "utils/defer.h"
@@ -103,14 +103,13 @@ bool mutation_log_tool::dump(
                                   log_length,
                                   mu->data.updates.size());
             if (callback && !mu->data.updates.empty()) {
-                dsn::message_ex **batched_requests =
-                    (dsn::message_ex **)alloca(sizeof(dsn::message_ex *) * 
mu->data.updates.size());
+                auto **batched_requests = ALLOC_STACK(message_ex *, 
mu->data.updates.size());
                 int batched_count = 0;
                 for (mutation_update &update : mu->data.updates) {
                     dsn::message_ex *req = 
dsn::message_ex::create_received_request(
                         update.code,
-                        (dsn_msg_serialize_format)update.serialization_type,
-                        (void *)update.data.data(),
+                        
static_cast<dsn_msg_serialize_format>(update.serialization_type),
+                        update.data.data(),
                         update.data.length());
                     batched_requests[batched_count++] = req;
                 }
diff --git a/src/utils/alloc.h b/src/utils/alloc.h
index 16045b3b5..fdb799117 100644
--- a/src/utils/alloc.h
+++ b/src/utils/alloc.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <alloca.h>
 #include <algorithm> // IWYU pragma: keep
 #include <cstddef>
 #include <functional>
@@ -85,6 +86,11 @@ cacheline_aligned_ptr<T> 
cacheline_aligned_alloc_array(size_t len, const T &val)
     return array;
 }
 
+// Allocate space with `num` objects of `type` from stack by alloca() 
function. Since the
+// space is allocated within the stack frame of the caller, it could only be 
implemented
+// by a macro.
+#define ALLOC_STACK(type, num) static_cast<type *>(alloca(sizeof(type) * 
(num)))
+
 } // namespace dsn
 
 #endif // CACHELINE_SIZE


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to