This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new d4693fd2b perf(make_idempotent): improve performance of idempotent
writes by reducing deserialization and removing the mapping table for atomic
write handlers (#2261)
d4693fd2b is described below
commit d4693fd2b2a3d7cd3e52aab5c8c4ee44c06c1292
Author: Dan Wang <[email protected]>
AuthorDate: Tue Jul 1 18:59:59 2025 +0800
perf(make_idempotent): improve performance of idempotent writes by reducing
deserialization and removing the mapping table for atomic write handlers (#2261)
https://github.com/apache/incubator-pegasus/issues/2197
Introduce `idempotent_writer` class to the primary replica to cache the
idempotent
single-update requests generated during the `make_idempotent` phase as well
as
the original atomic write RPC request from the client to improve
performance, sinceļ¼
1. both deserializations for single-update requests and the original atomic
write RPC
request are reduced;
2. use `std::variant` to hold the applying function and the RPC for each
atomic write
instead of the mapping table for its handler.
---
.../duplication/test/replica_duplicator_test.cpp | 17 +-
src/replica/idempotent_writer.h | 136 ++++++++++++++++
src/replica/mutation.h | 10 +-
src/replica/replica.cpp | 4 +-
src/replica/replica.h | 16 +-
src/replica/replica_2pc.cpp | 17 +-
src/replica/replication_app_base.cpp | 5 +-
src/replica/replication_app_base.h | 25 +--
src/replica/storage/simple_kv/simple_kv.server.h | 3 +-
src/replica/test/mock_utils.h | 3 +-
src/rpc/rpc_holder.h | 11 +-
src/server/pegasus_server_impl.cpp | 10 +-
src/server/pegasus_server_impl.h | 6 +-
src/server/pegasus_server_write.cpp | 176 +++------------------
src/server/pegasus_server_write.h | 66 +++-----
src/server/pegasus_write_service_impl.h | 12 +-
16 files changed, 256 insertions(+), 261 deletions(-)
diff --git a/src/replica/duplication/test/replica_duplicator_test.cpp
b/src/replica/duplication/test/replica_duplicator_test.cpp
index fc07be00a..37b70f0ae 100644
--- a/src/replica/duplication/test/replica_duplicator_test.cpp
+++ b/src/replica/duplication/test/replica_duplicator_test.cpp
@@ -33,23 +33,11 @@
#include "replica/mutation_log.h"
#include "replica/test/mock_utils.h"
#include "runtime/pipeline.h"
-#include "task/task_code.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/errors.h"
-#include "utils/threadpool_code.h"
-namespace dsn {
-namespace apps {
-
-// for loading PUT mutations from log file.
-DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_PUT, TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT);
-
-} // namespace apps
-} // namespace dsn
-
-namespace dsn {
-namespace replication {
+namespace dsn::replication {
class replica_duplicator_test : public duplication_test_base
{
@@ -223,5 +211,4 @@ TEST_P(replica_duplicator_test, prepare_dup)
ASSERT_EQ(100, last_durable_decree());
}
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
diff --git a/src/replica/idempotent_writer.h b/src/replica/idempotent_writer.h
new file mode 100644
index 000000000..59b47db2e
--- /dev/null
+++ b/src/replica/idempotent_writer.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <variant>
+#include <vector>
+
+#include "base/pegasus_rpc_types.h"
+#include "rpc/rpc_message.h"
+#include "rrdb/rrdb_types.h"
+#include "utils/ports.h"
+
+namespace pegasus {
+
+// The `idempotent_writer` class is used by the primary replica to cache the
idempotent
+// single-update requests generated during the "make_idempotent" phase, as
well as the original
+// atomic write RPC request from the client. Later, during the 2PC process, it
can directly
+// apply the cached single-update requests to the storage engine and
automatically respond to
+// the client based on the cached atomic write RPC request.
+//
+// With `idempotent_writer`, the extra and unnecessary deserialization of the
idempotent write
+// requests and the original atomic write request can be avoided.
+class idempotent_writer
+{
+public:
+ template <typename TRpcHolder>
+ using apply_func_t =
+ std::function<int(const std::vector<dsn::apps::update_request> &,
const TRpcHolder &)>;
+
+ // Parameters for the constructor:
+ // - original_rpc: the RPC holder with the deserialized original request,
restricted to the
+ // atomic write RPC requests (i.e. incr, check_and_set or
check_and_mutate).
+ // - apply_func: the user-defined function that applies the idempotent
single-update requests
+ // and builds the response to the client.
+ // - updates: the idempotent single-update requests translated from the
original atomic write
+ // request.
+ template <typename TRpcHolder,
+ std::enable_if_t<std::disjunction_v<std::is_same<TRpcHolder,
incr_rpc>,
+ std::is_same<TRpcHolder,
check_and_set_rpc>,
+ std::is_same<TRpcHolder,
check_and_mutate_rpc>>,
+ int> = 0>
+ idempotent_writer(TRpcHolder &&original_rpc,
+ apply_func_t<TRpcHolder> &&apply_func,
+ std::vector<dsn::apps::update_request> &&updates)
+ : _apply_runner(std::in_place_type<apply_runner<TRpcHolder>>,
+ std::forward<TRpcHolder>(original_rpc),
+ std::move(apply_func)),
+ _updates(std::move(updates))
+ {
+ }
+
+ ~idempotent_writer() = default;
+
+ // Return the serialized message of the original RPC request, which should
never be null.
+ [[nodiscard]] dsn::message_ex *request() const
+ {
+ return std::visit([](auto &runner) { return runner.rpc.dsn_request();
}, _apply_runner);
+ }
+
+ // Apply single-update requests to the storage engine, and automatically
respond to the
+ // client -- it won't respond until the internal holder of the RPC is
destructed. Return
+ // rocksdb::Status::kOk if succeed, otherwise some error code
(rocksdb::Status::Code).
+ [[nodiscard]] int apply() const
+ {
+ return std::visit(
+ [this](auto &runner) {
+ // Enable automatic reply to the client no matter whether it
would succeed
+ // or some error might occur.
+ runner.rpc.enable_auto_reply();
+
+ return runner.func(_updates, runner.rpc);
+ },
+ _apply_runner);
+ }
+
+private:
+ template <typename TRpcHolder>
+ struct apply_runner
+ {
+ apply_runner(TRpcHolder &&original_rpc, apply_func_t<TRpcHolder>
&&apply_func)
+ : rpc(std::forward<TRpcHolder>(original_rpc)),
func(std::move(apply_func))
+ {
+ // Disable automatic reply to make sure we won't respond to the
client until we
+ // are ready to apply idempotent single-update requests to the
storage engine,
+ // because:
+ // 1. Automatic reply only makes sense after the response is
ready, so it should
+ // be enabled right before applying to the storage engine.
+ // 2. Before applying to the storage engine, an error might occur
and the client
+ // could be proactively replied to externally (e.g., via
reply_with_error()),
+ // so automatic reply needs to be disabled during this stage.
+ rpc.disable_auto_reply();
+ }
+
+ // Holds the original RPC request and the response to it.
+ const TRpcHolder rpc;
+
+ // The user-defined function that applies idempotent single-update
requests to the
+ // storage engine.
+ const apply_func_t<TRpcHolder> func;
+ };
+
+ using apply_runner_t = std::variant<apply_runner<incr_rpc>,
+ apply_runner<check_and_set_rpc>,
+ apply_runner<check_and_mutate_rpc>>;
+
+ const apply_runner_t _apply_runner;
+
+ // The idempotent single-update requests that are translated from the
original atomic
+ // write request.
+ const std::vector<dsn::apps::update_request> _updates;
+
+ DISALLOW_COPY_AND_ASSIGN(idempotent_writer);
+ DISALLOW_MOVE_AND_ASSIGN(idempotent_writer);
+};
+
+using idempotent_writer_ptr = std::unique_ptr<idempotent_writer>;
+
+} // namespace pegasus
diff --git a/src/replica/mutation.h b/src/replica/mutation.h
index e88a90dce..7246a5e66 100644
--- a/src/replica/mutation.h
+++ b/src/replica/mutation.h
@@ -42,6 +42,7 @@
#include "common/replication_common.h"
#include "common/replication_other_types.h"
#include "consensus_types.h"
+#include "replica/idempotent_writer.h"
#include "rpc/rpc_message.h"
#include "runtime/api_layer1.h"
#include "task/task.h"
@@ -195,10 +196,11 @@ public:
// This field is only used by primary replicas.
bool is_blocking_candidate{false};
- // The original request received from the client. While making an atomic
request (incr,
- // check_and_set and check_and_mutate) idempotent, an extra variable is
needed to hold
- // its original request for the purpose of replying to the client.
- dsn::message_ptr original_request;
+ // While an atomic write request (i.e. incr, check_and_set or
check_and_mutate) is received
+ // and required to be idempotent, a data structure is needed to hold the
original RPC and
+ // the idempotent single-update requests until all idempotent requests
have been applied to
+ // the storage engine and the client has been responded to.
+ pegasus::idempotent_writer_ptr idem_writer;
std::shared_ptr<dsn::utils::latency_tracer> _tracer;
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index a775b02e2..7b16fa2da 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -583,10 +583,10 @@ mutation_ptr replica::new_mutation(decree d, bool
is_blocking_candidate)
return mu;
}
-mutation_ptr replica::new_mutation(decree d, dsn::message_ex *original_request)
+mutation_ptr replica::new_mutation(decree d, pegasus::idempotent_writer_ptr
&&idem_writer)
{
auto mu = new_mutation(d);
- mu->original_request = original_request;
+ mu->idem_writer = std::move(idem_writer);
return mu;
}
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 50e0105c2..1f5e8dae2 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -48,6 +48,7 @@
#include "prepare_list.h"
#include "ranger/access_type.h"
#include "replica/backup/cold_backup_context.h"
+#include "replica/idempotent_writer.h"
#include "replica/replica_base.h"
#include "replica_context.h"
#include "rpc/rpc_message.h"
@@ -165,9 +166,6 @@ class replica : public serverlet<replica>, public
ref_counter, public replica_ba
public:
~replica() override;
- DISALLOW_COPY_AND_ASSIGN(replica);
- DISALLOW_MOVE_AND_ASSIGN(replica);
-
// return true when the mutation is valid for the current replica
bool replay_mutation(mutation_ptr &mu, bool is_private);
void reset_prepare_list_after_replay();
@@ -393,15 +391,16 @@ private:
// Return the newly created mutation.
mutation_ptr new_mutation(decree d, bool is_blocking_candidate);
- // Create a new mutation with specified decree and the original atomic
write request,
- // which is used to build the response to the client.
+ // Create a new mutation with specified decree and idempotent writer.
//
// Parameters:
// - d: invalid_decree, or the real decree assigned to this mutation.
- // - original_request: the original request of the atomic write.
+ // - idem_writer: the data structure that applies the idempotent requests
to the storage
+ // engine and automatically responds to the atomic write request (i.e.
incr, check_and_set
+ // or check_and_mutate).
//
// Return the newly created mutation.
- mutation_ptr new_mutation(decree d, dsn::message_ex *original_request);
+ mutation_ptr new_mutation(decree d, pegasus::idempotent_writer_ptr
&&idem_writer);
// initialization
replica(replica_stub *stub,
@@ -904,6 +903,9 @@ private:
bool _allow_ingest_behind{false};
// Indicate where the storage engine data is corrupted and unrecoverable.
bool _data_corrupted{false};
+
+ DISALLOW_COPY_AND_ASSIGN(replica);
+ DISALLOW_MOVE_AND_ASSIGN(replica);
};
using replica_ptr = dsn::ref_ptr<replica>;
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index fefcded06..5d8dc5adb 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -52,6 +52,7 @@
#include "mutation_log.h"
#include "ranger/access_type.h"
#include "replica.h"
+#include "replica/idempotent_writer.h"
#include "replica/prepare_list.h"
#include "replica/replica_context.h"
#include "replica/replication_app_base.h"
@@ -294,7 +295,8 @@ int replica::make_idempotent(mutation_ptr &mu)
mu->client_requests.size(), 1, "the original atomic write request must
not be batched");
std::vector<dsn::message_ex *> new_requests;
- const int err = _app->make_idempotent(request, new_requests);
+ pegasus::idempotent_writer_ptr idem_writer;
+ const int err = _app->make_idempotent(request, new_requests, idem_writer);
// When the condition checks of `check_and_set` and `check_and_mutate`
fail, make_idempotent()
// would return rocksdb::Status::kTryAgain. Therefore, there is still a
certain probability
@@ -310,18 +312,13 @@ int replica::make_idempotent(mutation_ptr &mu)
"new_requests should not be empty since its original write request
must be atomic "
"and translated into at least one idempotent request");
- // During make_idempotent(), the request has been deserialized (i.e.
unmarshall() in the
- // constructor of `rpc_holder::internal`). Once deserialize it again,
assertion would fail for
- // set_read_msg() in the constructor of `rpc_read_stream`.
- //
- // To make it deserializable again to be applied into RocksDB, restore
read for it.
- request->restore_read();
+ CHECK_PREFIX_MSG(idem_writer, "idem_writer should not be empty");
CHECK_EQ_PREFIX_MSG(mu->get_decree(), invalid_decree, "the decree must
have not been assigned");
// Create a new mutation to hold the new idempotent requests. The old
mutation holding the
// original atomic write request will be released automatically.
- mu = new_mutation(invalid_decree, request);
+ mu = new_mutation(invalid_decree, std::move(idem_writer));
for (dsn::message_ex *new_request : new_requests) {
mu->add_client_request(new_request);
}
@@ -471,8 +468,8 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
void replica::reply_with_error(const mutation_ptr &mu, const error_code &err)
{
// Respond to the original atomic request if it is non-null. And it could
never be batched.
- if (mu->original_request != nullptr) {
- response_client_write(mu->original_request, err);
+ if (mu->idem_writer) {
+ response_client_write(mu->idem_writer->request(), err);
return;
}
diff --git a/src/replica/replication_app_base.cpp
b/src/replica/replication_app_base.cpp
index 40f043dd9..fd871fbd6 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -40,6 +40,7 @@
#include "consensus_types.h"
#include "dsn.layer2_types.h"
#include "replica.h"
+#include "replica/idempotent_writer.h"
#include "replica/mutation.h"
#include "replica/replication_app_base.h"
#include "rpc/rpc_message.h"
@@ -266,7 +267,7 @@ int replication_app_base::on_batched_write_requests(int64_t
decree,
uint64_t timestamp,
message_ex **requests,
uint32_t count,
- message_ex
*original_request)
+
pegasus::idempotent_writer_ptr &&idem_writer)
{
int storage_error = rocksdb::Status::kOk;
for (uint32_t i = 0; i < count; ++i) {
@@ -329,7 +330,7 @@ error_code replication_app_base::apply_mutation(const
mutation_ptr &mu)
mu->data.header.timestamp,
batched_requests,
batched_count,
- mu->original_request);
+
std::move(mu->idem_writer));
// release faked requests
for (uint32_t i = 0; i < faked_count; ++i) {
diff --git a/src/replica/replication_app_base.h
b/src/replica/replication_app_base.h
index 329a5e972..5842e66c0 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -37,6 +37,7 @@
#include "common/replication_other_types.h"
#include "metadata_types.h"
#include "mutation.h"
+#include "replica/idempotent_writer.h"
#include "replica/replica_base.h"
#include "replica_admin_types.h"
#include "utils/error_code.h"
@@ -262,27 +263,31 @@ public:
// - request: the original request received from a client.
// - new_requests: the output parameter, holding the resulting idempotent
requests if the
// original request is atomic, otherwise keeping unchanged.
+ // - idem_writer: the output parameter, holding the resulting idempotent
writer.
//
// Return:
// - for an idempotent requess always return rocksdb::Status::kOk .
// - for an atomic request, return rocksdb::Status::kOk if succeed in
making it idempotent;
// otherwise, return error code (rocksdb::Status::Code).
virtual int make_idempotent(dsn::message_ex *request,
- std::vector<dsn::message_ex *> &new_requests)
= 0;
+ std::vector<dsn::message_ex *> &new_requests,
+ pegasus::idempotent_writer_ptr &idem_writer) =
0;
// Apply batched write requests from a mutation. This is a virtual
function, and base class
// provide a naive implementation that just call on_request for each
request. Storage engine
// may override this function to get better performance.
//
// Parameters:
- // - decree: the decree of the mutation which these requests are batched
into.
- // - timestamp: an incremental timestamp generated for this batch of
requests.
- // - requests: the requests to be applied.
- // - count: the number of the requests.
- // - original_request: the original request received from the client.
Must be an atomic
- // request (i.e. incr, check_and_set and check_and_mutate) if non-null,
and another
- // parameter `requests` must hold the idempotent request translated from
it. Used to
- // reply to the client.
+ // - decree: the decree of the mutation which these requests are batched
into.
+ // - timestamp: an incremental timestamp generated for this batch of
requests.
+ // - requests: the requests to be applied.
+ // - count: the number of the requests.
+ // - idem_writer: the original request received from the client. Must be
an atomic
+ // request (i.e. incr, check_and_set and check_and_mutate) if non-null
+ // - idem_writer: if non-null, used to apply the idempotent requests to
the storage engine
+ // and automatically responds to the atomic write request (i.e. incr,
check_and_set or
+ // check_and_mutate), with the parameter `requests` holding the serialized
idempotent
+ // requests and the parameter `count` recording the number.
//
// Return rocksdb::Status::kOk or some error code (rocksdb::Status::Code)
if these requests
// failed to be applied by storage engine.
@@ -290,7 +295,7 @@ public:
uint64_t timestamp,
message_ex **requests,
uint32_t count,
- message_ex *original_request);
+ pegasus::idempotent_writer_ptr
&&idem_writer);
// Query compact state.
[[nodiscard]] virtual std::string query_compact_state() const = 0;
diff --git a/src/replica/storage/simple_kv/simple_kv.server.h
b/src/replica/storage/simple_kv/simple_kv.server.h
index c34aa0e81..8ec386250 100644
--- a/src/replica/storage/simple_kv/simple_kv.server.h
+++ b/src/replica/storage/simple_kv/simple_kv.server.h
@@ -54,7 +54,8 @@ public:
}
int make_idempotent(dsn::message_ex *request,
- std::vector<dsn::message_ex *> &new_requests) override
+ std::vector<dsn::message_ex *> &new_requests,
+ pegasus::idempotent_writer_ptr &idem_writer) override
{
return rocksdb::Status::kOk;
}
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index ab30d9a5c..3899b5da8 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -79,7 +79,8 @@ public:
}
int on_request(message_ex *request) override WARN_UNUSED_RESULT { return
0; }
int make_idempotent(dsn::message_ex *request,
- std::vector<dsn::message_ex *> &new_requests) override
+ std::vector<dsn::message_ex *> &new_requests,
+ pegasus::idempotent_writer_ptr &idem_writer) override
{
return rocksdb::Status::kOk;
}
diff --git a/src/rpc/rpc_holder.h b/src/rpc/rpc_holder.h
index 0ad39a21a..3dcc569ae 100644
--- a/src/rpc/rpc_holder.h
+++ b/src/rpc/rpc_holder.h
@@ -224,9 +224,11 @@ public:
dsn_rpc_forward(dsn_request(), addr);
}
- inline void set_auto_reply(bool auto_reply) { _i->auto_reply = auto_reply;
}
+ [[nodiscard]] inline bool is_auto_reply() const { return _i->auto_reply; }
- inline void enable_auto_reply() { set_auto_reply(true); }
+ inline void enable_auto_reply() const { set_auto_reply(true); }
+
+ inline void disable_auto_reply() const { set_auto_reply(false); }
// Returns an rpc_holder that will reply the request after its lifetime
ends.
// By default rpc_holder never replies.
@@ -276,6 +278,8 @@ public:
friend bool operator<(const rpc_holder &lhs, const rpc_holder &rhs) {
return lhs._i < rhs._i; }
private:
+ inline void set_auto_reply(bool auto_reply) const { _i->auto_reply =
auto_reply; }
+
friend class rpc_holder_test;
struct internal
@@ -333,6 +337,9 @@ private:
dsn::error_code rpc_error = dsn::ERR_OK;
bool auto_reply;
+
+ DISALLOW_COPY_AND_ASSIGN(internal);
+ DISALLOW_MOVE_AND_ASSIGN(internal);
};
std::shared_ptr<internal> _i;
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 2d13948a9..1ba03fb16 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -61,6 +61,7 @@
#include "pegasus_rpc_types.h"
#include "pegasus_server_write.h"
#include "replica_admin_types.h"
+#include "replica/idempotent_writer.h"
#include "rpc/rpc_message.h"
#include "rrdb/rrdb.code.definition.h"
#include "rrdb/rrdb_types.h"
@@ -346,24 +347,25 @@ void pegasus_server_impl::gc_checkpoints(bool
force_reserve_one)
}
int pegasus_server_impl::make_idempotent(dsn::message_ex *request,
- std::vector<dsn::message_ex *>
&new_requests)
+ std::vector<dsn::message_ex *>
&new_requests,
+ idempotent_writer_ptr &idem_writer)
{
CHECK_TRUE(_is_open);
- return _server_write->make_idempotent(request, new_requests);
+ return _server_write->make_idempotent(request, new_requests, idem_writer);
}
int pegasus_server_impl::on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn::message_ex **requests,
uint32_t count,
- dsn::message_ex
*original_request)
+ idempotent_writer_ptr
&&idem_writer)
{
CHECK_TRUE(_is_open);
CHECK_NOTNULL(requests, "");
return _server_write->on_batched_write_requests(
- requests, count, decree, timestamp, original_request);
+ requests, count, decree, timestamp, std::move(idem_writer));
}
// Since LOG_ERROR_PREFIX depends on log_prefix(), this method could not be
declared as static or
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 64934ffaa..e64eb1da1 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -45,6 +45,7 @@
#include "pegasus_utils.h"
#include "pegasus_value_schema.h"
#include "range_read_limiter.h"
+#include "replica/idempotent_writer.h"
#include "replica/replication_app_base.h"
#include "task/task.h"
#include "task/task_tracker.h"
@@ -149,7 +150,8 @@ public:
// See replication_app_base::make_idempotent() for details. Only called by
primary replicas.
int make_idempotent(dsn::message_ex *request,
- std::vector<dsn::message_ex *> &new_requests) override;
+ std::vector<dsn::message_ex *> &new_requests,
+ idempotent_writer_ptr &idem_writer) override;
/// Each of the write request (specifically, the rpc that's configured as
write, see
/// option `rpc_request_is_write_operation` in rDSN `task_spec`) will
first be
@@ -162,7 +164,7 @@ public:
uint64_t timestamp,
dsn::message_ex **requests,
uint32_t count,
- dsn::message_ex *original_request) override;
+ idempotent_writer_ptr &&idem_writer)
override;
::dsn::error_code prepare_get_checkpoint(dsn::blob &learn_req) override
{
diff --git a/src/server/pegasus_server_write.cpp
b/src/server/pegasus_server_write.cpp
index 810746f4f..b4db97427 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -32,8 +32,8 @@
#include "pegasus_server_impl.h"
#include "pegasus_server_write.h"
#include "pegasus_utils.h"
+#include "replica/idempotent_writer.h"
#include "rpc/rpc_holder.h"
-#include "rpc/serialization.h"
#include "rrdb/rrdb.code.definition.h"
#include "server/pegasus_write_service.h"
#include "utils/autoref_ptr.h"
@@ -58,11 +58,11 @@
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server)
{
init_make_idempotent_handlers();
init_non_batch_write_handlers();
- init_idempotent_writers();
}
int pegasus_server_write::make_idempotent(dsn::message_ex *request,
- std::vector<dsn::message_ex *>
&new_requests)
+ std::vector<dsn::message_ex *>
&new_requests,
+ idempotent_writer_ptr &idem_writer)
{
const auto make_idempotent_handler =
std::as_const(_make_idempotent_handlers).find(request->rpc_code());
@@ -73,7 +73,7 @@ int pegasus_server_write::make_idempotent(dsn::message_ex
*request,
}
try {
- return make_idempotent_handler->second(request, new_requests);
+ return make_idempotent_handler->second(request, new_requests,
idem_writer);
} catch (TTransportException &ex) {
METRIC_VAR_INCREMENT(corrupt_writes);
LOG_ERROR_PREFIX("make idempotent handler for {} failed: from = {},
exception = {}",
@@ -93,7 +93,7 @@ int
pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
uint32_t count,
int64_t decree,
uint64_t timestamp,
- dsn::message_ex
*original_request)
+ idempotent_writer_ptr
&&idem_writer)
{
_write_ctx = db_write_context::create(decree, timestamp);
_decree = decree;
@@ -134,93 +134,18 @@ int
pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
}
}
- if (original_request != nullptr) {
- // Once `original_request` is set, `requests` must be idempotent and
translated from
- // an atomic write request.
- return apply_idempotent(requests, count, original_request);
+ if (idem_writer) {
+ // Pass ownership down the call stack layer by layer, and ultimately
transfer it to
+ // the local variable `writer`, so that upon destruction it can
automatically reply
+ // to the client and release the memory occupied by the atomic write
RPC and its
+ // idempotent single-update requests.
+ const idempotent_writer_ptr writer(std::move(idem_writer));
+ return writer->apply();
}
return on_batched_writes(requests, count);
}
-int pegasus_server_write::apply_idempotent(dsn::message_ex **requests,
- uint32_t count,
- dsn::message_ex *original_request)
-{
- CHECK_GT_PREFIX_MSG(count,
- 0,
- "`requests` should consist of at least one request: "
- "original_request = ({}, {}), decree = {}",
- original_request->header->from_address,
- original_request->rpc_code(),
- _decree);
-
- std::vector<dsn::apps::update_request> updates;
- updates.reserve(count);
-
- for (uint32_t i = 0; i < count; ++i) {
- CHECK_EQ_PREFIX_MSG(requests[i]->rpc_code(),
- dsn::apps::RPC_RRDB_RRDB_PUT,
- "now all translated idempotent writes are
RPC_RRDB_RRDB_PUT "
- "requests: original_request = ({}, {}), decree =
{}, "
- "count = {}, index = {}",
- original_request->header->from_address,
- original_request->rpc_code(),
- _decree,
- count,
- i);
-
- // Preinitialize an empty update.
- updates.emplace_back();
-
- // Deserialize the message to create the real idempotent update.
- //
- // There is no need to catch TTransportException, since `requests` are
freshly
- // generated by serializing the idempotent updates within
make_idempotent().
- dsn::unmarshall(requests[i], updates.back());
-
- CHECK_PREFIX_MSG(
- updates.back().__isset.type,
- "update_request::type is not set for {} request: original_request
= ({}, {}) "
- "decree = {}, count = {}, index = {},",
- requests[i]->rpc_code(),
- original_request->header->from_address,
- original_request->rpc_code(),
- _decree,
- count,
- i);
- }
-
- const auto update_type = static_cast<uint32_t>(updates.front().type);
- CHECK_LT_PREFIX_MSG(update_type,
- _idempotent_writers.size(),
- "unsupported update_request::type {} for idempotent
writer: "
- "original_request = ({}, {}), decree = {}, count = {}",
- update_type,
- original_request->header->from_address,
- original_request->rpc_code(),
- _decree,
- count);
-
- // If the corresponding writer is retrieved using
original_request->rpc_code(), a map
- // would be needed. However, by choosing to obtain the writer based on the
type of the
- // first idempotent update, the writers can be stored in an array instead,
which offers
- // better performance than a map.
- const auto writer = _idempotent_writers[update_type];
- CHECK_PREFIX_MSG(writer,
- "idempotent writer for update_request::type {} should not
be empty: "
- "original_request = ({}, {}), decree = {}, count = {}",
- update_type,
- original_request->header->from_address,
- original_request->rpc_code(),
- _decree,
- count);
-
- // There is no need to catch TTransportException, since `original_request`
has just
- // been successfully deserialized in make_idempotent().
- return writer(updates, original_request);
-}
-
void pegasus_server_write::set_default_ttl(uint32_t ttl) {
_write_svc->set_default_ttl(ttl); }
int pegasus_server_write::on_batched_writes(dsn::message_ex **requests,
uint32_t count)
@@ -326,16 +251,22 @@ void pegasus_server_write::init_make_idempotent_handlers()
{
_make_idempotent_handlers = {
{dsn::apps::RPC_RRDB_RRDB_INCR,
- [this](dsn::message_ex *request, std::vector<dsn::message_ex *>
&new_requests) -> int {
- return make_idempotent<incr_rpc>(request, new_requests);
+ [this](dsn::message_ex *request,
+ std::vector<dsn::message_ex *> &new_requests,
+ idempotent_writer_ptr &idem_writer) -> int {
+ return make_idempotent<incr_rpc>(request, new_requests,
idem_writer);
}},
{dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET,
- [this](dsn::message_ex *request, std::vector<dsn::message_ex *>
&new_requests) -> int {
- return make_idempotent<check_and_set_rpc>(request, new_requests);
+ [this](dsn::message_ex *request,
+ std::vector<dsn::message_ex *> &new_requests,
+ idempotent_writer_ptr &idem_writer) -> int {
+ return make_idempotent<check_and_set_rpc>(request, new_requests,
idem_writer);
}},
{dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE,
- [this](dsn::message_ex *request, std::vector<dsn::message_ex *>
&new_requests) -> int {
- return make_idempotent<check_and_mutate_rpc>(request,
new_requests);
+ [this](dsn::message_ex *request,
+ std::vector<dsn::message_ex *> &new_requests,
+ idempotent_writer_ptr &idem_writer) -> int {
+ return make_idempotent<check_and_mutate_rpc>(request,
new_requests, idem_writer);
}},
};
}
@@ -381,63 +312,4 @@ void pegasus_server_write::init_non_batch_write_handlers()
};
}
-void pegasus_server_write::init_idempotent_writers()
-{
- _idempotent_writers = {
- nullptr, // No writer for UT_PUT.
- [this](const std::vector<dsn::apps::update_request> &updates,
- dsn::message_ex *original_request) -> int {
- // The type of the only update in `updates` is UT_INCR, thus
choose the writer
- // for incr.
- return put_incr(updates, original_request);
- },
- [this](const std::vector<dsn::apps::update_request> &updates,
- dsn::message_ex *original_request) -> int {
- // The type of the only update in `updates` is UT_CHECK_AND_SET,
thus choose the
- // writer for check_and_set.
- return put_check_and_set(updates, original_request);
- },
- [this](const std::vector<dsn::apps::update_request> &updates,
- dsn::message_ex *original_request) -> int {
- // The type of the first update in batched `updates` is
UT_CHECK_AND_MUTATE_PUT,
- // thus choose the writer for check_and_mutate.
- return put_check_and_mutate(updates, original_request);
- },
- [this](const std::vector<dsn::apps::update_request> &updates,
- dsn::message_ex *original_request) -> int {
- // The type of the first update in batched `updates` is
UT_CHECK_AND_MUTATE_REMOVE,
- // thus choose the writer for check_and_mutate.
- return put_check_and_mutate(updates, original_request);
- },
- };
-
- CHECK_EQ_PREFIX_MSG(dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE + 1,
- _idempotent_writers.size(),
- "_idempotent_writers does not match
dsn::apps::update_type");
-}
-
-int pegasus_server_write::put_incr(const
std::vector<dsn::apps::update_request> &updates,
- dsn::message_ex *original_request)
-{
- CHECK_EQ_PREFIX(updates.size(), 1);
- CHECK_EQ_PREFIX(original_request->rpc_code(),
dsn::apps::RPC_RRDB_RRDB_INCR);
- return put<incr_rpc>(updates, original_request);
-}
-
-int pegasus_server_write::put_check_and_set(const
std::vector<dsn::apps::update_request> &updates,
- dsn::message_ex *original_request)
-{
- CHECK_EQ_PREFIX(updates.size(), 1);
- CHECK_EQ_PREFIX(original_request->rpc_code(),
dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET);
- return put<check_and_set_rpc>(updates, original_request);
-}
-
-int pegasus_server_write::put_check_and_mutate(
- const std::vector<dsn::apps::update_request> &updates, dsn::message_ex
*original_request)
-{
- CHECK_GT_PREFIX(updates.size(), 0);
- CHECK_EQ_PREFIX(original_request->rpc_code(),
dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE);
- return put<check_and_mutate_rpc>(updates, original_request);
-}
-
} // namespace pegasus::server
diff --git a/src/server/pegasus_server_write.h
b/src/server/pegasus_server_write.h
index 9680f1a3c..6d578c389 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -21,16 +21,17 @@
#include <rocksdb/status.h>
#include <algorithm>
-#include <array>
#include <cstdint>
#include <functional>
#include <map>
#include <memory>
+#include <utility>
#include <vector>
#include "base/pegasus_rpc_types.h"
#include "common/replication_other_types.h"
#include "pegasus_write_service.h"
+#include "replica/idempotent_writer.h"
#include "replica/replica_base.h"
#include "rpc/rpc_message.h"
#include "rrdb/rrdb.code.definition.h"
@@ -55,7 +56,9 @@ public:
explicit pegasus_server_write(pegasus_server_impl *server);
// See replication_app_base::make_idempotent() for details. Only called by
primary replicas.
- int make_idempotent(dsn::message_ex *request, std::vector<dsn::message_ex
*> &new_requests);
+ int make_idempotent(dsn::message_ex *request,
+ std::vector<dsn::message_ex *> &new_requests,
+ idempotent_writer_ptr &idem_writer);
// See replication_app_base::on_batched_write_requests() for details.
//
@@ -71,7 +74,7 @@ public:
uint32_t count,
int64_t decree,
uint64_t timestamp,
- dsn::message_ex *original_request);
+ idempotent_writer_ptr &&idem_writer);
void set_default_ttl(uint32_t ttl);
@@ -79,7 +82,9 @@ private:
// Used to call make_idempotent() for each type (specified by TRpcHolder)
of atomic write.
// Only called by primary replicas.
template <typename TRpcHolder>
- int make_idempotent(dsn::message_ex *request, std::vector<dsn::message_ex
*> &new_requests)
+ int make_idempotent(dsn::message_ex *request,
+ std::vector<dsn::message_ex *> &new_requests,
+ idempotent_writer_ptr &idem_writer)
{
auto rpc = TRpcHolder(request);
@@ -109,37 +114,18 @@ private:
request->header->context.u.serialize_format)));
}
- return rocksdb::Status::kOk;
- }
-
- // Apply the idempotent updates `requests` into storage engine and respond
to the original
- // atomic write request `original_request`. Both of `requests` and
`original_request` should
- // not be null, while `count` should always be > 0.
- int
- apply_idempotent(dsn::message_ex **requests, uint32_t count,
dsn::message_ex *original_request);
+ idem_writer = std::make_unique<pegasus::idempotent_writer>(
+ std::move(rpc),
+ typename pegasus::idempotent_writer::template
apply_func_t<TRpcHolder>(
+ [this](const std::vector<dsn::apps::update_request> &updates,
+ const TRpcHolder &rpc) -> int {
+ return _write_svc->put(_write_ctx, updates, rpc.request(),
rpc.response());
+ }),
+ std::move(updates));
- // Apply the batched (one or multiple) single-update requests into the
storage engine.
- // Only called by primary replicas.
- template <typename TRpcHolder>
- inline int put(const std::vector<dsn::apps::update_request> &updates,
- dsn::message_ex *original_request)
- {
- // Enable auto reply, since in primary replicas we need to reply to
the client with
- // the response to the original atomic write request after the
idempotent updates
- // were applied into the storage engine.
- auto rpc = TRpcHolder::auto_reply(original_request);
- return _write_svc->put(_write_ctx, updates, rpc.request(),
rpc.response());
+ return rocksdb::Status::kOk;
}
- // Following functions are the writers that apply the idempotent updates
translated from
- // incr, check_and_set and check_and_mutate requests and then respond to
them.
- int put_incr(const std::vector<dsn::apps::update_request> &updates,
- dsn::message_ex *original_request);
- int put_check_and_set(const std::vector<dsn::apps::update_request>
&updates,
- dsn::message_ex *original_request);
- int put_check_and_mutate(const std::vector<dsn::apps::update_request>
&updates,
- dsn::message_ex *original_request);
-
// Delay replying for the batched requests until all of them complete.
int on_batched_writes(dsn::message_ex **requests, uint32_t count);
@@ -163,7 +149,6 @@ private:
void init_make_idempotent_handlers();
void init_non_batch_write_handlers();
- void init_idempotent_writers();
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
@@ -178,23 +163,16 @@ private:
int64_t _decree{invalid_decree};
// Handlers that translate an atomic write request into one or multiple
idempotent updates.
- using make_idempotent_map =
- std::map<dsn::task_code,
- std::function<int(dsn::message_ex *,
std::vector<dsn::message_ex *> &)>>;
+ using make_idempotent_map = std::map<
+ dsn::task_code,
+ std::function<int(
+ dsn::message_ex *, std::vector<dsn::message_ex *> &,
idempotent_writer_ptr &)>>;
make_idempotent_map _make_idempotent_handlers;
// Handlers that process a write request which could not be batched, e.g.
multi put/remove.
using non_batch_write_map = std::map<dsn::task_code,
std::function<int(dsn::message_ex *)>>;
non_batch_write_map _non_batch_write_handlers;
- // Writers that apply idempotent updates and respond to the original
atomic write request.
- // Each dsn::apps::update_type has its corresponding writer, therefore the
number of writers
- // must match the number of dsn::apps::update_type.
- using idempotent_writer_map = std::array<
- std::function<int(const std::vector<dsn::apps::update_request> &,
dsn::message_ex *)>,
- 5>;
- idempotent_writer_map _idempotent_writers{};
-
METRIC_VAR_DECLARE_counter(corrupt_writes);
};
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index e8de292a6..8bdf0d976 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -254,10 +254,10 @@ public:
}
// Shouldn't fail to parse since the value must be a valid int64.
- CHECK(dsn::buf2int64(update.value.to_string_view(), resp.new_value),
- "invalid int64 value for put idempotent incr: key={}, value={}",
- update.key,
- update.value);
+ CHECK_PREFIX_MSG(dsn::buf2int64(update.value.to_string_view(),
resp.new_value),
+ "invalid int64 value for put idempotent incr: key={},
value={}",
+ update.key,
+ update.value);
return rocksdb::Status::kOk;
}
@@ -560,7 +560,7 @@ public:
const std::vector<dsn::apps::update_request> &updates,
TResponse &resp)
{
- CHECK_EQ(updates.size(), 1);
+ CHECK_EQ_PREFIX(updates.size(), 1);
return put(ctx, updates.front(), resp);
}
@@ -678,6 +678,8 @@ public:
const std::vector<dsn::apps::update_request> &updates,
dsn::apps::check_and_mutate_response &resp)
{
+ CHECK_GT_PREFIX(updates.size(), 0);
+
make_basic_response(ctx.decree, resp);
// Copy check_value's fields from the first idempotent request to the
check_and_mutate
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]