This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 8f75b5293 feat(make_idempotent): support making `incr` request
idempotent in `pegasus_write_service::impl` (#2185)
8f75b5293 is described below
commit 8f75b5293597d1be1e053e75a828f57d3a6227ea
Author: Dan Wang <[email protected]>
AuthorDate: Wed Feb 5 11:12:29 2025 +0800
feat(make_idempotent): support making `incr` request idempotent in
`pegasus_write_service::impl` (#2185)
Implement two APIs making `incr` request idempotent in
`pegasus_write_service::impl`:
- translate an `incr` request (non-idempotent) into a single-put request
(idempotent),
and process the possible errors during the translation, e.g. failed to read
base value for
increment from the RocksDB instance;
- apply the single-put request into the RocksDB instance and make response
for `incr`.
---
idl/rrdb.thrift | 15 +
src/server/pegasus_server_write.h | 2 +-
src/server/pegasus_write_service.h | 3 +-
src/server/pegasus_write_service_impl.h | 178 ++++++++-
src/server/rocksdb_wrapper.cpp | 13 +-
.../test/pegasus_write_service_impl_test.cpp | 396 +++++++++++++++++----
src/utils/blob.h | 13 +-
7 files changed, 530 insertions(+), 90 deletions(-)
diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift
index da778f55e..c0534785d 100644
--- a/idl/rrdb.thrift
+++ b/idl/rrdb.thrift
@@ -68,11 +68,26 @@ enum mutate_operation
MO_DELETE
}
+enum update_type
+{
+ UT_PUT,
+ UT_INCR
+}
+
+// The single-put request, just writes a key/value pair into storage, which is
certainly
+// idempotent.
struct update_request
{
1:dsn.blob key;
2:dsn.blob value;
3:i32 expire_ts_seconds;
+
+ // This field marks the type of a single-put request, mainly used to
differentiate a general
+ // single-put request from the one translated from a non-idempotent atomic
write request:
+ // - a general single-put request, if `type` is UT_PUT or not set by
default as it's
+ // optional, or
+ // - a put request translated from a non-idempotent incr request, if
`type` is UT_INCR.
+ 4:optional update_type type;
}
struct update_response
diff --git a/src/server/pegasus_server_write.h
b/src/server/pegasus_server_write.h
index add75b392..d2be2d5a0 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -89,7 +89,7 @@ private:
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
- friend class pegasus_write_service_impl_test;
+ friend class PegasusWriteServiceImplTest;
friend class rocksdb_wrapper_test;
std::unique_ptr<pegasus_write_service> _write_svc;
diff --git a/src/server/pegasus_write_service.h
b/src/server/pegasus_write_service.h
index f430b48b0..2142792a1 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -189,9 +189,8 @@ public:
private:
void clear_up_batch_states();
-private:
friend class pegasus_write_service_test;
- friend class pegasus_write_service_impl_test;
+ friend class PegasusWriteServiceImplTest;
friend class pegasus_server_write_test;
friend class rocksdb_wrapper_test;
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index 35a9c6399..ff0f2bfac 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -171,6 +171,99 @@ public:
return resp.error;
}
+ // Tranlate an incr request which is certainly non-idempotent into a
single-put request
+ // which is certainly idempotent. Return current status for RocksDB.
+ int make_idempotent(const dsn::apps::incr_request &req,
+ dsn::apps::incr_response &err_resp,
+ dsn::apps::update_request &update)
+ {
+ // Get current raw value for the provided key from the RocksDB
instance.
+ db_get_context get_ctx;
+ const int err = _rocksdb_wrapper->get(req.key.to_string_view(),
&get_ctx);
+ if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+ return make_error_response(err, err_resp);
+ }
+
+ if (!get_ctx.found || get_ctx.expired) {
+ // Once the provided key is not found or has been expired, we
could assume that
+ // its value is 0 before incr; thus the final result for incr
could be set as
+ // the value of the single-put request, i.e. req.increment.
+ return make_idempotent_request_for_incr(
+ req.key, req.increment, calc_expire_on_non_existent(req),
update);
+ }
+
+ // Extract user data from raw value as base for increment.
+ dsn::blob base_value;
+ pegasus_extract_user_data(_pegasus_data_version,
std::move(get_ctx.raw_value), base_value);
+
+ int64_t new_int = 0;
+ if (base_value.empty()) {
+ // Old value is also considered as 0 before incr as above once
it's empty, thus
+ // set req.increment as the value for single put.
+ new_int = req.increment;
+ } else {
+ int64_t base_int = 0;
+ if (dsn_unlikely(!dsn::buf2int64(base_value.to_string_view(),
base_int))) {
+ // Old value is not valid int64.
+ LOG_ERROR_PREFIX("incr failed: error = base value \"{}\" "
+ "is not an integer or out of range",
+ utils::c_escape_sensitive_string(base_value));
+ return make_error_response(rocksdb::Status::kInvalidArgument,
err_resp);
+ }
+
+ new_int = base_int + req.increment;
+ if (dsn_unlikely((req.increment > 0 && new_int < base_int) ||
+ (req.increment < 0 && new_int > base_int))) {
+ // New value overflows, just respond with the base value.
+ LOG_ERROR_PREFIX("incr failed: error = new value is out of
range, "
+ "base_value = {}, increment = {}, new_value =
{}",
+ base_int,
+ req.increment,
+ new_int);
+ return make_error_response(rocksdb::Status::kInvalidArgument,
base_int, err_resp);
+ }
+ }
+
+ return make_idempotent_request_for_incr(
+ req.key, new_int, calc_expire_on_existing(req, get_ctx), update);
+ }
+
+ // Apply single-put request translated from incr request into RocksDB, and
build response
+ // for incr. Return current status for RocksDB.
+ int put(const db_write_context &ctx,
+ const dsn::apps::update_request &update,
+ dsn::apps::incr_response &resp)
+ {
+ resp.app_id = get_gpid().get_app_id();
+ resp.partition_index = get_gpid().get_partition_index();
+ resp.decree = ctx.decree;
+ resp.server = _primary_host_port;
+
+ auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
+
+ resp.error =
+ _rocksdb_wrapper->write_batch_put_ctx(ctx,
+ update.key.to_string_view(),
+
update.value.to_string_view(),
+
static_cast<uint32_t>(update.expire_ts_seconds));
+ if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
+ return resp.error;
+ }
+
+ resp.error = _rocksdb_wrapper->write(ctx.decree);
+ if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
+ return resp.error;
+ }
+
+ // Shouldn't fail to parse since the value must be a valid int64.
+ CHECK(dsn::buf2int64(update.value.to_string_view(), resp.new_value),
+ "invalid int64 value for put idempotent incr: key={}, value={}",
+ update.key,
+ update.value);
+
+ return resp.error;
+ }
+
int incr(int64_t decree, const dsn::apps::incr_request &update,
dsn::apps::incr_response &resp)
{
resp.app_id = get_gpid().get_app_id();
@@ -242,7 +335,7 @@ public:
auto cleanup = dsn::defer([this]() {
_rocksdb_wrapper->clear_up_write_batch(); });
resp.error = _rocksdb_wrapper->write_batch_put(
decree, update.key.to_string_view(), std::to_string(new_value),
new_expire_ts);
- if (resp.error) {
+ if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
@@ -250,6 +343,7 @@ public:
if (resp.error == rocksdb::Status::kOk) {
resp.new_value = new_value;
}
+
return resp.error;
}
@@ -569,6 +663,83 @@ private:
return raw_key;
}
+ // Calculate expire timestamp in seconds for the keys not contained in the
storage
+ // according to `req`.
+ template <typename TRequest>
+ static inline int32_t calc_expire_on_non_existent(const TRequest &req)
+ {
+ return req.expire_ts_seconds > 0 ? req.expire_ts_seconds : 0;
+ }
+
+ // Calculate new expire timestamp in seconds for the keys contained in the
storage
+ // according to `req` and their current expire timestamp in `get_ctx`.
+ template <typename TRequest>
+ static inline int32_t calc_expire_on_existing(const TRequest &req,
+ const db_get_context
&get_ctx)
+ {
+ if (req.expire_ts_seconds == 0) {
+ // Still use current expire timestamp of the existing key as the
new value.
+ return static_cast<int32_t>(get_ctx.expire_ts);
+ }
+
+ if (req.expire_ts_seconds < 0) {
+ // Reset expire timestamp to 0.
+ return 0;
+ }
+
+ return req.expire_ts_seconds;
+ }
+
+ // Build a single-put request by provided int64 value.
+ static inline void make_idempotent_request(const dsn::blob &key,
+ int64_t value,
+ int32_t expire_ts_seconds,
+ dsn::apps::update_type::type
type,
+ dsn::apps::update_request
&update)
+ {
+ update.key = key;
+ update.value = dsn::blob::create_from_numeric(value);
+ update.expire_ts_seconds = expire_ts_seconds;
+ update.__set_type(type);
+ }
+
+ // Build corresponding single-put request for an incr request, and return
current status
+ // for RocksDB, i.e. kOk.
+ static inline int make_idempotent_request_for_incr(const dsn::blob &key,
+ int64_t value,
+ int32_t
expire_ts_seconds,
+
dsn::apps::update_request &update)
+ {
+ make_idempotent_request(
+ key, value, expire_ts_seconds, dsn::apps::update_type::UT_INCR,
update);
+ return rocksdb::Status::kOk;
+ }
+
+ // Build incr response only for error, and return the current error status
for RocksDB.
+ inline int make_error_response(int err, dsn::apps::incr_response &resp)
+ {
+ CHECK(err != rocksdb::Status::kOk, "this incr response is built only
for error");
+ resp.error = err;
+
+ const auto pid = get_gpid();
+ resp.app_id = pid.get_app_id();
+ resp.partition_index = pid.get_partition_index();
+
+ // Currently the mutation has not been assigned with valid decree,
thus set to -1.
+ resp.decree = -1;
+
+ resp.server = _primary_host_port;
+
+ return err;
+ }
+
+ // Build incr response as above, except that also set new value for
response.
+ inline int make_error_response(int err, int64_t new_value,
dsn::apps::incr_response &resp)
+ {
+ resp.new_value = new_value;
+ return make_error_response(err, resp);
+ }
+
// return true if the check type is supported
static bool is_check_type_supported(::dsn::apps::cas_check_type::type
check_type)
{
@@ -678,13 +849,10 @@ private:
return false;
}
-private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
- friend class pegasus_write_service_impl_test;
+ friend class PegasusWriteServiceImplTest;
friend class rocksdb_wrapper_test;
- FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
- FRIEND_TEST(pegasus_write_service_impl_test,
verify_timetag_compatible_with_version_0);
const std::string _primary_host_port;
const uint32_t _pegasus_data_version;
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 6f5ba8bc3..4aa30b228 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -79,20 +79,25 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/
db_get_context *ctx)
{
FAIL_POINT_INJECT_F("db_get", [](std::string_view) -> int { return
FAIL_DB_GET; });
- rocksdb::Status s =
+ const rocksdb::Status s =
_db->Get(_rd_opts, _data_cf, utils::to_rocksdb_slice(raw_key),
&ctx->raw_value);
if (dsn_likely(s.ok())) {
- // success
+ // The key is found and its value is read successfully.
ctx->found = true;
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version,
ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
METRIC_VAR_INCREMENT(read_expired_values);
+ } else {
+ ctx->expired = false;
}
return rocksdb::Status::kOk;
- } else if (s.IsNotFound()) {
- // NotFound is an acceptable error
+ }
+
+ if (s.IsNotFound()) {
+ // NotFound is considered normal since the key may not be present in
DB now.
ctx->found = false;
+ ctx->expired = false;
return rocksdb::Status::kOk;
}
diff --git a/src/server/test/pegasus_write_service_impl_test.cpp
b/src/server/test/pegasus_write_service_impl_test.cpp
index 0ac33faa4..053dd6333 100644
--- a/src/server/test/pegasus_write_service_impl_test.cpp
+++ b/src/server/test/pegasus_write_service_impl_test.cpp
@@ -23,163 +23,407 @@
#include <limits>
#include <memory>
#include <string>
+#include <string_view>
+#include <utility>
#include "gtest/gtest.h"
#include "pegasus_key_schema.h"
#include "pegasus_server_test_base.h"
+#include "pegasus_value_schema.h"
#include "rrdb/rrdb_types.h"
#include "server/pegasus_server_write.h"
#include "server/pegasus_write_service.h"
#include "server/pegasus_write_service_impl.h"
#include "server/rocksdb_wrapper.h"
#include "utils/blob.h"
+#include "utils/defer.h"
+#include "utils/error_code.h"
#include "utils/fail_point.h"
-#include <string_view>
+#include "utils/string_conv.h"
-namespace pegasus {
-namespace server {
+// IWYU pragma: no_forward_declare
pegasus::server::IdempotentIncrTest_FailOnGet_Test
+// IWYU pragma: no_forward_declare
pegasus::server::IdempotentIncrTest_FailOnPut_Test
+// IWYU pragma: no_forward_declare
pegasus::server::IdempotentIncrTest_IncrOnNonNumericRecord_Test
+// IWYU pragma: no_forward_declare
pegasus::server::IdempotentIncrTest_IncrOverflowed_Test
+// IWYU pragma: no_forward_declare
pegasus::server::NonIdempotentIncrTest_FailOnGet_Test
+// IWYU pragma: no_forward_declare
pegasus::server::NonIdempotentIncrTest_FailOnPut_Test
+// IWYU pragma: no_forward_declare
pegasus::server::NonIdempotentIncrTest_IncrOnNonNumericRecord_Test
+// IWYU pragma: no_forward_declare
pegasus::server::NonIdempotentIncrTest_IncrOverflowed_Test
-class pegasus_write_service_impl_test : public pegasus_server_test_base
+namespace pegasus::server {
+
+class PegasusWriteServiceImplTest : public pegasus_server_test_base
{
protected:
std::unique_ptr<pegasus_server_write> _server_write;
pegasus_write_service::impl *_write_impl{nullptr};
rocksdb_wrapper *_rocksdb_wrapper{nullptr};
-public:
void SetUp() override
{
- start();
+ ASSERT_EQ(dsn::ERR_OK, start());
_server_write = std::make_unique<pegasus_server_write>(_server.get());
_write_impl = _server_write->_write_svc->_impl.get();
_rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
}
- int db_get(std::string_view raw_key, db_get_context *get_ctx)
+public:
+ // Given `raw_key`, get its value from DB with the result stored in
`get_ctx`. Should
+ // never fail. There are only 3 possible results:
+ // - the key is found and its value is read successfully;
+ // - the key is found but expired;
+ // - the key is not found.
+ void db_get(std::string_view raw_key, db_get_context *get_ctx)
+ {
+ ASSERT_EQ(rocksdb::Status::kOk, _rocksdb_wrapper->get(raw_key,
get_ctx));
+ }
+
+ void db_get(const dsn::blob &raw_key, db_get_context *get_ctx)
{
- return _rocksdb_wrapper->get(raw_key, get_ctx);
+ db_get(raw_key.to_string_view(), get_ctx);
}
- void single_set(dsn::blob raw_key, dsn::blob user_value)
+ // Apply single put into DB. Should never fail. `raw_key`/`user_value`
would always
+ // be applied successfully.
+ void single_set(const dsn::blob &raw_key, const dsn::blob &user_value)
{
dsn::apps::update_request put;
put.key = raw_key;
put.value = user_value;
+
db_write_context write_ctx;
dsn::apps::update_response put_resp;
- _write_impl->batch_put(write_ctx, put, put_resp);
- ASSERT_EQ(_write_impl->batch_commit(0), 0);
+ ASSERT_EQ(rocksdb::Status::kOk, _write_impl->batch_put(write_ctx, put,
put_resp));
+ ASSERT_EQ(rocksdb::Status::kOk, _write_impl->batch_commit(0));
+ }
+
+ // Extract `user_data` as string from `raw_value` in DB.
+ void extract_user_data(std::string &&raw_value, std::string &user_data)
+ {
+ dsn::blob data;
+ pegasus_extract_user_data(_write_impl->_pegasus_data_version,
std::move(raw_value), data);
+ user_data = data.to_string();
+ }
+
+ // Extract `user_data` as int64 from `raw_value` in DB.
+ void extract_user_data(std::string &&raw_value, int64_t &user_data)
+ {
+ std::string data;
+ extract_user_data(std::move(raw_value), data);
+ ASSERT_TRUE(dsn::buf2int64(data, user_data));
}
};
-class incr_test : public pegasus_write_service_impl_test
+// Initialize a base value with a checker deciding if the base value is still
the same as
+// it is in DB at the end of the scope.
+#define INIT_BASE_VALUE_AND_CHECKER(type, val)
\
+ static const type kBaseValue = (val);
\
+ auto kBaseValueChecker = dsn::defer([this]() {
check_db_record(kBaseValue); })
+
+// Put a string value into DB and check if it is still the same as it is in DB
at the end
+// of the scope.
+#define PUT_BASE_VALUE_STRING(val)
\
+ INIT_BASE_VALUE_AND_CHECKER(std::string, val);
\
+ single_set(req.key, dsn::blob::create_from_bytes(std::string(kBaseValue)))
+
+// Put an int64 value into DB and check if it is still the same as it is in DB
at the end
+// of the scope.
+#define PUT_BASE_VALUE_INT64(val)
\
+ INIT_BASE_VALUE_AND_CHECKER(int64_t, val);
\
+ single_set(req.key, dsn::blob::create_from_numeric(kBaseValue))
+
+class IncrTest : public PegasusWriteServiceImplTest
{
-public:
+protected:
void SetUp() override
{
- pegasus_write_service_impl_test::SetUp();
- pegasus::pegasus_generate_key(
- req.key, std::string_view("hash_key"),
std::string_view("sort_key"));
+ PegasusWriteServiceImplTest::SetUp();
+ generate_key("incr_hash_key", "incr_sort_key");
+ req.expire_ts_seconds = 0;
+ }
+
+public:
+ void generate_key(const std::string &hash_key, const std::string &sort_key)
+ {
+ pegasus_generate_key(req.key, hash_key, sort_key);
+ }
+
+ // Check that the key must exist in DB and its value should be the same as
`expected_value`.
+ template <typename TVal>
+ void check_db_record(const TVal &expected_value)
+ {
+ db_get_context get_ctx;
+ db_get(req.key, &get_ctx);
+ ASSERT_TRUE(get_ctx.found);
+ ASSERT_FALSE(get_ctx.expired);
+
+ TVal actual_value;
+ extract_user_data(std::move(get_ctx.raw_value), actual_value);
+ ASSERT_EQ(expected_value, actual_value);
+ }
+
+ // Check that the key must be found in DB but expired.
+ void check_db_record_expired()
+ {
+ db_get_context get_ctx;
+ db_get(req.key, &get_ctx);
+ ASSERT_TRUE(get_ctx.found);
+ ASSERT_TRUE(get_ctx.expired);
+ }
+
+ // Test if the incr result in response is correct while there is not any
error during incr.
+ virtual void test_incr(int64_t base, int64_t increment) = 0;
+
+ // Test if both the incr result in response and the value in DB are
correct while there is
+ // not any error during incr.
+ void test_incr_and_check_db_record(int64_t base, int64_t increment)
+ {
+ test_incr(base, increment);
+ check_db_record(base + increment);
+ }
+
+ // Test if incr could be executed correctly while the key did not exist in
DB previously.
+ void test_incr_on_absent_record(int64_t increment)
+ {
+ // Ensure that the key is absent.
+ db_get_context get_ctx;
+ db_get(req.key, &get_ctx);
+ ASSERT_FALSE(get_ctx.found);
+ ASSERT_FALSE(get_ctx.expired);
+
+ // The base value should be 0 as the record is absent.
+ test_incr_and_check_db_record(0, increment);
+ }
+
+ // Test if incr could be executed correctly while the key has been present
in DB.
+ void test_incr_on_existing_record(int64_t base, int64_t increment)
+ {
+ // Preload the record into DB as the existing key.
+ single_set(req.key, dsn::blob::create_from_numeric(base));
+
+ test_incr_and_check_db_record(base, increment);
}
dsn::apps::incr_request req;
dsn::apps::incr_response resp;
};
-INSTANTIATE_TEST_SUITE_P(, incr_test, ::testing::Values(false, true));
+class NonIdempotentIncrTest : public IncrTest
+{
+public:
+ // Test `incr` with both returned error and error in response as expected.
+ void test_non_idempotent_incr(int64_t increment, int expected_ret_err, int
expected_resp_err)
+ {
+ req.increment = increment;
+ ASSERT_EQ(expected_ret_err, _write_impl->incr(0, req, resp));
+ ASSERT_EQ(expected_resp_err, resp.error);
+ }
+
+ void test_incr(int64_t base, int64_t increment) override
+ {
+ test_non_idempotent_incr(increment, rocksdb::Status::kOk,
rocksdb::Status::kOk);
+ ASSERT_EQ(base + increment, resp.new_value);
+ }
+};
+
+TEST_P(NonIdempotentIncrTest, IncrOneOnAbsentRecord) {
test_incr_on_absent_record(1); }
+
+TEST_P(NonIdempotentIncrTest, IncrBigOnAbsentRecord) {
test_incr_on_absent_record(1); }
+
+TEST_P(NonIdempotentIncrTest, IncrOneOnExistingRecord) {
test_incr_on_existing_record(10, 1); }
+
+TEST_P(NonIdempotentIncrTest, IncrBigOnExistingRecord) {
test_incr_on_existing_record(10, 100); }
+
+TEST_P(NonIdempotentIncrTest, IncrNegative)
+{
+ test_incr_on_absent_record(-100);
+ test_incr_and_check_db_record(-100, -1);
+}
+
+TEST_P(NonIdempotentIncrTest, IncrZero)
+{
+ test_incr_on_absent_record(0);
+ test_incr_on_existing_record(10, 0);
+ test_incr_on_existing_record(-10, 0);
+}
+
+TEST_P(NonIdempotentIncrTest, IncrOnNonNumericRecord)
+{
+ PUT_BASE_VALUE_STRING("abc");
+
+ test_non_idempotent_incr(1, rocksdb::Status::kOk,
rocksdb::Status::kInvalidArgument);
+}
+
+TEST_P(NonIdempotentIncrTest, IncrOverflowed)
+{
+ PUT_BASE_VALUE_INT64(100);
+
+ test_non_idempotent_incr(std::numeric_limits<int64_t>::max(),
+ rocksdb::Status::kOk,
+ rocksdb::Status::kInvalidArgument);
+ ASSERT_EQ(kBaseValue, resp.new_value);
+}
-TEST_P(incr_test, incr_on_absent_record)
+TEST_P(NonIdempotentIncrTest, FailOnGet)
{
- // ensure key is absent
- db_get_context get_ctx;
- db_get(req.key.to_string_view(), &get_ctx);
- ASSERT_FALSE(get_ctx.found);
+ PUT_BASE_VALUE_INT64(100);
- req.increment = 100;
- _write_impl->incr(0, req, resp);
- ASSERT_EQ(resp.new_value, 100);
+ dsn::fail::setup();
+
+ // `incr` should return an error once failed to get current value from DB.
+ dsn::fail::cfg("db_get", "100%1*return()");
+ test_non_idempotent_incr(10, FAIL_DB_GET, FAIL_DB_GET);
- db_get(req.key.to_string_view(), &get_ctx);
- ASSERT_TRUE(get_ctx.found);
+ dsn::fail::teardown();
}
-TEST_P(incr_test, negative_incr_and_zero_incr)
+TEST_P(NonIdempotentIncrTest, FailOnPut)
{
- req.increment = -100;
- ASSERT_EQ(0, _write_impl->incr(0, req, resp));
- ASSERT_EQ(resp.new_value, -100);
+ PUT_BASE_VALUE_INT64(100);
- req.increment = -1;
- ASSERT_EQ(0, _write_impl->incr(0, req, resp));
- ASSERT_EQ(resp.new_value, -101);
+ dsn::fail::setup();
+
+ // `incr` should return an error once failed to write into batch.
+ dsn::fail::cfg("db_write_batch_put", "100%1*return()");
+ test_non_idempotent_incr(10, FAIL_DB_WRITE_BATCH_PUT,
FAIL_DB_WRITE_BATCH_PUT);
- req.increment = 0;
- ASSERT_EQ(0, _write_impl->incr(0, req, resp));
- ASSERT_EQ(resp.new_value, -101);
+ dsn::fail::teardown();
}
-TEST_P(incr_test, invalid_incr)
+TEST_P(NonIdempotentIncrTest, IncrOnExpireRecord)
{
- single_set(req.key, dsn::blob::create_from_bytes("abc"));
+ // Make the record expired.
+ req.expire_ts_seconds = 1;
+ test_non_idempotent_incr(10, rocksdb::Status::kOk, rocksdb::Status::kOk);
+
+ // Now the record should be expired.
+ check_db_record_expired();
+
+ // Incr the expired key.
+ req.expire_ts_seconds = 0;
+ test_incr_and_check_db_record(0, 100);
+}
- req.increment = 10;
- _write_impl->incr(1, req, resp);
- ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
- ASSERT_EQ(resp.new_value, 0);
+INSTANTIATE_TEST_SUITE_P(PegasusWriteServiceImplTest,
+ NonIdempotentIncrTest,
+ testing::Values(false, true));
- single_set(req.key, dsn::blob::create_from_bytes("100"));
+class IdempotentIncrTest : public IncrTest
+{
+public:
+ // Test make_idempotent for the incr request.
+ void test_make_idempotent(int64_t increment, int expected_err)
+ {
+ req.increment = increment;
+ const int err = _write_impl->make_idempotent(req, err_resp, update);
+ ASSERT_EQ(expected_err, err);
+ if (expected_err == rocksdb::Status::kOk) {
+ return;
+ }
+
+ ASSERT_EQ(expected_err, err_resp.error);
+ }
+
+ // Test idempotent write for the incr request:
+ // - make_idempotent for incr should be successful;
+ // - then, apply the idempotent put request into DB.
+ void test_idempotent_incr(int64_t increment, int expected_err)
+ {
+ test_make_idempotent(increment, rocksdb::Status::kOk);
+
+ db_write_context write_ctx;
+ ASSERT_EQ(expected_err, _write_impl->put(write_ctx, update, resp));
+ ASSERT_EQ(expected_err, resp.error);
+ }
- req.increment = std::numeric_limits<int64_t>::max();
- _write_impl->incr(1, req, resp);
- ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
- ASSERT_EQ(resp.new_value, 100);
+ void test_incr(int64_t base, int64_t increment) override
+ {
+ test_idempotent_incr(increment, rocksdb::Status::kOk);
+ ASSERT_EQ(base + increment, resp.new_value);
+ }
+
+ dsn::apps::incr_response err_resp;
+ dsn::apps::update_request update;
+};
+
+TEST_P(IdempotentIncrTest, IncrOneOnAbsentRecord) {
test_incr_on_absent_record(1); }
+
+TEST_P(IdempotentIncrTest, IncrBigOnAbsentRecord) {
test_incr_on_absent_record(100); }
+
+TEST_P(IdempotentIncrTest, IncrOneOnExistingRecord) {
test_incr_on_existing_record(10, 1); }
+
+TEST_P(IdempotentIncrTest, IncrBigOnExistingRecord) {
test_incr_on_existing_record(10, 100); }
+
+TEST_P(IdempotentIncrTest, IncrNegative)
+{
+ test_incr_on_absent_record(-100);
+ test_incr_and_check_db_record(-100, -1);
+}
+
+TEST_P(IdempotentIncrTest, IncrZero)
+{
+ test_incr_on_absent_record(0);
+ test_incr_on_existing_record(10, 0);
+ test_incr_on_existing_record(-10, 0);
+}
+
+TEST_P(IdempotentIncrTest, IncrOnNonNumericRecord)
+{
+ PUT_BASE_VALUE_STRING("abc");
+
+ test_make_idempotent(1, rocksdb::Status::kInvalidArgument);
+}
+
+TEST_P(IdempotentIncrTest, IncrOverflowed)
+{
+ PUT_BASE_VALUE_INT64(100);
+
+ test_make_idempotent(std::numeric_limits<int64_t>::max(),
rocksdb::Status::kInvalidArgument);
+ ASSERT_EQ(kBaseValue, err_resp.new_value);
}
-TEST_P(incr_test, fail_on_get)
+TEST_P(IdempotentIncrTest, FailOnGet)
{
+ PUT_BASE_VALUE_INT64(100);
+
dsn::fail::setup();
- dsn::fail::cfg("db_get", "100%1*return()");
- // when db_get failed, incr should return an error.
- req.increment = 10;
- _write_impl->incr(1, req, resp);
- ASSERT_EQ(resp.error, FAIL_DB_GET);
+ // `make_idempotent` should return an error once failed to get current
value from DB.
+ dsn::fail::cfg("db_get", "100%1*return()");
+ test_make_idempotent(10, FAIL_DB_GET);
dsn::fail::teardown();
}
-TEST_P(incr_test, fail_on_put)
+TEST_P(IdempotentIncrTest, FailOnPut)
{
+ PUT_BASE_VALUE_INT64(100);
+
dsn::fail::setup();
- dsn::fail::cfg("db_write_batch_put", "100%1*return()");
- // when rocksdb put failed, incr should return an error.
- req.increment = 10;
- _write_impl->incr(1, req, resp);
- ASSERT_EQ(resp.error, FAIL_DB_WRITE_BATCH_PUT);
+ // `put` should return an error once failed to write into batch.
+ dsn::fail::cfg("db_write_batch_put", "100%1*return()");
+ test_idempotent_incr(10, FAIL_DB_WRITE_BATCH_PUT);
dsn::fail::teardown();
}
-TEST_P(incr_test, incr_on_expire_record)
+TEST_P(IdempotentIncrTest, IncrOnExpireRecord)
{
- // make the key expired
+ // Make the record expired.
req.expire_ts_seconds = 1;
- _write_impl->incr(0, req, resp);
+ test_idempotent_incr(10, rocksdb::Status::kOk);
- // check whether the key is expired
- db_get_context get_ctx;
- db_get(req.key.to_string_view(), &get_ctx);
- ASSERT_TRUE(get_ctx.expired);
+ // Now the record should be expired.
+ check_db_record_expired();
- // incr the expired key
- req.increment = 100;
+ // Incr the expired key.
req.expire_ts_seconds = 0;
- _write_impl->incr(0, req, resp);
- ASSERT_EQ(resp.new_value, 100);
-
- db_get(req.key.to_string_view(), &get_ctx);
- ASSERT_TRUE(get_ctx.found);
+ test_incr_and_check_db_record(0, 100);
}
-} // namespace server
-} // namespace pegasus
+
+INSTANTIATE_TEST_SUITE_P(PegasusWriteServiceImplTest,
+ IdempotentIncrTest,
+ testing::Values(false, true));
+
+} // namespace pegasus::server
diff --git a/src/utils/blob.h b/src/utils/blob.h
index 07675e943..5b9d045c4 100644
--- a/src/utils/blob.h
+++ b/src/utils/blob.h
@@ -26,10 +26,12 @@
#pragma once
-#include <memory>
+#include <fmt/core.h>
#include <cstring>
-
+#include <memory>
#include <string_view>
+#include <type_traits>
+
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TProtocol.h>
@@ -126,6 +128,13 @@ public:
return {std::move(buf), static_cast<unsigned int>(s->length())};
}
+ template <typename TNum,
+ typename = typename
std::enable_if<std::is_arithmetic<TNum>::value>::type>
+ [[nodiscard]] static blob create_from_numeric(TNum val)
+ {
+ return create_from_bytes(fmt::format("{}", val));
+ }
+
void assign(const std::shared_ptr<char> &buffer, int offset, unsigned int
length)
{
_holder = buffer;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]