This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 6249c13ae feat(make_idempotent): support making `incr` request
idempotent in `pegasus_server_write` and `replication_app_base` (#2196)
6249c13ae is described below
commit 6249c13aed066caa3415f8623a6a125235bfddbd
Author: Dan Wang <[email protected]>
AuthorDate: Fri Feb 21 19:32:49 2025 +0800
feat(make_idempotent): support making `incr` request idempotent in
`pegasus_server_write` and `replication_app_base` (#2196)
https://github.com/apache/incubator-pegasus/issues/2197
To support idempotence, a new interface `make_idempotent()` is introduced
and
an existing interface `on_batched_write_requests()` is changed for both the
classes
`pegasus_server_write` and `replication_app_base`. This is different from
what we
have done for `pegasus_write_service` and `pegasus_write_service::impl`,
both of
which provide `make_idempotent()` and `put()` by the following PRs:
- https://github.com/apache/incubator-pegasus/pull/2185
- https://github.com/apache/incubator-pegasus/pull/2192
`make_idempotent()` for `replication_app_base` is provided as a virtual
function
called by primary replicas, implemented internally by `pegasus_server_impl`
and
`pegasus_server_write`. `on_batched_write_requests` for
`replication_app_base`
is the same. It is changed with a new parameter `original_request` added.
It is just
the original request received from the client. It must be an atomic request
(i.e. `incr`,
`check_and_set` and `check_and_mutate`) if it is non-null, and used to
decide if a
write request is atomic and generate the response corresponding to the
atomic write
request.
---
.clang-tidy | 2 +-
build_tools/clang_tidy.py | 2 +
src/redis_protocol/proxy_ut/redis_proxy_test.cpp | 11 +-
src/replica/mutation.h | 5 +
src/replica/replication_app_base.cpp | 52 +++---
src/replica/replication_app_base.h | 53 ++++--
src/replica/storage/simple_kv/simple_kv.server.h | 17 +-
src/replica/test/mock_utils.h | 6 +-
src/rpc/rpc_holder.h | 6 +-
src/rpc/rpc_message.cpp | 8 +-
src/rpc/rpc_message.h | 16 +-
src/rpc/rpc_stream.h | 24 +--
src/rpc/test/rpc_message_test.cpp | 4 +-
src/server/pegasus_server_impl.cpp | 15 +-
src/server/pegasus_server_impl.h | 5 +-
src/server/pegasus_server_write.cpp | 197 ++++++++++++++++++-----
src/server/pegasus_server_write.h | 33 +++-
src/server/test/hotkey_collector_test.cpp | 20 ++-
src/server/test/pegasus_server_write_test.cpp | 4 +-
src/tools/mutation_log_tool.cpp | 9 +-
src/utils/alloc.h | 6 +
21 files changed, 360 insertions(+), 135 deletions(-)
diff --git a/.clang-tidy b/.clang-tidy
index 95dd7616d..deaca38ca 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -20,7 +20,7 @@
CheckOptions: []
# Disable some checks that are not useful for us now.
# They are sorted by names, and should be consistent to
build_tools/clang_tidy.py.
-Checks:
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines
[...]
+Checks:
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-bugprone-sizeof-expression,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-
[...]
ExtraArgs:
ExtraArgsBefore: []
FormatStyle: none
diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py
index 8a35e1a5d..b91593026 100755
--- a/build_tools/clang_tidy.py
+++ b/build_tools/clang_tidy.py
@@ -65,6 +65,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
"-checks=-bugprone-easily-swappable-parameters,"
"-bugprone-lambda-function-name,"
"-bugprone-macro-parentheses,"
+ "-bugprone-sizeof-expression,"
"-cert-err58-cpp,"
"-concurrency-mt-unsafe,"
"-cppcoreguidelines-avoid-c-arrays,"
@@ -78,6 +79,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
"-cppcoreguidelines-pro-type-const-cast,"
"-cppcoreguidelines-pro-type-union-access,"
"-fuchsia-default-arguments-calls,"
+ "-fuchsia-multiple-inheritance,"
"-fuchsia-overloaded-operator,"
"-fuchsia-statically-constructed-objects,"
"-google-readability-avoid-underscore-in-googletest-name,"
diff --git a/src/redis_protocol/proxy_ut/redis_proxy_test.cpp
b/src/redis_protocol/proxy_ut/redis_proxy_test.cpp
index dafa9cb79..d87267f52 100644
--- a/src/redis_protocol/proxy_ut/redis_proxy_test.cpp
+++ b/src/redis_protocol/proxy_ut/redis_proxy_test.cpp
@@ -112,16 +112,15 @@ public:
_reserved_entry[index]->request.sub_requests =
std::move(msg.sub_requests);
}
- static dsn::message_ex *create_message(const char *data)
+ static dsn::message_ex *create_message(const char *data, unsigned int
length)
{
return dsn::message_ex::create_received_request(
- RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, (void *)data,
strlen(data));
+ RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, data, length);
}
- static dsn::message_ex *create_message(const char *data, int length)
+ static dsn::message_ex *create_message(const char *data)
{
- return dsn::message_ex::create_received_request(
- RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, (void *)data,
length);
+ return create_message(data, strlen(data));
}
static dsn::message_ex *marshalling_array(const redis_request &request)
@@ -379,7 +378,7 @@ TEST_F(proxy_test, test_random_cases)
offsets.insert(total_body_size);
int last_offset = 0;
- for (int offset : offsets) {
+ for (auto offset : offsets) {
dsn::message_ex *msg =
redis_test_parser::create_message(msg_buffer_ptr + last_offset,
offset -
last_offset);
ASSERT_TRUE(parse(msg));
diff --git a/src/replica/mutation.h b/src/replica/mutation.h
index 757ce450e..b6b8ecd14 100644
--- a/src/replica/mutation.h
+++ b/src/replica/mutation.h
@@ -153,6 +153,11 @@ public:
// user requests
std::vector<dsn::message_ex *> client_requests;
+ // The original request received from the client. While making an atomic
request (incr,
+ // check_and_set and check_and_mutate) idempotent, an extra variable is
needed to hold
+ // its original request for the purpose of replying to the client.
+ dsn::message_ptr original_request;
+
// used by pending mutation queue only
mutation *next;
diff --git a/src/replica/replication_app_base.cpp
b/src/replica/replication_app_base.cpp
index 0c82513dc..dd4471158 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -24,7 +24,6 @@
* THE SOFTWARE.
*/
-#include <alloca.h>
#include <fmt/core.h>
#include <rocksdb/env.h>
#include <rocksdb/status.h>
@@ -47,6 +46,7 @@
#include "rpc/serialization.h"
#include "task/task_code.h"
#include "task/task_spec.h"
+#include "utils/alloc.h"
#include "utils/autoref_ptr.h"
#include "utils/binary_reader.h"
#include "utils/binary_writer.h"
@@ -264,7 +264,8 @@ error_code
replication_app_base::apply_checkpoint(chkpt_apply_mode mode, const l
int replication_app_base::on_batched_write_requests(int64_t decree,
uint64_t timestamp,
message_ex **requests,
- int request_length)
+ int request_length,
+ message_ex
*original_request)
{
int storage_error = rocksdb::Status::kOk;
for (int i = 0; i < request_length; ++i) {
@@ -292,37 +293,42 @@ error_code replication_app_base::apply_mutation(const
mutation *mu)
}
bool has_ingestion_request = false;
- int request_count = static_cast<int>(mu->client_requests.size());
- message_ex **batched_requests = (message_ex **)alloca(sizeof(message_ex *)
* request_count);
- message_ex **faked_requests = (message_ex **)alloca(sizeof(message_ex *) *
request_count);
+ const int request_count = static_cast<int>(mu->client_requests.size());
+ auto **batched_requests = ALLOC_STACK(message_ex *, request_count);
+ auto **faked_requests = ALLOC_STACK(message_ex *, request_count);
int batched_count = 0; // write-empties are not included.
int faked_count = 0;
- for (int i = 0; i < request_count; i++) {
+ for (int i = 0; i < request_count; ++i) {
const mutation_update &update = mu->data.updates[i];
- message_ex *req = mu->client_requests[i];
LOG_DEBUG_PREFIX("mutation {} #{}: dispatch rpc call {}", mu->name(),
i, update.code);
- if (update.code != RPC_REPLICATION_WRITE_EMPTY) {
- if (req == nullptr) {
- req = message_ex::create_received_request(
- update.code,
- (dsn_msg_serialize_format)update.serialization_type,
- (void *)update.data.data(),
- update.data.length());
- faked_requests[faked_count++] = req;
- }
+ if (update.code == RPC_REPLICATION_WRITE_EMPTY) {
+ continue;
+ }
- batched_requests[batched_count++] = req;
- if (update.code == apps::RPC_RRDB_RRDB_BULK_LOAD) {
- has_ingestion_request = true;
- }
+ message_ex *req = mu->client_requests[i];
+ if (req == nullptr) {
+ req = message_ex::create_received_request(
+ update.code,
+
static_cast<dsn_msg_serialize_format>(update.serialization_type),
+ update.data.data(),
+ update.data.length());
+ faked_requests[faked_count++] = req;
+ }
+
+ batched_requests[batched_count++] = req;
+ if (update.code == apps::RPC_RRDB_RRDB_BULK_LOAD) {
+ has_ingestion_request = true;
}
}
- int storage_error = on_batched_write_requests(
- mu->data.header.decree, mu->data.header.timestamp, batched_requests,
batched_count);
+ const int storage_error = on_batched_write_requests(mu->data.header.decree,
+
mu->data.header.timestamp,
+ batched_requests,
+ batched_count,
+ mu->original_request);
// release faked requests
- for (int i = 0; i < faked_count; i++) {
+ for (int i = 0; i < faked_count; ++i) {
faked_requests[i]->release_ref();
}
diff --git a/src/replica/replication_app_base.h
b/src/replica/replication_app_base.h
index 2a88618f6..6549c8d01 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -129,16 +129,16 @@ public:
static const std::string kDataDir;
static const std::string kRdbDir;
- virtual ~replication_app_base() {}
+ virtual ~replication_app_base() = default;
- bool is_primary() const;
+ [[nodiscard]] bool is_primary() const;
// Whether this replica is duplicating as master.
- virtual bool is_duplication_master() const;
+ [[nodiscard]] virtual bool is_duplication_master() const;
// Whether this replica is duplicating as follower.
- virtual bool is_duplication_follower() const;
+ [[nodiscard]] virtual bool is_duplication_follower() const;
- const ballot &get_ballot() const;
+ [[nodiscard]] const ballot &get_ballot() const;
//
// Open the app.
@@ -248,21 +248,50 @@ public:
// The return type is generated by storage engine, e.g.
rocksdb::Status::Code, 0 always mean OK.
virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0;
+ // Make an atomic request received from the client idempotent. Only called
by primary replicas.
+ //
+ // Current implementation for atomic requests (incr, check_and_set and
check_and_mutate) is
+ // not idempotent. This function is used to translate them into requests
like single put
+ // which is naturally idempotent.
+ //
+ // For the other requests which must be idempotent such as single
put/remove or non-batch
+ // writes, this function would do nothing.
//
// Parameters:
- // - timestamp: an incremental timestamp generated for this batch of
requests.
+ // - request: the original request received from a client.
+ // - new_request: as the output parameter pointing to the resulting
idempotent request if the
+ // original request is atomic, otherwise keeping unchanged.
//
- // The base class gives a naive implementation that just call on_request
- // repeatedly. Storage engine may override this function to get better
performance.
+ // Return:
+ // - for an idempotent requess always return rocksdb::Status::kOk .
+ // - for an atomic request, return rocksdb::Status::kOk if succeed in
making it idempotent;
+ // otherwise, return error code (rocksdb::Status::Code).
+ virtual int make_idempotent(dsn::message_ex *request, dsn::message_ex
**new_request) = 0;
+
+ // Apply batched write requests from a mutation. This is a virtual
function, and base class
+ // provide a naive implementation that just call on_request for each
request. Storage engine
+ // may override this function to get better performance.
//
- // The return type is generated by storage engine, e.g.
rocksdb::Status::Code, 0 always mean OK.
+ // Parameters:
+ // - decree: the decree of the mutation which these requests are batched
into.
+ // - timestamp: an incremental timestamp generated for this batch of
requests.
+ // - requests: the requests to be applied.
+ // - request_length: the number of the requests.
+ // - original_request: the original request received from the client.
Must be an atomic
+ // request (i.e. incr, check_and_set and check_and_mutate) if non-null,
and another
+ // parameter `requests` must hold the idempotent request translated from
it. Used to
+ // reply to the client.
+ //
+ // Return rocksdb::Status::kOk or some error code (rocksdb::Status::Code)
if these requests
+ // failed to be applied by storage engine.
virtual int on_batched_write_requests(int64_t decree,
uint64_t timestamp,
message_ex **requests,
- int request_length);
+ int request_length,
+ message_ex *original_request);
- // query compact state.
- virtual std::string query_compact_state() const = 0;
+ // Query compact state.
+ [[nodiscard]] virtual std::string query_compact_state() const = 0;
// update app envs.
virtual void update_app_envs(const std::map<std::string, std::string>
&envs) = 0;
diff --git a/src/replica/storage/simple_kv/simple_kv.server.h
b/src/replica/storage/simple_kv/simple_kv.server.h
index 6acfc85a0..1fc723cf2 100644
--- a/src/replica/storage/simple_kv/simple_kv.server.h
+++ b/src/replica/storage/simple_kv/simple_kv.server.h
@@ -39,14 +39,25 @@ namespace application {
class simple_kv_service : public replication_app_base, public
storage_serverlet<simple_kv_service>
{
public:
- simple_kv_service(replica *r) : replication_app_base(r) {}
- virtual ~simple_kv_service() {}
+ explicit simple_kv_service(replica *r) : replication_app_base(r) {}
+ ~simple_kv_service() override = default;
- virtual int on_request(dsn::message_ex *request) override
WARN_UNUSED_RESULT
+ simple_kv_service(const simple_kv_service &) = delete;
+ simple_kv_service &operator=(const simple_kv_service &) = delete;
+
+ simple_kv_service(simple_kv_service &&) = delete;
+ simple_kv_service &operator=(simple_kv_service &&) = delete;
+
+ int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT
{
return handle_request(request);
}
+ int make_idempotent(dsn::message_ex *request, dsn::message_ex
**new_request) override
+ {
+ return rocksdb::Status::kOk;
+ }
+
protected:
// all service handlers to be implemented further
// RPC_SIMPLE_KV_SIMPLE_KV_READ
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index 0ae2dd96a..e8a952810 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -78,7 +78,11 @@ public:
return ERR_OK;
}
int on_request(message_ex *request) override WARN_UNUSED_RESULT { return
0; }
- std::string query_compact_state() const { return ""; };
+ int make_idempotent(dsn::message_ex *request, dsn::message_ex
**new_request) override
+ {
+ return rocksdb::Status::kOk;
+ }
+ [[nodiscard]] std::string query_compact_state() const override { return
""; };
// we mock the followings
void update_app_envs(const std::map<std::string, std::string> &envs)
override { _envs = envs; }
diff --git a/src/rpc/rpc_holder.h b/src/rpc/rpc_holder.h
index 9d753354c..0ad39a21a 100644
--- a/src/rpc/rpc_holder.h
+++ b/src/rpc/rpc_holder.h
@@ -224,13 +224,17 @@ public:
dsn_rpc_forward(dsn_request(), addr);
}
+ inline void set_auto_reply(bool auto_reply) { _i->auto_reply = auto_reply;
}
+
+ inline void enable_auto_reply() { set_auto_reply(true); }
+
// Returns an rpc_holder that will reply the request after its lifetime
ends.
// By default rpc_holder never replies.
// SEE: serverlet<T>::register_rpc_handler_with_rpc_holder
static inline rpc_holder auto_reply(message_ex *req)
{
rpc_holder rpc(req);
- rpc._i->auto_reply = true;
+ rpc.enable_auto_reply();
return rpc;
}
diff --git a/src/rpc/rpc_message.cpp b/src/rpc/rpc_message.cpp
index 4c394f1b9..d28829af7 100644
--- a/src/rpc/rpc_message.cpp
+++ b/src/rpc/rpc_message.cpp
@@ -142,13 +142,13 @@ message_ex *message_ex::create_receive_message(const blob
&data)
message_ex *message_ex::create_received_request(dsn::task_code code,
dsn_msg_serialize_format
format,
- void *buffer,
- int size,
+ const char *buffer,
+ unsigned int size,
int thread_hash,
uint64_t partition_hash)
{
- ::dsn::blob bb((const char *)buffer, 0, size);
- auto msg =
::dsn::message_ex::create_receive_message_with_standalone_header(bb);
+ dsn::blob bb(buffer, 0, size);
+ auto *msg =
::dsn::message_ex::create_receive_message_with_standalone_header(bb);
msg->local_rpc_code = code;
const char *name = code.to_string();
strncpy(msg->header->rpc_name, name, sizeof(msg->header->rpc_name) - 1);
diff --git a/src/rpc/rpc_message.h b/src/rpc/rpc_message.h
index 9e1d73717..fbf7df17c 100644
--- a/src/rpc/rpc_message.h
+++ b/src/rpc/rpc_message.h
@@ -174,10 +174,18 @@ public:
static message_ex *create_received_request(dsn::task_code rpc_code,
dsn_msg_serialize_format format,
- void *buffer,
- int size,
- int thread_hash = 0,
- uint64_t partition_hash = 0);
+ const char *buffer,
+ unsigned int size,
+ int thread_hash,
+ uint64_t partition_hash);
+
+ static message_ex *create_received_request(dsn::task_code rpc_code,
+ dsn_msg_serialize_format format,
+ const char *buffer,
+ unsigned int size)
+ {
+ return create_received_request(rpc_code, format, buffer, size, 0, 0);
+ }
/// This method is only used for receiving request.
/// The returned message:
diff --git a/src/rpc/rpc_stream.h b/src/rpc/rpc_stream.h
index ffaa049e2..5d7faae3f 100644
--- a/src/rpc/rpc_stream.h
+++ b/src/rpc/rpc_stream.h
@@ -54,23 +54,27 @@ public:
void set_read_msg(message_ex *msg)
{
_msg = msg;
- if (nullptr != _msg) {
- ::dsn::blob bb;
- CHECK(((::dsn::message_ex *)_msg)->read_next(bb),
- "read msg must have one segment of buffer ready");
- init(std::move(bb));
+ if (_msg == nullptr) {
+ return;
}
+
+ dsn::blob bb;
+ CHECK(_msg->read_next(bb), "read msg must have one segment of buffer
ready");
+
+ init(std::move(bb));
}
- int read(char *buffer, int sz) { return inner_read(buffer, sz); }
+ int read(char *buffer, int sz) override { return inner_read(buffer, sz); }
- int read(blob &blob, int len) { return inner_read(blob, len); }
+ int read(blob &blob, int len) override { return inner_read(blob, len); }
- ~rpc_read_stream()
+ ~rpc_read_stream() override
{
- if (_msg) {
- _msg->read_commit((size_t)(total_size() - get_remaining_size()));
+ if (_msg == nullptr) {
+ return;
}
+
+ _msg->read_commit(static_cast<size_t>(total_size() -
get_remaining_size()));
}
private:
diff --git a/src/rpc/test/rpc_message_test.cpp
b/src/rpc/test/rpc_message_test.cpp
index 19bde1e2d..f8300e6f9 100644
--- a/src/rpc/test/rpc_message_test.cpp
+++ b/src/rpc/test/rpc_message_test.cpp
@@ -167,8 +167,8 @@ TEST(rpc_message_test, message_ex)
message_ex *receive = message_ex::create_received_request(
request->local_rpc_code,
-
(dsn_msg_serialize_format)request->header->context.u.serialize_format,
- (void *)request->buffers[1].data(),
+
static_cast<dsn_msg_serialize_format>(request->header->context.u.serialize_format),
+ request->buffers[1].data(),
request->buffers[1].size(),
request->header->client.thread_hash,
request->header->client.partition_hash);
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 827748efc..90e6ed5b9 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -350,15 +350,24 @@ void pegasus_server_impl::gc_checkpoints(bool
force_reserve_one)
max_d);
}
+int pegasus_server_impl::make_idempotent(dsn::message_ex *request,
dsn::message_ex **new_request)
+{
+ CHECK_TRUE(_is_open);
+
+ return _server_write->make_idempotent(request, new_request);
+}
+
int pegasus_server_impl::on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn::message_ex **requests,
- int count)
+ int count,
+ dsn::message_ex
*original_request)
{
- CHECK(_is_open, "");
+ CHECK_TRUE(_is_open);
CHECK_NOTNULL(requests, "");
- return _server_write->on_batched_write_requests(requests, count, decree,
timestamp);
+ return _server_write->on_batched_write_requests(
+ requests, count, decree, timestamp, original_request);
}
// Since LOG_ERROR_PREFIX depends on log_prefix(), this method could not be
declared as static or
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 01c37319d..b23898775 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -147,6 +147,8 @@ public:
// - ERR_FILE_OPERATION_FAILED
::dsn::error_code stop(bool clear_state) override;
+ int make_idempotent(dsn::message_ex *request, dsn::message_ex
**new_request) override;
+
/// Each of the write request (specifically, the rpc that's configured as
write, see
/// option `rpc_request_is_write_operation` in rDSN `task_spec`) will
first be
/// replicated to the replicas through the underlying PacificA protocol in
rDSN, and
@@ -157,7 +159,8 @@ public:
int on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn::message_ex **requests,
- int count) override;
+ int count,
+ dsn::message_ex *original_request) override;
::dsn::error_code prepare_get_checkpoint(dsn::blob &learn_req) override
{
diff --git a/src/server/pegasus_server_write.cpp
b/src/server/pegasus_server_write.cpp
index 77f679ee7..8cf7d1d2e 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -22,6 +22,7 @@
#include <thrift/transport/TTransportException.h>
#include <algorithm>
#include <string_view>
+#include <type_traits>
#include <utility>
#include "base/pegasus_key_schema.h"
@@ -35,6 +36,7 @@
#include "rpc/rpc_holder.h"
#include "rpc/rpc_message.h"
#include "rrdb/rrdb.code.definition.h"
+#include "runtime/message_utils.h"
#include "server/pegasus_write_service.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
@@ -57,13 +59,28 @@
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server)
_write_svc(new pegasus_write_service(server)),
METRIC_VAR_INIT_replica(corrupt_writes)
{
+ init_make_idempotent_handlers();
init_non_batch_write_handlers();
+ init_on_idempotent_handlers();
+}
+
+int pegasus_server_write::make_idempotent(dsn::message_ex *request,
dsn::message_ex **new_request)
+{
+ auto iter =
std::as_const(_make_idempotent_handlers).find(request->rpc_code());
+ if (iter != _make_idempotent_handlers.end()) {
+ return iter->second(request, new_request);
+ }
+
+ // Those requests not in the handlers are considered as idempotent. Always
be successful
+ // for them.
+ return rocksdb::Status::kOk;
}
int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
int count,
int64_t decree,
- uint64_t timestamp)
+ uint64_t timestamp,
+ dsn::message_ex
*original_request)
{
_write_ctx = db_write_context::create(decree, timestamp);
_decree = decree;
@@ -76,75 +93,122 @@ int
pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
}
try {
- auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
+ auto iter =
std::as_const(_non_batch_write_handlers).find(requests[0]->rpc_code());
if (iter != _non_batch_write_handlers.end()) {
CHECK_EQ(count, 1);
return iter->second(requests[0]);
}
} catch (TTransportException &ex) {
METRIC_VAR_INCREMENT(corrupt_writes);
- LOG_ERROR_PREFIX("pegasus not batch write handler failed, from = {},
exception = {}",
+ LOG_ERROR_PREFIX("pegasus non-batch write handler failed, from = {},
exception = {}",
requests[0]->header->from_address,
ex.what());
+
+ // The corrupt write is likely to be an attack or a scan for security.
Since it has
+ // been in plog, just return rocksdb::Status::kOk to ignore it.
+ // See https://github.com/apache/incubator-pegasus/pull/798.
return rocksdb::Status::kOk;
}
+ if (original_request != nullptr) {
+ // The request is regarded as idempotent once its original request is
attached.
+ CHECK_EQ(count, 1);
+ return on_idempotent(requests[0], original_request);
+ }
+
return on_batched_writes(requests, count);
}
+int pegasus_server_write::on_idempotent(dsn::message_ex *request,
dsn::message_ex *original_request)
+{
+ try {
+ auto iter =
std::as_const(_on_idempotent_handlers).find(request->rpc_code());
+ if (iter != _on_idempotent_handlers.end()) {
+ return iter->second(request, original_request);
+ }
+ } catch (TTransportException &ex) {
+ METRIC_VAR_INCREMENT(corrupt_writes);
+ LOG_ERROR_PREFIX("pegasus on idempotent handler failed, from = {},
exception = {}",
+ request->header->from_address,
+ ex.what());
+ // The corrupt write is likely to be an attack or a scan for security.
Since it has
+ // been in plog, just return rocksdb::Status::kOk to ignore it.
+ // See https://github.com/apache/incubator-pegasus/pull/798.
+ return rocksdb::Status::kOk;
+ }
+
+ CHECK(false, "unsupported idempotent write request: rpc_code={}",
request->rpc_code());
+ return rocksdb::Status::kNotSupported;
+}
+
void pegasus_server_write::set_default_ttl(uint32_t ttl) {
_write_svc->set_default_ttl(ttl); }
int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int
count)
{
+ _write_svc->batch_prepare(_decree);
+
int err = rocksdb::Status::kOk;
- {
- _write_svc->batch_prepare(_decree);
-
- for (int i = 0; i < count; ++i) {
- CHECK_NOTNULL(requests[i], "request[{}] is null", i);
-
- // Make sure all writes are batched even if they are failed,
- // since we need to record the total qps and rpc latencies,
- // and respond for all RPCs regardless of their result.
- int local_err = rocksdb::Status::kOk;
- try {
- dsn::task_code rpc_code(requests[i]->rpc_code());
- if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
- auto rpc = put_rpc::auto_reply(requests[i]);
- local_err = on_single_put_in_batch(rpc);
- _put_rpc_batch.emplace_back(std::move(rpc));
- } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
- auto rpc = remove_rpc::auto_reply(requests[i]);
- local_err = on_single_remove_in_batch(rpc);
- _remove_rpc_batch.emplace_back(std::move(rpc));
- } else {
- if (_non_batch_write_handlers.find(rpc_code) !=
- _non_batch_write_handlers.end()) {
- LOG_FATAL("rpc code not allow batch: {}", rpc_code);
- } else {
- LOG_FATAL("rpc code not handled: {}", rpc_code);
- }
+ for (int i = 0; i < count; ++i) {
+ CHECK_NOTNULL(requests[i], "request[{}] is null", i);
+
+ // Make sure all writes are batched even if some of them failed, since
we need to record
+ // the total QPS and RPC latencies, and respond for all RPCs
regardless of their result.
+ int local_err = rocksdb::Status::kOk;
+ try {
+ dsn::task_code rpc_code(requests[i]->rpc_code());
+ if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
+ // Once this single-put request is found originating from an
atomic request,
+ // there's no need to reply to the client since now we must be
in one of the
+ // following situations:
+ // - now we are replaying plog into RocksDB at startup of this
replica.
+ // - now we are in a secondary replica: just received a
prepare request and
+ // appended it to plog, now we are applying it into RocksDB.
+ auto rpc = put_rpc(requests[i]);
+ const auto &update = rpc.request();
+ if (!update.__isset.type || update.type ==
dsn::apps::update_type::UT_PUT) {
+ // We must reply to the client for the plain single-put
request.
+ rpc.enable_auto_reply();
+ } else if (update.type == dsn::apps::update_type::UT_INCR) {
+ // This put request must originate from an incr request
and never be
+ // batched in plog.
+ CHECK_EQ(count, 1);
}
- } catch (TTransportException &ex) {
- METRIC_VAR_INCREMENT(corrupt_writes);
- LOG_ERROR_PREFIX("pegasus batch writes handler failed, from =
{}, exception = {}",
- requests[i]->header->from_address,
- ex.what());
- }
- if (err == rocksdb::Status::kOk && local_err !=
rocksdb::Status::kOk) {
- err = local_err;
+ local_err = on_single_put_in_batch(rpc);
+ _put_rpc_batch.emplace_back(std::move(rpc));
+ } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
+ auto rpc = remove_rpc::auto_reply(requests[i]);
+ local_err = on_single_remove_in_batch(rpc);
+ _remove_rpc_batch.emplace_back(std::move(rpc));
+ } else {
+ if (_non_batch_write_handlers.find(rpc_code) !=
_non_batch_write_handlers.end()) {
+ LOG_FATAL("rpc code not allow batch: {}", rpc_code);
+ } else {
+ LOG_FATAL("rpc code not handled: {}", rpc_code);
+ }
}
+ } catch (TTransportException &ex) {
+ METRIC_VAR_INCREMENT(corrupt_writes);
+ LOG_ERROR_PREFIX("pegasus batch writes handler failed, from = {},
exception = {}",
+ requests[i]->header->from_address,
+ ex.what());
+ // The corrupt write is likely to be an attack or a scan for
security. Since it has
+ // been in plog, just ignore it.
+ // See https://github.com/apache/incubator-pegasus/pull/798.
}
- if (dsn_unlikely(err != rocksdb::Status::kOk ||
- (_put_rpc_batch.empty() &&
_remove_rpc_batch.empty()))) {
- _write_svc->batch_abort(_decree, err == rocksdb::Status::kOk ? -1
: err);
- } else {
- err = _write_svc->batch_commit(_decree);
+ if (err == rocksdb::Status::kOk && local_err != rocksdb::Status::kOk) {
+ err = local_err;
}
}
+ if (dsn_unlikely(err != rocksdb::Status::kOk ||
+ (_put_rpc_batch.empty() && _remove_rpc_batch.empty()))) {
+ _write_svc->batch_abort(_decree, err == rocksdb::Status::kOk ? -1 :
err);
+ } else {
+ err = _write_svc->batch_commit(_decree);
+ }
+
// reply the batched RPCs
_put_rpc_batch.clear();
_remove_rpc_batch.clear();
@@ -177,6 +241,29 @@ void pegasus_server_write::request_key_check(int64_t
decree,
}
}
+void pegasus_server_write::init_make_idempotent_handlers()
+{
+ _make_idempotent_handlers = {
+ {dsn::apps::RPC_RRDB_RRDB_INCR,
+ [this](dsn::message_ex *request, dsn::message_ex **new_request) ->
int {
+ auto rpc = incr_rpc(request);
+ dsn::apps::update_request update;
+ // Translate an incr request into a single-put request.
+ const auto err = _write_svc->make_idempotent(rpc.request(),
rpc.response(), update);
+ if (dsn_likely(err == rocksdb::Status::kOk)) {
+ // Build the message based on the resulting put request.
+ *new_request = dsn::from_thrift_request_to_received_message(
+ update, dsn::apps::RPC_RRDB_RRDB_PUT);
+ } else {
+ // Once it failed, just reply to the client with error
immediately.
+ rpc.enable_auto_reply();
+ }
+
+ return err;
+ }},
+ };
+}
+
void pegasus_server_write::init_non_batch_write_handlers()
{
_non_batch_write_handlers = {
@@ -217,5 +304,29 @@ void pegasus_server_write::init_non_batch_write_handlers()
}},
};
}
+
+void pegasus_server_write::init_on_idempotent_handlers()
+{
+ _on_idempotent_handlers = {
+ {dsn::apps::RPC_RRDB_RRDB_PUT,
+ [this](dsn::message_ex *request, dsn::message_ex *original_request)
-> int {
+ auto put = put_rpc(request);
+
+ const auto &update = put.request();
+ CHECK(update.__isset.type, "update_request::type is not set for
idempotent write");
+
+ if (update.type == dsn::apps::update_type::UT_INCR) {
+ auto incr = incr_rpc::auto_reply(original_request);
+ return _write_svc->put(_write_ctx, update, incr.response());
+ }
+
+ CHECK(false,
+ "unsupported update_request::type for idempotent write {}",
+ static_cast<int>(update.type));
+ return rocksdb::Status::kNotSupported;
+ }},
+ };
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/pegasus_server_write.h
b/src/server/pegasus_server_write.h
index d2be2d5a0..c0481d5f0 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -45,8 +45,13 @@ class pegasus_server_impl;
class pegasus_server_write : public dsn::replication::replica_base
{
public:
- pegasus_server_write(pegasus_server_impl *server);
+ explicit pegasus_server_write(pegasus_server_impl *server);
+ // See replication_app_base::make_idempotent() for details.
+ int make_idempotent(dsn::message_ex *request, dsn::message_ex
**new_request);
+
+ // See replication_app_base::on_batched_write_requests() for details.
+ //
/// \return error code returned by rocksdb, i.e rocksdb::Status::code.
/// **NOTE**
/// Error returned is regarded as the failure of replica, thus will trigger
@@ -58,12 +63,16 @@ public:
int on_batched_write_requests(dsn::message_ex **requests,
int count,
int64_t decree,
- uint64_t timestamp);
+ uint64_t timestamp,
+ dsn::message_ex *original_request);
void set_default_ttl(uint32_t ttl);
private:
- /// Delay replying for the batched requests until all of them complete.
+ // Apply the idempotent request and respond to its original request.
+ int on_idempotent(dsn::message_ex *request, dsn::message_ex
*original_request);
+
+ // Delay replying for the batched requests until all of them complete.
int on_batched_writes(dsn::message_ex **requests, int count);
int on_single_put_in_batch(put_rpc &rpc)
@@ -84,8 +93,9 @@ private:
// In verbose mode it will log for every request.
void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob
&key);
-private:
+ void init_make_idempotent_handlers();
void init_non_batch_write_handlers();
+ void init_on_idempotent_handlers();
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
@@ -99,8 +109,19 @@ private:
db_write_context _write_ctx;
int64_t _decree;
- typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>>
non_batch_writes_map;
- non_batch_writes_map _non_batch_write_handlers;
+ // Handlers that make an atomic request idempotent.
+ using make_idempotent_map =
+ std::map<dsn::task_code, std::function<int(dsn::message_ex *,
dsn::message_ex **)>>;
+ make_idempotent_map _make_idempotent_handlers;
+
+ // Handlers that process a request could not be batched, e.g. multi
put/remove.
+ using non_batch_write_map = std::map<dsn::task_code,
std::function<int(dsn::message_ex *)>>;
+ non_batch_write_map _non_batch_write_handlers;
+
+ // Handlers that apply the idempotent request and respond to its original
request.
+ using on_idempotent_map =
+ std::map<dsn::task_code, std::function<int(dsn::message_ex *,
dsn::message_ex *)>>;
+ on_idempotent_map _on_idempotent_handlers;
METRIC_VAR_DECLARE_counter(corrupt_writes);
};
diff --git a/src/server/test/hotkey_collector_test.cpp
b/src/server/test/hotkey_collector_test.cpp
index 7219c5ca0..688efcb02 100644
--- a/src/server/test/hotkey_collector_test.cpp
+++ b/src/server/test/hotkey_collector_test.cpp
@@ -18,9 +18,12 @@
#include "server/hotkey_collector.h"
#include <fmt/core.h>
+#include <algorithm>
#include <chrono>
+#include <cstddef>
#include <string_view>
#include <thread>
+#include <vector>
#include "base/pegasus_key_schema.h"
#include "common/gpid.h"
@@ -257,7 +260,7 @@ public:
_server->on_detect_hotkey(req, resp);
}
- get_rpc generate_get_rpc(std::string hash_key)
+ static get_rpc generate_get_rpc(const std::string &hash_key)
{
dsn::blob raw_key;
pegasus_generate_key(raw_key, hash_key, std::string("sortkey"));
@@ -265,7 +268,7 @@ public:
return rpc;
}
- dsn::apps::update_request generate_set_req(std::string hash_key)
+ static dsn::apps::update_request generate_set_req(const std::string
&hash_key)
{
dsn::apps::update_request req;
dsn::blob raw_key;
@@ -377,14 +380,15 @@ TEST_P(hotkey_collector_test, data_completeness)
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
- const uint16_t WRITE_REQUEST_COUNT = 1000;
- dsn::message_ex *writes[WRITE_REQUEST_COUNT];
- for (int i = 0; i < WRITE_REQUEST_COUNT; i++) {
- writes[i] = create_put_request(generate_set_req(std::to_string(i)));
+ static const size_t kWriteRequestCount = 1000;
+ std::vector<dsn::message_ex *> writes;
+ writes.reserve(kWriteRequestCount);
+ for (size_t i = 0; i < kWriteRequestCount; ++i) {
+
writes.push_back(create_put_request(generate_set_req(std::to_string(i))));
}
- _server->on_batched_write_requests(int64_t(0), uint64_t(0), writes,
WRITE_REQUEST_COUNT);
+ _server->on_batched_write_requests(0, 0, writes.data(),
kWriteRequestCount, nullptr);
- for (int i = 0; i < WRITE_REQUEST_COUNT; i++) {
+ for (size_t i = 0; i < kWriteRequestCount; ++i) {
auto rpc = generate_get_rpc(std::to_string(i));
_server->on_get(rpc);
auto value = rpc.response().value.to_string();
diff --git a/src/server/test/pegasus_server_write_test.cpp
b/src/server/test/pegasus_server_write_test.cpp
index 72d21eb6a..4b7c6fc84 100644
--- a/src/server/test/pegasus_server_write_test.cpp
+++ b/src/server/test/pegasus_server_write_test.cpp
@@ -91,8 +91,8 @@ public:
writes[i] = pegasus::create_remove_request(key);
}
- int err =
- _server_write->on_batched_write_requests(writes,
total_rpc_cnt, decree, 0);
+ int err = _server_write->on_batched_write_requests(
+ writes, total_rpc_cnt, decree, 0, nullptr);
switch (err) {
case FAIL_DB_WRITE_BATCH_PUT:
case FAIL_DB_WRITE_BATCH_DELETE:
diff --git a/src/tools/mutation_log_tool.cpp b/src/tools/mutation_log_tool.cpp
index ef367e746..10e5fda24 100644
--- a/src/tools/mutation_log_tool.cpp
+++ b/src/tools/mutation_log_tool.cpp
@@ -26,7 +26,6 @@
#include "mutation_log_tool.h"
-#include <alloca.h>
#include <memory>
#include <vector>
@@ -41,6 +40,7 @@
#include "replica/replica_stub.h"
#include "rpc/rpc_message.h"
#include "task/task_spec.h"
+#include "utils/alloc.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/defer.h"
@@ -103,14 +103,13 @@ bool mutation_log_tool::dump(
log_length,
mu->data.updates.size());
if (callback && !mu->data.updates.empty()) {
- dsn::message_ex **batched_requests =
- (dsn::message_ex **)alloca(sizeof(dsn::message_ex *) *
mu->data.updates.size());
+ auto **batched_requests = ALLOC_STACK(message_ex *,
mu->data.updates.size());
int batched_count = 0;
for (mutation_update &update : mu->data.updates) {
dsn::message_ex *req =
dsn::message_ex::create_received_request(
update.code,
- (dsn_msg_serialize_format)update.serialization_type,
- (void *)update.data.data(),
+
static_cast<dsn_msg_serialize_format>(update.serialization_type),
+ update.data.data(),
update.data.length());
batched_requests[batched_count++] = req;
}
diff --git a/src/utils/alloc.h b/src/utils/alloc.h
index 16045b3b5..fdb799117 100644
--- a/src/utils/alloc.h
+++ b/src/utils/alloc.h
@@ -17,6 +17,7 @@
#pragma once
+#include <alloca.h>
#include <algorithm> // IWYU pragma: keep
#include <cstddef>
#include <functional>
@@ -85,6 +86,11 @@ cacheline_aligned_ptr<T>
cacheline_aligned_alloc_array(size_t len, const T &val)
return array;
}
+// Allocate space with `num` objects of `type` from stack by alloca()
function. Since the
+// space is allocated within the stack frame of the caller, it could only be
implemented
+// by a macro.
+#define ALLOC_STACK(type, num) static_cast<type *>(alloca(sizeof(type) *
(num)))
+
} // namespace dsn
#endif // CACHELINE_SIZE
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]