acelyc111 commented on code in PR #2246:
URL: 
https://github.com/apache/incubator-pegasus/pull/2246#discussion_r2083220647


##########
src/server/pegasus_write_service_impl.h:
##########
@@ -540,6 +532,165 @@ class pegasus_write_service::impl : public 
dsn::replication::replica_base
         return rocksdb::Status::kOk;
     }
 
+    // Used to call make_idempotent() for incr and check_and_set to get the 
idempotent single-put
+    // request which is stored as the unique element of `updates`.
+    //
+    // This interface is provided to ensure consistency between the 
make_idempotent() interfaces
+    // of incr/check_and_set operations and that of check_and_mutate (both 
using std::vector for
+    // `updates`), thereby facilitating uniform templated function invocation.
+    template <typename TRequest, typename TResponse>
+    inline int make_idempotent(const TRequest &req,
+                               TResponse &err_resp,
+                               std::vector<dsn::apps::update_request> &updates)
+    {
+        updates.clear();
+        updates.emplace_back();
+        return make_idempotent(req, err_resp, updates.front());
+    }
+
+    // Tranlate a check_and_mutate request into multiple single-put and 
single-remove requests
+    // which are certainly idempotent. Return current status for RocksDB. Only 
called by primary
+    // replicas.
+    int make_idempotent(const dsn::apps::check_and_mutate_request &req,
+                        dsn::apps::check_and_mutate_response &err_resp,
+                        std::vector<dsn::apps::update_request> &updates)
+    {
+        if (dsn_unlikely(req.mutate_list.empty())) {
+            LOG_ERROR_PREFIX("mutate_list is empty for check_and_mutate while 
making idempotent");
+
+            return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+        }
+
+        // Verify operation type for each mutate.
+        for (size_t i = 0; i < req.mutate_list.size(); ++i) {
+            const auto &mu = req.mutate_list[i];
+            if (dsn_likely(mu.operation == dsn::apps::mutate_operation::MO_PUT 
||
+                           mu.operation == 
dsn::apps::mutate_operation::MO_DELETE)) {
+                continue;
+            }
+
+            LOG_ERROR_PREFIX("mutate_list[{}]'s operation {} is invalid for 
check_and_mutate "
+                             "while making idempotent",
+                             i,
+                             mu.operation);
+
+            return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+        }
+
+        if (dsn_unlikely(!is_check_type_supported(req.check_type))) {
+            LOG_ERROR_PREFIX("check type {} is not supported for 
check_and_mutate ",
+                             "while making idempotent",
+                             cas_check_type_to_string(req.check_type));
+
+            return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+        }
+
+        dsn::blob check_key;
+        pegasus_generate_key(check_key, req.hash_key, req.check_sort_key);
+
+        // Get the check value.
+        db_get_context get_ctx;
+        const int err = _rocksdb_wrapper->get(check_key, &get_ctx);
+        if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+            // Failed to read the check value.
+            LOG_ERROR_PREFIX("failed to get the check value for 
check_and_mutate while making "
+                             "idempotent: rocksdb_status = {}, hash_key = {}, "
+                             "check_sort_key = {}",
+                             err,
+                             utils::c_escape_sensitive_string(req.hash_key),
+                             
utils::c_escape_sensitive_string(req.check_sort_key));
+
+            return make_error_response(err, err_resp);
+        }
+
+        dsn::blob check_value;
+        const bool value_exist = !get_ctx.expired && get_ctx.found;
+        if (value_exist) {
+            pegasus_extract_user_data(
+                _pegasus_data_version, std::move(get_ctx.raw_value), 
check_value);
+        }
+
+        bool invalid_argument = false;
+        const bool passed = validate_check(
+            req.check_type, req.check_operand, value_exist, check_value, 
invalid_argument);
+        if (!passed) {
+            make_check_value(req, value_exist, check_value, err_resp);
+            return make_error_response(invalid_argument ? 
rocksdb::Status::kInvalidArgument
+                                                        : 
rocksdb::Status::kTryAgain,

Review Comment:
   Why return the error code kTryAgain? Is it very likely to success if the 
caller try again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to