This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new f5c322f01 refactor: specify data column family explicitly for RocksDB 
wrapper (#2182)
f5c322f01 is described below

commit f5c322f01e6e20df1425f1e4df70dd2a43d7b77a
Author: Dan Wang <[email protected]>
AuthorDate: Thu Jan 16 12:37:07 2025 +0800

    refactor: specify data column family explicitly for RocksDB wrapper (#2182)
    
    Currently the wrapper operates RocksDB data still by default column family 
handler.
    To make data/meta column family both specified explicitly while accessing 
RocksDB
    instance, introduce data column family handler into the wrapper.
---
 src/server/rocksdb_wrapper.cpp | 19 ++++++++++++-------
 src/server/rocksdb_wrapper.h   |  1 +
 2 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 078b2b9d8..6f5ba8bc3 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -61,6 +61,7 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
     : replica_base(server),
       _db(server->_db),
       _rd_opts(server->_data_cf_rd_opts),
+      _data_cf(server->_data_cf),
       _meta_cf(server->_meta_cf),
       _pegasus_data_version(server->_pegasus_data_version),
       METRIC_VAR_INIT_replica(read_expired_values),
@@ -78,7 +79,8 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/ 
db_get_context *ctx)
 {
     FAIL_POINT_INJECT_F("db_get", [](std::string_view) -> int { return 
FAIL_DB_GET; });
 
-    rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), 
&(ctx->raw_value));
+    rocksdb::Status s =
+        _db->Get(_rd_opts, _data_cf, utils::to_rocksdb_slice(raw_key), 
&ctx->raw_value);
     if (dsn_likely(s.ok())) {
         // success
         ctx->found = true;
@@ -94,7 +96,8 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/ 
db_get_context *ctx)
         return rocksdb::Status::kOk;
     }
 
-    dsn::blob hash_key, sort_key;
+    dsn::blob hash_key;
+    dsn::blob sort_key;
     pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), 
hash_key, sort_key);
     LOG_ERROR_ROCKSDB("Get",
                       s.ToString(),
@@ -152,9 +155,10 @@ int rocksdb_wrapper::write_batch_put_ctx(const 
db_write_context &ctx,
     rocksdb::SliceParts skey_parts(&skey, 1);
     rocksdb::SliceParts svalue = _value_generator->generate_value(
         _pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
-    rocksdb::Status s = _write_batch->Put(skey_parts, svalue);
+    rocksdb::Status s = _write_batch->Put(_data_cf, skey_parts, svalue);
     if (dsn_unlikely(!s.ok())) {
-        ::dsn::blob hash_key, sort_key;
+        dsn::blob hash_key;
+        dsn::blob sort_key;
         pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), 
hash_key, sort_key);
         LOG_ERROR_ROCKSDB("WriteBatchPut",
                           s.ToString(),
@@ -199,9 +203,10 @@ int rocksdb_wrapper::write_batch_delete(int64_t decree, 
std::string_view raw_key
     FAIL_POINT_INJECT_F("db_write_batch_delete",
                         [](std::string_view) -> int { return 
FAIL_DB_WRITE_BATCH_DELETE; });
 
-    rocksdb::Status s = _write_batch->Delete(utils::to_rocksdb_slice(raw_key));
+    rocksdb::Status s = _write_batch->Delete(_data_cf, 
utils::to_rocksdb_slice(raw_key));
     if (dsn_unlikely(!s.ok())) {
-        dsn::blob hash_key, sort_key;
+        dsn::blob hash_key;
+        dsn::blob sort_key;
         pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), 
hash_key, sort_key);
         LOG_ERROR_ROCKSDB("write_batch_delete",
                           s.ToString(),
@@ -223,7 +228,7 @@ int rocksdb_wrapper::ingest_files(int64_t decree,
     ifo.move_files = true;
     ifo.ingest_behind = ingest_behind;
     ifo.write_global_seqno = FLAGS_rocksdb_write_global_seqno;
-    rocksdb::Status s = _db->IngestExternalFile(sst_file_list, ifo);
+    rocksdb::Status s = _db->IngestExternalFile(_data_cf, sst_file_list, ifo);
     if (dsn_unlikely(!s.ok())) {
         LOG_ERROR_ROCKSDB("IngestExternalFile",
                           s.ToString(),
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index c73f5cb91..f905dea93 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -81,6 +81,7 @@ private:
     std::unique_ptr<pegasus_value_generator> _value_generator;
     std::unique_ptr<rocksdb::WriteBatch> _write_batch;
     std::unique_ptr<rocksdb::WriteOptions> _wt_opts;
+    rocksdb::ColumnFamilyHandle *_data_cf;
     rocksdb::ColumnFamilyHandle *_meta_cf;
 
     const uint32_t _pegasus_data_version;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to