This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 682d960c6 feat(make_idempotent): support making `check_and_set` 
request idempotent in `pegasus_write_service::impl` (#2230)
682d960c6 is described below

commit 682d960c6a2317c57a4fa791c540b5d6407014ff
Author: Dan Wang <[email protected]>
AuthorDate: Tue Apr 22 14:54:02 2025 +0800

    feat(make_idempotent): support making `check_and_set` request idempotent in 
`pegasus_write_service::impl` (#2230)
    
    https://github.com/apache/incubator-pegasus/issues/2197
    
    Implement two APIs in `pegasus_write_service::impl` to make `check_and_set`
    requests idempotent:
    
    - implement `make_idempotent()` function to translate a `check_and_set` 
request
    into the single-put request which is naturally idempotent, and process the 
possible
    errors during the translation, e.g. failed to read the check value or did 
not pass the
    check;
    - implement `put()` function to apply the single-put request into the 
RocksDB
    instance and make response for `check_and_set`.
    
    Also refactor `validate_check()` function to make the check for 
`check_and_set`
    and `check_and_mutate` conditions more clearer and readable.
---
 idl/rrdb.thrift                         |  25 +-
 src/base/pegasus_key_schema.h           |  10 +-
 src/server/pegasus_write_service_impl.h | 402 +++++++++++++++++++++++++-------
 src/server/result_writer.cpp            |   1 +
 src/server/rocksdb_wrapper.cpp          |   9 +
 src/server/rocksdb_wrapper.h            |  12 +-
 src/utils/blob.h                        |  28 +++
 src/utils/test/utils.cpp                |   1 -
 8 files changed, 388 insertions(+), 100 deletions(-)

diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift
index 712de1ec7..56f5899fc 100644
--- a/idl/rrdb.thrift
+++ b/idl/rrdb.thrift
@@ -71,7 +71,8 @@ enum mutate_operation
 enum update_type
 {
     UT_PUT,
-    UT_INCR
+    UT_INCR,
+    UT_CHECK_AND_SET
 }
 
 // The single-put request, just writes a key/value pair into storage, which is 
certainly
@@ -86,8 +87,17 @@ struct update_request
     // single-put request from the one translated from a non-idempotent atomic 
write request:
     // - a general single-put request, if `type` is UT_PUT or not set by 
default as it's
     // optional, or
-    // - a put request translated from an incr request, if `type` is UT_INCR.
+    // - a put request translated from an incr request, if `type` is UT_INCR, 
or
+    // - a put request translated from a check_and_set request, if `type` is 
UT_CHECK_AND_SET.
     4:optional update_type type;
+
+    // Following 3 fields are only available while type = UT_CHECK_AND_SET, 
used to build
+    // check_and_set_response to reply to the client, once this put request is 
translated
+    // from the non-idempotent check_and_set_request.
+    5:optional bool     check_value_returned;
+    6:optional bool     check_value_exist; // Used only if 
check_value_returned is true.
+    7:optional dsn.blob check_value; // Used only if both check_value_returned 
and
+                                     // check_value_exist are true.
 }
 
 struct update_response
@@ -242,12 +252,13 @@ struct check_and_set_request
 
 struct check_and_set_response
 {
-    1:i32            error; // return kTryAgain if check not passed.
-                            // return kInvalidArgument if check type is int 
compare and
-                            // check_operand/check_value is not integer or out 
of range.
+    1:i32            error; // Return kTryAgain if check not passed.
+                            // Return kInvalidArgument if check_type is 
comparing integers and
+                            // check_value/check_operand is not a valid 
integer or out of range.
     2:bool           check_value_returned;
-    3:bool           check_value_exist; // used only if check_value_returned 
is true
-    4:dsn.blob       check_value; // used only if check_value_returned and 
check_value_exist is true
+    3:bool           check_value_exist; // Used only if check_value_returned 
is true.
+    4:dsn.blob       check_value; // Used only if both check_value_returned and
+                                  // check_value_exist are true.
     5:i32            app_id;
     6:i32            partition_index;
     7:i64            decree;
diff --git a/src/base/pegasus_key_schema.h b/src/base/pegasus_key_schema.h
index 4790693ac..c0897c609 100644
--- a/src/base/pegasus_key_schema.h
+++ b/src/base/pegasus_key_schema.h
@@ -34,11 +34,11 @@ namespace pegasus {
 // 
=====================================================================================
 // rocksdb key = [hash_key_len(uint16_t)] [hash_key(bytes)] [sort_key(bytes)]
 
-// generate rocksdb key.
-// T may be std::string or ::dsn::blob.
-// data is copied into 'key'.
-template <typename T>
-void pegasus_generate_key(::dsn::blob &key, const T &hash_key, const T 
&sort_key)
+// Generate rocksdb `key` by `hash_key` and `sort_key`.
+//
+// TKey may be std::string_view, std::string or dsn::blob.
+template <typename TKey>
+void pegasus_generate_key(::dsn::blob &key, const TKey &hash_key, const TKey 
&sort_key)
 {
     CHECK_LT(hash_key.length(), UINT16_MAX);
 
diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index 494a384e0..a51de9338 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -119,14 +119,13 @@ public:
         }
 
         auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
-        for (auto &kv : update.kvs) {
-            resp.error = _rocksdb_wrapper->write_batch_put_ctx(
-                ctx,
-                composite_raw_key(update.hash_key.to_string_view(), 
kv.key.to_string_view())
-                    .to_string_view(),
-                kv.value.to_string_view(),
-                static_cast<uint32_t>(update.expire_ts_seconds));
-            if (resp.error) {
+        for (const auto &kv : update.kvs) {
+            resp.error =
+                _rocksdb_wrapper->write_batch_put_ctx(ctx,
+                                                      
composite_raw_key(update.hash_key, kv.key),
+                                                      kv.value,
+                                                      
update.expire_ts_seconds);
+            if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
                 return resp.error;
             }
         }
@@ -154,12 +153,10 @@ public:
         }
 
         auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
-        for (auto &sort_key : update.sort_keys) {
+        for (const auto &sort_key : update.sort_keys) {
             resp.error = _rocksdb_wrapper->write_batch_delete(
-                decree,
-                composite_raw_key(update.hash_key.to_string_view(), 
sort_key.to_string_view())
-                    .to_string_view());
-            if (resp.error) {
+                decree, composite_raw_key(update.hash_key, 
sort_key).to_string_view());
+            if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
                 return resp.error;
             }
         }
@@ -181,6 +178,12 @@ public:
         db_get_context get_ctx;
         const int err = _rocksdb_wrapper->get(req.key.to_string_view(), 
&get_ctx);
         if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+            // Failed to read current raw value.
+            LOG_ERROR_PREFIX("failed to get current raw value for incr while 
making "
+                             "idempotent: rocksdb_status = {}, key = {}",
+                             err,
+                             utils::c_escape_sensitive_string(req.key));
+
             return make_error_response(err, err_resp);
         }
 
@@ -224,28 +227,28 @@ public:
             }
         }
 
+        // Generate the idempotent single-put request for incr.
         return make_idempotent_request_for_incr(
             req.key, new_int, calc_expire_on_existing(req, get_ctx), update);
     }
 
-    // Apply single-put request translated from incr request into RocksDB, and 
build response
-    // for incr. Return current status for RocksDB. Only called by primary 
replicas.
+    // Apply the single-put request translated from an incr request into 
RocksDB, and build
+    // response for the incr request. Return current status for RocksDB. Only 
called by
+    // primary replicas.
     int put(const db_write_context &ctx,
             const dsn::apps::update_request &update,
             dsn::apps::incr_response &resp)
     {
-        resp.app_id = get_gpid().get_app_id();
-        resp.partition_index = get_gpid().get_partition_index();
+        const auto pid = get_gpid();
+        resp.app_id = pid.get_app_id();
+        resp.partition_index = pid.get_partition_index();
         resp.decree = ctx.decree;
         resp.server = _primary_host_port;
 
         auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
 
-        resp.error =
-            _rocksdb_wrapper->write_batch_put_ctx(ctx,
-                                                  update.key.to_string_view(),
-                                                  
update.value.to_string_view(),
-                                                  
static_cast<uint32_t>(update.expire_ts_seconds));
+        resp.error = _rocksdb_wrapper->write_batch_put_ctx(
+            ctx, update.key, update.value, update.expire_ts_seconds);
         if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
             return resp.error;
         }
@@ -347,6 +350,100 @@ public:
         return resp.error;
     }
 
+    // Tranlate a check_and_set request into a single-put request which is 
certainly idempotent.
+    // Return current status for RocksDB. Only called by primary replicas.
+    int make_idempotent(const dsn::apps::check_and_set_request &req,
+                        dsn::apps::check_and_set_response &err_resp,
+                        dsn::apps::update_request &update)
+    {
+        if (dsn_unlikely(!is_check_type_supported(req.check_type))) {
+            LOG_ERROR_PREFIX("check type {} is not supported for check_and_set 
",
+                             "while making idempotent",
+                             cas_check_type_to_string(req.check_type));
+
+            return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+        }
+
+        dsn::blob check_key;
+        pegasus_generate_key(check_key, req.hash_key, req.check_sort_key);
+
+        // Get the check value.
+        db_get_context get_ctx;
+        const int err = _rocksdb_wrapper->get(check_key.to_string_view(), 
&get_ctx);
+        if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+            // Failed to read the check value.
+            LOG_ERROR_PREFIX("failed to get the check value for check_and_set 
while making "
+                             "idempotent: rocksdb_status = {}, hash_key = {}, "
+                             "check_sort_key = {}",
+                             err,
+                             utils::c_escape_sensitive_string(req.hash_key),
+                             
utils::c_escape_sensitive_string(req.check_sort_key));
+
+            return make_error_response(err, err_resp);
+        }
+
+        dsn::blob check_value;
+        const bool value_exist = !get_ctx.expired && get_ctx.found;
+        if (value_exist) {
+            pegasus_extract_user_data(
+                _pegasus_data_version, std::move(get_ctx.raw_value), 
check_value);
+        }
+
+        bool invalid_argument = false;
+        const bool passed = validate_check(
+            req.check_type, req.check_operand, value_exist, check_value, 
invalid_argument);
+        if (!passed) {
+            make_check_value(req, value_exist, check_value, err_resp);
+            return make_error_response(invalid_argument ? 
rocksdb::Status::kInvalidArgument
+                                                        : 
rocksdb::Status::kTryAgain,
+                                       err_resp);
+        }
+
+        // Check passed, generate new RocksDB key.
+        dsn::blob set_key;
+        if (req.set_diff_sort_key) {
+            pegasus_generate_key(set_key, req.hash_key, req.set_sort_key);
+        } else {
+            set_key = check_key;
+        }
+
+        // Generate new RocksDB value.
+        make_check_value(req, value_exist, check_value, update);
+
+        // Generate the idempotent single-put request for check_and_set.
+        return make_idempotent_request_for_check_and_set(
+            set_key, req.set_value, req.set_expire_ts_seconds, update);
+    }
+
+    // Apply the single-put request translated from a check_and_set request 
into RocksDB, and
+    // build response for the check_and_set request. Return current status for 
RocksDB. Only
+    // called by primary replicas.
+    int put(const db_write_context &ctx,
+            const dsn::apps::update_request &update,
+            dsn::apps::check_and_set_response &resp)
+    {
+        const auto pid = get_gpid();
+        resp.app_id = pid.get_app_id();
+        resp.partition_index = pid.get_partition_index();
+        resp.decree = ctx.decree;
+        resp.server = _primary_host_port;
+
+        // Copy check_value's fields from the single-put request to the 
check_and_set
+        // response to reply to the client.
+        copy_check_value(update, resp);
+
+        auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
+
+        resp.error = _rocksdb_wrapper->write_batch_put_ctx(
+            ctx, update.key, update.value, update.expire_ts_seconds);
+        if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
+            return resp.error;
+        }
+
+        resp.error = _rocksdb_wrapper->write(ctx.decree);
+        return resp.error;
+    }
+
     int check_and_set(int64_t decree,
                       const dsn::apps::check_and_set_request &update,
                       dsn::apps::check_and_set_response &resp)
@@ -370,14 +467,14 @@ public:
 
         db_get_context get_context;
         std::string_view check_raw_key = check_key.to_string_view();
-        int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
-        if (err != rocksdb::Status::kOk) {
+        const int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
+        if (dsn_unlikely(err != rocksdb::Status::kOk)) {
             // read check value failed
-            LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndSet decree: 
{}, hash_key: {}, "
-                              "check_sort_key: {}",
-                              decree,
-                              
utils::c_escape_sensitive_string(update.hash_key),
-                              
utils::c_escape_sensitive_string(update.check_sort_key));
+            LOG_ERROR_PREFIX("Error to GetCheckValue for CheckAndSet decree: 
{}, hash_key: {}, "
+                             "check_sort_key: {}",
+                             decree,
+                             utils::c_escape_sensitive_string(update.hash_key),
+                             
utils::c_escape_sensitive_string(update.check_sort_key));
             resp.error = err;
             return resp.error;
         }
@@ -610,11 +707,8 @@ public:
                   const dsn::apps::update_request &update,
                   dsn::apps::update_response &resp)
     {
-        resp.error =
-            _rocksdb_wrapper->write_batch_put_ctx(ctx,
-                                                  update.key.to_string_view(),
-                                                  
update.value.to_string_view(),
-                                                  
static_cast<uint32_t>(update.expire_ts_seconds));
+        resp.error = _rocksdb_wrapper->write_batch_put_ctx(
+            ctx, update.key, update.value, update.expire_ts_seconds);
         _update_responses.emplace_back(&resp);
         return resp.error;
     }
@@ -656,7 +750,11 @@ private:
         _rocksdb_wrapper->clear_up_write_batch();
     }
 
-    static dsn::blob composite_raw_key(std::string_view hash_key, 
std::string_view sort_key)
+    // Convenient encapsulation of pegasus_generate_key().
+    //
+    // TKey may be std::string_view, std::string or dsn::blob.
+    template <typename TKey>
+    static dsn::blob composite_raw_key(const TKey &hash_key, const TKey 
&sort_key)
     {
         dsn::blob raw_key;
         pegasus_generate_key(raw_key, hash_key, sort_key);
@@ -690,21 +788,53 @@ private:
         return req.expire_ts_seconds;
     }
 
-    // Build a single-put request by provided int64 value.
+    // Build a single-put request if only `key` and `type` are provided.
+    static inline void make_idempotent_request(const dsn::blob &key,
+                                               dsn::apps::update_type::type 
type,
+                                               dsn::apps::update_request 
&update)
+    {
+        update.key = key;
+
+        // Must use __set_type() since type is declared as optional.
+        update.__set_type(type);
+    }
+
+    // Build a single-put request while `key`, `type` and `expire_ts_seconds` 
are provided.
+    static inline void make_idempotent_request(const dsn::blob &key,
+                                               int32_t expire_ts_seconds,
+                                               dsn::apps::update_type::type 
type,
+                                               dsn::apps::update_request 
&update)
+    {
+        make_idempotent_request(key, type, update);
+        update.expire_ts_seconds = expire_ts_seconds;
+    }
+
+    // Build a single-put request while `key`, `type`, `expire_ts_seconds` and 
int64-typed
+    // `value` are provided.
     static inline void make_idempotent_request(const dsn::blob &key,
                                                int64_t value,
                                                int32_t expire_ts_seconds,
                                                dsn::apps::update_type::type 
type,
                                                dsn::apps::update_request 
&update)
     {
-        update.key = key;
+        make_idempotent_request(key, expire_ts_seconds, type, update);
         update.value = dsn::blob::create_from_numeric(value);
-        update.expire_ts_seconds = expire_ts_seconds;
-        update.__set_type(type);
     }
 
-    // Build corresponding single-put request for an incr request, and return 
current status
-    // for RocksDB, i.e. kOk.
+    // Build a single-put request while `key`, `type`, `expire_ts_seconds` and 
blob-typed
+    // `value` are provided.
+    static inline void make_idempotent_request(const dsn::blob &key,
+                                               const dsn::blob &value,
+                                               int32_t expire_ts_seconds,
+                                               dsn::apps::update_type::type 
type,
+                                               dsn::apps::update_request 
&update)
+    {
+        make_idempotent_request(key, expire_ts_seconds, type, update);
+        update.value = value;
+    }
+
+    // Build `update` (i.e. the corresponding single-put request for an incr 
request) based on
+    // `key`, `value` and `expire_ts_seconds`. Return current status for 
RocksDB (i.e. kOk).
     static inline int make_idempotent_request_for_incr(const dsn::blob &key,
                                                        int64_t value,
                                                        int32_t 
expire_ts_seconds,
@@ -715,10 +845,60 @@ private:
         return rocksdb::Status::kOk;
     }
 
-    // Build incr response only for error, and return the current error status 
for RocksDB.
-    inline int make_error_response(int err, dsn::apps::incr_response &resp)
+    // Set check_value's fields in `resp` based on `req`, `value_exist` and 
`check_value`.
+    //
+    // `req` may be dsn::apps::check_and_set_request and 
dsn::apps::check_and_mutate_request.
+    //
+    // `resp` may be dsn::apps::check_and_set_response, 
dsn::apps::check_and_mutate_response
+    // and dsn::apps::update_request.
+    template <typename TRequest, typename TResponse>
+    static inline void make_check_value(const TRequest &req,
+                                        bool value_exist,
+                                        const dsn::blob &check_value,
+                                        TResponse &resp)
+    {
+        // Must use __set_*() functions since in dsn::apps::update_request the 
fields are
+        // declared as optional.
+        resp.__set_check_value_returned(req.return_check_value);
+        resp.__set_check_value_exist(req.return_check_value && value_exist);
+        if (resp.check_value_exist) {
+            resp.__set_check_value(check_value);
+        }
+    }
+
+    // Copy check_value's fields from `update` (i.e. the single-put request) 
to `resp`.
+    //
+    // `resp` may be dsn::apps::check_and_set_response and 
dsn::apps::check_and_mutate_response.
+    template <typename TResponse>
+    static inline void copy_check_value(const dsn::apps::update_request 
&update, TResponse &resp)
+    {
+        resp.check_value_returned = update.check_value_returned;
+        resp.check_value_exist = update.check_value_exist;
+        resp.check_value = update.check_value;
+    }
+
+    // Build `update` (i.e. the corresponding single-put request for a 
check_and_set request)
+    // based on `key`, `value` and `expire_ts_seconds`. Return current status 
for RocksDB
+    // (i.e. kOk).
+    static inline int make_idempotent_request_for_check_and_set(const 
dsn::blob &key,
+                                                                const 
dsn::blob &value,
+                                                                int32_t 
expire_ts_seconds,
+                                                                
dsn::apps::update_request &update)
     {
-        CHECK(err != rocksdb::Status::kOk, "this incr response is built only 
for error");
+        make_idempotent_request(
+            key, value, expire_ts_seconds, 
dsn::apps::update_type::UT_CHECK_AND_SET, update);
+        return rocksdb::Status::kOk;
+    }
+
+    // Build response `resp` based on `err` only for the error case (i.e. the 
current status
+    // `err` for RocksDB is not rocksdb::Status::kOk). Return `err`.
+    //
+    // `resp` may be dsn::apps::incr_response,  
dsn::apps::check_and_set_response and
+    // dsn::apps::check_and_mutate_response.
+    template <typename TResponse>
+    inline int make_error_response(int err, TResponse &resp)
+    {
+        CHECK(err != rocksdb::Status::kOk, "this response is built only for 
error");
         resp.error = err;
 
         const auto pid = get_gpid();
@@ -733,7 +913,8 @@ private:
         return err;
     }
 
-    // Build incr response as above, except that also set new value for 
response.
+    // Build the incr response `resp` based on `err` and `new_value` only for 
the error case
+    // (i.e. the current status `err` for RocksDB is not 
rocksdb::Status::kOk). Return `err`.
     inline int make_error_response(int err, int64_t new_value, 
dsn::apps::incr_response &resp)
     {
         resp.new_value = new_value;
@@ -747,108 +928,159 @@ private:
                check_type <= ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER;
     }
 
-    // return true if check passed.
-    // for int compare, if check operand or value are not valid integer, then 
return false,
-    // and set out param `invalid_argument' to false.
-    bool validate_check(int64_t decree,
+    // Check whether the conditions are met for check_and_set and 
check_and_mutate based on
+    // `check_type`, `check_operand`, `value_exist` and `check_value`.
+    //
+    // `decree` is not used to decide. It is only used for logging. Its type 
is designed to
+    // be a template parameter, for the reason that this function might be 
called before the
+    // decree is assigned for a mutation, for example, while traslating an 
atomic write request
+    // to a single-put request. If unassigned, it could be any type as long as 
it is printable.
+    //
+    // `invalid_argument` is only used while comparing integers: if 
`check_operand` or
+    // `check_value` is not valid integer, `invalid_argument' would be set 
true, and the
+    // function would return false.
+    //
+    // Return true if check passed, otherwise false.
+    template <typename TDecree>
+    bool validate_check(TDecree decree,
                         ::dsn::apps::cas_check_type::type check_type,
                         const ::dsn::blob &check_operand,
                         bool value_exist,
-                        const ::dsn::blob &value,
+                        const ::dsn::blob &check_value,
                         bool &invalid_argument)
     {
         invalid_argument = false;
         switch (check_type) {
         case ::dsn::apps::cas_check_type::CT_NO_CHECK:
             return true;
+
         case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EXIST:
             return !value_exist;
+
         case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EXIST_OR_EMPTY:
-            return !value_exist || value.length() == 0;
+            return !value_exist || check_value.empty();
+
         case ::dsn::apps::cas_check_type::CT_VALUE_EXIST:
             return value_exist;
+
         case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EMPTY:
-            return value_exist && value.length() != 0;
+            return value_exist && !check_value.empty();
+
         case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE:
         case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_PREFIX:
         case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_POSTFIX: {
-            if (!value_exist)
+            if (!value_exist) {
                 return false;
-            if (check_operand.length() == 0)
+            }
+
+            if (check_operand.empty()) {
                 return true;
-            if (value.length() < check_operand.length())
+            }
+
+            if (check_value.length() < check_operand.length()) {
                 return false;
+            }
+
             if (check_type == 
::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE) {
-                return 
value.to_string_view().find(check_operand.to_string_view()) !=
-                       std::string_view::npos;
-            } else if (check_type == 
::dsn::apps::cas_check_type::CT_VALUE_MATCH_PREFIX) {
-                return dsn::utils::mequals(
-                    value.data(), check_operand.data(), 
check_operand.length());
-            } else { // check_type == 
::dsn::apps::cas_check_type::CT_VALUE_MATCH_POSTFIX
-                return dsn::utils::mequals(value.data() + value.length() - 
check_operand.length(),
-                                           check_operand.data(),
-                                           check_operand.length());
+                return check_value.contains(check_operand);
             }
+
+            if (check_type == 
::dsn::apps::cas_check_type::CT_VALUE_MATCH_PREFIX) {
+                return check_value.starts_with(check_operand);
+            }
+
+            CHECK(check_type == 
::dsn::apps::cas_check_type::CT_VALUE_MATCH_POSTFIX,
+                  "check_type({}) should be CT_VALUE_MATCH_POSTFIX",
+                  cas_check_type_to_string(check_type));
+
+            return check_value.ends_with(check_operand);
         }
+
         case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS:
         case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL:
         case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_EQUAL:
         case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL:
         case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER: {
-            if (!value_exist)
+            if (!value_exist) {
                 return false;
-            int c = 
value.to_string_view().compare(check_operand.to_string_view());
+            }
+
+            const int c = check_value.compare(check_operand);
             if (c < 0) {
                 return check_type <= 
::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL;
-            } else if (c == 0) {
-                return check_type >= 
::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL &&
-                       check_type <= 
::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL;
-            } else { // c > 0
+            }
+
+            if (c > 0) {
                 return check_type >= 
::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL;
             }
+
+            // Must be c == 0.
+            return check_type >= 
::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL &&
+                   check_type <= 
::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL;
         }
+
         case ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS:
         case ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL:
         case ::dsn::apps::cas_check_type::CT_VALUE_INT_EQUAL:
         case ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL:
         case ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER: {
-            if (!value_exist)
+            if (!value_exist) {
                 return false;
-            int64_t check_value_int;
-            if (!dsn::buf2int64(value.to_string_view(), check_value_int)) {
-                // invalid check value
-                LOG_ERROR_PREFIX("check failed: decree = {}, error = "
-                                 "check value \"{}\" is not an integer or out 
of range",
+            }
+
+            int64_t check_value_int = 0;
+            if (!dsn::buf2int64(check_value.to_string_view(), 
check_value_int)) {
+                // `check_value` is not a valid int64.
+                LOG_ERROR_PREFIX("check failed: decree = {}, error = check 
value \"{}\" is "
+                                 "not a valid integer or out of range",
                                  decree,
-                                 utils::c_escape_sensitive_string(value));
+                                 
utils::c_escape_sensitive_string(check_value));
                 invalid_argument = true;
                 return false;
             }
-            int64_t check_operand_int;
+
+            int64_t check_operand_int = 0;
             if (!dsn::buf2int64(check_operand.to_string_view(), 
check_operand_int)) {
-                // invalid check operand
-                LOG_ERROR_PREFIX("check failed: decree = {}, error = "
-                                 "check operand \"{}\" is not an integer or 
out of range",
+                // `check_operand` is not a valid int64.
+                LOG_ERROR_PREFIX("check failed: decree = {}, error = check 
operand \"{}\" is "
+                                 "not a valid integer or out of range",
                                  decree,
                                  
utils::c_escape_sensitive_string(check_operand));
                 invalid_argument = true;
                 return false;
             }
+
             if (check_value_int < check_operand_int) {
                 return check_type <= 
::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL;
-            } else if (check_value_int == check_operand_int) {
-                return check_type >= 
::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL &&
-                       check_type <= 
::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL;
-            } else { // check_value_int > check_operand_int
+            }
+
+            if (check_value_int > check_operand_int) {
                 return check_type >= 
::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL;
             }
+
+            // Must be check_value_int == check_operand_int.
+            return check_type >= 
::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL &&
+                   check_type <= 
::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL;
         }
+
         default:
             CHECK(false, "unsupported check type: {}", check_type);
         }
+
         return false;
     }
 
+    // The same as the above function, only used when the decree has not been 
assigned.
+    bool validate_check(::dsn::apps::cas_check_type::type check_type,
+                        const ::dsn::blob &check_operand,
+                        bool value_exist,
+                        const ::dsn::blob &check_value,
+                        bool &invalid_argument)
+    {
+        return validate_check(
+            "UNASSIGNED", check_type, check_operand, value_exist, check_value, 
invalid_argument);
+    }
+
     friend class pegasus_write_service_test;
     friend class pegasus_server_write_test;
     friend class PegasusWriteServiceImplTest;
diff --git a/src/server/result_writer.cpp b/src/server/result_writer.cpp
index 4d3a9160e..df78172e9 100644
--- a/src/server/result_writer.cpp
+++ b/src/server/result_writer.cpp
@@ -21,6 +21,7 @@
 
 #include <pegasus/error.h>
 #include <chrono>
+#include <type_traits>
 #include <utility>
 
 #include "pegasus/client.h"
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 4aa30b228..feba097cd 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -176,6 +176,15 @@ int rocksdb_wrapper::write_batch_put_ctx(const 
db_write_context &ctx,
     return s.code();
 }
 
+int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
+                                         const dsn::blob &raw_key,
+                                         const dsn::blob &value,
+                                         int32_t expire_sec)
+{
+    return write_batch_put_ctx(
+        ctx, raw_key.to_string_view(), value.to_string_view(), 
static_cast<uint32_t>(expire_sec));
+}
+
 int rocksdb_wrapper::write(int64_t decree)
 {
     CHECK_GT(_write_batch->Count(), 0);
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index f905dea93..feaefd70d 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -22,16 +22,20 @@
 #include <gtest/gtest_prod.h>
 #include <rocksdb/options.h>
 #include <rocksdb/write_batch.h>
-#include <stdint.h>
+#include <cstdint>
 #include <memory>
 #include <string>
+#include <string_view>
 #include <vector>
 
 #include "pegasus_value_schema.h"
 #include "replica/replica_base.h"
-#include <string_view>
 #include "utils/metrics.h"
 
+namespace dsn {
+class blob;
+} // namespace dsn
+
 namespace rocksdb {
 class ColumnFamilyHandle;
 class DB;
@@ -64,6 +68,10 @@ public:
                             std::string_view raw_key,
                             std::string_view value,
                             uint32_t expire_sec);
+    int write_batch_put_ctx(const db_write_context &ctx,
+                            const dsn::blob &raw_key,
+                            const dsn::blob &value,
+                            int32_t expire_sec);
     int write(int64_t decree);
     int write_batch_delete(int64_t decree, std::string_view raw_key);
     void clear_up_write_batch();
diff --git a/src/utils/blob.h b/src/utils/blob.h
index 5b9d045c4..c1212afb4 100644
--- a/src/utils/blob.h
+++ b/src/utils/blob.h
@@ -26,7 +26,9 @@
 
 #pragma once
 
+#include <boost/algorithm/string/predicate.hpp>
 #include <fmt/core.h>
+#include <algorithm>
 #include <cstring>
 #include <memory>
 #include <string_view>
@@ -37,6 +39,7 @@
 
 #include "utils/fmt_logging.h"
 #include "utils/fmt_utils.h"
+#include "utils/strings.h"
 #include "utils.h"
 
 namespace dsn {
@@ -224,6 +227,31 @@ public:
 
     [[nodiscard]] std::string_view to_string_view() const { return {_data, 
_length}; }
 
+    // Returns an integer less than, equal to, or greater than zero if _data 
is less than,
+    // equal to, or greater than rhs._data.
+    [[nodiscard]] int compare(const blob &rhs) const noexcept
+    {
+        return to_string_view().compare(rhs.to_string_view());
+    }
+
+    // Return true if _data contains rhs._data, otherwise false.
+    [[nodiscard]] bool contains(const blob &rhs) const
+    {
+        return boost::algorithm::contains(to_string_view(), 
rhs.to_string_view());
+    }
+
+    // Return true if _data starts with rhs._data, otherwise false.
+    [[nodiscard]] bool starts_with(const blob &rhs) const
+    {
+        return boost::algorithm::starts_with(to_string_view(), 
rhs.to_string_view());
+    }
+
+    // Return true if _data ends with rhs._data, otherwise false.
+    [[nodiscard]] bool ends_with(const blob &rhs) const
+    {
+        return boost::algorithm::ends_with(to_string_view(), 
rhs.to_string_view());
+    }
+
     uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
     uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
 
diff --git a/src/utils/test/utils.cpp b/src/utils/test/utils.cpp
index 01c880acd..d1888a7f5 100644
--- a/src/utils/test/utils.cpp
+++ b/src/utils/test/utils.cpp
@@ -30,7 +30,6 @@
 #include <set>
 #include <string>
 #include <tuple>
-#include <type_traits>
 #include <unordered_set>
 #include <utility>
 #include <vector>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to