This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 5b229e53d feat(make_idempotent): support making `check_and_mutate`
request idempotent in `pegasus_write_service::impl` (#2246)
5b229e53d is described below
commit 5b229e53d1e2dccb411dc7e5816ccdff6d4cc19f
Author: Dan Wang <[email protected]>
AuthorDate: Mon May 12 12:16:33 2025 +0800
feat(make_idempotent): support making `check_and_mutate` request idempotent
in `pegasus_write_service::impl` (#2246)
https://github.com/apache/incubator-pegasus/issues/2197
Implement two APIs in `pegasus_write_service::impl` to make
`check_and_mutate`
requests idempotent.
Unlike `incr` and `check_and_set`, a `check_and_mutate` request may be
translated
into multiple `put` and `remove` requests. To conveniently store both in
the same
`std::vector`, they are all converted into `update_request` objects, with
the `type`
field used to distinguish between them:
- implement `make_idempotent()` function to translate a `check_and_mutate`
request
into multiple single-put and single-remove requests which are naturally
idempotent,
and process the possible errors during the translation, e.g. failed to read
the check
value or did not pass the check;
- implement `put()` function to apply multiple single-put and single-remove
requests
into the RocksDB instance (certainly the atomicity of the batch operations
must be
guaranteed) and make response for `check_and_mutate`.
---
idl/rrdb.thrift | 39 ++++--
src/base/idl_utils.h | 1 +
src/server/pegasus_write_service_impl.h | 211 +++++++++++++++++++++++++++++---
src/server/rocksdb_wrapper.cpp | 11 ++
src/server/rocksdb_wrapper.h | 3 +
5 files changed, 238 insertions(+), 27 deletions(-)
diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift
index 56f5899fc..c318b31f1 100644
--- a/idl/rrdb.thrift
+++ b/idl/rrdb.thrift
@@ -72,28 +72,40 @@ enum update_type
{
UT_PUT,
UT_INCR,
- UT_CHECK_AND_SET
+ UT_CHECK_AND_SET,
+ UT_CHECK_AND_MUTATE_PUT,
+ UT_CHECK_AND_MUTATE_REMOVE
}
-// The single-put request, just writes a key/value pair into storage, which is
certainly
+// The single-update request, just applies a key/value pair into storage,
which is certainly
// idempotent.
struct update_request
{
+ // Once `type` is UT_CHECK_AND_MUTATE_REMOVE, only `key` would be used (as
one of the
+ // composite keys from check_and_mutate_request that should be deleted),
while `value`
+ // and `expire_ts_seconds` would be ignored.
1:dsn.blob key;
2:dsn.blob value;
3:i32 expire_ts_seconds;
- // This field marks the type of a single-put request, mainly used to
differentiate a general
- // single-put request from the one translated from a non-idempotent atomic
write request:
+ // This field marks the type of a single-update request, mainly used to
differentiate a
+ // general single-put request from the one translated from an atomic write
request which
+ // may be:
// - a general single-put request, if `type` is UT_PUT or not set by
default as it's
// optional, or
// - a put request translated from an incr request, if `type` is UT_INCR,
or
- // - a put request translated from a check_and_set request, if `type` is
UT_CHECK_AND_SET.
+ // - a put request translated from a check_and_set request, if `type` is
UT_CHECK_AND_SET,
+ // or
+ // - a put request translated from a mutate of MO_PUT in a
check_and_mutate request, if
+ // `type` is UT_CHECK_AND_MUTATE_PUT, or
+ // - a remove request translated from a mutate of MO_DELETE in a
check_and_mutate request,
+ // if `type` is UT_CHECK_AND_MUTATE_REMOVE.
4:optional update_type type;
- // Following 3 fields are only available while type = UT_CHECK_AND_SET,
used to build
- // check_and_set_response to reply to the client, once this put request is
translated
- // from the non-idempotent check_and_set_request.
+ // Following 3 fields are only available while `type` is UT_CHECK_AND_SET,
UT_CHECK_AND_MUTATE_PUT
+ // or UT_CHECK_AND_MUTATE_REMOVE, used to build check_and_set_response or
check_and_mutate_response
+ // to reply to the client, once this put request is translated from
check_and_set_request or
+ // check_and_mutate_request.
5:optional bool check_value_returned;
6:optional bool check_value_exist; // Used only if
check_value_returned is true.
7:optional dsn.blob check_value; // Used only if both check_value_returned
and
@@ -285,12 +297,13 @@ struct check_and_mutate_request
struct check_and_mutate_response
{
- 1:i32 error; // return kTryAgain if check not passed.
- // return kInvalidArgument if check type is int
compare and
- // check_operand/check_value is not integer or out
of range.
+ 1:i32 error; // Return kTryAgain if check not passed.
+ // Return kInvalidArgument if check_type is
comparing integers and
+ // check_value/check_operand is not a valid
integer or out of range.
2:bool check_value_returned;
- 3:bool check_value_exist; // used only if check_value_returned
is true
- 4:dsn.blob check_value; // used only if check_value_returned and
check_value_exist is true
+ 3:bool check_value_exist; // Used only if check_value_returned
is true.
+ 4:dsn.blob check_value; // Used only if both check_value_returned and
+ // check_value_exist are true.
5:i32 app_id;
6:i32 partition_index;
7:i64 decree;
diff --git a/src/base/idl_utils.h b/src/base/idl_utils.h
index 88324929b..0852c12e8 100644
--- a/src/base/idl_utils.h
+++ b/src/base/idl_utils.h
@@ -48,5 +48,6 @@ namespace apps {
USER_DEFINED_ENUM_FORMATTER(cas_check_type::type)
USER_DEFINED_ENUM_FORMATTER(filter_type::type)
USER_DEFINED_ENUM_FORMATTER(mutate_operation::type)
+USER_DEFINED_ENUM_FORMATTER(update_type::type)
} // namespace apps
} // namespace dsn
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index a51de9338..1c7c859d6 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -176,7 +176,7 @@ public:
{
// Get current raw value for the provided key from the RocksDB
instance.
db_get_context get_ctx;
- const int err = _rocksdb_wrapper->get(req.key.to_string_view(),
&get_ctx);
+ const int err = _rocksdb_wrapper->get(req.key, &get_ctx);
if (dsn_unlikely(err != rocksdb::Status::kOk)) {
// Failed to read current raw value.
LOG_ERROR_PREFIX("failed to get current raw value for incr while
making "
@@ -239,13 +239,9 @@ public:
const dsn::apps::update_request &update,
dsn::apps::incr_response &resp)
{
- const auto pid = get_gpid();
- resp.app_id = pid.get_app_id();
- resp.partition_index = pid.get_partition_index();
- resp.decree = ctx.decree;
- resp.server = _primary_host_port;
+ make_basic_response(ctx.decree, resp);
- auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
+ const auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
resp.error = _rocksdb_wrapper->write_batch_put_ctx(
ctx, update.key, update.value, update.expire_ts_seconds);
@@ -369,7 +365,7 @@ public:
// Get the check value.
db_get_context get_ctx;
- const int err = _rocksdb_wrapper->get(check_key.to_string_view(),
&get_ctx);
+ const int err = _rocksdb_wrapper->get(check_key, &get_ctx);
if (dsn_unlikely(err != rocksdb::Status::kOk)) {
// Failed to read the check value.
LOG_ERROR_PREFIX("failed to get the check value for check_and_set
while making "
@@ -422,17 +418,13 @@ public:
const dsn::apps::update_request &update,
dsn::apps::check_and_set_response &resp)
{
- const auto pid = get_gpid();
- resp.app_id = pid.get_app_id();
- resp.partition_index = pid.get_partition_index();
- resp.decree = ctx.decree;
- resp.server = _primary_host_port;
+ make_basic_response(ctx.decree, resp);
// Copy check_value's fields from the single-put request to the
check_and_set
// response to reply to the client.
copy_check_value(update, resp);
- auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
+ const auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
resp.error = _rocksdb_wrapper->write_batch_put_ctx(
ctx, update.key, update.value, update.expire_ts_seconds);
@@ -540,6 +532,165 @@ public:
return rocksdb::Status::kOk;
}
+ // Used to call make_idempotent() for incr and check_and_set to get the
idempotent single-put
+ // request which is stored as the unique element of `updates`.
+ //
+ // This interface is provided to ensure consistency between the
make_idempotent() interfaces
+ // of incr/check_and_set operations and that of check_and_mutate (both
using std::vector for
+ // `updates`), thereby facilitating uniform templated function invocation.
+ template <typename TRequest, typename TResponse>
+ inline int make_idempotent(const TRequest &req,
+ TResponse &err_resp,
+ std::vector<dsn::apps::update_request> &updates)
+ {
+ updates.clear();
+ updates.emplace_back();
+ return make_idempotent(req, err_resp, updates.front());
+ }
+
+ // Tranlate a check_and_mutate request into multiple single-put and
single-remove requests
+ // which are certainly idempotent. Return current status for RocksDB. Only
called by primary
+ // replicas.
+ int make_idempotent(const dsn::apps::check_and_mutate_request &req,
+ dsn::apps::check_and_mutate_response &err_resp,
+ std::vector<dsn::apps::update_request> &updates)
+ {
+ if (dsn_unlikely(req.mutate_list.empty())) {
+ LOG_ERROR_PREFIX("mutate_list is empty for check_and_mutate while
making idempotent");
+
+ return make_error_response(rocksdb::Status::kInvalidArgument,
err_resp);
+ }
+
+ // Verify operation type for each mutate.
+ for (size_t i = 0; i < req.mutate_list.size(); ++i) {
+ const auto &mu = req.mutate_list[i];
+ if (dsn_likely(mu.operation == dsn::apps::mutate_operation::MO_PUT
||
+ mu.operation ==
dsn::apps::mutate_operation::MO_DELETE)) {
+ continue;
+ }
+
+ LOG_ERROR_PREFIX("mutate_list[{}]'s operation {} is invalid for
check_and_mutate "
+ "while making idempotent",
+ i,
+ mu.operation);
+
+ return make_error_response(rocksdb::Status::kInvalidArgument,
err_resp);
+ }
+
+ if (dsn_unlikely(!is_check_type_supported(req.check_type))) {
+ LOG_ERROR_PREFIX("check type {} is not supported for
check_and_mutate ",
+ "while making idempotent",
+ cas_check_type_to_string(req.check_type));
+
+ return make_error_response(rocksdb::Status::kInvalidArgument,
err_resp);
+ }
+
+ dsn::blob check_key;
+ pegasus_generate_key(check_key, req.hash_key, req.check_sort_key);
+
+ // Get the check value.
+ db_get_context get_ctx;
+ const int err = _rocksdb_wrapper->get(check_key, &get_ctx);
+ if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+ // Failed to read the check value.
+ LOG_ERROR_PREFIX("failed to get the check value for
check_and_mutate while making "
+ "idempotent: rocksdb_status = {}, hash_key = {}, "
+ "check_sort_key = {}",
+ err,
+ utils::c_escape_sensitive_string(req.hash_key),
+
utils::c_escape_sensitive_string(req.check_sort_key));
+
+ return make_error_response(err, err_resp);
+ }
+
+ dsn::blob check_value;
+ const bool value_exist = !get_ctx.expired && get_ctx.found;
+ if (value_exist) {
+ pegasus_extract_user_data(
+ _pegasus_data_version, std::move(get_ctx.raw_value),
check_value);
+ }
+
+ bool invalid_argument = false;
+ const bool passed = validate_check(
+ req.check_type, req.check_operand, value_exist, check_value,
invalid_argument);
+ if (!passed) {
+ make_check_value(req, value_exist, check_value, err_resp);
+ return make_error_response(invalid_argument ?
rocksdb::Status::kInvalidArgument
+ :
rocksdb::Status::kTryAgain,
+ err_resp);
+ }
+
+ // Check passed.
+ updates.clear();
+ for (const auto &mu : req.mutate_list) {
+ // Generate new RocksDB key.
+ dsn::blob set_key;
+ pegasus_generate_key(set_key, req.hash_key, mu.sort_key);
+
+ // Add a new put request.
+ updates.emplace_back();
+
+ if (mu.operation == dsn::apps::mutate_operation::MO_PUT) {
+ make_idempotent_request_for_check_and_mutate_put(
+ set_key, mu.value, mu.set_expire_ts_seconds,
updates.back());
+ continue;
+ }
+
+ if (mu.operation == dsn::apps::mutate_operation::MO_DELETE) {
+ make_idempotent_request_for_check_and_mutate_remove(set_key,
updates.back());
+ continue;
+ }
+
+ // It must have returned and replied to the client once there is
some invalid
+ // mutate_operation. Here is just a defensive assertion.
+ LOG_FATAL("invalid mutate_operation {} for check_and_mutate while
making idempotent",
+ mu.operation);
+ __builtin_unreachable();
+ }
+
+ // Add check value to the first generated idempotent request, for the
future response to
+ // the client.
+ make_check_value(req, value_exist, check_value, updates.front());
+
+ return rocksdb::Status::kOk;
+ }
+
+ // Apply the single-put and single-remove requests translated from a
check_and_mutate request
+ // into RocksDB, and build response for the check_and_mutate request.
Return current status
+ // for RocksDB. Only called by primary replicas.
+ int put(const db_write_context &ctx,
+ const std::vector<dsn::apps::update_request> &updates,
+ dsn::apps::check_and_mutate_response &resp)
+ {
+ make_basic_response(ctx.decree, resp);
+
+ // Copy check_value's fields from the first idempotent request to the
check_and_mutate
+ // response to reply to the client.
+ copy_check_value(updates.front(), resp);
+
+ const auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
+
+ for (const auto &update : updates) {
+ if (update.type ==
dsn::apps::update_type::UT_CHECK_AND_MUTATE_PUT) {
+ resp.error = _rocksdb_wrapper->write_batch_put_ctx(
+ ctx, update.key, update.value, update.expire_ts_seconds);
+ } else if (update.type ==
dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE) {
+ resp.error = _rocksdb_wrapper->write_batch_delete(ctx.decree,
update.key);
+ } else {
+ LOG_FATAL("invalid update_type for check_and_mutate {} while
making idempotent",
+ update.type);
+ __builtin_unreachable();
+ }
+
+ if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
+ return resp.error;
+ }
+ }
+
+ resp.error = _rocksdb_wrapper->write(ctx.decree);
+ return resp.error;
+ }
+
int check_and_mutate(int64_t decree,
const dsn::apps::check_and_mutate_request &update,
dsn::apps::check_and_mutate_response &resp)
@@ -890,6 +1041,38 @@ private:
return rocksdb::Status::kOk;
}
+ // Build a single-put `update` for a mutate of MO_PUT in a
check_and_mutate request based
+ // on `key`, `value` and `expire_ts_seconds`.
+ static inline void
+ make_idempotent_request_for_check_and_mutate_put(const dsn::blob &key,
+ const dsn::blob &value,
+ int32_t expire_ts_seconds,
+ dsn::apps::update_request
&update)
+ {
+ make_idempotent_request(
+ key, value, expire_ts_seconds,
dsn::apps::update_type::UT_CHECK_AND_MUTATE_PUT, update);
+ }
+
+ // Build a single-remove `update` for a mutate of MO_DELETE in a
check_and_mutate request
+ // based on `key`.
+ static inline void
+ make_idempotent_request_for_check_and_mutate_remove(const dsn::blob &key,
+
dsn::apps::update_request &update)
+ {
+ make_idempotent_request(key,
dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE, update);
+ }
+
+ // Build response `resp` with basic info.
+ template <typename TResponse>
+ inline void make_basic_response(int64_t decree, TResponse &resp)
+ {
+ const auto pid = get_gpid();
+ resp.app_id = pid.get_app_id();
+ resp.partition_index = pid.get_partition_index();
+ resp.decree = decree;
+ resp.server = _primary_host_port;
+ }
+
// Build response `resp` based on `err` only for the error case (i.e. the
current status
// `err` for RocksDB is not rocksdb::Status::kOk). Return `err`.
//
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index feba097cd..0206c2236 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -112,6 +112,12 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/
db_get_context *ctx)
return s.code();
}
+int rocksdb_wrapper::get(const dsn::blob &raw_key,
+ /*out*/ db_get_context *ctx)
+{
+ return get(raw_key.to_string_view(), ctx);
+}
+
int rocksdb_wrapper::write_batch_put(int64_t decree,
std::string_view raw_key,
std::string_view value,
@@ -232,6 +238,11 @@ int rocksdb_wrapper::write_batch_delete(int64_t decree,
std::string_view raw_key
return s.code();
}
+int rocksdb_wrapper::write_batch_delete(int64_t decree, const dsn::blob
&raw_key)
+{
+ return write_batch_delete(decree, raw_key.to_string_view());
+}
+
void rocksdb_wrapper::clear_up_write_batch() { _write_batch->Clear(); }
int rocksdb_wrapper::ingest_files(int64_t decree,
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index feaefd70d..75bb9f92c 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -59,6 +59,8 @@ public:
/// \result ctx.expired=true if record expired. Still rocksdb::Status::kOk
is returned.
/// \result ctx.found=false if record is not found. Still
rocksdb::Status::kOk is returned.
int get(std::string_view raw_key, /*out*/ db_get_context *ctx);
+ int get(const dsn::blob &raw_key,
+ /*out*/ db_get_context *ctx);
int write_batch_put(int64_t decree,
std::string_view raw_key,
@@ -74,6 +76,7 @@ public:
int32_t expire_sec);
int write(int64_t decree);
int write_batch_delete(int64_t decree, std::string_view raw_key);
+ int write_batch_delete(int64_t decree, const dsn::blob &raw_key);
void clear_up_write_batch();
int ingest_files(int64_t decree,
const std::vector<std::string> &sst_file_list,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]