This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 682d960c6 feat(make_idempotent): support making `check_and_set`
request idempotent in `pegasus_write_service::impl` (#2230)
682d960c6 is described below
commit 682d960c6a2317c57a4fa791c540b5d6407014ff
Author: Dan Wang <[email protected]>
AuthorDate: Tue Apr 22 14:54:02 2025 +0800
feat(make_idempotent): support making `check_and_set` request idempotent in
`pegasus_write_service::impl` (#2230)
https://github.com/apache/incubator-pegasus/issues/2197
Implement two APIs in `pegasus_write_service::impl` to make `check_and_set`
requests idempotent:
- implement `make_idempotent()` function to translate a `check_and_set`
request
into the single-put request which is naturally idempotent, and process the
possible
errors during the translation, e.g. failed to read the check value or did
not pass the
check;
- implement `put()` function to apply the single-put request into the
RocksDB
instance and make response for `check_and_set`.
Also refactor `validate_check()` function to make the check for
`check_and_set`
and `check_and_mutate` conditions more clearer and readable.
---
idl/rrdb.thrift | 25 +-
src/base/pegasus_key_schema.h | 10 +-
src/server/pegasus_write_service_impl.h | 402 +++++++++++++++++++++++++-------
src/server/result_writer.cpp | 1 +
src/server/rocksdb_wrapper.cpp | 9 +
src/server/rocksdb_wrapper.h | 12 +-
src/utils/blob.h | 28 +++
src/utils/test/utils.cpp | 1 -
8 files changed, 388 insertions(+), 100 deletions(-)
diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift
index 712de1ec7..56f5899fc 100644
--- a/idl/rrdb.thrift
+++ b/idl/rrdb.thrift
@@ -71,7 +71,8 @@ enum mutate_operation
enum update_type
{
UT_PUT,
- UT_INCR
+ UT_INCR,
+ UT_CHECK_AND_SET
}
// The single-put request, just writes a key/value pair into storage, which is
certainly
@@ -86,8 +87,17 @@ struct update_request
// single-put request from the one translated from a non-idempotent atomic
write request:
// - a general single-put request, if `type` is UT_PUT or not set by
default as it's
// optional, or
- // - a put request translated from an incr request, if `type` is UT_INCR.
+ // - a put request translated from an incr request, if `type` is UT_INCR,
or
+ // - a put request translated from a check_and_set request, if `type` is
UT_CHECK_AND_SET.
4:optional update_type type;
+
+ // Following 3 fields are only available while type = UT_CHECK_AND_SET,
used to build
+ // check_and_set_response to reply to the client, once this put request is
translated
+ // from the non-idempotent check_and_set_request.
+ 5:optional bool check_value_returned;
+ 6:optional bool check_value_exist; // Used only if
check_value_returned is true.
+ 7:optional dsn.blob check_value; // Used only if both check_value_returned
and
+ // check_value_exist are true.
}
struct update_response
@@ -242,12 +252,13 @@ struct check_and_set_request
struct check_and_set_response
{
- 1:i32 error; // return kTryAgain if check not passed.
- // return kInvalidArgument if check type is int
compare and
- // check_operand/check_value is not integer or out
of range.
+ 1:i32 error; // Return kTryAgain if check not passed.
+ // Return kInvalidArgument if check_type is
comparing integers and
+ // check_value/check_operand is not a valid
integer or out of range.
2:bool check_value_returned;
- 3:bool check_value_exist; // used only if check_value_returned
is true
- 4:dsn.blob check_value; // used only if check_value_returned and
check_value_exist is true
+ 3:bool check_value_exist; // Used only if check_value_returned
is true.
+ 4:dsn.blob check_value; // Used only if both check_value_returned and
+ // check_value_exist are true.
5:i32 app_id;
6:i32 partition_index;
7:i64 decree;
diff --git a/src/base/pegasus_key_schema.h b/src/base/pegasus_key_schema.h
index 4790693ac..c0897c609 100644
--- a/src/base/pegasus_key_schema.h
+++ b/src/base/pegasus_key_schema.h
@@ -34,11 +34,11 @@ namespace pegasus {
//
=====================================================================================
// rocksdb key = [hash_key_len(uint16_t)] [hash_key(bytes)] [sort_key(bytes)]
-// generate rocksdb key.
-// T may be std::string or ::dsn::blob.
-// data is copied into 'key'.
-template <typename T>
-void pegasus_generate_key(::dsn::blob &key, const T &hash_key, const T
&sort_key)
+// Generate rocksdb `key` by `hash_key` and `sort_key`.
+//
+// TKey may be std::string_view, std::string or dsn::blob.
+template <typename TKey>
+void pegasus_generate_key(::dsn::blob &key, const TKey &hash_key, const TKey
&sort_key)
{
CHECK_LT(hash_key.length(), UINT16_MAX);
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index 494a384e0..a51de9338 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -119,14 +119,13 @@ public:
}
auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
- for (auto &kv : update.kvs) {
- resp.error = _rocksdb_wrapper->write_batch_put_ctx(
- ctx,
- composite_raw_key(update.hash_key.to_string_view(),
kv.key.to_string_view())
- .to_string_view(),
- kv.value.to_string_view(),
- static_cast<uint32_t>(update.expire_ts_seconds));
- if (resp.error) {
+ for (const auto &kv : update.kvs) {
+ resp.error =
+ _rocksdb_wrapper->write_batch_put_ctx(ctx,
+
composite_raw_key(update.hash_key, kv.key),
+ kv.value,
+
update.expire_ts_seconds);
+ if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
return resp.error;
}
}
@@ -154,12 +153,10 @@ public:
}
auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
- for (auto &sort_key : update.sort_keys) {
+ for (const auto &sort_key : update.sort_keys) {
resp.error = _rocksdb_wrapper->write_batch_delete(
- decree,
- composite_raw_key(update.hash_key.to_string_view(),
sort_key.to_string_view())
- .to_string_view());
- if (resp.error) {
+ decree, composite_raw_key(update.hash_key,
sort_key).to_string_view());
+ if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
return resp.error;
}
}
@@ -181,6 +178,12 @@ public:
db_get_context get_ctx;
const int err = _rocksdb_wrapper->get(req.key.to_string_view(),
&get_ctx);
if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+ // Failed to read current raw value.
+ LOG_ERROR_PREFIX("failed to get current raw value for incr while
making "
+ "idempotent: rocksdb_status = {}, key = {}",
+ err,
+ utils::c_escape_sensitive_string(req.key));
+
return make_error_response(err, err_resp);
}
@@ -224,28 +227,28 @@ public:
}
}
+ // Generate the idempotent single-put request for incr.
return make_idempotent_request_for_incr(
req.key, new_int, calc_expire_on_existing(req, get_ctx), update);
}
- // Apply single-put request translated from incr request into RocksDB, and
build response
- // for incr. Return current status for RocksDB. Only called by primary
replicas.
+ // Apply the single-put request translated from an incr request into
RocksDB, and build
+ // response for the incr request. Return current status for RocksDB. Only
called by
+ // primary replicas.
int put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::incr_response &resp)
{
- resp.app_id = get_gpid().get_app_id();
- resp.partition_index = get_gpid().get_partition_index();
+ const auto pid = get_gpid();
+ resp.app_id = pid.get_app_id();
+ resp.partition_index = pid.get_partition_index();
resp.decree = ctx.decree;
resp.server = _primary_host_port;
auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
- resp.error =
- _rocksdb_wrapper->write_batch_put_ctx(ctx,
- update.key.to_string_view(),
-
update.value.to_string_view(),
-
static_cast<uint32_t>(update.expire_ts_seconds));
+ resp.error = _rocksdb_wrapper->write_batch_put_ctx(
+ ctx, update.key, update.value, update.expire_ts_seconds);
if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
return resp.error;
}
@@ -347,6 +350,100 @@ public:
return resp.error;
}
+ // Tranlate a check_and_set request into a single-put request which is
certainly idempotent.
+ // Return current status for RocksDB. Only called by primary replicas.
+ int make_idempotent(const dsn::apps::check_and_set_request &req,
+ dsn::apps::check_and_set_response &err_resp,
+ dsn::apps::update_request &update)
+ {
+ if (dsn_unlikely(!is_check_type_supported(req.check_type))) {
+ LOG_ERROR_PREFIX("check type {} is not supported for check_and_set
",
+ "while making idempotent",
+ cas_check_type_to_string(req.check_type));
+
+ return make_error_response(rocksdb::Status::kInvalidArgument,
err_resp);
+ }
+
+ dsn::blob check_key;
+ pegasus_generate_key(check_key, req.hash_key, req.check_sort_key);
+
+ // Get the check value.
+ db_get_context get_ctx;
+ const int err = _rocksdb_wrapper->get(check_key.to_string_view(),
&get_ctx);
+ if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+ // Failed to read the check value.
+ LOG_ERROR_PREFIX("failed to get the check value for check_and_set
while making "
+ "idempotent: rocksdb_status = {}, hash_key = {}, "
+ "check_sort_key = {}",
+ err,
+ utils::c_escape_sensitive_string(req.hash_key),
+
utils::c_escape_sensitive_string(req.check_sort_key));
+
+ return make_error_response(err, err_resp);
+ }
+
+ dsn::blob check_value;
+ const bool value_exist = !get_ctx.expired && get_ctx.found;
+ if (value_exist) {
+ pegasus_extract_user_data(
+ _pegasus_data_version, std::move(get_ctx.raw_value),
check_value);
+ }
+
+ bool invalid_argument = false;
+ const bool passed = validate_check(
+ req.check_type, req.check_operand, value_exist, check_value,
invalid_argument);
+ if (!passed) {
+ make_check_value(req, value_exist, check_value, err_resp);
+ return make_error_response(invalid_argument ?
rocksdb::Status::kInvalidArgument
+ :
rocksdb::Status::kTryAgain,
+ err_resp);
+ }
+
+ // Check passed, generate new RocksDB key.
+ dsn::blob set_key;
+ if (req.set_diff_sort_key) {
+ pegasus_generate_key(set_key, req.hash_key, req.set_sort_key);
+ } else {
+ set_key = check_key;
+ }
+
+ // Generate new RocksDB value.
+ make_check_value(req, value_exist, check_value, update);
+
+ // Generate the idempotent single-put request for check_and_set.
+ return make_idempotent_request_for_check_and_set(
+ set_key, req.set_value, req.set_expire_ts_seconds, update);
+ }
+
+ // Apply the single-put request translated from a check_and_set request
into RocksDB, and
+ // build response for the check_and_set request. Return current status for
RocksDB. Only
+ // called by primary replicas.
+ int put(const db_write_context &ctx,
+ const dsn::apps::update_request &update,
+ dsn::apps::check_and_set_response &resp)
+ {
+ const auto pid = get_gpid();
+ resp.app_id = pid.get_app_id();
+ resp.partition_index = pid.get_partition_index();
+ resp.decree = ctx.decree;
+ resp.server = _primary_host_port;
+
+ // Copy check_value's fields from the single-put request to the
check_and_set
+ // response to reply to the client.
+ copy_check_value(update, resp);
+
+ auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
+
+ resp.error = _rocksdb_wrapper->write_batch_put_ctx(
+ ctx, update.key, update.value, update.expire_ts_seconds);
+ if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
+ return resp.error;
+ }
+
+ resp.error = _rocksdb_wrapper->write(ctx.decree);
+ return resp.error;
+ }
+
int check_and_set(int64_t decree,
const dsn::apps::check_and_set_request &update,
dsn::apps::check_and_set_response &resp)
@@ -370,14 +467,14 @@ public:
db_get_context get_context;
std::string_view check_raw_key = check_key.to_string_view();
- int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
- if (err != rocksdb::Status::kOk) {
+ const int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
+ if (dsn_unlikely(err != rocksdb::Status::kOk)) {
// read check value failed
- LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndSet decree:
{}, hash_key: {}, "
- "check_sort_key: {}",
- decree,
-
utils::c_escape_sensitive_string(update.hash_key),
-
utils::c_escape_sensitive_string(update.check_sort_key));
+ LOG_ERROR_PREFIX("Error to GetCheckValue for CheckAndSet decree:
{}, hash_key: {}, "
+ "check_sort_key: {}",
+ decree,
+ utils::c_escape_sensitive_string(update.hash_key),
+
utils::c_escape_sensitive_string(update.check_sort_key));
resp.error = err;
return resp.error;
}
@@ -610,11 +707,8 @@ public:
const dsn::apps::update_request &update,
dsn::apps::update_response &resp)
{
- resp.error =
- _rocksdb_wrapper->write_batch_put_ctx(ctx,
- update.key.to_string_view(),
-
update.value.to_string_view(),
-
static_cast<uint32_t>(update.expire_ts_seconds));
+ resp.error = _rocksdb_wrapper->write_batch_put_ctx(
+ ctx, update.key, update.value, update.expire_ts_seconds);
_update_responses.emplace_back(&resp);
return resp.error;
}
@@ -656,7 +750,11 @@ private:
_rocksdb_wrapper->clear_up_write_batch();
}
- static dsn::blob composite_raw_key(std::string_view hash_key,
std::string_view sort_key)
+ // Convenient encapsulation of pegasus_generate_key().
+ //
+ // TKey may be std::string_view, std::string or dsn::blob.
+ template <typename TKey>
+ static dsn::blob composite_raw_key(const TKey &hash_key, const TKey
&sort_key)
{
dsn::blob raw_key;
pegasus_generate_key(raw_key, hash_key, sort_key);
@@ -690,21 +788,53 @@ private:
return req.expire_ts_seconds;
}
- // Build a single-put request by provided int64 value.
+ // Build a single-put request if only `key` and `type` are provided.
+ static inline void make_idempotent_request(const dsn::blob &key,
+ dsn::apps::update_type::type
type,
+ dsn::apps::update_request
&update)
+ {
+ update.key = key;
+
+ // Must use __set_type() since type is declared as optional.
+ update.__set_type(type);
+ }
+
+ // Build a single-put request while `key`, `type` and `expire_ts_seconds`
are provided.
+ static inline void make_idempotent_request(const dsn::blob &key,
+ int32_t expire_ts_seconds,
+ dsn::apps::update_type::type
type,
+ dsn::apps::update_request
&update)
+ {
+ make_idempotent_request(key, type, update);
+ update.expire_ts_seconds = expire_ts_seconds;
+ }
+
+ // Build a single-put request while `key`, `type`, `expire_ts_seconds` and
int64-typed
+ // `value` are provided.
static inline void make_idempotent_request(const dsn::blob &key,
int64_t value,
int32_t expire_ts_seconds,
dsn::apps::update_type::type
type,
dsn::apps::update_request
&update)
{
- update.key = key;
+ make_idempotent_request(key, expire_ts_seconds, type, update);
update.value = dsn::blob::create_from_numeric(value);
- update.expire_ts_seconds = expire_ts_seconds;
- update.__set_type(type);
}
- // Build corresponding single-put request for an incr request, and return
current status
- // for RocksDB, i.e. kOk.
+ // Build a single-put request while `key`, `type`, `expire_ts_seconds` and
blob-typed
+ // `value` are provided.
+ static inline void make_idempotent_request(const dsn::blob &key,
+ const dsn::blob &value,
+ int32_t expire_ts_seconds,
+ dsn::apps::update_type::type
type,
+ dsn::apps::update_request
&update)
+ {
+ make_idempotent_request(key, expire_ts_seconds, type, update);
+ update.value = value;
+ }
+
+ // Build `update` (i.e. the corresponding single-put request for an incr
request) based on
+ // `key`, `value` and `expire_ts_seconds`. Return current status for
RocksDB (i.e. kOk).
static inline int make_idempotent_request_for_incr(const dsn::blob &key,
int64_t value,
int32_t
expire_ts_seconds,
@@ -715,10 +845,60 @@ private:
return rocksdb::Status::kOk;
}
- // Build incr response only for error, and return the current error status
for RocksDB.
- inline int make_error_response(int err, dsn::apps::incr_response &resp)
+ // Set check_value's fields in `resp` based on `req`, `value_exist` and
`check_value`.
+ //
+ // `req` may be dsn::apps::check_and_set_request and
dsn::apps::check_and_mutate_request.
+ //
+ // `resp` may be dsn::apps::check_and_set_response,
dsn::apps::check_and_mutate_response
+ // and dsn::apps::update_request.
+ template <typename TRequest, typename TResponse>
+ static inline void make_check_value(const TRequest &req,
+ bool value_exist,
+ const dsn::blob &check_value,
+ TResponse &resp)
+ {
+ // Must use __set_*() functions since in dsn::apps::update_request the
fields are
+ // declared as optional.
+ resp.__set_check_value_returned(req.return_check_value);
+ resp.__set_check_value_exist(req.return_check_value && value_exist);
+ if (resp.check_value_exist) {
+ resp.__set_check_value(check_value);
+ }
+ }
+
+ // Copy check_value's fields from `update` (i.e. the single-put request)
to `resp`.
+ //
+ // `resp` may be dsn::apps::check_and_set_response and
dsn::apps::check_and_mutate_response.
+ template <typename TResponse>
+ static inline void copy_check_value(const dsn::apps::update_request
&update, TResponse &resp)
+ {
+ resp.check_value_returned = update.check_value_returned;
+ resp.check_value_exist = update.check_value_exist;
+ resp.check_value = update.check_value;
+ }
+
+ // Build `update` (i.e. the corresponding single-put request for a
check_and_set request)
+ // based on `key`, `value` and `expire_ts_seconds`. Return current status
for RocksDB
+ // (i.e. kOk).
+ static inline int make_idempotent_request_for_check_and_set(const
dsn::blob &key,
+ const
dsn::blob &value,
+ int32_t
expire_ts_seconds,
+
dsn::apps::update_request &update)
{
- CHECK(err != rocksdb::Status::kOk, "this incr response is built only
for error");
+ make_idempotent_request(
+ key, value, expire_ts_seconds,
dsn::apps::update_type::UT_CHECK_AND_SET, update);
+ return rocksdb::Status::kOk;
+ }
+
+ // Build response `resp` based on `err` only for the error case (i.e. the
current status
+ // `err` for RocksDB is not rocksdb::Status::kOk). Return `err`.
+ //
+ // `resp` may be dsn::apps::incr_response,
dsn::apps::check_and_set_response and
+ // dsn::apps::check_and_mutate_response.
+ template <typename TResponse>
+ inline int make_error_response(int err, TResponse &resp)
+ {
+ CHECK(err != rocksdb::Status::kOk, "this response is built only for
error");
resp.error = err;
const auto pid = get_gpid();
@@ -733,7 +913,8 @@ private:
return err;
}
- // Build incr response as above, except that also set new value for
response.
+ // Build the incr response `resp` based on `err` and `new_value` only for
the error case
+ // (i.e. the current status `err` for RocksDB is not
rocksdb::Status::kOk). Return `err`.
inline int make_error_response(int err, int64_t new_value,
dsn::apps::incr_response &resp)
{
resp.new_value = new_value;
@@ -747,108 +928,159 @@ private:
check_type <= ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER;
}
- // return true if check passed.
- // for int compare, if check operand or value are not valid integer, then
return false,
- // and set out param `invalid_argument' to false.
- bool validate_check(int64_t decree,
+ // Check whether the conditions are met for check_and_set and
check_and_mutate based on
+ // `check_type`, `check_operand`, `value_exist` and `check_value`.
+ //
+ // `decree` is not used to decide. It is only used for logging. Its type
is designed to
+ // be a template parameter, for the reason that this function might be
called before the
+ // decree is assigned for a mutation, for example, while traslating an
atomic write request
+ // to a single-put request. If unassigned, it could be any type as long as
it is printable.
+ //
+ // `invalid_argument` is only used while comparing integers: if
`check_operand` or
+ // `check_value` is not valid integer, `invalid_argument' would be set
true, and the
+ // function would return false.
+ //
+ // Return true if check passed, otherwise false.
+ template <typename TDecree>
+ bool validate_check(TDecree decree,
::dsn::apps::cas_check_type::type check_type,
const ::dsn::blob &check_operand,
bool value_exist,
- const ::dsn::blob &value,
+ const ::dsn::blob &check_value,
bool &invalid_argument)
{
invalid_argument = false;
switch (check_type) {
case ::dsn::apps::cas_check_type::CT_NO_CHECK:
return true;
+
case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EXIST:
return !value_exist;
+
case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EXIST_OR_EMPTY:
- return !value_exist || value.length() == 0;
+ return !value_exist || check_value.empty();
+
case ::dsn::apps::cas_check_type::CT_VALUE_EXIST:
return value_exist;
+
case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EMPTY:
- return value_exist && value.length() != 0;
+ return value_exist && !check_value.empty();
+
case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE:
case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_PREFIX:
case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_POSTFIX: {
- if (!value_exist)
+ if (!value_exist) {
return false;
- if (check_operand.length() == 0)
+ }
+
+ if (check_operand.empty()) {
return true;
- if (value.length() < check_operand.length())
+ }
+
+ if (check_value.length() < check_operand.length()) {
return false;
+ }
+
if (check_type ==
::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE) {
- return
value.to_string_view().find(check_operand.to_string_view()) !=
- std::string_view::npos;
- } else if (check_type ==
::dsn::apps::cas_check_type::CT_VALUE_MATCH_PREFIX) {
- return dsn::utils::mequals(
- value.data(), check_operand.data(),
check_operand.length());
- } else { // check_type ==
::dsn::apps::cas_check_type::CT_VALUE_MATCH_POSTFIX
- return dsn::utils::mequals(value.data() + value.length() -
check_operand.length(),
- check_operand.data(),
- check_operand.length());
+ return check_value.contains(check_operand);
}
+
+ if (check_type ==
::dsn::apps::cas_check_type::CT_VALUE_MATCH_PREFIX) {
+ return check_value.starts_with(check_operand);
+ }
+
+ CHECK(check_type ==
::dsn::apps::cas_check_type::CT_VALUE_MATCH_POSTFIX,
+ "check_type({}) should be CT_VALUE_MATCH_POSTFIX",
+ cas_check_type_to_string(check_type));
+
+ return check_value.ends_with(check_operand);
}
+
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER: {
- if (!value_exist)
+ if (!value_exist) {
return false;
- int c =
value.to_string_view().compare(check_operand.to_string_view());
+ }
+
+ const int c = check_value.compare(check_operand);
if (c < 0) {
return check_type <=
::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL;
- } else if (c == 0) {
- return check_type >=
::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL &&
- check_type <=
::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL;
- } else { // c > 0
+ }
+
+ if (c > 0) {
return check_type >=
::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL;
}
+
+ // Must be c == 0.
+ return check_type >=
::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL &&
+ check_type <=
::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL;
}
+
case ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER: {
- if (!value_exist)
+ if (!value_exist) {
return false;
- int64_t check_value_int;
- if (!dsn::buf2int64(value.to_string_view(), check_value_int)) {
- // invalid check value
- LOG_ERROR_PREFIX("check failed: decree = {}, error = "
- "check value \"{}\" is not an integer or out
of range",
+ }
+
+ int64_t check_value_int = 0;
+ if (!dsn::buf2int64(check_value.to_string_view(),
check_value_int)) {
+ // `check_value` is not a valid int64.
+ LOG_ERROR_PREFIX("check failed: decree = {}, error = check
value \"{}\" is "
+ "not a valid integer or out of range",
decree,
- utils::c_escape_sensitive_string(value));
+
utils::c_escape_sensitive_string(check_value));
invalid_argument = true;
return false;
}
- int64_t check_operand_int;
+
+ int64_t check_operand_int = 0;
if (!dsn::buf2int64(check_operand.to_string_view(),
check_operand_int)) {
- // invalid check operand
- LOG_ERROR_PREFIX("check failed: decree = {}, error = "
- "check operand \"{}\" is not an integer or
out of range",
+ // `check_operand` is not a valid int64.
+ LOG_ERROR_PREFIX("check failed: decree = {}, error = check
operand \"{}\" is "
+ "not a valid integer or out of range",
decree,
utils::c_escape_sensitive_string(check_operand));
invalid_argument = true;
return false;
}
+
if (check_value_int < check_operand_int) {
return check_type <=
::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL;
- } else if (check_value_int == check_operand_int) {
- return check_type >=
::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL &&
- check_type <=
::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL;
- } else { // check_value_int > check_operand_int
+ }
+
+ if (check_value_int > check_operand_int) {
return check_type >=
::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL;
}
+
+ // Must be check_value_int == check_operand_int.
+ return check_type >=
::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL &&
+ check_type <=
::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL;
}
+
default:
CHECK(false, "unsupported check type: {}", check_type);
}
+
return false;
}
+ // The same as the above function, only used when the decree has not been
assigned.
+ bool validate_check(::dsn::apps::cas_check_type::type check_type,
+ const ::dsn::blob &check_operand,
+ bool value_exist,
+ const ::dsn::blob &check_value,
+ bool &invalid_argument)
+ {
+ return validate_check(
+ "UNASSIGNED", check_type, check_operand, value_exist, check_value,
invalid_argument);
+ }
+
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
friend class PegasusWriteServiceImplTest;
diff --git a/src/server/result_writer.cpp b/src/server/result_writer.cpp
index 4d3a9160e..df78172e9 100644
--- a/src/server/result_writer.cpp
+++ b/src/server/result_writer.cpp
@@ -21,6 +21,7 @@
#include <pegasus/error.h>
#include <chrono>
+#include <type_traits>
#include <utility>
#include "pegasus/client.h"
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 4aa30b228..feba097cd 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -176,6 +176,15 @@ int rocksdb_wrapper::write_batch_put_ctx(const
db_write_context &ctx,
return s.code();
}
+int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
+ const dsn::blob &raw_key,
+ const dsn::blob &value,
+ int32_t expire_sec)
+{
+ return write_batch_put_ctx(
+ ctx, raw_key.to_string_view(), value.to_string_view(),
static_cast<uint32_t>(expire_sec));
+}
+
int rocksdb_wrapper::write(int64_t decree)
{
CHECK_GT(_write_batch->Count(), 0);
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index f905dea93..feaefd70d 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -22,16 +22,20 @@
#include <gtest/gtest_prod.h>
#include <rocksdb/options.h>
#include <rocksdb/write_batch.h>
-#include <stdint.h>
+#include <cstdint>
#include <memory>
#include <string>
+#include <string_view>
#include <vector>
#include "pegasus_value_schema.h"
#include "replica/replica_base.h"
-#include <string_view>
#include "utils/metrics.h"
+namespace dsn {
+class blob;
+} // namespace dsn
+
namespace rocksdb {
class ColumnFamilyHandle;
class DB;
@@ -64,6 +68,10 @@ public:
std::string_view raw_key,
std::string_view value,
uint32_t expire_sec);
+ int write_batch_put_ctx(const db_write_context &ctx,
+ const dsn::blob &raw_key,
+ const dsn::blob &value,
+ int32_t expire_sec);
int write(int64_t decree);
int write_batch_delete(int64_t decree, std::string_view raw_key);
void clear_up_write_batch();
diff --git a/src/utils/blob.h b/src/utils/blob.h
index 5b9d045c4..c1212afb4 100644
--- a/src/utils/blob.h
+++ b/src/utils/blob.h
@@ -26,7 +26,9 @@
#pragma once
+#include <boost/algorithm/string/predicate.hpp>
#include <fmt/core.h>
+#include <algorithm>
#include <cstring>
#include <memory>
#include <string_view>
@@ -37,6 +39,7 @@
#include "utils/fmt_logging.h"
#include "utils/fmt_utils.h"
+#include "utils/strings.h"
#include "utils.h"
namespace dsn {
@@ -224,6 +227,31 @@ public:
[[nodiscard]] std::string_view to_string_view() const { return {_data,
_length}; }
+ // Returns an integer less than, equal to, or greater than zero if _data
is less than,
+ // equal to, or greater than rhs._data.
+ [[nodiscard]] int compare(const blob &rhs) const noexcept
+ {
+ return to_string_view().compare(rhs.to_string_view());
+ }
+
+ // Return true if _data contains rhs._data, otherwise false.
+ [[nodiscard]] bool contains(const blob &rhs) const
+ {
+ return boost::algorithm::contains(to_string_view(),
rhs.to_string_view());
+ }
+
+ // Return true if _data starts with rhs._data, otherwise false.
+ [[nodiscard]] bool starts_with(const blob &rhs) const
+ {
+ return boost::algorithm::starts_with(to_string_view(),
rhs.to_string_view());
+ }
+
+ // Return true if _data ends with rhs._data, otherwise false.
+ [[nodiscard]] bool ends_with(const blob &rhs) const
+ {
+ return boost::algorithm::ends_with(to_string_view(),
rhs.to_string_view());
+ }
+
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
diff --git a/src/utils/test/utils.cpp b/src/utils/test/utils.cpp
index 01c880acd..d1888a7f5 100644
--- a/src/utils/test/utils.cpp
+++ b/src/utils/test/utils.cpp
@@ -30,7 +30,6 @@
#include <set>
#include <string>
#include <tuple>
-#include <type_traits>
#include <unordered_set>
#include <utility>
#include <vector>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]