acelyc111 commented on a change in pull request #656:
URL: https://github.com/apache/incubator-pegasus/pull/656#discussion_r543483206



##########
File path: src/server/rocksdb_wrapper.cpp
##########
@@ -66,5 +75,109 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ 
db_get_context *ctx)
     return s.code();
 }
 
+int rocksdb_wrapper::write_batch_put(int64_t decree,
+                                     dsn::string_view raw_key,
+                                     dsn::string_view value,
+                                     uint32_t expire_sec)
+{
+    return write_batch_put_ctx(db_write_context::empty(decree), raw_key, 
value, expire_sec);
+}
+
+int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
+                                         dsn::string_view raw_key,
+                                         dsn::string_view value,
+                                         uint32_t expire_sec)
+{
+    FAIL_POINT_INJECT_F("db_write_batch_put",
+                        [](dsn::string_view) -> int { return 
FAIL_DB_WRITE_BATCH_PUT; });
+
+    uint64_t new_timetag = ctx.remote_timetag;
+    if (!ctx.is_duplicated_write()) { // local write
+        new_timetag = generate_timetag(ctx.timestamp, 
get_cluster_id_if_exists(), false);
+    }
+
+    if (ctx.verify_timetag &&         // needs read-before-write
+        _pegasus_data_version >= 1 && // data version 0 doesn't support 
timetag.
+        !raw_key.empty()) {           // not an empty write
+
+        db_get_context get_ctx;
+        int err = get(raw_key, &get_ctx);
+        if (dsn_unlikely(err != 0)) {
+            return err;
+        }
+        // if record exists and is not expired.
+        if (get_ctx.found && !get_ctx.expired) {
+            uint64_t local_timetag =
+                pegasus_extract_timetag(_pegasus_data_version, 
get_ctx.raw_value);
+
+            if (local_timetag >= new_timetag) {
+                // ignore this stale update with lower timetag,
+                // and write an empty record instead
+                raw_key = value = dsn::string_view();
+            }
+        }
+    }
+
+    rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
+    rocksdb::SliceParts skey_parts(&skey, 1);
+    rocksdb::SliceParts svalue = _value_generator->generate_value(
+        _pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
+    rocksdb::Status s = _write_batch->Put(skey_parts, svalue);
+    if (dsn_unlikely(!s.ok())) {
+        ::dsn::blob hash_key, sort_key;
+        pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), 
hash_key, sort_key);
+        derror_rocksdb("WriteBatchPut",
+                       s.ToString(),
+                       "decree: {}, hash_key: {}, sort_key: {}, expire_ts: {}",
+                       ctx.decree,
+                       utils::c_escape_string(hash_key),
+                       utils::c_escape_string(sort_key),
+                       expire_sec);
+    }
+    return s.code();
+}
+
+int rocksdb_wrapper::write(int64_t decree)
+{
+    dassert(_write_batch->Count() != 0, "");

Review comment:
       Fill the error message.

##########
File path: src/server/pegasus_write_service_impl.h
##########
@@ -338,22 +338,24 @@ class pegasus_write_service::impl : public 
dsn::replication::replica_base
             } else {
                 set_key = check_key;
             }
-            resp.error = db_write_batch_put(decree,
-                                            set_key,
-                                            update.set_value,
-                                            
static_cast<uint32_t>(update.set_expire_ts_seconds));
+            resp.error = _rocksdb_wrapper->write_batch_put(
+                decree,
+                set_key,
+                update.set_value,
+                static_cast<uint32_t>(update.set_expire_ts_seconds));
         } else {
             // check not passed, write empty record to update rocksdb's last 
flushed decree
-            resp.error = db_write_batch_put(decree, dsn::string_view(), 
dsn::string_view(), 0);
+            resp.error = _rocksdb_wrapper->write_batch_put(
+                decree, dsn::string_view(), dsn::string_view(), 0);
         }
         if (resp.error) {
-            clear_up_batch_states(decree, resp.error);
+            _rocksdb_wrapper->clear_up_write_batch();

Review comment:
       Use an RAII way to optimize `clear_up_write_batch `.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to