This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b229e53d feat(make_idempotent): support making `check_and_mutate` 
request idempotent in `pegasus_write_service::impl` (#2246)
5b229e53d is described below

commit 5b229e53d1e2dccb411dc7e5816ccdff6d4cc19f
Author: Dan Wang <[email protected]>
AuthorDate: Mon May 12 12:16:33 2025 +0800

    feat(make_idempotent): support making `check_and_mutate` request idempotent 
in `pegasus_write_service::impl` (#2246)
    
    https://github.com/apache/incubator-pegasus/issues/2197
    
    Implement two APIs in `pegasus_write_service::impl` to make 
`check_and_mutate`
    requests idempotent.
    
    Unlike `incr` and `check_and_set`, a `check_and_mutate` request may be 
translated
    into multiple `put` and `remove` requests. To conveniently store both in 
the same
    `std::vector`, they are all converted into `update_request` objects, with 
the `type`
    field used to distinguish between them:
    
    - implement `make_idempotent()` function to translate a `check_and_mutate` 
request
    into multiple single-put and single-remove requests which are naturally 
idempotent,
    and process the possible errors during the translation, e.g. failed to read 
the check
    value or did not pass the check;
    - implement `put()` function to apply multiple single-put and single-remove 
requests
    into the RocksDB instance (certainly the atomicity of the batch operations 
must be
    guaranteed) and make response for `check_and_mutate`.
---
 idl/rrdb.thrift                         |  39 ++++--
 src/base/idl_utils.h                    |   1 +
 src/server/pegasus_write_service_impl.h | 211 +++++++++++++++++++++++++++++---
 src/server/rocksdb_wrapper.cpp          |  11 ++
 src/server/rocksdb_wrapper.h            |   3 +
 5 files changed, 238 insertions(+), 27 deletions(-)

diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift
index 56f5899fc..c318b31f1 100644
--- a/idl/rrdb.thrift
+++ b/idl/rrdb.thrift
@@ -72,28 +72,40 @@ enum update_type
 {
     UT_PUT,
     UT_INCR,
-    UT_CHECK_AND_SET
+    UT_CHECK_AND_SET,
+    UT_CHECK_AND_MUTATE_PUT,
+    UT_CHECK_AND_MUTATE_REMOVE
 }
 
-// The single-put request, just writes a key/value pair into storage, which is 
certainly
+// The single-update request, just applies a key/value pair into storage, 
which is certainly
 // idempotent.
 struct update_request
 {
+    // Once `type` is UT_CHECK_AND_MUTATE_REMOVE, only `key` would be used (as 
one of the
+    // composite keys from check_and_mutate_request that should be deleted), 
while `value`
+    // and `expire_ts_seconds` would be ignored.
     1:dsn.blob      key;
     2:dsn.blob      value;
     3:i32           expire_ts_seconds;
 
-    // This field marks the type of a single-put request, mainly used to 
differentiate a general
-    // single-put request from the one translated from a non-idempotent atomic 
write request:
+    // This field marks the type of a single-update request, mainly used to 
differentiate a
+    // general single-put request from the one translated from an atomic write 
request which
+    // may be:
     // - a general single-put request, if `type` is UT_PUT or not set by 
default as it's
     // optional, or
     // - a put request translated from an incr request, if `type` is UT_INCR, 
or
-    // - a put request translated from a check_and_set request, if `type` is 
UT_CHECK_AND_SET.
+    // - a put request translated from a check_and_set request, if `type` is 
UT_CHECK_AND_SET,
+    // or
+    // - a put request translated from a mutate of MO_PUT in a 
check_and_mutate request, if
+    // `type` is UT_CHECK_AND_MUTATE_PUT, or
+    // - a remove request translated from a mutate of MO_DELETE in a 
check_and_mutate request,
+    // if `type` is UT_CHECK_AND_MUTATE_REMOVE.
     4:optional update_type type;
 
-    // Following 3 fields are only available while type = UT_CHECK_AND_SET, 
used to build
-    // check_and_set_response to reply to the client, once this put request is 
translated
-    // from the non-idempotent check_and_set_request.
+    // Following 3 fields are only available while `type` is UT_CHECK_AND_SET, 
UT_CHECK_AND_MUTATE_PUT
+    // or UT_CHECK_AND_MUTATE_REMOVE, used to build check_and_set_response or 
check_and_mutate_response
+    // to reply to the client, once this put request is translated from 
check_and_set_request or
+    // check_and_mutate_request.
     5:optional bool     check_value_returned;
     6:optional bool     check_value_exist; // Used only if 
check_value_returned is true.
     7:optional dsn.blob check_value; // Used only if both check_value_returned 
and
@@ -285,12 +297,13 @@ struct check_and_mutate_request
 
 struct check_and_mutate_response
 {
-    1:i32            error; // return kTryAgain if check not passed.
-                            // return kInvalidArgument if check type is int 
compare and
-                            // check_operand/check_value is not integer or out 
of range.
+    1:i32            error; // Return kTryAgain if check not passed.
+                            // Return kInvalidArgument if check_type is 
comparing integers and
+                            // check_value/check_operand is not a valid 
integer or out of range.
     2:bool           check_value_returned;
-    3:bool           check_value_exist; // used only if check_value_returned 
is true
-    4:dsn.blob       check_value; // used only if check_value_returned and 
check_value_exist is true
+    3:bool           check_value_exist; // Used only if check_value_returned 
is true.
+    4:dsn.blob       check_value; // Used only if both check_value_returned and
+                                  // check_value_exist are true.
     5:i32            app_id;
     6:i32            partition_index;
     7:i64            decree;
diff --git a/src/base/idl_utils.h b/src/base/idl_utils.h
index 88324929b..0852c12e8 100644
--- a/src/base/idl_utils.h
+++ b/src/base/idl_utils.h
@@ -48,5 +48,6 @@ namespace apps {
 USER_DEFINED_ENUM_FORMATTER(cas_check_type::type)
 USER_DEFINED_ENUM_FORMATTER(filter_type::type)
 USER_DEFINED_ENUM_FORMATTER(mutate_operation::type)
+USER_DEFINED_ENUM_FORMATTER(update_type::type)
 } // namespace apps
 } // namespace dsn
diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index a51de9338..1c7c859d6 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -176,7 +176,7 @@ public:
     {
         // Get current raw value for the provided key from the RocksDB 
instance.
         db_get_context get_ctx;
-        const int err = _rocksdb_wrapper->get(req.key.to_string_view(), 
&get_ctx);
+        const int err = _rocksdb_wrapper->get(req.key, &get_ctx);
         if (dsn_unlikely(err != rocksdb::Status::kOk)) {
             // Failed to read current raw value.
             LOG_ERROR_PREFIX("failed to get current raw value for incr while 
making "
@@ -239,13 +239,9 @@ public:
             const dsn::apps::update_request &update,
             dsn::apps::incr_response &resp)
     {
-        const auto pid = get_gpid();
-        resp.app_id = pid.get_app_id();
-        resp.partition_index = pid.get_partition_index();
-        resp.decree = ctx.decree;
-        resp.server = _primary_host_port;
+        make_basic_response(ctx.decree, resp);
 
-        auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
+        const auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
 
         resp.error = _rocksdb_wrapper->write_batch_put_ctx(
             ctx, update.key, update.value, update.expire_ts_seconds);
@@ -369,7 +365,7 @@ public:
 
         // Get the check value.
         db_get_context get_ctx;
-        const int err = _rocksdb_wrapper->get(check_key.to_string_view(), 
&get_ctx);
+        const int err = _rocksdb_wrapper->get(check_key, &get_ctx);
         if (dsn_unlikely(err != rocksdb::Status::kOk)) {
             // Failed to read the check value.
             LOG_ERROR_PREFIX("failed to get the check value for check_and_set 
while making "
@@ -422,17 +418,13 @@ public:
             const dsn::apps::update_request &update,
             dsn::apps::check_and_set_response &resp)
     {
-        const auto pid = get_gpid();
-        resp.app_id = pid.get_app_id();
-        resp.partition_index = pid.get_partition_index();
-        resp.decree = ctx.decree;
-        resp.server = _primary_host_port;
+        make_basic_response(ctx.decree, resp);
 
         // Copy check_value's fields from the single-put request to the 
check_and_set
         // response to reply to the client.
         copy_check_value(update, resp);
 
-        auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
+        const auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
 
         resp.error = _rocksdb_wrapper->write_batch_put_ctx(
             ctx, update.key, update.value, update.expire_ts_seconds);
@@ -540,6 +532,165 @@ public:
         return rocksdb::Status::kOk;
     }
 
+    // Used to call make_idempotent() for incr and check_and_set to get the 
idempotent single-put
+    // request which is stored as the unique element of `updates`.
+    //
+    // This interface is provided to ensure consistency between the 
make_idempotent() interfaces
+    // of incr/check_and_set operations and that of check_and_mutate (both 
using std::vector for
+    // `updates`), thereby facilitating uniform templated function invocation.
+    template <typename TRequest, typename TResponse>
+    inline int make_idempotent(const TRequest &req,
+                               TResponse &err_resp,
+                               std::vector<dsn::apps::update_request> &updates)
+    {
+        updates.clear();
+        updates.emplace_back();
+        return make_idempotent(req, err_resp, updates.front());
+    }
+
+    // Tranlate a check_and_mutate request into multiple single-put and 
single-remove requests
+    // which are certainly idempotent. Return current status for RocksDB. Only 
called by primary
+    // replicas.
+    int make_idempotent(const dsn::apps::check_and_mutate_request &req,
+                        dsn::apps::check_and_mutate_response &err_resp,
+                        std::vector<dsn::apps::update_request> &updates)
+    {
+        if (dsn_unlikely(req.mutate_list.empty())) {
+            LOG_ERROR_PREFIX("mutate_list is empty for check_and_mutate while 
making idempotent");
+
+            return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+        }
+
+        // Verify operation type for each mutate.
+        for (size_t i = 0; i < req.mutate_list.size(); ++i) {
+            const auto &mu = req.mutate_list[i];
+            if (dsn_likely(mu.operation == dsn::apps::mutate_operation::MO_PUT 
||
+                           mu.operation == 
dsn::apps::mutate_operation::MO_DELETE)) {
+                continue;
+            }
+
+            LOG_ERROR_PREFIX("mutate_list[{}]'s operation {} is invalid for 
check_and_mutate "
+                             "while making idempotent",
+                             i,
+                             mu.operation);
+
+            return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+        }
+
+        if (dsn_unlikely(!is_check_type_supported(req.check_type))) {
+            LOG_ERROR_PREFIX("check type {} is not supported for 
check_and_mutate ",
+                             "while making idempotent",
+                             cas_check_type_to_string(req.check_type));
+
+            return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+        }
+
+        dsn::blob check_key;
+        pegasus_generate_key(check_key, req.hash_key, req.check_sort_key);
+
+        // Get the check value.
+        db_get_context get_ctx;
+        const int err = _rocksdb_wrapper->get(check_key, &get_ctx);
+        if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+            // Failed to read the check value.
+            LOG_ERROR_PREFIX("failed to get the check value for 
check_and_mutate while making "
+                             "idempotent: rocksdb_status = {}, hash_key = {}, "
+                             "check_sort_key = {}",
+                             err,
+                             utils::c_escape_sensitive_string(req.hash_key),
+                             
utils::c_escape_sensitive_string(req.check_sort_key));
+
+            return make_error_response(err, err_resp);
+        }
+
+        dsn::blob check_value;
+        const bool value_exist = !get_ctx.expired && get_ctx.found;
+        if (value_exist) {
+            pegasus_extract_user_data(
+                _pegasus_data_version, std::move(get_ctx.raw_value), 
check_value);
+        }
+
+        bool invalid_argument = false;
+        const bool passed = validate_check(
+            req.check_type, req.check_operand, value_exist, check_value, 
invalid_argument);
+        if (!passed) {
+            make_check_value(req, value_exist, check_value, err_resp);
+            return make_error_response(invalid_argument ? 
rocksdb::Status::kInvalidArgument
+                                                        : 
rocksdb::Status::kTryAgain,
+                                       err_resp);
+        }
+
+        // Check passed.
+        updates.clear();
+        for (const auto &mu : req.mutate_list) {
+            // Generate new RocksDB key.
+            dsn::blob set_key;
+            pegasus_generate_key(set_key, req.hash_key, mu.sort_key);
+
+            // Add a new put request.
+            updates.emplace_back();
+
+            if (mu.operation == dsn::apps::mutate_operation::MO_PUT) {
+                make_idempotent_request_for_check_and_mutate_put(
+                    set_key, mu.value, mu.set_expire_ts_seconds, 
updates.back());
+                continue;
+            }
+
+            if (mu.operation == dsn::apps::mutate_operation::MO_DELETE) {
+                make_idempotent_request_for_check_and_mutate_remove(set_key, 
updates.back());
+                continue;
+            }
+
+            // It must have returned and replied to the client once there is 
some invalid
+            // mutate_operation. Here is just a defensive assertion.
+            LOG_FATAL("invalid mutate_operation {} for check_and_mutate while 
making idempotent",
+                      mu.operation);
+            __builtin_unreachable();
+        }
+
+        // Add check value to the first generated idempotent request, for the 
future response to
+        // the client.
+        make_check_value(req, value_exist, check_value, updates.front());
+
+        return rocksdb::Status::kOk;
+    }
+
+    // Apply the single-put and single-remove requests translated from a 
check_and_mutate request
+    // into RocksDB, and build response for the check_and_mutate request. 
Return current status
+    // for RocksDB. Only called by primary replicas.
+    int put(const db_write_context &ctx,
+            const std::vector<dsn::apps::update_request> &updates,
+            dsn::apps::check_and_mutate_response &resp)
+    {
+        make_basic_response(ctx.decree, resp);
+
+        // Copy check_value's fields from the first idempotent request to the 
check_and_mutate
+        // response to reply to the client.
+        copy_check_value(updates.front(), resp);
+
+        const auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
+
+        for (const auto &update : updates) {
+            if (update.type == 
dsn::apps::update_type::UT_CHECK_AND_MUTATE_PUT) {
+                resp.error = _rocksdb_wrapper->write_batch_put_ctx(
+                    ctx, update.key, update.value, update.expire_ts_seconds);
+            } else if (update.type == 
dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE) {
+                resp.error = _rocksdb_wrapper->write_batch_delete(ctx.decree, 
update.key);
+            } else {
+                LOG_FATAL("invalid update_type for check_and_mutate {} while 
making idempotent",
+                          update.type);
+                __builtin_unreachable();
+            }
+
+            if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
+                return resp.error;
+            }
+        }
+
+        resp.error = _rocksdb_wrapper->write(ctx.decree);
+        return resp.error;
+    }
+
     int check_and_mutate(int64_t decree,
                          const dsn::apps::check_and_mutate_request &update,
                          dsn::apps::check_and_mutate_response &resp)
@@ -890,6 +1041,38 @@ private:
         return rocksdb::Status::kOk;
     }
 
+    // Build a single-put `update` for a mutate of MO_PUT in a 
check_and_mutate request based
+    // on `key`, `value` and `expire_ts_seconds`.
+    static inline void
+    make_idempotent_request_for_check_and_mutate_put(const dsn::blob &key,
+                                                     const dsn::blob &value,
+                                                     int32_t expire_ts_seconds,
+                                                     dsn::apps::update_request 
&update)
+    {
+        make_idempotent_request(
+            key, value, expire_ts_seconds, 
dsn::apps::update_type::UT_CHECK_AND_MUTATE_PUT, update);
+    }
+
+    // Build a single-remove `update` for a mutate of MO_DELETE in a 
check_and_mutate request
+    // based on `key`.
+    static inline void
+    make_idempotent_request_for_check_and_mutate_remove(const dsn::blob &key,
+                                                        
dsn::apps::update_request &update)
+    {
+        make_idempotent_request(key, 
dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE, update);
+    }
+
+    // Build response `resp` with basic info.
+    template <typename TResponse>
+    inline void make_basic_response(int64_t decree, TResponse &resp)
+    {
+        const auto pid = get_gpid();
+        resp.app_id = pid.get_app_id();
+        resp.partition_index = pid.get_partition_index();
+        resp.decree = decree;
+        resp.server = _primary_host_port;
+    }
+
     // Build response `resp` based on `err` only for the error case (i.e. the 
current status
     // `err` for RocksDB is not rocksdb::Status::kOk). Return `err`.
     //
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index feba097cd..0206c2236 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -112,6 +112,12 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/ 
db_get_context *ctx)
     return s.code();
 }
 
+int rocksdb_wrapper::get(const dsn::blob &raw_key,
+                         /*out*/ db_get_context *ctx)
+{
+    return get(raw_key.to_string_view(), ctx);
+}
+
 int rocksdb_wrapper::write_batch_put(int64_t decree,
                                      std::string_view raw_key,
                                      std::string_view value,
@@ -232,6 +238,11 @@ int rocksdb_wrapper::write_batch_delete(int64_t decree, 
std::string_view raw_key
     return s.code();
 }
 
+int rocksdb_wrapper::write_batch_delete(int64_t decree, const dsn::blob 
&raw_key)
+{
+    return write_batch_delete(decree, raw_key.to_string_view());
+}
+
 void rocksdb_wrapper::clear_up_write_batch() { _write_batch->Clear(); }
 
 int rocksdb_wrapper::ingest_files(int64_t decree,
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index feaefd70d..75bb9f92c 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -59,6 +59,8 @@ public:
     /// \result ctx.expired=true if record expired. Still rocksdb::Status::kOk 
is returned.
     /// \result ctx.found=false if record is not found. Still 
rocksdb::Status::kOk is returned.
     int get(std::string_view raw_key, /*out*/ db_get_context *ctx);
+    int get(const dsn::blob &raw_key,
+            /*out*/ db_get_context *ctx);
 
     int write_batch_put(int64_t decree,
                         std::string_view raw_key,
@@ -74,6 +76,7 @@ public:
                             int32_t expire_sec);
     int write(int64_t decree);
     int write_batch_delete(int64_t decree, std::string_view raw_key);
+    int write_batch_delete(int64_t decree, const dsn::blob &raw_key);
     void clear_up_write_batch();
     int ingest_files(int64_t decree,
                      const std::vector<std::string> &sst_file_list,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to