This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 1ee52eb3a chore(log): replace logging calls in storage/* (#2909)
1ee52eb3a is described below

commit 1ee52eb3a00af68b1ec99e139ad350313784936a
Author: Anirudh Lakhanpal <[email protected]>
AuthorDate: Wed May 7 11:11:39 2025 +0530

    chore(log): replace logging calls in storage/* (#2909)
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 src/storage/batch_extractor.cc    | 57 ++++++++++++------------
 src/storage/compact_filter.cc     | 16 +++----
 src/storage/compaction_checker.cc | 26 +++++------
 src/storage/event_listener.cc     | 94 ++++++++++++++++++++-------------------
 src/storage/iterator.h            |  4 +-
 src/storage/scripting.cc          |  4 +-
 src/storage/storage.cc            | 91 ++++++++++++++++++-------------------
 7 files changed, 145 insertions(+), 147 deletions(-)

diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index 45fb70e59..f0b633b3e 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -37,7 +37,7 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) 
{
   } else {
     // Redis type log data
     if (auto s = log_data_.Decode(blob); !s.IsOK()) {
-      LOG(WARNING) << "Failed to decode Redis type log: " << s.Msg();
+      warn("Failed to decode Redis type log: {}", s.Msg());
     }
   }
 }
@@ -139,8 +139,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
       case kRedisList: {
         auto args = log_data_.GetArguments();
         if (args->empty()) {
-          LOG(ERROR)
-              << "Failed to parse write_batch in PutCF. Type=List: no 
arguments, at least should contain a command";
+          error("Failed to parse write_batch in PutCF. Type=List: no 
arguments, at least should contain a command");
           return rocksdb::Status::OK();
         }
 
@@ -154,8 +153,9 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
         switch (cmd) {
           case kRedisCmdLSet:
             if (args->size() < 2) {
-              LOG(ERROR) << "Failed to parse write_batch in PutCF. 
Command=LSET: no enough arguments, at least should "
-                            "contain an index";
+              error(
+                  "Failed to parse write_batch in PutCF. Command=LSET: no 
enough arguments, at least should contain an "
+                  "index");
               return rocksdb::Status::OK();
             }
 
@@ -164,8 +164,9 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
           case kRedisCmdLInsert:
             if (first_seen_) {
               if (args->size() < 4) {
-                LOG(ERROR) << "Failed to parse write_batch in PutCF. 
Command=LINSERT: no enough arguments, should "
-                              "contain before pivot value";
+                error(
+                    "Failed to parse write_batch in PutCF. Command=LINSERT: no 
enough arguments, should contain before "
+                    "pivot values");
                 return rocksdb::Status::OK();
               }
 
@@ -186,8 +187,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
             // LMOVE will be parsed in DeleteCF, so ignore it here
             break;
           default:
-            LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=List: 
unhandled command with code "
-                       << *parse_result;
+            error("Failed to parse write_batch in PutCF. Type=List: unhandled 
command with code {}", *parse_result);
         }
         break;
       }
@@ -202,8 +202,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
       case kRedisBitmap: {
         auto args = log_data_.GetArguments();
         if (args->empty()) {
-          LOG(ERROR)
-              << "Failed to parse write_batch in PutCF. Type=Bitmap: no 
arguments, at least should contain a command";
+          error("Failed to parse write_batch in PutCF. Type=Bitmap: no 
arguments, at least should contain a command");
           return rocksdb::Status::OK();
         }
 
@@ -217,8 +216,9 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
         switch (cmd) {
           case kRedisCmdSetBit: {
             if (args->size() < 2) {
-              LOG(ERROR) << "Failed to parse write_batch in PutCF. 
Command=SETBIT: no enough arguments, should contain "
-                            "an offset";
+              error(
+                  "Failed to parse write_batch in PutCF. Command=SETBIT: no 
enough arguments, should contain an "
+                  "offset");
               return rocksdb::Status::OK();
             }
 
@@ -234,8 +234,9 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
           case kRedisCmdBitOp:
             if (first_seen_) {
               if (args->size() < 4) {
-                LOG(ERROR) << "Failed to parse write_batch in PutCF. 
Command=BITOP: no enough arguments, at least "
-                              "should contain srckey";
+                error(
+                    "Failed to parse write_batch in PutCF. Command=BITOP: no 
enough arguments, at least should contain "
+                    "srckey");
                 return rocksdb::Status::OK();
               }
 
@@ -249,8 +250,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
             command_args.insert(command_args.end(), args->begin() + 1, 
args->end());
             break;
           default:
-            LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=Bitmap: 
unhandled command with code "
-                       << *parsed_cmd;
+            error("Failed to parse write_batch in PutCF. Type=Bitmap: 
unhandled command with code {}", *parsed_cmd);
             return rocksdb::Status::OK();
         }
         break;
@@ -268,7 +268,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
   } else if (column_family_id == 
static_cast<uint32_t>(ColumnFamilyID::Stream)) {
     auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value, 
&command_args);
     if (!s.IsOK()) {
-      LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=Stream: " << 
s.Msg();
+      error("Failed to parse write_batch in PutCF. Type=Stream: {}", s.Msg());
       return rocksdb::Status::OK();
     }
   }
@@ -322,8 +322,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t 
column_family_id, const S
       case kRedisList: {
         auto args = log_data_.GetArguments();
         if (args->empty()) {
-          LOG(ERROR)
-              << "Failed to parse write_batch in DeleteCF. Type=List: no 
arguments, at least should contain a command";
+          error("Failed to parse write_batch in DeleteCF. Type=List: no 
arguments, at least should contain a command");
           return rocksdb::Status::OK();
         }
 
@@ -338,8 +337,9 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t 
column_family_id, const S
           case kRedisCmdLTrim:
             if (first_seen_) {
               if (args->size() < 3) {
-                LOG(ERROR) << "Failed to parse write_batch in DeleteCF; 
Command=LTRIM: no enough arguments, should "
-                              "contain start and stop";
+                error(
+                    "Failed to parse write_batch in DeleteCF; Command=LTRIM: 
no enough arguments, should contain start "
+                    "and stop");
                 return rocksdb::Status::OK();
               }
 
@@ -350,8 +350,9 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t 
column_family_id, const S
           case kRedisCmdLRem:
             if (first_seen_) {
               if (args->size() < 3) {
-                LOG(ERROR) << "Failed to parse write_batch in DeleteCF. 
Command=LREM: no enough arguments, should "
-                              "contain count and value";
+                error(
+                    "Failed to parse write_batch in DeleteCF. Command=LREM: no 
enough arguments, should "
+                    "contain count and value");
                 return rocksdb::Status::OK();
               }
 
@@ -368,8 +369,9 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t 
column_family_id, const S
           case kRedisCmdLMove:
             if (first_seen_) {
               if (args->size() < 5) {
-                LOG(ERROR) << "Failed to parse write_batch in DeleteCF; 
Command=LMOVE: no enough arguments, should "
-                              "contain source, destination and where/from 
arguments";
+                error(
+                    "Failed to parse write_batch in DeleteCF; Command=LMOVE: 
no enough arguments, should "
+                    "contain source, destination and where/from arguments");
                 return rocksdb::Status::OK();
               }
               command_args = {"LMOVE", (*args)[1], (*args)[2], (*args)[3], 
(*args)[4]};
@@ -377,8 +379,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t 
column_family_id, const S
             }
             break;
           default:
-            LOG(ERROR) << "Failed to parse write_batch in DeleteCF. Type=List: 
unhandled command with code "
-                       << *parse_result;
+            error("Failed to parse write_batch in DeleteCF. Type=List: 
unhandled command with code {}", *parse_result);
         }
         break;
       }
diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index 809556c81..737dbf114 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -38,13 +38,11 @@ bool MetadataFilter::Filter([[maybe_unused]] int level, 
const Slice &key, const
   rocksdb::Status s = metadata.Decode(value);
   auto [ns, user_key] = ExtractNamespaceKey(key, stor_->IsSlotIdEncoded());
   if (!s.ok()) {
-    LOG(WARNING) << "[compact_filter/metadata] Failed to decode,"
-                 << ", namespace: " << ns << ", key: " << user_key << ", err: 
" << s.ToString();
+    warn("[compact_filter/metadata] Failed to decode, namespace: {}, key: {}, 
err: {}", ns, user_key, s.ToString());
     return false;
   }
-  DLOG(INFO) << "[compact_filter/metadata] "
-             << "namespace: " << ns << ", key: " << user_key
-             << ", result: " << (metadata.Expired() ? "deleted" : "reserved");
+  debug("[compact_filter/metadata] namespace: {}, key: {}, result: {}", ns, 
user_key,
+        (metadata.Expired() ? "deleted" : "reserved"));
   return metadata.Expired();
 }
 
@@ -104,8 +102,8 @@ rocksdb::CompactionFilter::Decision 
SubKeyFilter::FilterBlobByKey([[maybe_unused
     return rocksdb::CompactionFilter::Decision::kRemove;
   }
   if (!s.IsOK()) {
-    LOG(ERROR) << "[compact_filter/subkey] Failed to get metadata"
-               << ", namespace: " << ikey.GetNamespace() << ", key: " << 
ikey.GetKey() << ", err: " << s.Msg();
+    error("[compact_filter/subkey] Failed to get metadata, namespace: {}, key: 
{}, err: {}", ikey.GetNamespace(),
+          ikey.GetKey(), s.Msg());
     return rocksdb::CompactionFilter::Decision::kKeep;
   }
   // bitmap will be checked in Filter
@@ -126,8 +124,8 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const 
Slice &key, const Sl
     return true;
   }
   if (!s.IsOK()) {
-    LOG(ERROR) << "[compact_filter/subkey] Failed to get metadata"
-               << ", namespace: " << ikey.GetNamespace() << ", key: " << 
ikey.GetKey() << ", err: " << s.Msg();
+    error("[compact_filter/subkey] Failed to get metadata, namespace: {}, key: 
{}, err: {}", ikey.GetNamespace(),
+          ikey.GetKey(), s.Msg());
     return false;
   }
 
diff --git a/src/storage/compaction_checker.cc 
b/src/storage/compaction_checker.cc
index 0bf90785a..34c621249 100644
--- a/src/storage/compaction_checker.cc
+++ b/src/storage/compaction_checker.cc
@@ -30,11 +30,10 @@ void CompactionChecker::CompactPropagateAndPubSubFiles() {
   compact_opts.change_level = true;
   for (const auto &cf :
        {engine::ColumnFamilyConfigs::PubSubColumnFamily(), 
engine::ColumnFamilyConfigs::PropagateColumnFamily()}) {
-    LOG(INFO) << "[compaction checker] Start to compact the column family: " 
<< cf.Name();
+    info("[compaction checker] Start to compact the column family: {}", 
cf.Name());
     auto cf_handle = storage_->GetCFHandle(cf.Id());
     auto s = storage_->GetDB()->CompactRange(compact_opts, cf_handle, nullptr, 
nullptr);
-    LOG(INFO) << "[compaction checker] Compact the column family: " << 
cf.Name()
-              << " finished, result: " << s.ToString();
+    info("[compaction checker] Compact the column family: {} finished, result: 
{}", cf.Name(), s.ToString());
   }
 }
 
@@ -43,7 +42,7 @@ void CompactionChecker::PickCompactionFilesForCf(const 
engine::ColumnFamilyConfi
   rocksdb::ColumnFamilyHandle *cf = 
storage_->GetCFHandle(column_family_config.Id());
   auto s = storage_->GetDB()->GetPropertiesOfAllTables(cf, &props);
   if (!s.ok()) {
-    LOG(WARNING) << "[compaction checker] Failed to get table properties, " << 
s.ToString();
+    warn("[compaction checker] Failed to get table properties, {}", 
s.ToString());
     return;
   }
   // The main goal of compaction was reclaimed the disk space and removed
@@ -74,8 +73,7 @@ void CompactionChecker::PickCompactionFilesForCf(const 
engine::ColumnFamilyConfi
       // file_creation_time is 0 which means the unknown condition in rocksdb
       s = rocksdb::Env::Default()->GetFileModificationTime(iter.first, 
&file_creation_time);
       if (!s.ok()) {
-        LOG(INFO) << "[compaction checker] Failed to get the file creation 
time: " << iter.first
-                  << ", err: " << s.ToString();
+        info("[compaction checker] Failed to get the file creation time: {}, 
err: {}", iter.first, s.ToString());
         continue;
       }
     }
@@ -84,7 +82,7 @@ void CompactionChecker::PickCompactionFilesForCf(const 
engine::ColumnFamilyConfi
       if (property_iter.first == "total_keys") {
         auto parse_result = ParseInt<int>(property_iter.second, 10);
         if (!parse_result) {
-          LOG(ERROR) << "[compaction checker] Parse total_keys error: " << 
parse_result.Msg();
+          error("[compaction checker] Parse total_keys error: {}", 
parse_result.Msg());
           continue;
         }
         total_keys = *parse_result;
@@ -92,7 +90,7 @@ void CompactionChecker::PickCompactionFilesForCf(const 
engine::ColumnFamilyConfi
       if (property_iter.first == "deleted_keys") {
         auto parse_result = ParseInt<int>(property_iter.second, 10);
         if (!parse_result) {
-          LOG(ERROR) << "[compaction checker] Parse deleted_keys error: " << 
parse_result.Msg();
+          error("[compaction checker] Parse deleted_keys error: {}", 
parse_result.Msg());
           continue;
         }
         deleted_keys = *parse_result;
@@ -111,10 +109,10 @@ void CompactionChecker::PickCompactionFilesForCf(const 
engine::ColumnFamilyConfi
     // pick the file according to force compact policy
     if (file_creation_time < static_cast<uint64_t>(now - 
force_compact_file_age) &&
         delete_ratio >= force_compact_min_ratio) {
-      LOG(INFO) << "[compaction checker] Going to compact the key in file 
(force compact policy): " << iter.first;
+      info("[compaction checker] Going to compact the key in file (force 
compact policy): {}", iter.first);
       auto s = storage_->Compact(cf, &start_key, &stop_key);
-      LOG(INFO) << "[compaction checker] Compact the key in file (force 
compact policy): " << iter.first
-                << " finished, result: " << s.ToString();
+      info("[compaction checker] Compact the key in file (force compact 
policy): {} finished, result: {}", iter.first,
+           s.ToString());
       max_files_to_compact--;
       continue;
     }
@@ -133,11 +131,11 @@ void CompactionChecker::PickCompactionFilesForCf(const 
engine::ColumnFamilyConfi
     }
   }
   if (best_delete_ratio > 0.1 && !best_start_key.empty() && 
!best_stop_key.empty()) {
-    LOG(INFO) << "[compaction checker] Going to compact the key in file: " << 
best_filename
-              << ", delete ratio: " << best_delete_ratio;
+    info("[compaction checker] Going to compact the key in file: {}, delete 
ratio: {}", best_filename,
+         best_delete_ratio);
     auto s = storage_->Compact(cf, &best_start_key, &best_stop_key);
     if (!s.ok()) {
-      LOG(ERROR) << "[compaction checker] Failed to do compaction: " << 
s.ToString();
+      error("[compaction checker] Failed to do compaction: {}", s.ToString());
     }
   }
 }
diff --git a/src/storage/event_listener.cc b/src/storage/event_listener.cc
index e5861d20a..1c8f48f37 100644
--- a/src/storage/event_listener.cc
+++ b/src/storage/event_listener.cc
@@ -80,58 +80,61 @@ bool IsDiskQuotaExceeded(const rocksdb::Status &bg_error) {
 }
 
 void EventListener::OnCompactionBegin([[maybe_unused]] rocksdb::DB *db, const 
rocksdb::CompactionJobInfo &ci) {
-  LOG(INFO) << "[event_listener/compaction_begin] column family: " << 
ci.cf_name << ", job_id: " << ci.job_id
-            << ", compaction reason: " << 
rocksdb::GetCompactionReasonString(ci.compaction_reason)
-            << ", output compression type: " << 
CompressType2String(ci.compression)
-            << ", base input level(files): " << ci.base_input_level << "(" << 
ci.input_files.size() << ")"
-            << ", output level(files): " << ci.output_level << "(" << 
ci.output_files.size() << ")"
-            << ", input bytes: " << ci.stats.total_input_bytes << ", output 
bytes:" << ci.stats.total_output_bytes
-            << ", is_manual_compaction:" << (ci.stats.is_manual_compaction ? 
"yes" : "no");
+  info(
+      "[event_listener/compaction_begin] column family: {}, job_id: {}, 
compaction reason: {}, output compression "
+      "type: {}, base input level(files): {}({}), output level(files): {}({}), 
input bytes: {}, output bytes: {}, "
+      "is_manual_compaction: {}",
+      ci.cf_name, ci.job_id, 
rocksdb::GetCompactionReasonString(ci.compaction_reason),
+      CompressType2String(ci.compression), ci.base_input_level, 
ci.input_files.size(), ci.output_level,
+      ci.output_files.size(), ci.stats.total_input_bytes, 
ci.stats.total_output_bytes,
+      ci.stats.is_manual_compaction ? "yes" : "no");
 }
 
 void EventListener::OnCompactionCompleted([[maybe_unused]] rocksdb::DB *db, 
const rocksdb::CompactionJobInfo &ci) {
-  LOG(INFO) << "[event_listener/compaction_completed] column family: " << 
ci.cf_name << ", job_id: " << ci.job_id
-            << ", compaction reason: " << 
rocksdb::GetCompactionReasonString(ci.compaction_reason)
-            << ", output compression type: " << 
CompressType2String(ci.compression)
-            << ", base input level(files): " << ci.base_input_level << "(" << 
ci.input_files.size() << ")"
-            << ", output level(files): " << ci.output_level << "(" << 
ci.output_files.size() << ")"
-            << ", input bytes: " << ci.stats.total_input_bytes << ", output 
bytes:" << ci.stats.total_output_bytes
-            << ", is_manual_compaction:" << (ci.stats.is_manual_compaction ? 
"yes" : "no")
-            << ", elapsed(micro): " << ci.stats.elapsed_micros;
+  info(
+      "[event_listener/compaction_completed] column family: {}, job_id: {}, 
compaction reason: {}, output compression "
+      "type: {}, base input level(files): {}({}), output level(files): {}({}), 
input bytes: {}, output bytes: {}, "
+      "is_manual_compaction: {}, elapsed(micro): {}",
+      ci.cf_name, ci.job_id, 
rocksdb::GetCompactionReasonString(ci.compaction_reason),
+      CompressType2String(ci.compression), ci.base_input_level, 
ci.input_files.size(), ci.output_level,
+      ci.output_files.size(), ci.stats.total_input_bytes, 
ci.stats.total_output_bytes,
+      ci.stats.is_manual_compaction ? "yes" : "no", ci.stats.elapsed_micros);
   storage_->RecordStat(engine::StatType::CompactionCount, 1);
   storage_->CheckDBSizeLimit();
 }
 
 void EventListener::OnSubcompactionBegin(const rocksdb::SubcompactionJobInfo 
&si) {
-  LOG(INFO) << "[event_listener/subcompaction_begin] column family: " << 
si.cf_name << ", job_id: " << si.job_id
-            << ", compaction reason: " << 
rocksdb::GetCompactionReasonString(si.compaction_reason)
-            << ", output compression type: " << 
CompressType2String(si.compression);
+  info(
+      "[event_listener/subcompaction_begin] column family: {}, job_id: {}, 
compaction reason: {}, output compression "
+      "type: {}",
+      si.cf_name, si.job_id, 
rocksdb::GetCompactionReasonString(si.compaction_reason),
+      CompressType2String(si.compression));
 }
 
 void EventListener::OnSubcompactionCompleted(const 
rocksdb::SubcompactionJobInfo &si) {
-  LOG(INFO) << "[event_listener/subcompaction_completed] column family: " << 
si.cf_name << ", job_id: " << si.job_id
-            << ", compaction reason: " << 
rocksdb::GetCompactionReasonString(si.compaction_reason)
-            << ", output compression type: " << 
CompressType2String(si.compression)
-            << ", base input level(files): " << si.base_input_level << ", 
output level(files): " << si.output_level
-            << ", input bytes: " << si.stats.total_input_bytes << ", output 
bytes:" << si.stats.total_output_bytes
-            << ", is_manual_compaction:" << (si.stats.is_manual_compaction ? 
"yes" : "no")
-            << ", elapsed(micro): " << si.stats.elapsed_micros;
+  info(
+      "[event_listener/subcompaction_completed] column family: {}, job_id: {}, 
compaction reason: {}, output "
+      "compression type: {}, base input level(files): {}, output level(files): 
{}, input bytes: {}, output bytes: {}, "
+      "is_manual_compaction: {}, elapsed(micro): {}",
+      si.cf_name, si.job_id, 
rocksdb::GetCompactionReasonString(si.compaction_reason),
+      CompressType2String(si.compression), si.base_input_level, 
si.output_level, si.stats.total_input_bytes,
+      si.stats.total_output_bytes, si.stats.is_manual_compaction ? "yes" : 
"no", si.stats.elapsed_micros);
 }
 
 void EventListener::OnFlushBegin([[maybe_unused]] rocksdb::DB *db, const 
rocksdb::FlushJobInfo &fi) {
-  LOG(INFO) << "[event_listener/flush_begin] column family: " << fi.cf_name << 
", thread_id: " << fi.thread_id
-            << ", job_id: " << fi.job_id << ", reason: " << 
rocksdb::GetFlushReasonString(fi.flush_reason);
+  info("[event_listener/flush_begin] column family: {}, thread_id: {}, job_id: 
{}, reason: {}", fi.cf_name,
+       fi.thread_id, fi.job_id, 
rocksdb::GetFlushReasonString(fi.flush_reason));
 }
 
 void EventListener::OnFlushCompleted([[maybe_unused]] rocksdb::DB *db, const 
rocksdb::FlushJobInfo &fi) {
   storage_->RecordStat(engine::StatType::FlushCount, 1);
   storage_->CheckDBSizeLimit();
-  LOG(INFO) << "[event_listener/flush_completed] column family: " << 
fi.cf_name << ", thread_id: " << fi.thread_id
-            << ", job_id: " << fi.job_id << ", file: " << fi.file_path
-            << ", reason: " << static_cast<int>(fi.flush_reason)
-            << ", is_write_slowdown: " << (fi.triggered_writes_slowdown ? 
"yes" : "no")
-            << ", is_write_stall: " << (fi.triggered_writes_stop ? "yes" : 
"no")
-            << ", largest seqno: " << fi.largest_seqno << ", smallest seqno: " 
<< fi.smallest_seqno;
+  info(
+      "[event_listener/flush_completed] column family: {}, thread_id: {}, 
job_id: {}, file: {}, reason: {}, "
+      "is_write_slowdown: {}, is_write_stall: {}, largest seqno: {}, smallest 
seqno: {}",
+      fi.cf_name, fi.thread_id, fi.job_id, fi.file_path, 
static_cast<int>(fi.flush_reason),
+      fi.triggered_writes_slowdown ? "yes" : "no", fi.triggered_writes_stop ? 
"yes" : "no", fi.largest_seqno,
+      fi.smallest_seqno);
 }
 
 void EventListener::OnBackgroundError(rocksdb::BackgroundErrorReason reason, 
rocksdb::Status *bg_error) {
@@ -149,7 +152,7 @@ void 
EventListener::OnBackgroundError(rocksdb::BackgroundErrorReason reason, roc
       auto s = storage_->GetDB()->GetLiveFiles(live_files, &manifest_size, 
false /* flush_memtable */);
       if (s.ok() && std::find(live_files.begin(), live_files.end(), 
corrupt_sst) == live_files.end()) {
         *bg_error = rocksdb::Status::OK();
-        LOG(WARNING) << fmt::format(
+        warn(
             "[event_listener/background_error] ignore no-fatal background 
error about sst file, reason: {}, bg_error: "
             "{}",
             reason_str, error_str);
@@ -163,22 +166,23 @@ void 
EventListener::OnBackgroundError(rocksdb::BackgroundErrorReason reason, roc
     storage_->SetDBInRetryableIOError(true);
   }
 
-  LOG(ERROR) << fmt::format("[event_listener/background_error] reason: {}, 
bg_error: {}", reason_str, error_str);
+  error("[event_listener/background_error] reason: {}, bg_error: {}", 
reason_str, error_str);
 }
 
-void EventListener::OnTableFileDeleted(const rocksdb::TableFileDeletionInfo 
&info) {
-  LOG(INFO) << "[event_listener/table_file_deleted] db: " << info.db_name << 
", sst file: " << info.file_path
-            << ", status: " << info.status.ToString();
+void EventListener::OnTableFileDeleted(const rocksdb::TableFileDeletionInfo 
&table_info) {
+  info("[event_listener/table_file_deleted] db: {}, sst file: {}, status: {}", 
table_info.db_name, table_info.file_path,
+       table_info.status.ToString());
 }
 
 void EventListener::OnStallConditionsChanged(const rocksdb::WriteStallInfo 
&info) {
-  LOG(WARNING) << "[event_listener/stall_cond_changed] column family: " << 
info.cf_name
-               << " write stall condition was changed, from " << 
StallConditionType2String(info.condition.prev)
-               << " to " << StallConditionType2String(info.condition.cur);
+  warn("[event_listener/stall_cond_changed] column family: {} write stall 
condition was changed, from {} to {}",
+       info.cf_name, StallConditionType2String(info.condition.prev), 
StallConditionType2String(info.condition.cur));
 }
 
-void EventListener::OnTableFileCreated(const rocksdb::TableFileCreationInfo 
&info) {
-  LOG(INFO) << "[event_listener/table_file_created] column family: " << 
info.cf_name
-            << ", file path: " << info.file_path << ", file size: " << 
info.file_size << ", job_id: " << info.job_id
-            << ", reason: " << FileCreatedReason2String(info.reason) << ", 
status: " << info.status.ToString();
+void EventListener::OnTableFileCreated(const rocksdb::TableFileCreationInfo 
&table_info) {
+  info(
+      "[event_listener/table_file_created] column family: {}, file path: {}, 
file size: {}, job_id: {}, reason: {}, "
+      "status: {}",
+      table_info.cf_name, table_info.file_path, table_info.file_size, 
table_info.job_id,
+      FileCreatedReason2String(table_info.reason), 
table_info.status.ToString());
 }
diff --git a/src/storage/iterator.h b/src/storage/iterator.h
index 4468f7ebd..1ffb52cae 100644
--- a/src/storage/iterator.h
+++ b/src/storage/iterator.h
@@ -142,9 +142,9 @@ class WALBatchExtractor : public 
rocksdb::WriteBatch::Handler {
 class WALIterator {
  public:
   explicit WALIterator(engine::Storage *storage, const SlotRange &slot_range)
-      : storage_(storage), slot_range_(slot_range), extractor_(slot_range), 
next_batch_seq_(0){};
+      : storage_(storage), slot_range_(slot_range), extractor_(slot_range), 
next_batch_seq_(0) {}
   explicit WALIterator(engine::Storage *storage, int slot = -1)
-      : storage_(storage), slot_range_(slot, slot), extractor_(slot), 
next_batch_seq_(0){};
+      : storage_(storage), slot_range_(slot, slot), extractor_(slot), 
next_batch_seq_(0) {}
   ~WALIterator() = default;
 
   bool Valid() const;
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index ebd194a23..93a9463d2 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -206,10 +206,10 @@ int RedisLogCommand(lua_State *lua) {
   switch (level) {
     case LL_VERBOSE:  // also regard VERBOSE as INFO here since no VERBOSE 
level
     case LL_NOTICE:
-      LOG(INFO) << "[Lua] " << log_message;
+      info("[Lua] {}", log_message);
       break;
     case LL_WARNING:
-      LOG(WARNING) << "[Lua] " << log_message;
+      warn("[Lua] {}", log_message);
       break;
   }
   return 0;
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index e59a0e602..fd8059192 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -390,16 +390,16 @@ Status Storage::Open(DBOpenMode mode) {
   int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(end 
- start).count();
 
   if (!db_) {
-    LOG(INFO) << "[storage] Failed to load the data from disk: " << duration 
<< " ms";
+    info("[storage] Failed to load the data from disk: {} ms", duration);
     return {Status::DBOpenErr};
   }
-  LOG(INFO) << "[storage] Success to load the data from disk: " << duration << 
" ms";
+  info("[storage] Success to load the data from disk: {} ms", duration);
 
   return Status::OK();
 }
 
 Status Storage::CreateBackup(uint64_t *sequence_number) {
-  LOG(INFO) << "[storage] Start to create new backup";
+  info("[storage] Start to create new backup");
   std::lock_guard<std::mutex> lg(config_->backup_mu);
   std::string task_backup_dir = config_->backup_dir;
 
@@ -411,28 +411,28 @@ Status Storage::CreateBackup(uint64_t *sequence_number) {
   rocksdb::Checkpoint *checkpoint = nullptr;
   rocksdb::Status s = rocksdb::Checkpoint::Create(db_.get(), &checkpoint);
   if (!s.ok()) {
-    LOG(WARNING) << "Failed to create checkpoint object for backup. Error: " 
<< s.ToString();
+    warn("Failed to create checkpoint object for backup. Error: {}", 
s.ToString());
     return {Status::NotOK, s.ToString()};
   }
 
   std::unique_ptr<rocksdb::Checkpoint> checkpoint_guard(checkpoint);
   s = checkpoint->CreateCheckpoint(tmpdir, config_->rocks_db.write_buffer_size 
* MiB, sequence_number);
   if (!s.ok()) {
-    LOG(WARNING) << "Failed to create checkpoint (snapshot) for backup. Error: 
" << s.ToString();
+    warn("Failed to create checkpoint (snapshot) for backup. Error: {}", 
s.ToString());
     return {Status::DBBackupErr, s.ToString()};
   }
 
   // 2) Rename tmp backup to real backup dir
   if (s = rocksdb::DestroyDB(task_backup_dir, rocksdb::Options()); !s.ok()) {
-    LOG(WARNING) << "[storage] Failed to clean old backup. Error: " << 
s.ToString();
+    warn("[storage] Failed to clean old backup. Error: {}", s.ToString());
     return {Status::NotOK, s.ToString()};
   }
 
   if (s = env_->RenameFile(tmpdir, task_backup_dir); !s.ok()) {
-    LOG(WARNING) << "[storage] Failed to rename tmp backup. Error: " << 
s.ToString();
+    warn("[storage] Failed to rename tmp backup. Error: {}", s.ToString());
     // Just try best effort
     if (s = rocksdb::DestroyDB(tmpdir, rocksdb::Options()); !s.ok()) {
-      LOG(WARNING) << "[storage] Failed to clean tmp backup. Error: " << 
s.ToString();
+      warn("[storage] Failed to clean tmp backup. Error: {}", s.ToString());
     }
 
     return {Status::NotOK, s.ToString()};
@@ -441,7 +441,7 @@ Status Storage::CreateBackup(uint64_t *sequence_number) {
   // 'backup_mu_' can guarantee 'backup_creating_time_secs_' is thread-safe
   backup_creating_time_secs_ = util::GetTimeStamp<std::chrono::seconds>();
 
-  LOG(INFO) << "[storage] Success to create new backup";
+  info("[storage] Success to create new backup");
   return Status::OK();
 }
 
@@ -464,15 +464,15 @@ Status Storage::RestoreFromBackup() {
 
   auto s = backup_->RestoreDBFromLatestBackup(config_->db_dir, 
config_->db_dir);
   if (!s.ok()) {
-    LOG(ERROR) << "[storage] Failed to restore database from the latest 
backup. Error: " << s.ToString();
+    error("[storage] Failed to restore database from the latest backup. Error: 
{}", s.ToString());
   } else {
-    LOG(INFO) << "[storage] Database was restored from the latest backup";
+    info("[storage] Database was restored from the latest backup");
   }
 
   // Reopen DB (should always try to reopen db even if restore failed, 
replication SST file CRC check may use it)
   auto s2 = Open();
   if (!s2.IsOK()) {
-    LOG(ERROR) << "[storage] Failed to reopen the database. Error: " << 
s2.Msg();
+    error("[storage] Failed to reopen the database. Error: {}", s2.Msg());
     return {Status::DBOpenErr, s2.Msg()};
   }
 
@@ -504,7 +504,7 @@ Status Storage::RestoreFromCheckpoint() {
   s = env_->RenameFile(config_->db_dir, tmp_dir);
   if (!s.ok()) {
     if (auto s1 = Open(); !s1.IsOK()) {
-      LOG(ERROR) << "[storage] Failed to reopen database. Error: " << s1.Msg();
+      error("[storage] Failed to reopen database. Error: {}", s1.Msg());
     }
     return {Status::NotOK, fmt::format("Failed to rename database directory 
'{}' to '{}'. Error: {}", config_->db_dir,
                                        tmp_dir, s.ToString())};
@@ -514,7 +514,7 @@ Status Storage::RestoreFromCheckpoint() {
   if (s = env_->RenameFile(checkpoint_dir, config_->db_dir); !s.ok()) {
     env_->RenameFile(tmp_dir, config_->db_dir);
     if (auto s1 = Open(); !s1.IsOK()) {
-      LOG(ERROR) << "[storage] Failed to reopen database. Error: " << s1.Msg();
+      error("[storage] Failed to reopen database. Error: {}", s1.Msg());
     }
     return {Status::NotOK, fmt::format("Failed to rename checkpoint directory 
'{}' to '{}'. Error: {}", checkpoint_dir,
                                        config_->db_dir, s.ToString())};
@@ -523,18 +523,18 @@ Status Storage::RestoreFromCheckpoint() {
   // Open the new database, restore if replica fails to open
   auto s2 = Open();
   if (!s2.IsOK()) {
-    LOG(WARNING) << "[storage] Failed to open master checkpoint. Error: " << 
s2.Msg();
+    warn("[storage] Failed to open master checkpoint. Error: {}", s2.Msg());
     rocksdb::DestroyDB(config_->db_dir, rocksdb::Options());
     env_->RenameFile(tmp_dir, config_->db_dir);
     if (auto s1 = Open(); !s1.IsOK()) {
-      LOG(ERROR) << "[storage] Failed to reopen database. Error: " << s1.Msg();
+      error("[storage] Failed to reopen database. Error: {}", s1.Msg());
     }
     return {Status::DBOpenErr, "Failed to open master checkpoint. Error: " + 
s2.Msg()};
   }
 
   // Destroy the origin database
   if (s = rocksdb::DestroyDB(tmp_dir, rocksdb::Options()); !s.ok()) {
-    LOG(WARNING) << "[storage] Failed to destroy the origin database at '" << 
tmp_dir << "'. Error: " << s.ToString();
+    warn("[storage] Failed to destroy the origin database at '{}'. Error: {}", 
tmp_dir, s.ToString());
   }
   return Status::OK();
 }
@@ -559,7 +559,7 @@ void Storage::EmptyDB() {
 
   auto s = rocksdb::DestroyDB(config_->db_dir, rocksdb::Options());
   if (!s.ok()) {
-    LOG(ERROR) << "[storage] Failed to destroy database. Error: " << 
s.ToString();
+    error("[storage] Failed to destroy database. Error: {}", s.ToString());
   }
 }
 
@@ -578,10 +578,10 @@ void Storage::PurgeOldBackups(uint32_t 
num_backups_to_keep, uint32_t backup_max_
   if (num_backups_to_keep == 0 || backup_expired) {
     s = rocksdb::DestroyDB(task_backup_dir, rocksdb::Options());
     if (s.ok()) {
-      LOG(INFO) << "[storage] Succeeded cleaning old backup that was created 
at " << backup_creating_time_secs_;
+      info("[storage] Succeeded cleaning old backup that was created at {}", 
backup_creating_time_secs_);
     } else {
-      LOG(INFO) << "[storage] Failed cleaning old backup that was created at " 
<< backup_creating_time_secs_
-                << ". Error: " << s.ToString();
+      info("[storage] Failed cleaning old backup that was created at {}. 
Error: {}", backup_creating_time_secs_,
+           s.ToString());
     }
   }
 }
@@ -785,7 +785,7 @@ StatusOr<int> Storage::IngestSST(const std::string 
&sst_dir, const rocksdb::Inge
 
   sst_files = std::move(filtered_files);
   if (sst_files.empty()) {
-    LOG(WARNING) << "No SST files found in " << sst_dir;
+    warn("No SST files found in {}", sst_dir);
     return 0;
   }
 
@@ -929,10 +929,9 @@ void Storage::CheckDBSizeLimit() {
 
   db_size_limit_reached_ = limit_reached;
   if (db_size_limit_reached_) {
-    LOG(WARNING) << "[storage] ENABLE db_size limit " << config_->max_db_size 
<< " GB."
-                 << "Switch kvrocks to read-only mode.";
+    warn("[storage] ENABLE db_size limit {} GB. Switch kvrocks to read-only 
mode.", config_->max_db_size);
   } else {
-    LOG(WARNING) << "[storage] DISABLE db_size limit. Switch kvrocks to 
read-write mode.";
+    warn("[storage] DISABLE db_size limit. Switch kvrocks to read-write 
mode.");
   }
 }
 
@@ -1012,7 +1011,7 @@ Status Storage::ShiftReplId(engine::Context &ctx) {
     rand_str[i] = charset[distrib(gen)];
   }
   replid_ = std::move(rand_str);
-  LOG(INFO) << "[replication] New replication id: " << replid_;
+  info("[replication] New replication id: {}", replid_);
 
   // Write new replication id into db engine
   return WriteToPropagateCF(ctx, kReplicationIdKey, replid_);
@@ -1089,7 +1088,7 @@ Status 
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
     rocksdb::Checkpoint *checkpoint = nullptr;
     rocksdb::Status s = rocksdb::Checkpoint::Create(storage->db_.get(), 
&checkpoint);
     if (!s.ok()) {
-      LOG(WARNING) << "Failed to create checkpoint object. Error: " << 
s.ToString();
+      warn("Failed to create checkpoint object. Error: {}", s.ToString());
       return {Status::NotOK, s.ToString()};
     }
 
@@ -1104,11 +1103,10 @@ Status 
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
     storage->checkpoint_info_.access_time_secs = now_secs;
     storage->checkpoint_info_.latest_seq = checkpoint_latest_seq;
     if (!s.ok()) {
-      LOG(WARNING) << "[storage] Failed to create checkpoint (snapshot). 
Error: " << s.ToString();
+      warn("[storage] Failed to create checkpoint (snapshot). Error: {}", 
s.ToString());
       return {Status::NotOK, s.ToString()};
     }
-
-    LOG(INFO) << "[storage] Create checkpoint successfully";
+    info("[storage] Create checkpoint successfully");
   } else {
     // Replicas can share checkpoint to replication if the checkpoint existing 
time is less than a half of WAL TTL.
     int64_t can_shared_time_secs = storage->config_->rocks_db.wal_ttl_seconds 
/ 2;
@@ -1117,7 +1115,7 @@ Status 
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
 
     auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
     if (now_secs - storage->GetCheckpointCreateTimeSecs() > 
can_shared_time_secs) {
-      LOG(WARNING) << "[storage] Can't use current checkpoint, waiting next 
checkpoint";
+      warn("[storage] Can't use current checkpoint, waiting next checkpoint");
       return {Status::NotOK, "Can't use current checkpoint, waiting for next 
checkpoint"};
     }
 
@@ -1125,10 +1123,10 @@ Status 
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
     // or the slave will fall into the full sync loop since it won't create 
new checkpoint.
     auto s = storage->InWALBoundary(storage->checkpoint_info_.latest_seq);
     if (!s.IsOK()) {
-      LOG(WARNING) << "[storage] Can't use current checkpoint, error: " << 
s.Msg();
+      warn("[storage] Can't use current checkpoint, error: {}", s.Msg());
       return {Status::NotOK, fmt::format("Can't use current checkpoint, error: 
{}", s.Msg())};
     }
-    LOG(INFO) << "[storage] Using current existing checkpoint";
+    info("[storage] Using current existing checkpoint");
   }
 
   ulm.unlock();
@@ -1192,9 +1190,9 @@ Status 
Storage::ReplDataManager::CleanInvalidFiles(Storage *storage, const std::
     auto s = storage->env_->DeleteFile(dir + "/" + *it);
     if (!s.ok()) {
       ret = Status(Status::NotOK, s.ToString());
-      LOG(INFO) << "[storage] Failed to delete invalid file " << *it << " of 
master checkpoint";
+      info("[storage] Failed to delete invalid file {} of master checkpoint", 
*it);
     } else {
-      LOG(INFO) << "[storage] Succeed deleting invalid file " << *it << " of 
master checkpoint";
+      info("[storage] Succeed deleting invalid file {} of master checkpoint", 
*it);
     }
   }
   return ret;
@@ -1204,14 +1202,14 @@ int Storage::ReplDataManager::OpenDataFile(Storage 
*storage, const std::string &
   std::string abs_path = storage->config_->checkpoint_dir + "/" + repl_file;
   auto s = storage->env_->FileExists(abs_path);
   if (!s.ok()) {
-    LOG(ERROR) << "[storage] Data file [" << abs_path << "] not found";
+    error("[storage] Data file [{}] not found", abs_path);
     return NullFD;
   }
 
   storage->env_->GetFileSize(abs_path, file_size);
   auto rv = open(abs_path.c_str(), O_RDONLY);
   if (rv < 0) {
-    LOG(ERROR) << "[storage] Failed to open file: " << strerror(errno);
+    error("[storage] Failed to open file: {}", strerror(errno));
   }
 
   return rv;
@@ -1220,7 +1218,7 @@ int Storage::ReplDataManager::OpenDataFile(Storage 
*storage, const std::string &
 Status Storage::ReplDataManager::ParseMetaAndSave(Storage *storage, 
rocksdb::BackupID meta_id, evbuffer *evbuf,
                                                   
Storage::ReplDataManager::MetaInfo *meta) {
   auto meta_file = "meta/" + std::to_string(meta_id);
-  DLOG(INFO) << "[meta] id: " << meta_id;
+  debug("[meta] id: {}", meta_id);
 
   // Save the meta to tmp file
   auto wf = NewTmpFile(storage, storage->config_->backup_sync_dir, meta_file);
@@ -1230,28 +1228,27 @@ Status 
Storage::ReplDataManager::ParseMetaAndSave(Storage *storage, rocksdb::Bac
 
   // timestamp;
   UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_LF);
-  DLOG(INFO) << "[meta] timestamp: " << line.get();
+  debug("[meta] timestamp: {}", line.get());
   meta->timestamp = std::strtoll(line.get(), nullptr, 10);
   // sequence
   line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
-  DLOG(INFO) << "[meta] seq:" << line.get();
+  debug("[meta] seq: {}", line.get());
   meta->seq = std::strtoull(line.get(), nullptr, 10);
   // optional metadata
   line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
   if (strncmp(line.get(), "metadata", 8) == 0) {
-    DLOG(INFO) << "[meta] meta: " << line.get();
+    debug("[meta] meta: {}", line.get());
     meta->meta_data = std::string(line.get(), line.length);
     line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
   }
-  DLOG(INFO) << "[meta] file count: " << line.get();
+  debug("[meta] file count: {}", line.get());
   // file list
   while (true) {
     line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
     if (!line) {
       break;
     }
-
-    DLOG(INFO) << "[meta] file info: " << line.get();
+    debug("[meta] file info: {}", line.get());
     auto cptr = line.get();
     while (*(cptr++) != ' ') {
     }
@@ -1274,7 +1271,7 @@ Status MkdirRecursively(rocksdb::Env *env, const 
std::string &dir) {
   for (auto pos = dir.find('/', 1); pos != std::string::npos; pos = 
dir.find('/', pos + 1)) {
     parent = dir.substr(0, pos);
     if (auto s = env->CreateDirIfMissing(parent); !s.ok()) {
-      LOG(ERROR) << "[storage] Failed to create directory '" << parent << "' 
recursively. Error: " << s.ToString();
+      error("[storage] Failed to create directory '{}' recursively. Error: 
{}", parent, s.ToString());
       return {Status::NotOK};
     }
   }
@@ -1289,7 +1286,7 @@ std::unique_ptr<rocksdb::WritableFile> 
Storage::ReplDataManager::NewTmpFile(Stor
   std::string tmp_file = dir + "/" + repl_file + ".tmp";
   auto s = storage->env_->FileExists(tmp_file);
   if (s.ok()) {
-    LOG(ERROR) << "[storage] Data file exists, override";
+    error("[storage] Data file exists, override");
     storage->env_->DeleteFile(tmp_file);
   }
 
@@ -1302,7 +1299,7 @@ std::unique_ptr<rocksdb::WritableFile> 
Storage::ReplDataManager::NewTmpFile(Stor
   std::unique_ptr<rocksdb::WritableFile> wf;
   s = storage->env_->NewWritableFile(tmp_file, &wf, rocksdb::EnvOptions());
   if (!s.ok()) {
-    LOG(ERROR) << "[storage] Failed to create data file '" << tmp_file << "'. 
Error: " << s.ToString();
+    error("[storage] Failed to create data file '{}'. Error: {}", tmp_file, 
s.ToString());
     return nullptr;
   }
 


Reply via email to