This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 154bb16f1 feat(make_idempotent): re-adapt `make_idempotent()` and
`put()` to support batched single-update requests for atomic writes (#2249)
154bb16f1 is described below
commit 154bb16f1637116673acc803861157ac39e5d4c2
Author: Dan Wang <[email protected]>
AuthorDate: Wed May 28 15:26:43 2025 +0800
feat(make_idempotent): re-adapt `make_idempotent()` and `put()` to support
batched single-update requests for atomic writes (#2249)
https://github.com/apache/incubator-pegasus/issues/2197
Because a `check_and_mutate` request may be converted into multiple
single-update
requests, the `make_idempotent()` and `put()` interfaces for `incr`,
`check_and_set`
and `check_and_mutate` have been re-adapted and better supported through
template
functions.
The number of batched single-operation requests corresponding to the
batched requests
of put/remove, or an atomic write request of incr, check_and_set or
check_and_mutate
has been changed from being stored in a variable to being recorded in an
array.
In addition, the writers responsible for applying single-update requests to
the storage
engine are now stored in an array instead of a map to improve performance.
---
src/replica/replica_2pc.cpp | 57 +++--
src/replica/replication_app_base.cpp | 27 +-
src/replica/replication_app_base.h | 14 +-
src/replica/storage/simple_kv/simple_kv.server.h | 3 +-
src/replica/test/mock_utils.h | 3 +-
src/server/pegasus_server_impl.cpp | 7 +-
src/server/pegasus_server_impl.h | 6 +-
src/server/pegasus_server_write.cpp | 307 +++++++++++++++--------
src/server/pegasus_server_write.h | 129 +++++++---
src/server/pegasus_write_service.cpp | 145 ++++++++---
src/server/pegasus_write_service.h | 84 +++++--
src/server/pegasus_write_service_impl.h | 33 ++-
src/server/test/pegasus_server_write_test.cpp | 16 +-
13 files changed, 575 insertions(+), 256 deletions(-)
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index d8954e939..fefcded06 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -148,20 +148,20 @@ void replica::on_client_write(dsn::message_ex *request,
bool ignore_throttling)
}
if (dsn_unlikely(request->rpc_code() == TASK_CODE_INVALID)) {
- LOG_ERROR("recv message with invalid RPC code {} from {}, trace_id =
{}",
- request->rpc_code(),
- request->header->from_address,
- request->header->trace_id);
+ LOG_ERROR_PREFIX("recv message with invalid RPC code {} from {},
trace_id = {}",
+ request->rpc_code(),
+ request->header->from_address,
+ request->header->trace_id);
response_client_write(request, ERR_INVALID_PARAMETERS);
return;
}
const auto *spec = task_spec::get(request->rpc_code());
if (dsn_unlikely(spec == nullptr)) {
- LOG_ERROR("recv message with unhandled RPC code {} from {}, trace_id =
{}",
- request->rpc_code(),
- request->header->from_address,
- request->header->trace_id);
+ LOG_ERROR_PREFIX("recv message with unhandled RPC code {} from {},
trace_id = {}",
+ request->rpc_code(),
+ request->header->from_address,
+ request->header->trace_id);
response_client_write(request, ERR_HANDLER_NOT_FOUND);
return;
}
@@ -239,10 +239,14 @@ void replica::on_client_write(dsn::message_ex *request,
bool ignore_throttling)
bool replica::need_reject_non_idempotent(const task_spec *spec) const
{
if (!is_duplication_master()) {
+ // This is not the dup master that needs to duplicate writes to
followers, thus
+ // non-idempotent requests are accepted.
return false;
}
if (_app_info.atomic_idempotent) {
+ // Since the table which this replica belongs to has been configured
to make
+ // all atomic write requests idempotent, certainly they are accepted.
return false;
}
@@ -265,37 +269,46 @@ bool replica::need_make_idempotent(message_ex *request)
const
}
if (!_app_info.atomic_idempotent) {
+ // The table which this replica belongs to is not configured to make
all atomic
+ // write requests idempotent.
return false;
}
const auto *spec = task_spec::get(request->rpc_code());
- CHECK_NOTNULL(spec, "RPC code {} not found", request->rpc_code());
+ CHECK_NOTNULL_PREFIX_MSG(spec, "RPC code {} not found",
request->rpc_code());
return !spec->rpc_request_is_write_idempotent;
}
int replica::make_idempotent(mutation_ptr &mu)
{
- CHECK_TRUE(!mu->client_requests.empty());
+ CHECK_PREFIX_MSG(!mu->client_requests.empty(),
+ "the mutation should include at least one request");
message_ex *request = mu->client_requests.front();
if (!need_make_idempotent(request)) {
return rocksdb::Status::kOk;
}
- // The original atomic write request must not be batched.
- CHECK_EQ(mu->client_requests.size(), 1);
+ CHECK_EQ_PREFIX_MSG(
+ mu->client_requests.size(), 1, "the original atomic write request must
not be batched");
- dsn::message_ex *new_request = nullptr;
- const int err = _app->make_idempotent(request, &new_request);
- if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+ std::vector<dsn::message_ex *> new_requests;
+ const int err = _app->make_idempotent(request, new_requests);
+
+ // When the condition checks of `check_and_set` and `check_and_mutate`
fail, make_idempotent()
+ // would return rocksdb::Status::kTryAgain. Therefore, there is still a
certain probability
+ // that a status code other than rocksdb::Status::kOk is returned.
+ if (err != rocksdb::Status::kOk) {
// Once some error occurred, the response with error must have been
returned to the
// client during _app->make_idempotent(). Thus do nothing here.
return err;
}
- CHECK_NOTNULL(new_request,
- "new_request should not be null since its original write
request must be atomic");
+ CHECK_PREFIX_MSG(
+ !new_requests.empty(),
+ "new_requests should not be empty since its original write request
must be atomic "
+ "and translated into at least one idempotent request");
// During make_idempotent(), the request has been deserialized (i.e.
unmarshall() in the
// constructor of `rpc_holder::internal`). Once deserialize it again,
assertion would fail for
@@ -304,13 +317,15 @@ int replica::make_idempotent(mutation_ptr &mu)
// To make it deserializable again to be applied into RocksDB, restore
read for it.
request->restore_read();
- // The decree must have not been assigned.
- CHECK_EQ(mu->get_decree(), invalid_decree);
+ CHECK_EQ_PREFIX_MSG(mu->get_decree(), invalid_decree, "the decree must
have not been assigned");
- // Create a new mutation to hold the new idempotent request. The old
mutation holding the
+ // Create a new mutation to hold the new idempotent requests. The old
mutation holding the
// original atomic write request will be released automatically.
mu = new_mutation(invalid_decree, request);
- mu->add_client_request(new_request);
+ for (dsn::message_ex *new_request : new_requests) {
+ mu->add_client_request(new_request);
+ }
+
return rocksdb::Status::kOk;
}
diff --git a/src/replica/replication_app_base.cpp
b/src/replica/replication_app_base.cpp
index d014c9061..40f043dd9 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -265,17 +265,20 @@ error_code
replication_app_base::apply_checkpoint(chkpt_apply_mode mode, const l
int replication_app_base::on_batched_write_requests(int64_t decree,
uint64_t timestamp,
message_ex **requests,
- int request_length,
+ uint32_t count,
message_ex
*original_request)
{
int storage_error = rocksdb::Status::kOk;
- for (int i = 0; i < request_length; ++i) {
- int e = on_request(requests[i]);
- if (e != rocksdb::Status::kOk) {
- LOG_ERROR_PREFIX("got storage engine error when handler
request({})",
- requests[i]->header->rpc_name);
- storage_error = e;
+ for (uint32_t i = 0; i < count; ++i) {
+ const int err = on_request(requests[i]);
+ if (err == rocksdb::Status::kOk) {
+ continue;
}
+
+ LOG_ERROR_PREFIX("got storage engine error when handler request({}):
error = {}",
+ requests[i]->header->rpc_name,
+ err);
+ storage_error = err;
}
return storage_error;
}
@@ -294,12 +297,12 @@ error_code replication_app_base::apply_mutation(const
mutation_ptr &mu)
}
bool has_ingestion_request = false;
- const int request_count = static_cast<int>(mu->client_requests.size());
+ const auto request_count =
static_cast<uint32_t>(mu->client_requests.size());
auto **batched_requests = ALLOC_STACK(message_ex *, request_count);
auto **faked_requests = ALLOC_STACK(message_ex *, request_count);
- int batched_count = 0; // write-empties are not included.
- int faked_count = 0;
- for (int i = 0; i < request_count; ++i) {
+ uint32_t batched_count = 0; // write-empties are not included.
+ uint32_t faked_count = 0;
+ for (uint32_t i = 0; i < request_count; ++i) {
const mutation_update &update = mu->data.updates[i];
LOG_DEBUG_PREFIX("mutation {} #{}: dispatch rpc call {}", mu->name(),
i, update.code);
if (update.code == RPC_REPLICATION_WRITE_EMPTY) {
@@ -329,7 +332,7 @@ error_code replication_app_base::apply_mutation(const
mutation_ptr &mu)
mu->original_request);
// release faked requests
- for (int i = 0; i < faked_count; ++i) {
+ for (uint32_t i = 0; i < faked_count; ++i) {
faked_requests[i]->release_ref();
}
diff --git a/src/replica/replication_app_base.h
b/src/replica/replication_app_base.h
index c44b5d313..329a5e972 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -30,6 +30,7 @@
#include <cstdint>
#include <map>
#include <string>
+#include <vector>
#include "bulk_load_types.h"
#include "common/json_helper.h"
@@ -254,19 +255,20 @@ public:
// not idempotent. This function is used to translate them into requests
like single put
// which is naturally idempotent.
//
- // For the other requests which must be idempotent such as single
put/remove or non-batch
- // writes, this function would do nothing.
+ // For the other requests such as single put/remove or non-batch writes
all of which must be
+ // idempotent, this function would do nothing.
//
// Parameters:
// - request: the original request received from a client.
- // - new_request: as the output parameter pointing to the resulting
idempotent request if the
+ // - new_requests: the output parameter, holding the resulting idempotent
requests if the
// original request is atomic, otherwise keeping unchanged.
//
// Return:
// - for an idempotent requess always return rocksdb::Status::kOk .
// - for an atomic request, return rocksdb::Status::kOk if succeed in
making it idempotent;
// otherwise, return error code (rocksdb::Status::Code).
- virtual int make_idempotent(dsn::message_ex *request, dsn::message_ex
**new_request) = 0;
+ virtual int make_idempotent(dsn::message_ex *request,
+ std::vector<dsn::message_ex *> &new_requests)
= 0;
// Apply batched write requests from a mutation. This is a virtual
function, and base class
// provide a naive implementation that just call on_request for each
request. Storage engine
@@ -276,7 +278,7 @@ public:
// - decree: the decree of the mutation which these requests are batched
into.
// - timestamp: an incremental timestamp generated for this batch of
requests.
// - requests: the requests to be applied.
- // - request_length: the number of the requests.
+ // - count: the number of the requests.
// - original_request: the original request received from the client.
Must be an atomic
// request (i.e. incr, check_and_set and check_and_mutate) if non-null,
and another
// parameter `requests` must hold the idempotent request translated from
it. Used to
@@ -287,7 +289,7 @@ public:
virtual int on_batched_write_requests(int64_t decree,
uint64_t timestamp,
message_ex **requests,
- int request_length,
+ uint32_t count,
message_ex *original_request);
// Query compact state.
diff --git a/src/replica/storage/simple_kv/simple_kv.server.h
b/src/replica/storage/simple_kv/simple_kv.server.h
index 1fc723cf2..c34aa0e81 100644
--- a/src/replica/storage/simple_kv/simple_kv.server.h
+++ b/src/replica/storage/simple_kv/simple_kv.server.h
@@ -53,7 +53,8 @@ public:
return handle_request(request);
}
- int make_idempotent(dsn::message_ex *request, dsn::message_ex
**new_request) override
+ int make_idempotent(dsn::message_ex *request,
+ std::vector<dsn::message_ex *> &new_requests) override
{
return rocksdb::Status::kOk;
}
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index e8a952810..ab30d9a5c 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -78,7 +78,8 @@ public:
return ERR_OK;
}
int on_request(message_ex *request) override WARN_UNUSED_RESULT { return
0; }
- int make_idempotent(dsn::message_ex *request, dsn::message_ex
**new_request) override
+ int make_idempotent(dsn::message_ex *request,
+ std::vector<dsn::message_ex *> &new_requests) override
{
return rocksdb::Status::kOk;
}
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index de9abc7b0..2d13948a9 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -345,17 +345,18 @@ void pegasus_server_impl::gc_checkpoints(bool
force_reserve_one)
max_d);
}
-int pegasus_server_impl::make_idempotent(dsn::message_ex *request,
dsn::message_ex **new_request)
+int pegasus_server_impl::make_idempotent(dsn::message_ex *request,
+ std::vector<dsn::message_ex *>
&new_requests)
{
CHECK_TRUE(_is_open);
- return _server_write->make_idempotent(request, new_request);
+ return _server_write->make_idempotent(request, new_requests);
}
int pegasus_server_impl::on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn::message_ex **requests,
- int count,
+ uint32_t count,
dsn::message_ex
*original_request)
{
CHECK_TRUE(_is_open);
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index b385137e0..64934ffaa 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -147,7 +147,9 @@ public:
// - ERR_FILE_OPERATION_FAILED
::dsn::error_code stop(bool clear_state) override;
- int make_idempotent(dsn::message_ex *request, dsn::message_ex
**new_request) override;
+ // See replication_app_base::make_idempotent() for details. Only called by
primary replicas.
+ int make_idempotent(dsn::message_ex *request,
+ std::vector<dsn::message_ex *> &new_requests) override;
/// Each of the write request (specifically, the rpc that's configured as
write, see
/// option `rpc_request_is_write_operation` in rDSN `task_spec`) will
first be
@@ -159,7 +161,7 @@ public:
int on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn::message_ex **requests,
- int count,
+ uint32_t count,
dsn::message_ex *original_request) override;
::dsn::error_code prepare_get_checkpoint(dsn::blob &learn_req) override
diff --git a/src/server/pegasus_server_write.cpp
b/src/server/pegasus_server_write.cpp
index 583032afa..810746f4f 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -18,7 +18,6 @@
*/
#include <fmt/core.h>
-#include <rocksdb/status.h>
#include <thrift/transport/TTransportException.h>
#include <algorithm>
#include <string_view>
@@ -34,11 +33,9 @@
#include "pegasus_server_write.h"
#include "pegasus_utils.h"
#include "rpc/rpc_holder.h"
-#include "rpc/rpc_message.h"
+#include "rpc/serialization.h"
#include "rrdb/rrdb.code.definition.h"
-#include "runtime/message_utils.h"
#include "server/pegasus_write_service.h"
-#include "task/task_spec.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/flags.h"
@@ -52,33 +49,48 @@ METRIC_DEFINE_counter(replica,
DSN_DECLARE_bool(rocksdb_verbose_log);
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server)
: replica_base(server),
- _write_svc(new pegasus_write_service(server)),
+ _write_svc(std::make_unique<pegasus_write_service>(server)),
METRIC_VAR_INIT_replica(corrupt_writes)
{
init_make_idempotent_handlers();
init_non_batch_write_handlers();
- init_on_idempotent_handlers();
+ init_idempotent_writers();
}
-int pegasus_server_write::make_idempotent(dsn::message_ex *request,
dsn::message_ex **new_request)
+int pegasus_server_write::make_idempotent(dsn::message_ex *request,
+ std::vector<dsn::message_ex *>
&new_requests)
{
- auto iter =
std::as_const(_make_idempotent_handlers).find(request->rpc_code());
- if (iter != _make_idempotent_handlers.end()) {
- return iter->second(request, new_request);
+ const auto make_idempotent_handler =
+ std::as_const(_make_idempotent_handlers).find(request->rpc_code());
+ if (make_idempotent_handler == _make_idempotent_handlers.end()) {
+ // Those requests not in the handlers are considered as idempotent.
Always be
+ // successful for them.
+ return rocksdb::Status::kOk;
}
- // Those requests not in the handlers are considered as idempotent. Always
be successful
- // for them.
- return rocksdb::Status::kOk;
+ try {
+ return make_idempotent_handler->second(request, new_requests);
+ } catch (TTransportException &ex) {
+ METRIC_VAR_INCREMENT(corrupt_writes);
+ LOG_ERROR_PREFIX("make idempotent handler for {} failed: from = {},
exception = {}",
+ request->rpc_code(),
+ request->header->from_address,
+ ex.what());
+
+ // The corrupt write is likely to be an attack or a scan for security.
It needs to return
+ // `rocksdb::Status::kCorruption` to inform the caller that
`make_idempotent()` has failed
+ // and that the subsequent 2PC should not proceed.
+ // See https://github.com/apache/incubator-pegasus/pull/798.
+ return rocksdb::Status::kCorruption;
+ }
}
int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
- int count,
+ uint32_t count,
int64_t decree,
uint64_t timestamp,
dsn::message_ex
*original_request)
@@ -93,64 +105,136 @@ int
pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
return _write_svc->empty_put(_decree);
}
- try {
- auto iter =
std::as_const(_non_batch_write_handlers).find(requests[0]->rpc_code());
- if (iter != _non_batch_write_handlers.end()) {
- CHECK_EQ(count, 1);
- return iter->second(requests[0]);
- }
- } catch (TTransportException &ex) {
- METRIC_VAR_INCREMENT(corrupt_writes);
- LOG_ERROR_PREFIX("pegasus non-batch write handler failed, from = {},
exception = {}",
- requests[0]->header->from_address,
- ex.what());
+ const auto non_batch_write_handler =
+ std::as_const(_non_batch_write_handlers).find(requests[0]->rpc_code());
+ if (non_batch_write_handler != _non_batch_write_handlers.end()) {
+ CHECK_EQ_PREFIX_MSG(count,
+ 1,
+ "there should be only one request for the
non-batch "
+ "write {}: from = {}, decree = {}",
+ requests[0]->rpc_code(),
+ requests[0]->header->from_address,
+ _decree);
- // The corrupt write is likely to be an attack or a scan for security.
Since it has
- // been in plog, just return rocksdb::Status::kOk to ignore it.
- // See https://github.com/apache/incubator-pegasus/pull/798.
- return rocksdb::Status::kOk;
+ try {
+ return non_batch_write_handler->second(requests[0]);
+ } catch (TTransportException &ex) {
+ METRIC_VAR_INCREMENT(corrupt_writes);
+ LOG_ERROR_PREFIX("non-batch write handler for {} failed: from =
{}, "
+ "decree = {}, exception = {}",
+ requests[0]->rpc_code(),
+ requests[0]->header->from_address,
+ _decree,
+ ex.what());
+
+ // The corrupt write is likely to be an attack or a scan for
security. Since it has
+ // been in plog, just return rocksdb::Status::kOk to ignore it.
+ // See https://github.com/apache/incubator-pegasus/pull/798.
+ return rocksdb::Status::kOk;
+ }
}
if (original_request != nullptr) {
- // The request is regarded as idempotent once its original request is
attached.
- CHECK_EQ(count, 1);
- return on_idempotent(requests[0], original_request);
+ // Once `original_request` is set, `requests` must be idempotent and
translated from
+ // an atomic write request.
+ return apply_idempotent(requests, count, original_request);
}
return on_batched_writes(requests, count);
}
-int pegasus_server_write::on_idempotent(dsn::message_ex *request,
dsn::message_ex *original_request)
+int pegasus_server_write::apply_idempotent(dsn::message_ex **requests,
+ uint32_t count,
+ dsn::message_ex *original_request)
{
- try {
- auto iter =
std::as_const(_on_idempotent_handlers).find(request->rpc_code());
- if (iter != _on_idempotent_handlers.end()) {
- return iter->second(request, original_request);
- }
- } catch (TTransportException &ex) {
- METRIC_VAR_INCREMENT(corrupt_writes);
- LOG_ERROR_PREFIX("pegasus on idempotent handler failed, from = {},
exception = {}",
- request->header->from_address,
- ex.what());
- // The corrupt write is likely to be an attack or a scan for security.
Since it has
- // been in plog, just return rocksdb::Status::kOk to ignore it.
- // See https://github.com/apache/incubator-pegasus/pull/798.
- return rocksdb::Status::kOk;
+ CHECK_GT_PREFIX_MSG(count,
+ 0,
+ "`requests` should consist of at least one request: "
+ "original_request = ({}, {}), decree = {}",
+ original_request->header->from_address,
+ original_request->rpc_code(),
+ _decree);
+
+ std::vector<dsn::apps::update_request> updates;
+ updates.reserve(count);
+
+ for (uint32_t i = 0; i < count; ++i) {
+ CHECK_EQ_PREFIX_MSG(requests[i]->rpc_code(),
+ dsn::apps::RPC_RRDB_RRDB_PUT,
+ "now all translated idempotent writes are
RPC_RRDB_RRDB_PUT "
+ "requests: original_request = ({}, {}), decree =
{}, "
+ "count = {}, index = {}",
+ original_request->header->from_address,
+ original_request->rpc_code(),
+ _decree,
+ count,
+ i);
+
+ // Preinitialize an empty update.
+ updates.emplace_back();
+
+ // Deserialize the message to create the real idempotent update.
+ //
+ // There is no need to catch TTransportException, since `requests` are
freshly
+ // generated by serializing the idempotent updates within
make_idempotent().
+ dsn::unmarshall(requests[i], updates.back());
+
+ CHECK_PREFIX_MSG(
+ updates.back().__isset.type,
+ "update_request::type is not set for {} request: original_request
= ({}, {}) "
+ "decree = {}, count = {}, index = {},",
+ requests[i]->rpc_code(),
+ original_request->header->from_address,
+ original_request->rpc_code(),
+ _decree,
+ count,
+ i);
}
- CHECK(false, "unsupported idempotent write request: rpc_code={}",
request->rpc_code());
- return rocksdb::Status::kNotSupported;
+ const auto update_type = static_cast<uint32_t>(updates.front().type);
+ CHECK_LT_PREFIX_MSG(update_type,
+ _idempotent_writers.size(),
+ "unsupported update_request::type {} for idempotent
writer: "
+ "original_request = ({}, {}), decree = {}, count = {}",
+ update_type,
+ original_request->header->from_address,
+ original_request->rpc_code(),
+ _decree,
+ count);
+
+ // If the corresponding writer is retrieved using
original_request->rpc_code(), a map
+ // would be needed. However, by choosing to obtain the writer based on the
type of the
+ // first idempotent update, the writers can be stored in an array instead,
which offers
+ // better performance than a map.
+ const auto writer = _idempotent_writers[update_type];
+ CHECK_PREFIX_MSG(writer,
+ "idempotent writer for update_request::type {} should not
be empty: "
+ "original_request = ({}, {}), decree = {}, count = {}",
+ update_type,
+ original_request->header->from_address,
+ original_request->rpc_code(),
+ _decree,
+ count);
+
+ // There is no need to catch TTransportException, since `original_request`
has just
+ // been successfully deserialized in make_idempotent().
+ return writer(updates, original_request);
}
void pegasus_server_write::set_default_ttl(uint32_t ttl) {
_write_svc->set_default_ttl(ttl); }
-int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int
count)
+int pegasus_server_write::on_batched_writes(dsn::message_ex **requests,
uint32_t count)
{
_write_svc->batch_prepare(_decree);
int err = rocksdb::Status::kOk;
- for (int i = 0; i < count; ++i) {
- CHECK_NOTNULL(requests[i], "request[{}] is null", i);
+ for (uint32_t i = 0; i < count; ++i) {
+ CHECK_NOTNULL_PREFIX_MSG(requests[i],
+ "batched requests[{}] should not be null: "
+ "decree = {}, count = {}",
+ i,
+ _decree,
+ count);
// Make sure all writes are batched even if some of them failed, since
we need to record
// the total QPS and RPC latencies, and respond for all RPCs
regardless of their result.
@@ -169,10 +253,6 @@ int
pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
if (!update.__isset.type || update.type ==
dsn::apps::update_type::UT_PUT) {
// We must reply to the client for the plain single-put
request.
rpc.enable_auto_reply();
- } else if (update.type == dsn::apps::update_type::UT_INCR) {
- // This put request must originate from an incr request
and never be
- // batched in plog.
- CHECK_EQ(count, 1);
}
local_err = on_single_put_in_batch(rpc);
@@ -183,9 +263,9 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex
**requests, int coun
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
if (_non_batch_write_handlers.find(rpc_code) !=
_non_batch_write_handlers.end()) {
- LOG_FATAL("rpc code not allow batch: {}", rpc_code);
+ LOG_FATAL_PREFIX("rpc code not allow batch: {}", rpc_code);
} else {
- LOG_FATAL("rpc code not handled: {}", rpc_code);
+ LOG_FATAL_PREFIX("rpc code not handled: {}", rpc_code);
}
}
} catch (TTransportException &ex) {
@@ -246,26 +326,16 @@ void pegasus_server_write::init_make_idempotent_handlers()
{
_make_idempotent_handlers = {
{dsn::apps::RPC_RRDB_RRDB_INCR,
- [this](dsn::message_ex *request, dsn::message_ex **new_request) ->
int {
- auto rpc = incr_rpc(request);
- dsn::apps::update_request update;
- // Translate an incr request into a single-put request.
- const auto err = _write_svc->make_idempotent(rpc.request(),
rpc.response(), update);
- if (dsn_likely(err == rocksdb::Status::kOk)) {
- // Build the message based on the resulting put request.
- *new_request = dsn::from_thrift_request_to_received_message(
- update,
- dsn::apps::RPC_RRDB_RRDB_PUT,
- request->header->client.thread_hash,
- request->header->client.partition_hash,
- static_cast<dsn::dsn_msg_serialize_format>(
- request->header->context.u.serialize_format));
- } else {
- // Once it failed, just reply to the client with error
immediately.
- rpc.enable_auto_reply();
- }
-
- return err;
+ [this](dsn::message_ex *request, std::vector<dsn::message_ex *>
&new_requests) -> int {
+ return make_idempotent<incr_rpc>(request, new_requests);
+ }},
+ {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET,
+ [this](dsn::message_ex *request, std::vector<dsn::message_ex *>
&new_requests) -> int {
+ return make_idempotent<check_and_set_rpc>(request, new_requests);
+ }},
+ {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE,
+ [this](dsn::message_ex *request, std::vector<dsn::message_ex *>
&new_requests) -> int {
+ return make_idempotent<check_and_mutate_rpc>(request,
new_requests);
}},
};
}
@@ -311,28 +381,63 @@ void pegasus_server_write::init_non_batch_write_handlers()
};
}
-void pegasus_server_write::init_on_idempotent_handlers()
+void pegasus_server_write::init_idempotent_writers()
{
- _on_idempotent_handlers = {
- {dsn::apps::RPC_RRDB_RRDB_PUT,
- [this](dsn::message_ex *request, dsn::message_ex *original_request)
-> int {
- auto put = put_rpc(request);
-
- const auto &update = put.request();
- CHECK(update.__isset.type, "update_request::type is not set for
idempotent write");
-
- if (update.type == dsn::apps::update_type::UT_INCR) {
- auto incr = incr_rpc::auto_reply(original_request);
- return _write_svc->put(_write_ctx, update, incr.response());
- }
-
- CHECK(false,
- "unsupported update_request::type for idempotent write {}",
- static_cast<int>(update.type));
- return rocksdb::Status::kNotSupported;
- }},
+ _idempotent_writers = {
+ nullptr, // No writer for UT_PUT.
+ [this](const std::vector<dsn::apps::update_request> &updates,
+ dsn::message_ex *original_request) -> int {
+ // The type of the only update in `updates` is UT_INCR, thus
choose the writer
+ // for incr.
+ return put_incr(updates, original_request);
+ },
+ [this](const std::vector<dsn::apps::update_request> &updates,
+ dsn::message_ex *original_request) -> int {
+ // The type of the only update in `updates` is UT_CHECK_AND_SET,
thus choose the
+ // writer for check_and_set.
+ return put_check_and_set(updates, original_request);
+ },
+ [this](const std::vector<dsn::apps::update_request> &updates,
+ dsn::message_ex *original_request) -> int {
+ // The type of the first update in batched `updates` is
UT_CHECK_AND_MUTATE_PUT,
+ // thus choose the writer for check_and_mutate.
+ return put_check_and_mutate(updates, original_request);
+ },
+ [this](const std::vector<dsn::apps::update_request> &updates,
+ dsn::message_ex *original_request) -> int {
+ // The type of the first update in batched `updates` is
UT_CHECK_AND_MUTATE_REMOVE,
+ // thus choose the writer for check_and_mutate.
+ return put_check_and_mutate(updates, original_request);
+ },
};
+
+ CHECK_EQ_PREFIX_MSG(dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE + 1,
+ _idempotent_writers.size(),
+ "_idempotent_writers does not match
dsn::apps::update_type");
+}
+
+int pegasus_server_write::put_incr(const
std::vector<dsn::apps::update_request> &updates,
+ dsn::message_ex *original_request)
+{
+ CHECK_EQ_PREFIX(updates.size(), 1);
+ CHECK_EQ_PREFIX(original_request->rpc_code(),
dsn::apps::RPC_RRDB_RRDB_INCR);
+ return put<incr_rpc>(updates, original_request);
+}
+
+int pegasus_server_write::put_check_and_set(const
std::vector<dsn::apps::update_request> &updates,
+ dsn::message_ex *original_request)
+{
+ CHECK_EQ_PREFIX(updates.size(), 1);
+ CHECK_EQ_PREFIX(original_request->rpc_code(),
dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET);
+ return put<check_and_set_rpc>(updates, original_request);
+}
+
+int pegasus_server_write::put_check_and_mutate(
+ const std::vector<dsn::apps::update_request> &updates, dsn::message_ex
*original_request)
+{
+ CHECK_GT_PREFIX(updates.size(), 0);
+ CHECK_EQ_PREFIX(original_request->rpc_code(),
dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE);
+ return put<check_and_mutate_rpc>(updates, original_request);
}
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
diff --git a/src/server/pegasus_server_write.h
b/src/server/pegasus_server_write.h
index c0481d5f0..9680f1a3c 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -19,26 +19,33 @@
#pragma once
-#include <stdint.h>
+#include <rocksdb/status.h>
+#include <algorithm>
+#include <array>
+#include <cstdint>
#include <functional>
#include <map>
#include <memory>
#include <vector>
#include "base/pegasus_rpc_types.h"
+#include "common/replication_other_types.h"
#include "pegasus_write_service.h"
#include "replica/replica_base.h"
+#include "rpc/rpc_message.h"
+#include "rrdb/rrdb.code.definition.h"
#include "rrdb/rrdb_types.h"
+#include "runtime/message_utils.h"
#include "task/task_code.h"
+#include "task/task_spec.h"
#include "utils/metrics.h"
namespace dsn {
class blob;
-class message_ex;
} // namespace dsn
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
+
class pegasus_server_impl;
/// This class implements the interface of
`pegasus_sever_impl::on_batched_write_requests`.
@@ -47,21 +54,21 @@ class pegasus_server_write : public
dsn::replication::replica_base
public:
explicit pegasus_server_write(pegasus_server_impl *server);
- // See replication_app_base::make_idempotent() for details.
- int make_idempotent(dsn::message_ex *request, dsn::message_ex
**new_request);
+ // See replication_app_base::make_idempotent() for details. Only called by
primary replicas.
+ int make_idempotent(dsn::message_ex *request, std::vector<dsn::message_ex
*> &new_requests);
// See replication_app_base::on_batched_write_requests() for details.
//
- /// \return error code returned by rocksdb, i.e rocksdb::Status::code.
- /// **NOTE**
- /// Error returned is regarded as the failure of replica, thus will trigger
- /// cluster membership changes. Make sure no error is returned because of
- /// invalid user argument.
- /// As long as the returned error is rocksdb::Status::kOk, the operation
is guaranteed to be
- /// successfully applied into rocksdb, which means an empty_put will be
called
- /// even if there's no write.
+ // **NOTE**
+ // An error code other than rocksdb::Status::kOk would be regarded as the
failure of the
+ // replica, leading to cluster membership changes. Make sure no errors
occur due to
+ // invalid parameters.
+ //
+ // As long as the returned error is rocksdb::Status::kOk, the write
requests are guaranteed
+ // to be applied into RocksDB successfully, which means empty_put() will
be called even if
+ // there's no write.
int on_batched_write_requests(dsn::message_ex **requests,
- int count,
+ uint32_t count,
int64_t decree,
uint64_t timestamp,
dsn::message_ex *original_request);
@@ -69,11 +76,72 @@ public:
void set_default_ttl(uint32_t ttl);
private:
- // Apply the idempotent request and respond to its original request.
- int on_idempotent(dsn::message_ex *request, dsn::message_ex
*original_request);
+ // Used to call make_idempotent() for each type (specified by TRpcHolder)
of atomic write.
+ // Only called by primary replicas.
+ template <typename TRpcHolder>
+ int make_idempotent(dsn::message_ex *request, std::vector<dsn::message_ex
*> &new_requests)
+ {
+ auto rpc = TRpcHolder(request);
+
+ // Translate an atomic request into one or multiple idempotent
single-update requests.
+ std::vector<dsn::apps::update_request> updates;
+ const int err = _write_svc->make_idempotent(rpc.request(),
rpc.response(), updates);
+
+ // When the condition checks of `check_and_set` and `check_and_mutate`
fail,
+ // make_idempotent() would return rocksdb::Status::kTryAgain.
Therefore, there is
+ // still a certain probability that a status code other than
rocksdb::Status::kOk
+ // is returned.
+ if (err != rocksdb::Status::kOk) {
+ // Once it failed, just reply to the client with error immediately.
+ rpc.enable_auto_reply();
+ return err;
+ }
+
+ // Build new messages based on the generated idempotent updates.
+ new_requests.clear();
+ for (const auto &update : updates) {
+
new_requests.push_back(dsn::from_thrift_request_to_received_message(
+ update,
+ dsn::apps::RPC_RRDB_RRDB_PUT,
+ request->header->client.thread_hash,
+ request->header->client.partition_hash,
+ static_cast<dsn::dsn_msg_serialize_format>(
+ request->header->context.u.serialize_format)));
+ }
+
+ return rocksdb::Status::kOk;
+ }
+
+ // Apply the idempotent updates `requests` into storage engine and respond
to the original
+ // atomic write request `original_request`. Both of `requests` and
`original_request` should
+ // not be null, while `count` should always be > 0.
+ int
+ apply_idempotent(dsn::message_ex **requests, uint32_t count,
dsn::message_ex *original_request);
+
+ // Apply the batched (one or multiple) single-update requests into the
storage engine.
+ // Only called by primary replicas.
+ template <typename TRpcHolder>
+ inline int put(const std::vector<dsn::apps::update_request> &updates,
+ dsn::message_ex *original_request)
+ {
+ // Enable auto reply, since in primary replicas we need to reply to
the client with
+ // the response to the original atomic write request after the
idempotent updates
+ // were applied into the storage engine.
+ auto rpc = TRpcHolder::auto_reply(original_request);
+ return _write_svc->put(_write_ctx, updates, rpc.request(),
rpc.response());
+ }
+
+ // Following functions are the writers that apply the idempotent updates
translated from
+ // incr, check_and_set and check_and_mutate requests and then respond to
them.
+ int put_incr(const std::vector<dsn::apps::update_request> &updates,
+ dsn::message_ex *original_request);
+ int put_check_and_set(const std::vector<dsn::apps::update_request>
&updates,
+ dsn::message_ex *original_request);
+ int put_check_and_mutate(const std::vector<dsn::apps::update_request>
&updates,
+ dsn::message_ex *original_request);
// Delay replying for the batched requests until all of them complete.
- int on_batched_writes(dsn::message_ex **requests, int count);
+ int on_batched_writes(dsn::message_ex **requests, uint32_t count);
int on_single_put_in_batch(put_rpc &rpc)
{
@@ -95,7 +163,7 @@ private:
void init_make_idempotent_handlers();
void init_non_batch_write_handlers();
- void init_on_idempotent_handlers();
+ void init_idempotent_writers();
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
@@ -107,24 +175,27 @@ private:
std::vector<remove_rpc> _remove_rpc_batch;
db_write_context _write_ctx;
- int64_t _decree;
+ int64_t _decree{invalid_decree};
- // Handlers that make an atomic request idempotent.
+ // Handlers that translate an atomic write request into one or multiple
idempotent updates.
using make_idempotent_map =
- std::map<dsn::task_code, std::function<int(dsn::message_ex *,
dsn::message_ex **)>>;
+ std::map<dsn::task_code,
+ std::function<int(dsn::message_ex *,
std::vector<dsn::message_ex *> &)>>;
make_idempotent_map _make_idempotent_handlers;
- // Handlers that process a request could not be batched, e.g. multi
put/remove.
+ // Handlers that process a write request which could not be batched, e.g.
multi put/remove.
using non_batch_write_map = std::map<dsn::task_code,
std::function<int(dsn::message_ex *)>>;
non_batch_write_map _non_batch_write_handlers;
- // Handlers that apply the idempotent request and respond to its original
request.
- using on_idempotent_map =
- std::map<dsn::task_code, std::function<int(dsn::message_ex *,
dsn::message_ex *)>>;
- on_idempotent_map _on_idempotent_handlers;
+ // Writers that apply idempotent updates and respond to the original
atomic write request.
+ // Each dsn::apps::update_type has its corresponding writer, therefore the
number of writers
+ // must match the number of dsn::apps::update_type.
+ using idempotent_writer_map = std::array<
+ std::function<int(const std::vector<dsn::apps::update_request> &,
dsn::message_ex *)>,
+ 5>;
+ idempotent_writer_map _idempotent_writers{};
METRIC_VAR_DECLARE_counter(corrupt_writes);
};
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
diff --git a/src/server/pegasus_write_service.cpp
b/src/server/pegasus_write_service.cpp
index dfea32601..df3766170 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -17,12 +17,12 @@
* under the License.
*/
-#include <string_view>
#include <fmt/core.h>
#include <rocksdb/status.h>
-#include <stddef.h>
+#include <algorithm>
#include <functional>
#include <set>
+#include <string_view>
#include <vector>
#include "base/pegasus_rpc_types.h"
@@ -38,13 +38,14 @@
#include "rrdb/rrdb_types.h"
#include "runtime/api_layer1.h"
#include "runtime/message_utils.h"
+#include "server/pegasus_server_impl.h"
#include "task/async_calls.h"
#include "task/task_code.h"
-#include "server/pegasus_server_impl.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
+#include "utils/ports.h"
METRIC_DEFINE_counter(replica,
put_requests,
@@ -99,6 +100,16 @@ METRIC_DEFINE_percentile_int64(replica,
"translating the check_and_set request into the
single-put "
"request. Only used for the primary replicas");
+METRIC_DEFINE_percentile_int64(replica,
+ make_check_and_mutate_idempotent_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The duration that a check_and_mutate request
is made "
+ "idempotent, including reading the check value
from "
+ "storage engine, validating the check
conditions and "
+ "translating the check_and_mutate request into
multiple "
+ "single-put and single-remove requests. Only
used for the "
+ "primary replicas");
+
METRIC_DEFINE_percentile_int64(replica,
put_latency_ns,
dsn::metric_unit::kNanoSeconds,
@@ -160,8 +171,8 @@ namespace dsn {
class blob;
class message_ex;
} // namespace dsn
-namespace pegasus {
-namespace server {
+
+namespace pegasus::server {
DEFINE_TASK_CODE(LPC_INGESTION, TASK_PRIORITY_COMMON, THREAD_POOL_INGESTION)
@@ -180,6 +191,7 @@
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
METRIC_VAR_INIT_replica(check_and_mutate_requests),
METRIC_VAR_INIT_replica(make_incr_idempotent_latency_ns),
METRIC_VAR_INIT_replica(make_check_and_set_idempotent_latency_ns),
+ METRIC_VAR_INIT_replica(make_check_and_mutate_idempotent_latency_ns),
METRIC_VAR_INIT_replica(put_latency_ns),
METRIC_VAR_INIT_replica(multi_put_latency_ns),
METRIC_VAR_INIT_replica(remove_latency_ns),
@@ -189,11 +201,7 @@
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
METRIC_VAR_INIT_replica(check_and_mutate_latency_ns),
METRIC_VAR_INIT_replica(dup_requests),
METRIC_VAR_INIT_replica(dup_time_lag_ms),
- METRIC_VAR_INIT_replica(dup_lagging_writes),
- _put_batch_size(0),
- _remove_batch_size(0),
- _incr_batch_size(0),
- _check_and_set_batch_size(0)
+ METRIC_VAR_INIT_replica(dup_lagging_writes)
{
}
@@ -235,24 +243,25 @@ int pegasus_write_service::multi_remove(int64_t decree,
int pegasus_write_service::make_idempotent(const dsn::apps::incr_request &req,
dsn::apps::incr_response &err_resp,
- dsn::apps::update_request &update)
+
std::vector<dsn::apps::update_request> &updates)
{
METRIC_VAR_AUTO_LATENCY(make_incr_idempotent_latency_ns);
- return _impl->make_idempotent(req, err_resp, update);
+ return _impl->make_idempotent(req, err_resp, updates);
}
int pegasus_write_service::put(const db_write_context &ctx,
- const dsn::apps::update_request &update,
+ const std::vector<dsn::apps::update_request>
&updates,
+ const dsn::apps::incr_request &req,
dsn::apps::incr_response &resp)
{
METRIC_VAR_AUTO_LATENCY(incr_latency_ns);
METRIC_VAR_INCREMENT(incr_requests);
- const int err = _impl->put(ctx, update, resp);
+ const int err = _impl->put(ctx, updates, resp);
if (_server->is_primary()) {
- _cu_calculator->add_incr_cu(resp.error, update.key);
+ _cu_calculator->add_incr_cu(resp.error, req.key);
}
return err;
@@ -276,22 +285,22 @@ int pegasus_write_service::incr(int64_t decree,
int pegasus_write_service::make_idempotent(const
dsn::apps::check_and_set_request &req,
dsn::apps::check_and_set_response
&err_resp,
- dsn::apps::update_request &update)
+
std::vector<dsn::apps::update_request> &updates)
{
METRIC_VAR_AUTO_LATENCY(make_check_and_set_idempotent_latency_ns);
- return _impl->make_idempotent(req, err_resp, update);
+ return _impl->make_idempotent(req, err_resp, updates);
}
int pegasus_write_service::put(const db_write_context &ctx,
- const dsn::apps::update_request &update,
+ const std::vector<dsn::apps::update_request>
&updates,
const dsn::apps::check_and_set_request &req,
dsn::apps::check_and_set_response &resp)
{
METRIC_VAR_AUTO_LATENCY(check_and_set_latency_ns);
METRIC_VAR_INCREMENT(check_and_set_requests);
- const int err = _impl->put(ctx, update, resp);
+ const int err = _impl->put(ctx, updates, resp);
if (_server->is_primary()) {
_cu_calculator->add_check_and_set_cu(
@@ -321,6 +330,33 @@ int pegasus_write_service::check_and_set(int64_t decree,
return err;
}
+int pegasus_write_service::make_idempotent(const
dsn::apps::check_and_mutate_request &req,
+
dsn::apps::check_and_mutate_response &err_resp,
+
std::vector<dsn::apps::update_request> &updates)
+{
+ METRIC_VAR_AUTO_LATENCY(make_check_and_mutate_idempotent_latency_ns);
+
+ return _impl->make_idempotent(req, err_resp, updates);
+}
+
+int pegasus_write_service::put(const db_write_context &ctx,
+ const std::vector<dsn::apps::update_request>
&updates,
+ const dsn::apps::check_and_mutate_request &req,
+ dsn::apps::check_and_mutate_response &resp)
+{
+ METRIC_VAR_AUTO_LATENCY(check_and_mutate_latency_ns);
+ METRIC_VAR_INCREMENT(check_and_mutate_requests);
+
+ const int err = _impl->put(ctx, updates, resp);
+
+ if (_server->is_primary()) {
+ _cu_calculator->add_check_and_mutate_cu(
+ resp.error, req.hash_key, req.check_sort_key, req.mutate_list);
+ }
+
+ return err;
+}
+
int pegasus_write_service::check_and_mutate(int64_t decree,
const
dsn::apps::check_and_mutate_request &update,
dsn::apps::check_and_mutate_response &resp)
@@ -350,11 +386,25 @@ int pegasus_write_service::batch_put(const
db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp)
{
+ // `kBatchWriteTypeMap` is used to map dsn::apps::update_type field in the
single put
+ // request to batch_write_type, to measure the size of requests in batch
for each kind
+ // of write into `_batch_sizes`.
+ static constexpr std::array kBatchWriteTypeMap = {
+ batch_write_type::put,
+ batch_write_type::incr,
+ batch_write_type::check_and_set,
+ batch_write_type::check_and_mutate,
+ batch_write_type::check_and_mutate,
+ };
+ static_assert(dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE + 1 ==
+ kBatchWriteTypeMap.size(),
+ "kBatchWriteTypeMap does not match dsn::apps::update_type");
+
CHECK_GT_MSG(_batch_start_time, 0, "batch_put must be called after
batch_prepare");
- if (!update.__isset.type || update.type == dsn::apps::update_type::UT_PUT)
{
+ if (!update.__isset.type) {
// This is a general single-put request.
- ++_put_batch_size;
+ ++put_batch_size();
} else {
// There are only two possible situations for batch_put() where this
put request
// originates from an atomic write request:
@@ -364,10 +414,8 @@ int pegasus_write_service::batch_put(const
db_write_context &ctx,
//
// Though this is a put request, we choose to udapte the metrics of
its original
// request (i.e. the atomic write).
- if (update.type == dsn::apps::update_type::UT_INCR) {
- ++_incr_batch_size;
- } else if (update.type == dsn::apps::update_type::UT_CHECK_AND_SET) {
- ++_check_and_set_batch_size;
+ if (dsn_likely(update.type < kBatchWriteTypeMap.size())) {
+ ++batch_size(kBatchWriteTypeMap.at(update.type));
}
}
@@ -386,7 +434,8 @@ int pegasus_write_service::batch_remove(int64_t decree,
{
CHECK_GT_MSG(_batch_start_time, 0, "batch_remove must be called after
batch_prepare");
- ++_remove_batch_size;
+ ++remove_batch_size();
+
const int err = _impl->batch_remove(decree, key, resp);
if (_server->is_primary()) {
@@ -418,28 +467,43 @@ void pegasus_write_service::set_default_ttl(uint32_t ttl)
{ _impl->set_default_t
void pegasus_write_service::batch_finish()
{
-#define UPDATE_WRITE_BATCH_METRICS(op)
\
+#define UPDATE_BATCH_METRICS(op, nrequests)
\
do {
\
- METRIC_VAR_INCREMENT_BY(op##_requests,
static_cast<int64_t>(_##op##_batch_size)); \
- METRIC_VAR_SET(op##_latency_ns,
static_cast<size_t>(_##op##_batch_size), latency_ns); \
- _##op##_batch_size = 0;
\
+ const auto __nrequests = (nrequests);
\
+ METRIC_VAR_INCREMENT_BY(op##_requests,
static_cast<int64_t>(__nrequests)); \
+ METRIC_VAR_SET(op##_latency_ns, static_cast<size_t>(__nrequests),
latency_ns); \
} while (0)
- auto latency_ns = static_cast<int64_t>(dsn_now_ns() - _batch_start_time);
+#define UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE(op)
\
+ UPDATE_BATCH_METRICS(op, batch_size(batch_write_type::op))
+
+ const auto latency_ns = static_cast<int64_t>(dsn_now_ns() -
_batch_start_time);
// Take the latency of executing the entire batch as the latency for
processing each
// request within it, since the latency of each request could not be known.
- UPDATE_WRITE_BATCH_METRICS(put);
- UPDATE_WRITE_BATCH_METRICS(remove);
-
- // These put requests are translated from atomic requests. See comments in
batch_put()
- // for the two possible situations where we are now.
- UPDATE_WRITE_BATCH_METRICS(incr);
- UPDATE_WRITE_BATCH_METRICS(check_and_set);
+ UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE(put);
+ UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE(remove);
+
+ // These idempotent updates are translated from the atomic write requests.
See comments
+ // in batch_put() for the two possible situations where we are now.
+ //
+ // A batch only contains one update translated from an incr or
check_and_set request.
+ // However, a batch may contain multiple updates translated from a
check_and_mutate.
+ // Therefore, we need to measure the number of check_and_mutate requests:
+ // - 1 if the number of idempotent updates is at least 1;
+ // - 0 otherwise.
+ UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE(incr);
+ UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE(check_and_set);
+ UPDATE_BATCH_METRICS(check_and_mutate,
+ std::min(1U,
batch_size(batch_write_type::check_and_mutate)));
+
+ // Reset _batch_sizes for next calculation.
+ _batch_sizes.fill(0);
_batch_start_time = 0;
-#undef UPDATE_WRITE_BATCH_METRICS
+#undef UPDATE_BATCH_METRICS_FOR_SINGLE_WRITE
+#undef UPDATE_BATCH_METRICS
}
int pegasus_write_service::duplicate(int64_t decree,
@@ -551,5 +615,4 @@ int pegasus_write_service::ingest_files(int64_t decree,
return rocksdb::Status::kOk;
}
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
diff --git a/src/server/pegasus_write_service.h
b/src/server/pegasus_write_service.h
index 3939a6a6e..c1bfd5b11 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -19,8 +19,11 @@
#pragma once
+#include <array>
+#include <cstddef>
#include <cstdint>
#include <memory>
+#include <vector>
#include "replica/replica_base.h"
#include "utils/metrics.h"
@@ -49,8 +52,7 @@ class ingestion_response;
} // namespace replication
} // namespace dsn
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
// The context of an mutation to the database.
struct db_write_context
@@ -140,12 +142,13 @@ public:
// replicas.
int make_idempotent(const dsn::apps::incr_request &req,
dsn::apps::incr_response &err_resp,
- dsn::apps::update_request &update);
+ std::vector<dsn::apps::update_request> &updates);
// Write an idempotent INCR record (i.e. a PUT record) and reply to the
client with INCR
// response. Only called by primary replicas.
int put(const db_write_context &ctx,
- const dsn::apps::update_request &update,
+ const std::vector<dsn::apps::update_request> &updates,
+ const dsn::apps::incr_request &req,
dsn::apps::incr_response &resp);
// Write a non-idempotent INCR record.
@@ -155,12 +158,12 @@ public:
// primary replicas.
int make_idempotent(const dsn::apps::check_and_set_request &req,
dsn::apps::check_and_set_response &err_resp,
- dsn::apps::update_request &update);
+ std::vector<dsn::apps::update_request> &updates);
// Write an idempotent CHECK_AND_SET record (i.e. a PUT record) and reply
to the client
// with CHECK_AND_SET response. Only called by primary replicas.
int put(const db_write_context &ctx,
- const dsn::apps::update_request &update,
+ const std::vector<dsn::apps::update_request> &updates,
const dsn::apps::check_and_set_request &req,
dsn::apps::check_and_set_response &resp);
@@ -169,6 +172,20 @@ public:
const dsn::apps::check_and_set_request &update,
dsn::apps::check_and_set_response &resp);
+ // Translate a CHECK_AND_MUTATE request into multiple idempotent PUT
requests, which are
+ // shared by both single-put and single-remove operations. Only called by
primary replicas.
+ int make_idempotent(const dsn::apps::check_and_mutate_request &req,
+ dsn::apps::check_and_mutate_response &err_resp,
+ std::vector<dsn::apps::update_request> &updates);
+
+ // Write an idempotent CHECK_AND_MUTATE record (i.e. batched PUT records
including both
+ // single-put and single-remove operations) and reply to the client with
CHECK_AND_MUTATE
+ // response. Only called by primary replicas.
+ int put(const db_write_context &ctx,
+ const std::vector<dsn::apps::update_request> &updates,
+ const dsn::apps::check_and_mutate_request &req,
+ dsn::apps::check_and_mutate_response &resp);
+
// Write CHECK_AND_MUTATE record.
int check_and_mutate(int64_t decree,
const dsn::apps::check_and_mutate_request &update,
@@ -217,6 +234,28 @@ private:
// Finish batch write with metrics such as latencies calculated and some
states cleared.
void batch_finish();
+ // Used to store the batch size for each type of write into an array, see
comments for
+ // `_batch_sizes` for details.
+ enum class batch_write_type : uint32_t
+ {
+ put = 0,
+ remove,
+ incr,
+ check_and_set,
+ check_and_mutate,
+ COUNT,
+ };
+
+ // Read/change the batch size of the writes with the given type.
+ uint32_t &batch_size(batch_write_type type)
+ {
+ return _batch_sizes.at(static_cast<uint32_t>(type));
+ }
+
+ uint32_t &put_batch_size() { return batch_size(batch_write_type::put); }
+
+ uint32_t &remove_batch_size() { return
batch_size(batch_write_type::remove); }
+
friend class pegasus_write_service_test;
friend class PegasusWriteServiceImplTest;
friend class pegasus_server_write_test;
@@ -232,6 +271,21 @@ private:
capacity_unit_calculator *_cu_calculator;
+ // To calculate the metrics such as the number of requests and latency for
the writes
+ // allowed to be batched, measure the size of requests in batch applied
into RocksDB
+ // for single put, single remove, incr, check_and_set and
check_and_mutate, all of which
+ // are contained in batch_write_type. In fact, incr, check_and_set and
check_and_mutate
+ // are not batched; the reason why they are contained in batch_write_type
is because
+ // all of them are not actually incr, check_and_set, or check_and_mutate
themselves, but
+ // rather single put operations that have been transformed from these
operations to be
+ // idempotent. Therefore, they essentially all appear in the form of
single puts with an
+ // extra field indicating what the original request is.
+ //
+ // Each request of single put, single remove, incr and check_and_set
contains only one
+ // write operation, while check_and_mutate may contain multiple operations
of single
+ // puts and removes.
+ std::array<uint32_t, static_cast<size_t>(batch_write_type::COUNT)>
_batch_sizes{};
+
METRIC_VAR_DECLARE_counter(put_requests);
METRIC_VAR_DECLARE_counter(multi_put_requests);
METRIC_VAR_DECLARE_counter(remove_requests);
@@ -242,6 +296,7 @@ private:
METRIC_VAR_DECLARE_percentile_int64(make_incr_idempotent_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(make_check_and_set_idempotent_latency_ns);
+
METRIC_VAR_DECLARE_percentile_int64(make_check_and_mutate_idempotent_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(put_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(multi_put_latency_ns);
@@ -255,22 +310,7 @@ private:
METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
METRIC_VAR_DECLARE_counter(dup_lagging_writes);
- // Measure the size of single-put requests in batch applied into RocksDB
for metrics.
- uint32_t _put_batch_size;
-
- // Measure the size of single-remove requests in batch applied into
RocksDB for metrics.
- uint32_t _remove_batch_size;
-
- // Measure the size of incr requests (with each translated into an
idempotent put request)
- // in batch applied into RocksDB for metrics.
- uint32_t _incr_batch_size;
-
- // Measure the size of check_and_set requests (with each translated into
an idempotent put
- // request) in batch applied into RocksDB for metrics.
- uint32_t _check_and_set_batch_size;
-
// TODO(wutao1): add metrics for failed rpc.
};
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index 1c7c859d6..e8de292a6 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -34,8 +34,7 @@
#include "utils/string_conv.h"
#include "utils/strings.h"
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
/// internal error codes used for fail injection
// TODO(yingchun): Use real rocksdb::Status::code.
@@ -260,7 +259,7 @@ public:
update.key,
update.value);
- return resp.error;
+ return rocksdb::Status::kOk;
}
int incr(int64_t decree, const dsn::apps::incr_request &update,
dsn::apps::incr_response &resp)
@@ -532,12 +531,13 @@ public:
return rocksdb::Status::kOk;
}
- // Used to call make_idempotent() for incr and check_and_set to get the
idempotent single-put
- // request which is stored as the unique element of `updates`.
+ // Calling make_idempotent() on either an incr or check_and_set operation
produces an
+ // idempotent single-put request, which is also the only element in the
output array
+ // `updates`.
//
// This interface is provided to ensure consistency between the
make_idempotent() interfaces
- // of incr/check_and_set operations and that of check_and_mutate (both
using std::vector for
- // `updates`), thereby facilitating uniform templated function invocation.
+ // of incr/check_and_set operations and that of check_and_mutate (both
using std::vector
+ // for `updates`), thereby facilitating uniform templated function
invocation.
template <typename TRequest, typename TResponse>
inline int make_idempotent(const TRequest &req,
TResponse &err_resp,
@@ -548,6 +548,22 @@ public:
return make_idempotent(req, err_resp, updates.front());
}
+ // Apply `updates` into storage engine. Actually `updates` contains only
one element --
+ // an idempotent single-put request translated from either an incr or
check_and_set
+ // operation.
+ //
+ // This interface is provided to ensure consistency between the put()
interfaces of
+ // incr/check_and_set operations and that of check_and_mutate (both using
std::vector
+ // for `updates`), thereby facilitating uniform templated function
invocation.
+ template <typename TResponse>
+ int put(const db_write_context &ctx,
+ const std::vector<dsn::apps::update_request> &updates,
+ TResponse &resp)
+ {
+ CHECK_EQ(updates.size(), 1);
+ return put(ctx, updates.front(), resp);
+ }
+
// Tranlate a check_and_mutate request into multiple single-put and
single-remove requests
// which are certainly idempotent. Return current status for RocksDB. Only
called by primary
// replicas.
@@ -1278,5 +1294,4 @@ private:
std::vector<dsn::apps::update_response *> _update_responses;
};
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
diff --git a/src/server/test/pegasus_server_write_test.cpp
b/src/server/test/pegasus_server_write_test.cpp
index f9ab7b414..cfb1a0ebd 100644
--- a/src/server/test/pegasus_server_write_test.cpp
+++ b/src/server/test/pegasus_server_write_test.cpp
@@ -75,23 +75,23 @@ public:
req.key = key;
req.value.assign("value", 0, 5);
- int put_rpc_cnt = dsn::rand::next_u32(1, 10);
- int remove_rpc_cnt = dsn::rand::next_u32(1, 10);
- int total_rpc_cnt = put_rpc_cnt + remove_rpc_cnt;
+ const uint32_t put_rpc_cnt = dsn::rand::next_u32(1, 10);
+ const uint32_t remove_rpc_cnt = dsn::rand::next_u32(1, 10);
+ const uint32_t total_rpc_cnt = put_rpc_cnt + remove_rpc_cnt;
/**
* writes[0] ~ writes[total_rpc_cnt-1] will be released by
their corresponding
* rpc_holders, which created in on_batched_write_requests. So
we don't need to
* release them here
**/
dsn::message_ex *writes[total_rpc_cnt];
- for (int i = 0; i < put_rpc_cnt; i++) {
+ for (uint32_t i = 0; i < put_rpc_cnt; i++) {
writes[i] = pegasus::create_put_request(req);
}
- for (int i = put_rpc_cnt; i < total_rpc_cnt; i++) {
+ for (uint32_t i = put_rpc_cnt; i < total_rpc_cnt; i++) {
writes[i] = pegasus::create_remove_request(key);
}
- int err = _server_write->on_batched_write_requests(
+ const int err = _server_write->on_batched_write_requests(
writes, total_rpc_cnt, decree, 0, nullptr);
switch (err) {
case FAIL_DB_WRITE_BATCH_PUT:
@@ -106,8 +106,8 @@ public:
// make sure everything is cleanup after batch write.
ASSERT_TRUE(_server_write->_put_rpc_batch.empty());
ASSERT_TRUE(_server_write->_remove_rpc_batch.empty());
- ASSERT_EQ(_server_write->_write_svc->_put_batch_size, 0);
- ASSERT_EQ(_server_write->_write_svc->_remove_batch_size, 0);
+ ASSERT_EQ(_server_write->_write_svc->put_batch_size(), 0);
+ ASSERT_EQ(_server_write->_write_svc->remove_batch_size(), 0);
ASSERT_EQ(_server_write->_write_svc->_batch_start_time, 0);
ASSERT_EQ(_server_write->_write_svc->_impl->_rocksdb_wrapper->_write_batch->Count(),
0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]