empiredan commented on code in PR #2246:
URL: 
https://github.com/apache/incubator-pegasus/pull/2246#discussion_r2083749576


##########
src/server/pegasus_write_service_impl.h:
##########
@@ -540,6 +532,165 @@ class pegasus_write_service::impl : public 
dsn::replication::replica_base
         return rocksdb::Status::kOk;
     }
 
+    // Used to call make_idempotent() for incr and check_and_set to get the 
idempotent single-put
+    // request which is stored as the unique element of `updates`.
+    //
+    // This interface is provided to ensure consistency between the 
make_idempotent() interfaces
+    // of incr/check_and_set operations and that of check_and_mutate (both 
using std::vector for
+    // `updates`), thereby facilitating uniform templated function invocation.
+    template <typename TRequest, typename TResponse>
+    inline int make_idempotent(const TRequest &req,
+                               TResponse &err_resp,
+                               std::vector<dsn::apps::update_request> &updates)
+    {
+        updates.clear();
+        updates.emplace_back();
+        return make_idempotent(req, err_resp, updates.front());
+    }
+
+    // Tranlate a check_and_mutate request into multiple single-put and 
single-remove requests
+    // which are certainly idempotent. Return current status for RocksDB. Only 
called by primary
+    // replicas.
+    int make_idempotent(const dsn::apps::check_and_mutate_request &req,
+                        dsn::apps::check_and_mutate_response &err_resp,
+                        std::vector<dsn::apps::update_request> &updates)
+    {
+        if (dsn_unlikely(req.mutate_list.empty())) {
+            LOG_ERROR_PREFIX("mutate_list is empty for check_and_mutate while 
making idempotent");
+
+            return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+        }
+
+        // Verify operation type for each mutate.
+        for (size_t i = 0; i < req.mutate_list.size(); ++i) {
+            const auto &mu = req.mutate_list[i];
+            if (dsn_likely(mu.operation == dsn::apps::mutate_operation::MO_PUT 
||
+                           mu.operation == 
dsn::apps::mutate_operation::MO_DELETE)) {
+                continue;
+            }
+
+            LOG_ERROR_PREFIX("mutate_list[{}]'s operation {} is invalid for 
check_and_mutate "
+                             "while making idempotent",
+                             i,
+                             mu.operation);
+
+            return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+        }
+
+        if (dsn_unlikely(!is_check_type_supported(req.check_type))) {
+            LOG_ERROR_PREFIX("check type {} is not supported for 
check_and_mutate ",
+                             "while making idempotent",
+                             cas_check_type_to_string(req.check_type));
+
+            return make_error_response(rocksdb::Status::kInvalidArgument, 
err_resp);
+        }
+
+        dsn::blob check_key;
+        pegasus_generate_key(check_key, req.hash_key, req.check_sort_key);
+
+        // Get the check value.
+        db_get_context get_ctx;
+        const int err = _rocksdb_wrapper->get(check_key, &get_ctx);
+        if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+            // Failed to read the check value.
+            LOG_ERROR_PREFIX("failed to get the check value for 
check_and_mutate while making "
+                             "idempotent: rocksdb_status = {}, hash_key = {}, "
+                             "check_sort_key = {}",
+                             err,
+                             utils::c_escape_sensitive_string(req.hash_key),
+                             
utils::c_escape_sensitive_string(req.check_sort_key));
+
+            return make_error_response(err, err_resp);
+        }
+
+        dsn::blob check_value;
+        const bool value_exist = !get_ctx.expired && get_ctx.found;
+        if (value_exist) {
+            pegasus_extract_user_data(
+                _pegasus_data_version, std::move(get_ctx.raw_value), 
check_value);
+        }
+
+        bool invalid_argument = false;
+        const bool passed = validate_check(
+            req.check_type, req.check_operand, value_exist, check_value, 
invalid_argument);
+        if (!passed) {
+            make_check_value(req, value_exist, check_value, err_resp);
+            return make_error_response(invalid_argument ? 
rocksdb::Status::kInvalidArgument
+                                                        : 
rocksdb::Status::kTryAgain,

Review Comment:
   Great question! The reason for returning `kTryAgain` here is that the 
condition check in `check_and_mutate` failed, which signals the caller not to 
proceed with the subsequent steps (including 2PC and applying to the storage 
engine). At the same time, it responds to the client indicating that no other 
error occurred — it's simply that the condition check didn’t pass. If 
necessary, the client can still choose to resubmit the request to the server. 
`kTryAgain` is chosen as the return code also to stay consistent with the 
previous non-idempotent version of `check_and_mutate()`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to