This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f75b5293 feat(make_idempotent): support making `incr` request 
idempotent in `pegasus_write_service::impl` (#2185)
8f75b5293 is described below

commit 8f75b5293597d1be1e053e75a828f57d3a6227ea
Author: Dan Wang <[email protected]>
AuthorDate: Wed Feb 5 11:12:29 2025 +0800

    feat(make_idempotent): support making `incr` request idempotent in 
`pegasus_write_service::impl` (#2185)
    
    Implement two APIs making `incr` request idempotent in 
`pegasus_write_service::impl`:
    
    - translate an `incr` request (non-idempotent) into a single-put request 
(idempotent),
    and process the possible errors during the translation, e.g. failed to read 
base value for
    increment from the RocksDB instance;
    - apply the single-put request into the RocksDB instance and make response 
for `incr`.
---
 idl/rrdb.thrift                                    |  15 +
 src/server/pegasus_server_write.h                  |   2 +-
 src/server/pegasus_write_service.h                 |   3 +-
 src/server/pegasus_write_service_impl.h            | 178 ++++++++-
 src/server/rocksdb_wrapper.cpp                     |  13 +-
 .../test/pegasus_write_service_impl_test.cpp       | 396 +++++++++++++++++----
 src/utils/blob.h                                   |  13 +-
 7 files changed, 530 insertions(+), 90 deletions(-)

diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift
index da778f55e..c0534785d 100644
--- a/idl/rrdb.thrift
+++ b/idl/rrdb.thrift
@@ -68,11 +68,26 @@ enum mutate_operation
     MO_DELETE
 }
 
+enum update_type
+{
+    UT_PUT,
+    UT_INCR
+}
+
+// The single-put request, just writes a key/value pair into storage, which is 
certainly
+// idempotent.
 struct update_request
 {
     1:dsn.blob      key;
     2:dsn.blob      value;
     3:i32           expire_ts_seconds;
+
+    // This field marks the type of a single-put request, mainly used to 
differentiate a general
+    // single-put request from the one translated from a non-idempotent atomic 
write request:
+    // - a general single-put request, if `type` is UT_PUT or not set by 
default as it's
+    // optional, or
+    // - a put request translated from a non-idempotent incr request, if 
`type` is UT_INCR.
+    4:optional update_type type;
 }
 
 struct update_response
diff --git a/src/server/pegasus_server_write.h 
b/src/server/pegasus_server_write.h
index add75b392..d2be2d5a0 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -89,7 +89,7 @@ private:
 
     friend class pegasus_server_write_test;
     friend class pegasus_write_service_test;
-    friend class pegasus_write_service_impl_test;
+    friend class PegasusWriteServiceImplTest;
     friend class rocksdb_wrapper_test;
 
     std::unique_ptr<pegasus_write_service> _write_svc;
diff --git a/src/server/pegasus_write_service.h 
b/src/server/pegasus_write_service.h
index f430b48b0..2142792a1 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -189,9 +189,8 @@ public:
 private:
     void clear_up_batch_states();
 
-private:
     friend class pegasus_write_service_test;
-    friend class pegasus_write_service_impl_test;
+    friend class PegasusWriteServiceImplTest;
     friend class pegasus_server_write_test;
     friend class rocksdb_wrapper_test;
 
diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index 35a9c6399..ff0f2bfac 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -171,6 +171,99 @@ public:
         return resp.error;
     }
 
+    // Tranlate an incr request which is certainly non-idempotent into a 
single-put request
+    // which is certainly idempotent. Return current status for RocksDB.
+    int make_idempotent(const dsn::apps::incr_request &req,
+                        dsn::apps::incr_response &err_resp,
+                        dsn::apps::update_request &update)
+    {
+        // Get current raw value for the provided key from the RocksDB 
instance.
+        db_get_context get_ctx;
+        const int err = _rocksdb_wrapper->get(req.key.to_string_view(), 
&get_ctx);
+        if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+            return make_error_response(err, err_resp);
+        }
+
+        if (!get_ctx.found || get_ctx.expired) {
+            // Once the provided key is not found or has been expired, we 
could assume that
+            // its value is 0 before incr; thus the final result for incr 
could be set as
+            // the value of the single-put request, i.e. req.increment.
+            return make_idempotent_request_for_incr(
+                req.key, req.increment, calc_expire_on_non_existent(req), 
update);
+        }
+
+        // Extract user data from raw value as base for increment.
+        dsn::blob base_value;
+        pegasus_extract_user_data(_pegasus_data_version, 
std::move(get_ctx.raw_value), base_value);
+
+        int64_t new_int = 0;
+        if (base_value.empty()) {
+            // Old value is also considered as 0 before incr as above once 
it's empty, thus
+            // set req.increment as the value for single put.
+            new_int = req.increment;
+        } else {
+            int64_t base_int = 0;
+            if (dsn_unlikely(!dsn::buf2int64(base_value.to_string_view(), 
base_int))) {
+                // Old value is not valid int64.
+                LOG_ERROR_PREFIX("incr failed: error = base value \"{}\" "
+                                 "is not an integer or out of range",
+                                 utils::c_escape_sensitive_string(base_value));
+                return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+            }
+
+            new_int = base_int + req.increment;
+            if (dsn_unlikely((req.increment > 0 && new_int < base_int) ||
+                             (req.increment < 0 && new_int > base_int))) {
+                // New value overflows, just respond with the base value.
+                LOG_ERROR_PREFIX("incr failed: error = new value is out of 
range, "
+                                 "base_value = {}, increment = {}, new_value = 
{}",
+                                 base_int,
+                                 req.increment,
+                                 new_int);
+                return make_error_response(rocksdb::Status::kInvalidArgument, 
base_int, err_resp);
+            }
+        }
+
+        return make_idempotent_request_for_incr(
+            req.key, new_int, calc_expire_on_existing(req, get_ctx), update);
+    }
+
+    // Apply single-put request translated from incr request into RocksDB, and 
build response
+    // for incr. Return current status for RocksDB.
+    int put(const db_write_context &ctx,
+            const dsn::apps::update_request &update,
+            dsn::apps::incr_response &resp)
+    {
+        resp.app_id = get_gpid().get_app_id();
+        resp.partition_index = get_gpid().get_partition_index();
+        resp.decree = ctx.decree;
+        resp.server = _primary_host_port;
+
+        auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
+
+        resp.error =
+            _rocksdb_wrapper->write_batch_put_ctx(ctx,
+                                                  update.key.to_string_view(),
+                                                  
update.value.to_string_view(),
+                                                  
static_cast<uint32_t>(update.expire_ts_seconds));
+        if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
+            return resp.error;
+        }
+
+        resp.error = _rocksdb_wrapper->write(ctx.decree);
+        if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
+            return resp.error;
+        }
+
+        // Shouldn't fail to parse since the value must be a valid int64.
+        CHECK(dsn::buf2int64(update.value.to_string_view(), resp.new_value),
+              "invalid int64 value for put idempotent incr: key={}, value={}",
+              update.key,
+              update.value);
+
+        return resp.error;
+    }
+
     int incr(int64_t decree, const dsn::apps::incr_request &update, 
dsn::apps::incr_response &resp)
     {
         resp.app_id = get_gpid().get_app_id();
@@ -242,7 +335,7 @@ public:
         auto cleanup = dsn::defer([this]() { 
_rocksdb_wrapper->clear_up_write_batch(); });
         resp.error = _rocksdb_wrapper->write_batch_put(
             decree, update.key.to_string_view(), std::to_string(new_value), 
new_expire_ts);
-        if (resp.error) {
+        if (resp.error != rocksdb::Status::kOk) {
             return resp.error;
         }
 
@@ -250,6 +343,7 @@ public:
         if (resp.error == rocksdb::Status::kOk) {
             resp.new_value = new_value;
         }
+
         return resp.error;
     }
 
@@ -569,6 +663,83 @@ private:
         return raw_key;
     }
 
+    // Calculate expire timestamp in seconds for the keys not contained in the 
storage
+    // according to `req`.
+    template <typename TRequest>
+    static inline int32_t calc_expire_on_non_existent(const TRequest &req)
+    {
+        return req.expire_ts_seconds > 0 ? req.expire_ts_seconds : 0;
+    }
+
+    // Calculate new expire timestamp in seconds for the keys contained in the 
storage
+    // according to `req` and their current expire timestamp in `get_ctx`.
+    template <typename TRequest>
+    static inline int32_t calc_expire_on_existing(const TRequest &req,
+                                                  const db_get_context 
&get_ctx)
+    {
+        if (req.expire_ts_seconds == 0) {
+            // Still use current expire timestamp of the existing key as the 
new value.
+            return static_cast<int32_t>(get_ctx.expire_ts);
+        }
+
+        if (req.expire_ts_seconds < 0) {
+            // Reset expire timestamp to 0.
+            return 0;
+        }
+
+        return req.expire_ts_seconds;
+    }
+
+    // Build a single-put request by provided int64 value.
+    static inline void make_idempotent_request(const dsn::blob &key,
+                                               int64_t value,
+                                               int32_t expire_ts_seconds,
+                                               dsn::apps::update_type::type 
type,
+                                               dsn::apps::update_request 
&update)
+    {
+        update.key = key;
+        update.value = dsn::blob::create_from_numeric(value);
+        update.expire_ts_seconds = expire_ts_seconds;
+        update.__set_type(type);
+    }
+
+    // Build corresponding single-put request for an incr request, and return 
current status
+    // for RocksDB, i.e. kOk.
+    static inline int make_idempotent_request_for_incr(const dsn::blob &key,
+                                                       int64_t value,
+                                                       int32_t 
expire_ts_seconds,
+                                                       
dsn::apps::update_request &update)
+    {
+        make_idempotent_request(
+            key, value, expire_ts_seconds, dsn::apps::update_type::UT_INCR, 
update);
+        return rocksdb::Status::kOk;
+    }
+
+    // Build incr response only for error, and return the current error status 
for RocksDB.
+    inline int make_error_response(int err, dsn::apps::incr_response &resp)
+    {
+        CHECK(err != rocksdb::Status::kOk, "this incr response is built only 
for error");
+        resp.error = err;
+
+        const auto pid = get_gpid();
+        resp.app_id = pid.get_app_id();
+        resp.partition_index = pid.get_partition_index();
+
+        // Currently the mutation has not been assigned with valid decree, 
thus set to -1.
+        resp.decree = -1;
+
+        resp.server = _primary_host_port;
+
+        return err;
+    }
+
+    // Build incr response as above, except that also set new value for 
response.
+    inline int make_error_response(int err, int64_t new_value, 
dsn::apps::incr_response &resp)
+    {
+        resp.new_value = new_value;
+        return make_error_response(err, resp);
+    }
+
     // return true if the check type is supported
     static bool is_check_type_supported(::dsn::apps::cas_check_type::type 
check_type)
     {
@@ -678,13 +849,10 @@ private:
         return false;
     }
 
-private:
     friend class pegasus_write_service_test;
     friend class pegasus_server_write_test;
-    friend class pegasus_write_service_impl_test;
+    friend class PegasusWriteServiceImplTest;
     friend class rocksdb_wrapper_test;
-    FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
-    FRIEND_TEST(pegasus_write_service_impl_test, 
verify_timetag_compatible_with_version_0);
 
     const std::string _primary_host_port;
     const uint32_t _pegasus_data_version;
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 6f5ba8bc3..4aa30b228 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -79,20 +79,25 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/ 
db_get_context *ctx)
 {
     FAIL_POINT_INJECT_F("db_get", [](std::string_view) -> int { return 
FAIL_DB_GET; });
 
-    rocksdb::Status s =
+    const rocksdb::Status s =
         _db->Get(_rd_opts, _data_cf, utils::to_rocksdb_slice(raw_key), 
&ctx->raw_value);
     if (dsn_likely(s.ok())) {
-        // success
+        // The key is found and its value is read successfully.
         ctx->found = true;
         ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, 
ctx->raw_value);
         if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
             ctx->expired = true;
             METRIC_VAR_INCREMENT(read_expired_values);
+        } else {
+            ctx->expired = false;
         }
         return rocksdb::Status::kOk;
-    } else if (s.IsNotFound()) {
-        // NotFound is an acceptable error
+    }
+
+    if (s.IsNotFound()) {
+        // NotFound is considered normal since the key may not be present in 
DB now.
         ctx->found = false;
+        ctx->expired = false;
         return rocksdb::Status::kOk;
     }
 
diff --git a/src/server/test/pegasus_write_service_impl_test.cpp 
b/src/server/test/pegasus_write_service_impl_test.cpp
index 0ac33faa4..053dd6333 100644
--- a/src/server/test/pegasus_write_service_impl_test.cpp
+++ b/src/server/test/pegasus_write_service_impl_test.cpp
@@ -23,163 +23,407 @@
 #include <limits>
 #include <memory>
 #include <string>
+#include <string_view>
+#include <utility>
 
 #include "gtest/gtest.h"
 #include "pegasus_key_schema.h"
 #include "pegasus_server_test_base.h"
+#include "pegasus_value_schema.h"
 #include "rrdb/rrdb_types.h"
 #include "server/pegasus_server_write.h"
 #include "server/pegasus_write_service.h"
 #include "server/pegasus_write_service_impl.h"
 #include "server/rocksdb_wrapper.h"
 #include "utils/blob.h"
+#include "utils/defer.h"
+#include "utils/error_code.h"
 #include "utils/fail_point.h"
-#include <string_view>
+#include "utils/string_conv.h"
 
-namespace pegasus {
-namespace server {
+// IWYU pragma: no_forward_declare 
pegasus::server::IdempotentIncrTest_FailOnGet_Test
+// IWYU pragma: no_forward_declare 
pegasus::server::IdempotentIncrTest_FailOnPut_Test
+// IWYU pragma: no_forward_declare 
pegasus::server::IdempotentIncrTest_IncrOnNonNumericRecord_Test
+// IWYU pragma: no_forward_declare 
pegasus::server::IdempotentIncrTest_IncrOverflowed_Test
+// IWYU pragma: no_forward_declare 
pegasus::server::NonIdempotentIncrTest_FailOnGet_Test
+// IWYU pragma: no_forward_declare 
pegasus::server::NonIdempotentIncrTest_FailOnPut_Test
+// IWYU pragma: no_forward_declare 
pegasus::server::NonIdempotentIncrTest_IncrOnNonNumericRecord_Test
+// IWYU pragma: no_forward_declare 
pegasus::server::NonIdempotentIncrTest_IncrOverflowed_Test
 
-class pegasus_write_service_impl_test : public pegasus_server_test_base
+namespace pegasus::server {
+
+class PegasusWriteServiceImplTest : public pegasus_server_test_base
 {
 protected:
     std::unique_ptr<pegasus_server_write> _server_write;
     pegasus_write_service::impl *_write_impl{nullptr};
     rocksdb_wrapper *_rocksdb_wrapper{nullptr};
 
-public:
     void SetUp() override
     {
-        start();
+        ASSERT_EQ(dsn::ERR_OK, start());
         _server_write = std::make_unique<pegasus_server_write>(_server.get());
         _write_impl = _server_write->_write_svc->_impl.get();
         _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
     }
 
-    int db_get(std::string_view raw_key, db_get_context *get_ctx)
+public:
+    // Given `raw_key`, get its value from DB with the result stored in 
`get_ctx`. Should
+    // never fail. There are only 3 possible results:
+    // - the key is found and its value is read successfully;
+    // - the key is found but expired;
+    // - the key is not found.
+    void db_get(std::string_view raw_key, db_get_context *get_ctx)
+    {
+        ASSERT_EQ(rocksdb::Status::kOk, _rocksdb_wrapper->get(raw_key, 
get_ctx));
+    }
+
+    void db_get(const dsn::blob &raw_key, db_get_context *get_ctx)
     {
-        return _rocksdb_wrapper->get(raw_key, get_ctx);
+        db_get(raw_key.to_string_view(), get_ctx);
     }
 
-    void single_set(dsn::blob raw_key, dsn::blob user_value)
+    // Apply single put into DB. Should never fail. `raw_key`/`user_value` 
would always
+    // be applied successfully.
+    void single_set(const dsn::blob &raw_key, const dsn::blob &user_value)
     {
         dsn::apps::update_request put;
         put.key = raw_key;
         put.value = user_value;
+
         db_write_context write_ctx;
         dsn::apps::update_response put_resp;
-        _write_impl->batch_put(write_ctx, put, put_resp);
-        ASSERT_EQ(_write_impl->batch_commit(0), 0);
+        ASSERT_EQ(rocksdb::Status::kOk, _write_impl->batch_put(write_ctx, put, 
put_resp));
+        ASSERT_EQ(rocksdb::Status::kOk, _write_impl->batch_commit(0));
+    }
+
+    // Extract `user_data` as string from `raw_value` in DB.
+    void extract_user_data(std::string &&raw_value, std::string &user_data)
+    {
+        dsn::blob data;
+        pegasus_extract_user_data(_write_impl->_pegasus_data_version, 
std::move(raw_value), data);
+        user_data = data.to_string();
+    }
+
+    // Extract `user_data` as int64 from `raw_value` in DB.
+    void extract_user_data(std::string &&raw_value, int64_t &user_data)
+    {
+        std::string data;
+        extract_user_data(std::move(raw_value), data);
+        ASSERT_TRUE(dsn::buf2int64(data, user_data));
     }
 };
 
-class incr_test : public pegasus_write_service_impl_test
+// Initialize a base value with a checker deciding if the base value is still 
the same as
+// it is in DB at the end of the scope.
+#define INIT_BASE_VALUE_AND_CHECKER(type, val)                                 
                    \
+    static const type kBaseValue = (val);                                      
                    \
+    auto kBaseValueChecker = dsn::defer([this]() { 
check_db_record(kBaseValue); })
+
+// Put a string value into DB and check if it is still the same as it is in DB 
at the end
+// of the scope.
+#define PUT_BASE_VALUE_STRING(val)                                             
                    \
+    INIT_BASE_VALUE_AND_CHECKER(std::string, val);                             
                    \
+    single_set(req.key, dsn::blob::create_from_bytes(std::string(kBaseValue)))
+
+// Put an int64 value into DB and check if it is still the same as it is in DB 
at the end
+// of the scope.
+#define PUT_BASE_VALUE_INT64(val)                                              
                    \
+    INIT_BASE_VALUE_AND_CHECKER(int64_t, val);                                 
                    \
+    single_set(req.key, dsn::blob::create_from_numeric(kBaseValue))
+
+class IncrTest : public PegasusWriteServiceImplTest
 {
-public:
+protected:
     void SetUp() override
     {
-        pegasus_write_service_impl_test::SetUp();
-        pegasus::pegasus_generate_key(
-            req.key, std::string_view("hash_key"), 
std::string_view("sort_key"));
+        PegasusWriteServiceImplTest::SetUp();
+        generate_key("incr_hash_key", "incr_sort_key");
+        req.expire_ts_seconds = 0;
+    }
+
+public:
+    void generate_key(const std::string &hash_key, const std::string &sort_key)
+    {
+        pegasus_generate_key(req.key, hash_key, sort_key);
+    }
+
+    // Check that the key must exist in DB and its value should be the same as 
`expected_value`.
+    template <typename TVal>
+    void check_db_record(const TVal &expected_value)
+    {
+        db_get_context get_ctx;
+        db_get(req.key, &get_ctx);
+        ASSERT_TRUE(get_ctx.found);
+        ASSERT_FALSE(get_ctx.expired);
+
+        TVal actual_value;
+        extract_user_data(std::move(get_ctx.raw_value), actual_value);
+        ASSERT_EQ(expected_value, actual_value);
+    }
+
+    // Check that the key must be found in DB but expired.
+    void check_db_record_expired()
+    {
+        db_get_context get_ctx;
+        db_get(req.key, &get_ctx);
+        ASSERT_TRUE(get_ctx.found);
+        ASSERT_TRUE(get_ctx.expired);
+    }
+
+    // Test if the incr result in response is correct while there is not any 
error during incr.
+    virtual void test_incr(int64_t base, int64_t increment) = 0;
+
+    // Test if both the incr result in response and the value in DB are 
correct while there is
+    // not any error during incr.
+    void test_incr_and_check_db_record(int64_t base, int64_t increment)
+    {
+        test_incr(base, increment);
+        check_db_record(base + increment);
+    }
+
+    // Test if incr could be executed correctly while the key did not exist in 
DB previously.
+    void test_incr_on_absent_record(int64_t increment)
+    {
+        // Ensure that the key is absent.
+        db_get_context get_ctx;
+        db_get(req.key, &get_ctx);
+        ASSERT_FALSE(get_ctx.found);
+        ASSERT_FALSE(get_ctx.expired);
+
+        // The base value should be 0 as the record is absent.
+        test_incr_and_check_db_record(0, increment);
+    }
+
+    // Test if incr could be executed correctly while the key has been present 
in DB.
+    void test_incr_on_existing_record(int64_t base, int64_t increment)
+    {
+        // Preload the record into DB as the existing key.
+        single_set(req.key, dsn::blob::create_from_numeric(base));
+
+        test_incr_and_check_db_record(base, increment);
     }
 
     dsn::apps::incr_request req;
     dsn::apps::incr_response resp;
 };
 
-INSTANTIATE_TEST_SUITE_P(, incr_test, ::testing::Values(false, true));
+class NonIdempotentIncrTest : public IncrTest
+{
+public:
+    // Test `incr` with both returned error and error in response as expected.
+    void test_non_idempotent_incr(int64_t increment, int expected_ret_err, int 
expected_resp_err)
+    {
+        req.increment = increment;
+        ASSERT_EQ(expected_ret_err, _write_impl->incr(0, req, resp));
+        ASSERT_EQ(expected_resp_err, resp.error);
+    }
+
+    void test_incr(int64_t base, int64_t increment) override
+    {
+        test_non_idempotent_incr(increment, rocksdb::Status::kOk, 
rocksdb::Status::kOk);
+        ASSERT_EQ(base + increment, resp.new_value);
+    }
+};
+
+TEST_P(NonIdempotentIncrTest, IncrOneOnAbsentRecord) { 
test_incr_on_absent_record(1); }
+
+TEST_P(NonIdempotentIncrTest, IncrBigOnAbsentRecord) { 
test_incr_on_absent_record(1); }
+
+TEST_P(NonIdempotentIncrTest, IncrOneOnExistingRecord) { 
test_incr_on_existing_record(10, 1); }
+
+TEST_P(NonIdempotentIncrTest, IncrBigOnExistingRecord) { 
test_incr_on_existing_record(10, 100); }
+
+TEST_P(NonIdempotentIncrTest, IncrNegative)
+{
+    test_incr_on_absent_record(-100);
+    test_incr_and_check_db_record(-100, -1);
+}
+
+TEST_P(NonIdempotentIncrTest, IncrZero)
+{
+    test_incr_on_absent_record(0);
+    test_incr_on_existing_record(10, 0);
+    test_incr_on_existing_record(-10, 0);
+}
+
+TEST_P(NonIdempotentIncrTest, IncrOnNonNumericRecord)
+{
+    PUT_BASE_VALUE_STRING("abc");
+
+    test_non_idempotent_incr(1, rocksdb::Status::kOk, 
rocksdb::Status::kInvalidArgument);
+}
+
+TEST_P(NonIdempotentIncrTest, IncrOverflowed)
+{
+    PUT_BASE_VALUE_INT64(100);
+
+    test_non_idempotent_incr(std::numeric_limits<int64_t>::max(),
+                             rocksdb::Status::kOk,
+                             rocksdb::Status::kInvalidArgument);
+    ASSERT_EQ(kBaseValue, resp.new_value);
+}
 
-TEST_P(incr_test, incr_on_absent_record)
+TEST_P(NonIdempotentIncrTest, FailOnGet)
 {
-    // ensure key is absent
-    db_get_context get_ctx;
-    db_get(req.key.to_string_view(), &get_ctx);
-    ASSERT_FALSE(get_ctx.found);
+    PUT_BASE_VALUE_INT64(100);
 
-    req.increment = 100;
-    _write_impl->incr(0, req, resp);
-    ASSERT_EQ(resp.new_value, 100);
+    dsn::fail::setup();
+
+    // `incr` should return an error once failed to get current value from DB.
+    dsn::fail::cfg("db_get", "100%1*return()");
+    test_non_idempotent_incr(10, FAIL_DB_GET, FAIL_DB_GET);
 
-    db_get(req.key.to_string_view(), &get_ctx);
-    ASSERT_TRUE(get_ctx.found);
+    dsn::fail::teardown();
 }
 
-TEST_P(incr_test, negative_incr_and_zero_incr)
+TEST_P(NonIdempotentIncrTest, FailOnPut)
 {
-    req.increment = -100;
-    ASSERT_EQ(0, _write_impl->incr(0, req, resp));
-    ASSERT_EQ(resp.new_value, -100);
+    PUT_BASE_VALUE_INT64(100);
 
-    req.increment = -1;
-    ASSERT_EQ(0, _write_impl->incr(0, req, resp));
-    ASSERT_EQ(resp.new_value, -101);
+    dsn::fail::setup();
+
+    // `incr` should return an error once failed to write into batch.
+    dsn::fail::cfg("db_write_batch_put", "100%1*return()");
+    test_non_idempotent_incr(10, FAIL_DB_WRITE_BATCH_PUT, 
FAIL_DB_WRITE_BATCH_PUT);
 
-    req.increment = 0;
-    ASSERT_EQ(0, _write_impl->incr(0, req, resp));
-    ASSERT_EQ(resp.new_value, -101);
+    dsn::fail::teardown();
 }
 
-TEST_P(incr_test, invalid_incr)
+TEST_P(NonIdempotentIncrTest, IncrOnExpireRecord)
 {
-    single_set(req.key, dsn::blob::create_from_bytes("abc"));
+    // Make the record expired.
+    req.expire_ts_seconds = 1;
+    test_non_idempotent_incr(10, rocksdb::Status::kOk, rocksdb::Status::kOk);
+
+    // Now the record should be expired.
+    check_db_record_expired();
+
+    // Incr the expired key.
+    req.expire_ts_seconds = 0;
+    test_incr_and_check_db_record(0, 100);
+}
 
-    req.increment = 10;
-    _write_impl->incr(1, req, resp);
-    ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
-    ASSERT_EQ(resp.new_value, 0);
+INSTANTIATE_TEST_SUITE_P(PegasusWriteServiceImplTest,
+                         NonIdempotentIncrTest,
+                         testing::Values(false, true));
 
-    single_set(req.key, dsn::blob::create_from_bytes("100"));
+class IdempotentIncrTest : public IncrTest
+{
+public:
+    // Test make_idempotent for the incr request.
+    void test_make_idempotent(int64_t increment, int expected_err)
+    {
+        req.increment = increment;
+        const int err = _write_impl->make_idempotent(req, err_resp, update);
+        ASSERT_EQ(expected_err, err);
+        if (expected_err == rocksdb::Status::kOk) {
+            return;
+        }
+
+        ASSERT_EQ(expected_err, err_resp.error);
+    }
+
+    // Test idempotent write for the incr request:
+    // - make_idempotent for incr should be successful;
+    // - then, apply the idempotent put request into DB.
+    void test_idempotent_incr(int64_t increment, int expected_err)
+    {
+        test_make_idempotent(increment, rocksdb::Status::kOk);
+
+        db_write_context write_ctx;
+        ASSERT_EQ(expected_err, _write_impl->put(write_ctx, update, resp));
+        ASSERT_EQ(expected_err, resp.error);
+    }
 
-    req.increment = std::numeric_limits<int64_t>::max();
-    _write_impl->incr(1, req, resp);
-    ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
-    ASSERT_EQ(resp.new_value, 100);
+    void test_incr(int64_t base, int64_t increment) override
+    {
+        test_idempotent_incr(increment, rocksdb::Status::kOk);
+        ASSERT_EQ(base + increment, resp.new_value);
+    }
+
+    dsn::apps::incr_response err_resp;
+    dsn::apps::update_request update;
+};
+
+TEST_P(IdempotentIncrTest, IncrOneOnAbsentRecord) { 
test_incr_on_absent_record(1); }
+
+TEST_P(IdempotentIncrTest, IncrBigOnAbsentRecord) { 
test_incr_on_absent_record(100); }
+
+TEST_P(IdempotentIncrTest, IncrOneOnExistingRecord) { 
test_incr_on_existing_record(10, 1); }
+
+TEST_P(IdempotentIncrTest, IncrBigOnExistingRecord) { 
test_incr_on_existing_record(10, 100); }
+
+TEST_P(IdempotentIncrTest, IncrNegative)
+{
+    test_incr_on_absent_record(-100);
+    test_incr_and_check_db_record(-100, -1);
+}
+
+TEST_P(IdempotentIncrTest, IncrZero)
+{
+    test_incr_on_absent_record(0);
+    test_incr_on_existing_record(10, 0);
+    test_incr_on_existing_record(-10, 0);
+}
+
+TEST_P(IdempotentIncrTest, IncrOnNonNumericRecord)
+{
+    PUT_BASE_VALUE_STRING("abc");
+
+    test_make_idempotent(1, rocksdb::Status::kInvalidArgument);
+}
+
+TEST_P(IdempotentIncrTest, IncrOverflowed)
+{
+    PUT_BASE_VALUE_INT64(100);
+
+    test_make_idempotent(std::numeric_limits<int64_t>::max(), 
rocksdb::Status::kInvalidArgument);
+    ASSERT_EQ(kBaseValue, err_resp.new_value);
 }
 
-TEST_P(incr_test, fail_on_get)
+TEST_P(IdempotentIncrTest, FailOnGet)
 {
+    PUT_BASE_VALUE_INT64(100);
+
     dsn::fail::setup();
-    dsn::fail::cfg("db_get", "100%1*return()");
-    // when db_get failed, incr should return an error.
 
-    req.increment = 10;
-    _write_impl->incr(1, req, resp);
-    ASSERT_EQ(resp.error, FAIL_DB_GET);
+    // `make_idempotent` should return an error once failed to get current 
value from DB.
+    dsn::fail::cfg("db_get", "100%1*return()");
+    test_make_idempotent(10, FAIL_DB_GET);
 
     dsn::fail::teardown();
 }
 
-TEST_P(incr_test, fail_on_put)
+TEST_P(IdempotentIncrTest, FailOnPut)
 {
+    PUT_BASE_VALUE_INT64(100);
+
     dsn::fail::setup();
-    dsn::fail::cfg("db_write_batch_put", "100%1*return()");
-    // when rocksdb put failed, incr should return an error.
 
-    req.increment = 10;
-    _write_impl->incr(1, req, resp);
-    ASSERT_EQ(resp.error, FAIL_DB_WRITE_BATCH_PUT);
+    // `put` should return an error once failed to write into batch.
+    dsn::fail::cfg("db_write_batch_put", "100%1*return()");
+    test_idempotent_incr(10, FAIL_DB_WRITE_BATCH_PUT);
 
     dsn::fail::teardown();
 }
 
-TEST_P(incr_test, incr_on_expire_record)
+TEST_P(IdempotentIncrTest, IncrOnExpireRecord)
 {
-    // make the key expired
+    // Make the record expired.
     req.expire_ts_seconds = 1;
-    _write_impl->incr(0, req, resp);
+    test_idempotent_incr(10, rocksdb::Status::kOk);
 
-    // check whether the key is expired
-    db_get_context get_ctx;
-    db_get(req.key.to_string_view(), &get_ctx);
-    ASSERT_TRUE(get_ctx.expired);
+    // Now the record should be expired.
+    check_db_record_expired();
 
-    // incr the expired key
-    req.increment = 100;
+    // Incr the expired key.
     req.expire_ts_seconds = 0;
-    _write_impl->incr(0, req, resp);
-    ASSERT_EQ(resp.new_value, 100);
-
-    db_get(req.key.to_string_view(), &get_ctx);
-    ASSERT_TRUE(get_ctx.found);
+    test_incr_and_check_db_record(0, 100);
 }
-} // namespace server
-} // namespace pegasus
+
+INSTANTIATE_TEST_SUITE_P(PegasusWriteServiceImplTest,
+                         IdempotentIncrTest,
+                         testing::Values(false, true));
+
+} // namespace pegasus::server
diff --git a/src/utils/blob.h b/src/utils/blob.h
index 07675e943..5b9d045c4 100644
--- a/src/utils/blob.h
+++ b/src/utils/blob.h
@@ -26,10 +26,12 @@
 
 #pragma once
 
-#include <memory>
+#include <fmt/core.h>
 #include <cstring>
-
+#include <memory>
 #include <string_view>
+#include <type_traits>
+
 #include <thrift/protocol/TBinaryProtocol.h>
 #include <thrift/protocol/TProtocol.h>
 
@@ -126,6 +128,13 @@ public:
         return {std::move(buf), static_cast<unsigned int>(s->length())};
     }
 
+    template <typename TNum,
+              typename = typename 
std::enable_if<std::is_arithmetic<TNum>::value>::type>
+    [[nodiscard]] static blob create_from_numeric(TNum val)
+    {
+        return create_from_bytes(fmt::format("{}", val));
+    }
+
     void assign(const std::shared_ptr<char> &buffer, int offset, unsigned int 
length)
     {
         _holder = buffer;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to