This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 1ee52eb3a chore(log): replace logging calls in storage/* (#2909)
1ee52eb3a is described below
commit 1ee52eb3a00af68b1ec99e139ad350313784936a
Author: Anirudh Lakhanpal <[email protected]>
AuthorDate: Wed May 7 11:11:39 2025 +0530
chore(log): replace logging calls in storage/* (#2909)
Co-authored-by: Twice <[email protected]>
Co-authored-by: Twice <[email protected]>
---
src/storage/batch_extractor.cc | 57 ++++++++++++------------
src/storage/compact_filter.cc | 16 +++----
src/storage/compaction_checker.cc | 26 +++++------
src/storage/event_listener.cc | 94 ++++++++++++++++++++-------------------
src/storage/iterator.h | 4 +-
src/storage/scripting.cc | 4 +-
src/storage/storage.cc | 91 ++++++++++++++++++-------------------
7 files changed, 145 insertions(+), 147 deletions(-)
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index 45fb70e59..f0b633b3e 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -37,7 +37,7 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice &blob)
{
} else {
// Redis type log data
if (auto s = log_data_.Decode(blob); !s.IsOK()) {
- LOG(WARNING) << "Failed to decode Redis type log: " << s.Msg();
+ warn("Failed to decode Redis type log: {}", s.Msg());
}
}
}
@@ -139,8 +139,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
case kRedisList: {
auto args = log_data_.GetArguments();
if (args->empty()) {
- LOG(ERROR)
- << "Failed to parse write_batch in PutCF. Type=List: no
arguments, at least should contain a command";
+ error("Failed to parse write_batch in PutCF. Type=List: no
arguments, at least should contain a command");
return rocksdb::Status::OK();
}
@@ -154,8 +153,9 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
switch (cmd) {
case kRedisCmdLSet:
if (args->size() < 2) {
- LOG(ERROR) << "Failed to parse write_batch in PutCF.
Command=LSET: no enough arguments, at least should "
- "contain an index";
+ error(
+ "Failed to parse write_batch in PutCF. Command=LSET: no
enough arguments, at least should contain an "
+ "index");
return rocksdb::Status::OK();
}
@@ -164,8 +164,9 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
case kRedisCmdLInsert:
if (first_seen_) {
if (args->size() < 4) {
- LOG(ERROR) << "Failed to parse write_batch in PutCF.
Command=LINSERT: no enough arguments, should "
- "contain before pivot value";
+ error(
+ "Failed to parse write_batch in PutCF. Command=LINSERT: no
enough arguments, should contain before "
+ "pivot values");
return rocksdb::Status::OK();
}
@@ -186,8 +187,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
// LMOVE will be parsed in DeleteCF, so ignore it here
break;
default:
- LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=List:
unhandled command with code "
- << *parse_result;
+ error("Failed to parse write_batch in PutCF. Type=List: unhandled
command with code {}", *parse_result);
}
break;
}
@@ -202,8 +202,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
case kRedisBitmap: {
auto args = log_data_.GetArguments();
if (args->empty()) {
- LOG(ERROR)
- << "Failed to parse write_batch in PutCF. Type=Bitmap: no
arguments, at least should contain a command";
+ error("Failed to parse write_batch in PutCF. Type=Bitmap: no
arguments, at least should contain a command");
return rocksdb::Status::OK();
}
@@ -217,8 +216,9 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
switch (cmd) {
case kRedisCmdSetBit: {
if (args->size() < 2) {
- LOG(ERROR) << "Failed to parse write_batch in PutCF.
Command=SETBIT: no enough arguments, should contain "
- "an offset";
+ error(
+ "Failed to parse write_batch in PutCF. Command=SETBIT: no
enough arguments, should contain an "
+ "offset");
return rocksdb::Status::OK();
}
@@ -234,8 +234,9 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
case kRedisCmdBitOp:
if (first_seen_) {
if (args->size() < 4) {
- LOG(ERROR) << "Failed to parse write_batch in PutCF.
Command=BITOP: no enough arguments, at least "
- "should contain srckey";
+ error(
+ "Failed to parse write_batch in PutCF. Command=BITOP: no
enough arguments, at least should contain "
+ "srckey");
return rocksdb::Status::OK();
}
@@ -249,8 +250,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
command_args.insert(command_args.end(), args->begin() + 1,
args->end());
break;
default:
- LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=Bitmap:
unhandled command with code "
- << *parsed_cmd;
+ error("Failed to parse write_batch in PutCF. Type=Bitmap:
unhandled command with code {}", *parsed_cmd);
return rocksdb::Status::OK();
}
break;
@@ -268,7 +268,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
} else if (column_family_id ==
static_cast<uint32_t>(ColumnFamilyID::Stream)) {
auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value,
&command_args);
if (!s.IsOK()) {
- LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=Stream: " <<
s.Msg();
+ error("Failed to parse write_batch in PutCF. Type=Stream: {}", s.Msg());
return rocksdb::Status::OK();
}
}
@@ -322,8 +322,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t
column_family_id, const S
case kRedisList: {
auto args = log_data_.GetArguments();
if (args->empty()) {
- LOG(ERROR)
- << "Failed to parse write_batch in DeleteCF. Type=List: no
arguments, at least should contain a command";
+ error("Failed to parse write_batch in DeleteCF. Type=List: no
arguments, at least should contain a command");
return rocksdb::Status::OK();
}
@@ -338,8 +337,9 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t
column_family_id, const S
case kRedisCmdLTrim:
if (first_seen_) {
if (args->size() < 3) {
- LOG(ERROR) << "Failed to parse write_batch in DeleteCF;
Command=LTRIM: no enough arguments, should "
- "contain start and stop";
+ error(
+ "Failed to parse write_batch in DeleteCF; Command=LTRIM:
no enough arguments, should contain start "
+ "and stop");
return rocksdb::Status::OK();
}
@@ -350,8 +350,9 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t
column_family_id, const S
case kRedisCmdLRem:
if (first_seen_) {
if (args->size() < 3) {
- LOG(ERROR) << "Failed to parse write_batch in DeleteCF.
Command=LREM: no enough arguments, should "
- "contain count and value";
+ error(
+ "Failed to parse write_batch in DeleteCF. Command=LREM: no
enough arguments, should "
+ "contain count and value");
return rocksdb::Status::OK();
}
@@ -368,8 +369,9 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t
column_family_id, const S
case kRedisCmdLMove:
if (first_seen_) {
if (args->size() < 5) {
- LOG(ERROR) << "Failed to parse write_batch in DeleteCF;
Command=LMOVE: no enough arguments, should "
- "contain source, destination and where/from
arguments";
+ error(
+ "Failed to parse write_batch in DeleteCF; Command=LMOVE:
no enough arguments, should "
+ "contain source, destination and where/from arguments");
return rocksdb::Status::OK();
}
command_args = {"LMOVE", (*args)[1], (*args)[2], (*args)[3],
(*args)[4]};
@@ -377,8 +379,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t
column_family_id, const S
}
break;
default:
- LOG(ERROR) << "Failed to parse write_batch in DeleteCF. Type=List:
unhandled command with code "
- << *parse_result;
+ error("Failed to parse write_batch in DeleteCF. Type=List:
unhandled command with code {}", *parse_result);
}
break;
}
diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index 809556c81..737dbf114 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -38,13 +38,11 @@ bool MetadataFilter::Filter([[maybe_unused]] int level,
const Slice &key, const
rocksdb::Status s = metadata.Decode(value);
auto [ns, user_key] = ExtractNamespaceKey(key, stor_->IsSlotIdEncoded());
if (!s.ok()) {
- LOG(WARNING) << "[compact_filter/metadata] Failed to decode,"
- << ", namespace: " << ns << ", key: " << user_key << ", err:
" << s.ToString();
+ warn("[compact_filter/metadata] Failed to decode, namespace: {}, key: {},
err: {}", ns, user_key, s.ToString());
return false;
}
- DLOG(INFO) << "[compact_filter/metadata] "
- << "namespace: " << ns << ", key: " << user_key
- << ", result: " << (metadata.Expired() ? "deleted" : "reserved");
+ debug("[compact_filter/metadata] namespace: {}, key: {}, result: {}", ns,
user_key,
+ (metadata.Expired() ? "deleted" : "reserved"));
return metadata.Expired();
}
@@ -104,8 +102,8 @@ rocksdb::CompactionFilter::Decision
SubKeyFilter::FilterBlobByKey([[maybe_unused
return rocksdb::CompactionFilter::Decision::kRemove;
}
if (!s.IsOK()) {
- LOG(ERROR) << "[compact_filter/subkey] Failed to get metadata"
- << ", namespace: " << ikey.GetNamespace() << ", key: " <<
ikey.GetKey() << ", err: " << s.Msg();
+ error("[compact_filter/subkey] Failed to get metadata, namespace: {}, key:
{}, err: {}", ikey.GetNamespace(),
+ ikey.GetKey(), s.Msg());
return rocksdb::CompactionFilter::Decision::kKeep;
}
// bitmap will be checked in Filter
@@ -126,8 +124,8 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const
Slice &key, const Sl
return true;
}
if (!s.IsOK()) {
- LOG(ERROR) << "[compact_filter/subkey] Failed to get metadata"
- << ", namespace: " << ikey.GetNamespace() << ", key: " <<
ikey.GetKey() << ", err: " << s.Msg();
+ error("[compact_filter/subkey] Failed to get metadata, namespace: {}, key:
{}, err: {}", ikey.GetNamespace(),
+ ikey.GetKey(), s.Msg());
return false;
}
diff --git a/src/storage/compaction_checker.cc
b/src/storage/compaction_checker.cc
index 0bf90785a..34c621249 100644
--- a/src/storage/compaction_checker.cc
+++ b/src/storage/compaction_checker.cc
@@ -30,11 +30,10 @@ void CompactionChecker::CompactPropagateAndPubSubFiles() {
compact_opts.change_level = true;
for (const auto &cf :
{engine::ColumnFamilyConfigs::PubSubColumnFamily(),
engine::ColumnFamilyConfigs::PropagateColumnFamily()}) {
- LOG(INFO) << "[compaction checker] Start to compact the column family: "
<< cf.Name();
+ info("[compaction checker] Start to compact the column family: {}",
cf.Name());
auto cf_handle = storage_->GetCFHandle(cf.Id());
auto s = storage_->GetDB()->CompactRange(compact_opts, cf_handle, nullptr,
nullptr);
- LOG(INFO) << "[compaction checker] Compact the column family: " <<
cf.Name()
- << " finished, result: " << s.ToString();
+ info("[compaction checker] Compact the column family: {} finished, result:
{}", cf.Name(), s.ToString());
}
}
@@ -43,7 +42,7 @@ void CompactionChecker::PickCompactionFilesForCf(const
engine::ColumnFamilyConfi
rocksdb::ColumnFamilyHandle *cf =
storage_->GetCFHandle(column_family_config.Id());
auto s = storage_->GetDB()->GetPropertiesOfAllTables(cf, &props);
if (!s.ok()) {
- LOG(WARNING) << "[compaction checker] Failed to get table properties, " <<
s.ToString();
+ warn("[compaction checker] Failed to get table properties, {}",
s.ToString());
return;
}
// The main goal of compaction was reclaimed the disk space and removed
@@ -74,8 +73,7 @@ void CompactionChecker::PickCompactionFilesForCf(const
engine::ColumnFamilyConfi
// file_creation_time is 0 which means the unknown condition in rocksdb
s = rocksdb::Env::Default()->GetFileModificationTime(iter.first,
&file_creation_time);
if (!s.ok()) {
- LOG(INFO) << "[compaction checker] Failed to get the file creation
time: " << iter.first
- << ", err: " << s.ToString();
+ info("[compaction checker] Failed to get the file creation time: {},
err: {}", iter.first, s.ToString());
continue;
}
}
@@ -84,7 +82,7 @@ void CompactionChecker::PickCompactionFilesForCf(const
engine::ColumnFamilyConfi
if (property_iter.first == "total_keys") {
auto parse_result = ParseInt<int>(property_iter.second, 10);
if (!parse_result) {
- LOG(ERROR) << "[compaction checker] Parse total_keys error: " <<
parse_result.Msg();
+ error("[compaction checker] Parse total_keys error: {}",
parse_result.Msg());
continue;
}
total_keys = *parse_result;
@@ -92,7 +90,7 @@ void CompactionChecker::PickCompactionFilesForCf(const
engine::ColumnFamilyConfi
if (property_iter.first == "deleted_keys") {
auto parse_result = ParseInt<int>(property_iter.second, 10);
if (!parse_result) {
- LOG(ERROR) << "[compaction checker] Parse deleted_keys error: " <<
parse_result.Msg();
+ error("[compaction checker] Parse deleted_keys error: {}",
parse_result.Msg());
continue;
}
deleted_keys = *parse_result;
@@ -111,10 +109,10 @@ void CompactionChecker::PickCompactionFilesForCf(const
engine::ColumnFamilyConfi
// pick the file according to force compact policy
if (file_creation_time < static_cast<uint64_t>(now -
force_compact_file_age) &&
delete_ratio >= force_compact_min_ratio) {
- LOG(INFO) << "[compaction checker] Going to compact the key in file
(force compact policy): " << iter.first;
+ info("[compaction checker] Going to compact the key in file (force
compact policy): {}", iter.first);
auto s = storage_->Compact(cf, &start_key, &stop_key);
- LOG(INFO) << "[compaction checker] Compact the key in file (force
compact policy): " << iter.first
- << " finished, result: " << s.ToString();
+ info("[compaction checker] Compact the key in file (force compact
policy): {} finished, result: {}", iter.first,
+ s.ToString());
max_files_to_compact--;
continue;
}
@@ -133,11 +131,11 @@ void CompactionChecker::PickCompactionFilesForCf(const
engine::ColumnFamilyConfi
}
}
if (best_delete_ratio > 0.1 && !best_start_key.empty() &&
!best_stop_key.empty()) {
- LOG(INFO) << "[compaction checker] Going to compact the key in file: " <<
best_filename
- << ", delete ratio: " << best_delete_ratio;
+ info("[compaction checker] Going to compact the key in file: {}, delete
ratio: {}", best_filename,
+ best_delete_ratio);
auto s = storage_->Compact(cf, &best_start_key, &best_stop_key);
if (!s.ok()) {
- LOG(ERROR) << "[compaction checker] Failed to do compaction: " <<
s.ToString();
+ error("[compaction checker] Failed to do compaction: {}", s.ToString());
}
}
}
diff --git a/src/storage/event_listener.cc b/src/storage/event_listener.cc
index e5861d20a..1c8f48f37 100644
--- a/src/storage/event_listener.cc
+++ b/src/storage/event_listener.cc
@@ -80,58 +80,61 @@ bool IsDiskQuotaExceeded(const rocksdb::Status &bg_error) {
}
void EventListener::OnCompactionBegin([[maybe_unused]] rocksdb::DB *db, const
rocksdb::CompactionJobInfo &ci) {
- LOG(INFO) << "[event_listener/compaction_begin] column family: " <<
ci.cf_name << ", job_id: " << ci.job_id
- << ", compaction reason: " <<
rocksdb::GetCompactionReasonString(ci.compaction_reason)
- << ", output compression type: " <<
CompressType2String(ci.compression)
- << ", base input level(files): " << ci.base_input_level << "(" <<
ci.input_files.size() << ")"
- << ", output level(files): " << ci.output_level << "(" <<
ci.output_files.size() << ")"
- << ", input bytes: " << ci.stats.total_input_bytes << ", output
bytes:" << ci.stats.total_output_bytes
- << ", is_manual_compaction:" << (ci.stats.is_manual_compaction ?
"yes" : "no");
+ info(
+ "[event_listener/compaction_begin] column family: {}, job_id: {},
compaction reason: {}, output compression "
+ "type: {}, base input level(files): {}({}), output level(files): {}({}),
input bytes: {}, output bytes: {}, "
+ "is_manual_compaction: {}",
+ ci.cf_name, ci.job_id,
rocksdb::GetCompactionReasonString(ci.compaction_reason),
+ CompressType2String(ci.compression), ci.base_input_level,
ci.input_files.size(), ci.output_level,
+ ci.output_files.size(), ci.stats.total_input_bytes,
ci.stats.total_output_bytes,
+ ci.stats.is_manual_compaction ? "yes" : "no");
}
void EventListener::OnCompactionCompleted([[maybe_unused]] rocksdb::DB *db,
const rocksdb::CompactionJobInfo &ci) {
- LOG(INFO) << "[event_listener/compaction_completed] column family: " <<
ci.cf_name << ", job_id: " << ci.job_id
- << ", compaction reason: " <<
rocksdb::GetCompactionReasonString(ci.compaction_reason)
- << ", output compression type: " <<
CompressType2String(ci.compression)
- << ", base input level(files): " << ci.base_input_level << "(" <<
ci.input_files.size() << ")"
- << ", output level(files): " << ci.output_level << "(" <<
ci.output_files.size() << ")"
- << ", input bytes: " << ci.stats.total_input_bytes << ", output
bytes:" << ci.stats.total_output_bytes
- << ", is_manual_compaction:" << (ci.stats.is_manual_compaction ?
"yes" : "no")
- << ", elapsed(micro): " << ci.stats.elapsed_micros;
+ info(
+ "[event_listener/compaction_completed] column family: {}, job_id: {},
compaction reason: {}, output compression "
+ "type: {}, base input level(files): {}({}), output level(files): {}({}),
input bytes: {}, output bytes: {}, "
+ "is_manual_compaction: {}, elapsed(micro): {}",
+ ci.cf_name, ci.job_id,
rocksdb::GetCompactionReasonString(ci.compaction_reason),
+ CompressType2String(ci.compression), ci.base_input_level,
ci.input_files.size(), ci.output_level,
+ ci.output_files.size(), ci.stats.total_input_bytes,
ci.stats.total_output_bytes,
+ ci.stats.is_manual_compaction ? "yes" : "no", ci.stats.elapsed_micros);
storage_->RecordStat(engine::StatType::CompactionCount, 1);
storage_->CheckDBSizeLimit();
}
void EventListener::OnSubcompactionBegin(const rocksdb::SubcompactionJobInfo
&si) {
- LOG(INFO) << "[event_listener/subcompaction_begin] column family: " <<
si.cf_name << ", job_id: " << si.job_id
- << ", compaction reason: " <<
rocksdb::GetCompactionReasonString(si.compaction_reason)
- << ", output compression type: " <<
CompressType2String(si.compression);
+ info(
+ "[event_listener/subcompaction_begin] column family: {}, job_id: {},
compaction reason: {}, output compression "
+ "type: {}",
+ si.cf_name, si.job_id,
rocksdb::GetCompactionReasonString(si.compaction_reason),
+ CompressType2String(si.compression));
}
void EventListener::OnSubcompactionCompleted(const
rocksdb::SubcompactionJobInfo &si) {
- LOG(INFO) << "[event_listener/subcompaction_completed] column family: " <<
si.cf_name << ", job_id: " << si.job_id
- << ", compaction reason: " <<
rocksdb::GetCompactionReasonString(si.compaction_reason)
- << ", output compression type: " <<
CompressType2String(si.compression)
- << ", base input level(files): " << si.base_input_level << ",
output level(files): " << si.output_level
- << ", input bytes: " << si.stats.total_input_bytes << ", output
bytes:" << si.stats.total_output_bytes
- << ", is_manual_compaction:" << (si.stats.is_manual_compaction ?
"yes" : "no")
- << ", elapsed(micro): " << si.stats.elapsed_micros;
+ info(
+ "[event_listener/subcompaction_completed] column family: {}, job_id: {},
compaction reason: {}, output "
+ "compression type: {}, base input level(files): {}, output level(files):
{}, input bytes: {}, output bytes: {}, "
+ "is_manual_compaction: {}, elapsed(micro): {}",
+ si.cf_name, si.job_id,
rocksdb::GetCompactionReasonString(si.compaction_reason),
+ CompressType2String(si.compression), si.base_input_level,
si.output_level, si.stats.total_input_bytes,
+ si.stats.total_output_bytes, si.stats.is_manual_compaction ? "yes" :
"no", si.stats.elapsed_micros);
}
void EventListener::OnFlushBegin([[maybe_unused]] rocksdb::DB *db, const
rocksdb::FlushJobInfo &fi) {
- LOG(INFO) << "[event_listener/flush_begin] column family: " << fi.cf_name <<
", thread_id: " << fi.thread_id
- << ", job_id: " << fi.job_id << ", reason: " <<
rocksdb::GetFlushReasonString(fi.flush_reason);
+ info("[event_listener/flush_begin] column family: {}, thread_id: {}, job_id:
{}, reason: {}", fi.cf_name,
+ fi.thread_id, fi.job_id,
rocksdb::GetFlushReasonString(fi.flush_reason));
}
void EventListener::OnFlushCompleted([[maybe_unused]] rocksdb::DB *db, const
rocksdb::FlushJobInfo &fi) {
storage_->RecordStat(engine::StatType::FlushCount, 1);
storage_->CheckDBSizeLimit();
- LOG(INFO) << "[event_listener/flush_completed] column family: " <<
fi.cf_name << ", thread_id: " << fi.thread_id
- << ", job_id: " << fi.job_id << ", file: " << fi.file_path
- << ", reason: " << static_cast<int>(fi.flush_reason)
- << ", is_write_slowdown: " << (fi.triggered_writes_slowdown ?
"yes" : "no")
- << ", is_write_stall: " << (fi.triggered_writes_stop ? "yes" :
"no")
- << ", largest seqno: " << fi.largest_seqno << ", smallest seqno: "
<< fi.smallest_seqno;
+ info(
+ "[event_listener/flush_completed] column family: {}, thread_id: {},
job_id: {}, file: {}, reason: {}, "
+ "is_write_slowdown: {}, is_write_stall: {}, largest seqno: {}, smallest
seqno: {}",
+ fi.cf_name, fi.thread_id, fi.job_id, fi.file_path,
static_cast<int>(fi.flush_reason),
+ fi.triggered_writes_slowdown ? "yes" : "no", fi.triggered_writes_stop ?
"yes" : "no", fi.largest_seqno,
+ fi.smallest_seqno);
}
void EventListener::OnBackgroundError(rocksdb::BackgroundErrorReason reason,
rocksdb::Status *bg_error) {
@@ -149,7 +152,7 @@ void
EventListener::OnBackgroundError(rocksdb::BackgroundErrorReason reason, roc
auto s = storage_->GetDB()->GetLiveFiles(live_files, &manifest_size,
false /* flush_memtable */);
if (s.ok() && std::find(live_files.begin(), live_files.end(),
corrupt_sst) == live_files.end()) {
*bg_error = rocksdb::Status::OK();
- LOG(WARNING) << fmt::format(
+ warn(
"[event_listener/background_error] ignore no-fatal background
error about sst file, reason: {}, bg_error: "
"{}",
reason_str, error_str);
@@ -163,22 +166,23 @@ void
EventListener::OnBackgroundError(rocksdb::BackgroundErrorReason reason, roc
storage_->SetDBInRetryableIOError(true);
}
- LOG(ERROR) << fmt::format("[event_listener/background_error] reason: {},
bg_error: {}", reason_str, error_str);
+ error("[event_listener/background_error] reason: {}, bg_error: {}",
reason_str, error_str);
}
-void EventListener::OnTableFileDeleted(const rocksdb::TableFileDeletionInfo
&info) {
- LOG(INFO) << "[event_listener/table_file_deleted] db: " << info.db_name <<
", sst file: " << info.file_path
- << ", status: " << info.status.ToString();
+void EventListener::OnTableFileDeleted(const rocksdb::TableFileDeletionInfo
&table_info) {
+ info("[event_listener/table_file_deleted] db: {}, sst file: {}, status: {}",
table_info.db_name, table_info.file_path,
+ table_info.status.ToString());
}
void EventListener::OnStallConditionsChanged(const rocksdb::WriteStallInfo
&info) {
- LOG(WARNING) << "[event_listener/stall_cond_changed] column family: " <<
info.cf_name
- << " write stall condition was changed, from " <<
StallConditionType2String(info.condition.prev)
- << " to " << StallConditionType2String(info.condition.cur);
+ warn("[event_listener/stall_cond_changed] column family: {} write stall
condition was changed, from {} to {}",
+ info.cf_name, StallConditionType2String(info.condition.prev),
StallConditionType2String(info.condition.cur));
}
-void EventListener::OnTableFileCreated(const rocksdb::TableFileCreationInfo
&info) {
- LOG(INFO) << "[event_listener/table_file_created] column family: " <<
info.cf_name
- << ", file path: " << info.file_path << ", file size: " <<
info.file_size << ", job_id: " << info.job_id
- << ", reason: " << FileCreatedReason2String(info.reason) << ",
status: " << info.status.ToString();
+void EventListener::OnTableFileCreated(const rocksdb::TableFileCreationInfo
&table_info) {
+ info(
+ "[event_listener/table_file_created] column family: {}, file path: {},
file size: {}, job_id: {}, reason: {}, "
+ "status: {}",
+ table_info.cf_name, table_info.file_path, table_info.file_size,
table_info.job_id,
+ FileCreatedReason2String(table_info.reason),
table_info.status.ToString());
}
diff --git a/src/storage/iterator.h b/src/storage/iterator.h
index 4468f7ebd..1ffb52cae 100644
--- a/src/storage/iterator.h
+++ b/src/storage/iterator.h
@@ -142,9 +142,9 @@ class WALBatchExtractor : public
rocksdb::WriteBatch::Handler {
class WALIterator {
public:
explicit WALIterator(engine::Storage *storage, const SlotRange &slot_range)
- : storage_(storage), slot_range_(slot_range), extractor_(slot_range),
next_batch_seq_(0){};
+ : storage_(storage), slot_range_(slot_range), extractor_(slot_range),
next_batch_seq_(0) {}
explicit WALIterator(engine::Storage *storage, int slot = -1)
- : storage_(storage), slot_range_(slot, slot), extractor_(slot),
next_batch_seq_(0){};
+ : storage_(storage), slot_range_(slot, slot), extractor_(slot),
next_batch_seq_(0) {}
~WALIterator() = default;
bool Valid() const;
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index ebd194a23..93a9463d2 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -206,10 +206,10 @@ int RedisLogCommand(lua_State *lua) {
switch (level) {
case LL_VERBOSE: // also regard VERBOSE as INFO here since no VERBOSE
level
case LL_NOTICE:
- LOG(INFO) << "[Lua] " << log_message;
+ info("[Lua] {}", log_message);
break;
case LL_WARNING:
- LOG(WARNING) << "[Lua] " << log_message;
+ warn("[Lua] {}", log_message);
break;
}
return 0;
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index e59a0e602..fd8059192 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -390,16 +390,16 @@ Status Storage::Open(DBOpenMode mode) {
int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(end
- start).count();
if (!db_) {
- LOG(INFO) << "[storage] Failed to load the data from disk: " << duration
<< " ms";
+ info("[storage] Failed to load the data from disk: {} ms", duration);
return {Status::DBOpenErr};
}
- LOG(INFO) << "[storage] Success to load the data from disk: " << duration <<
" ms";
+ info("[storage] Success to load the data from disk: {} ms", duration);
return Status::OK();
}
Status Storage::CreateBackup(uint64_t *sequence_number) {
- LOG(INFO) << "[storage] Start to create new backup";
+ info("[storage] Start to create new backup");
std::lock_guard<std::mutex> lg(config_->backup_mu);
std::string task_backup_dir = config_->backup_dir;
@@ -411,28 +411,28 @@ Status Storage::CreateBackup(uint64_t *sequence_number) {
rocksdb::Checkpoint *checkpoint = nullptr;
rocksdb::Status s = rocksdb::Checkpoint::Create(db_.get(), &checkpoint);
if (!s.ok()) {
- LOG(WARNING) << "Failed to create checkpoint object for backup. Error: "
<< s.ToString();
+ warn("Failed to create checkpoint object for backup. Error: {}",
s.ToString());
return {Status::NotOK, s.ToString()};
}
std::unique_ptr<rocksdb::Checkpoint> checkpoint_guard(checkpoint);
s = checkpoint->CreateCheckpoint(tmpdir, config_->rocks_db.write_buffer_size
* MiB, sequence_number);
if (!s.ok()) {
- LOG(WARNING) << "Failed to create checkpoint (snapshot) for backup. Error:
" << s.ToString();
+ warn("Failed to create checkpoint (snapshot) for backup. Error: {}",
s.ToString());
return {Status::DBBackupErr, s.ToString()};
}
// 2) Rename tmp backup to real backup dir
if (s = rocksdb::DestroyDB(task_backup_dir, rocksdb::Options()); !s.ok()) {
- LOG(WARNING) << "[storage] Failed to clean old backup. Error: " <<
s.ToString();
+ warn("[storage] Failed to clean old backup. Error: {}", s.ToString());
return {Status::NotOK, s.ToString()};
}
if (s = env_->RenameFile(tmpdir, task_backup_dir); !s.ok()) {
- LOG(WARNING) << "[storage] Failed to rename tmp backup. Error: " <<
s.ToString();
+ warn("[storage] Failed to rename tmp backup. Error: {}", s.ToString());
// Just try best effort
if (s = rocksdb::DestroyDB(tmpdir, rocksdb::Options()); !s.ok()) {
- LOG(WARNING) << "[storage] Failed to clean tmp backup. Error: " <<
s.ToString();
+ warn("[storage] Failed to clean tmp backup. Error: {}", s.ToString());
}
return {Status::NotOK, s.ToString()};
@@ -441,7 +441,7 @@ Status Storage::CreateBackup(uint64_t *sequence_number) {
// 'backup_mu_' can guarantee 'backup_creating_time_secs_' is thread-safe
backup_creating_time_secs_ = util::GetTimeStamp<std::chrono::seconds>();
- LOG(INFO) << "[storage] Success to create new backup";
+ info("[storage] Success to create new backup");
return Status::OK();
}
@@ -464,15 +464,15 @@ Status Storage::RestoreFromBackup() {
auto s = backup_->RestoreDBFromLatestBackup(config_->db_dir,
config_->db_dir);
if (!s.ok()) {
- LOG(ERROR) << "[storage] Failed to restore database from the latest
backup. Error: " << s.ToString();
+ error("[storage] Failed to restore database from the latest backup. Error:
{}", s.ToString());
} else {
- LOG(INFO) << "[storage] Database was restored from the latest backup";
+ info("[storage] Database was restored from the latest backup");
}
// Reopen DB (should always try to reopen db even if restore failed,
replication SST file CRC check may use it)
auto s2 = Open();
if (!s2.IsOK()) {
- LOG(ERROR) << "[storage] Failed to reopen the database. Error: " <<
s2.Msg();
+ error("[storage] Failed to reopen the database. Error: {}", s2.Msg());
return {Status::DBOpenErr, s2.Msg()};
}
@@ -504,7 +504,7 @@ Status Storage::RestoreFromCheckpoint() {
s = env_->RenameFile(config_->db_dir, tmp_dir);
if (!s.ok()) {
if (auto s1 = Open(); !s1.IsOK()) {
- LOG(ERROR) << "[storage] Failed to reopen database. Error: " << s1.Msg();
+ error("[storage] Failed to reopen database. Error: {}", s1.Msg());
}
return {Status::NotOK, fmt::format("Failed to rename database directory
'{}' to '{}'. Error: {}", config_->db_dir,
tmp_dir, s.ToString())};
@@ -514,7 +514,7 @@ Status Storage::RestoreFromCheckpoint() {
if (s = env_->RenameFile(checkpoint_dir, config_->db_dir); !s.ok()) {
env_->RenameFile(tmp_dir, config_->db_dir);
if (auto s1 = Open(); !s1.IsOK()) {
- LOG(ERROR) << "[storage] Failed to reopen database. Error: " << s1.Msg();
+ error("[storage] Failed to reopen database. Error: {}", s1.Msg());
}
return {Status::NotOK, fmt::format("Failed to rename checkpoint directory
'{}' to '{}'. Error: {}", checkpoint_dir,
config_->db_dir, s.ToString())};
@@ -523,18 +523,18 @@ Status Storage::RestoreFromCheckpoint() {
// Open the new database, restore if replica fails to open
auto s2 = Open();
if (!s2.IsOK()) {
- LOG(WARNING) << "[storage] Failed to open master checkpoint. Error: " <<
s2.Msg();
+ warn("[storage] Failed to open master checkpoint. Error: {}", s2.Msg());
rocksdb::DestroyDB(config_->db_dir, rocksdb::Options());
env_->RenameFile(tmp_dir, config_->db_dir);
if (auto s1 = Open(); !s1.IsOK()) {
- LOG(ERROR) << "[storage] Failed to reopen database. Error: " << s1.Msg();
+ error("[storage] Failed to reopen database. Error: {}", s1.Msg());
}
return {Status::DBOpenErr, "Failed to open master checkpoint. Error: " +
s2.Msg()};
}
// Destroy the origin database
if (s = rocksdb::DestroyDB(tmp_dir, rocksdb::Options()); !s.ok()) {
- LOG(WARNING) << "[storage] Failed to destroy the origin database at '" <<
tmp_dir << "'. Error: " << s.ToString();
+ warn("[storage] Failed to destroy the origin database at '{}'. Error: {}",
tmp_dir, s.ToString());
}
return Status::OK();
}
@@ -559,7 +559,7 @@ void Storage::EmptyDB() {
auto s = rocksdb::DestroyDB(config_->db_dir, rocksdb::Options());
if (!s.ok()) {
- LOG(ERROR) << "[storage] Failed to destroy database. Error: " <<
s.ToString();
+ error("[storage] Failed to destroy database. Error: {}", s.ToString());
}
}
@@ -578,10 +578,10 @@ void Storage::PurgeOldBackups(uint32_t
num_backups_to_keep, uint32_t backup_max_
if (num_backups_to_keep == 0 || backup_expired) {
s = rocksdb::DestroyDB(task_backup_dir, rocksdb::Options());
if (s.ok()) {
- LOG(INFO) << "[storage] Succeeded cleaning old backup that was created
at " << backup_creating_time_secs_;
+ info("[storage] Succeeded cleaning old backup that was created at {}",
backup_creating_time_secs_);
} else {
- LOG(INFO) << "[storage] Failed cleaning old backup that was created at "
<< backup_creating_time_secs_
- << ". Error: " << s.ToString();
+ info("[storage] Failed cleaning old backup that was created at {}.
Error: {}", backup_creating_time_secs_,
+ s.ToString());
}
}
}
@@ -785,7 +785,7 @@ StatusOr<int> Storage::IngestSST(const std::string
&sst_dir, const rocksdb::Inge
sst_files = std::move(filtered_files);
if (sst_files.empty()) {
- LOG(WARNING) << "No SST files found in " << sst_dir;
+ warn("No SST files found in {}", sst_dir);
return 0;
}
@@ -929,10 +929,9 @@ void Storage::CheckDBSizeLimit() {
db_size_limit_reached_ = limit_reached;
if (db_size_limit_reached_) {
- LOG(WARNING) << "[storage] ENABLE db_size limit " << config_->max_db_size
<< " GB."
- << "Switch kvrocks to read-only mode.";
+ warn("[storage] ENABLE db_size limit {} GB. Switch kvrocks to read-only
mode.", config_->max_db_size);
} else {
- LOG(WARNING) << "[storage] DISABLE db_size limit. Switch kvrocks to
read-write mode.";
+ warn("[storage] DISABLE db_size limit. Switch kvrocks to read-write
mode.");
}
}
@@ -1012,7 +1011,7 @@ Status Storage::ShiftReplId(engine::Context &ctx) {
rand_str[i] = charset[distrib(gen)];
}
replid_ = std::move(rand_str);
- LOG(INFO) << "[replication] New replication id: " << replid_;
+ info("[replication] New replication id: {}", replid_);
// Write new replication id into db engine
return WriteToPropagateCF(ctx, kReplicationIdKey, replid_);
@@ -1089,7 +1088,7 @@ Status
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
rocksdb::Checkpoint *checkpoint = nullptr;
rocksdb::Status s = rocksdb::Checkpoint::Create(storage->db_.get(),
&checkpoint);
if (!s.ok()) {
- LOG(WARNING) << "Failed to create checkpoint object. Error: " <<
s.ToString();
+ warn("Failed to create checkpoint object. Error: {}", s.ToString());
return {Status::NotOK, s.ToString()};
}
@@ -1104,11 +1103,10 @@ Status
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
storage->checkpoint_info_.access_time_secs = now_secs;
storage->checkpoint_info_.latest_seq = checkpoint_latest_seq;
if (!s.ok()) {
- LOG(WARNING) << "[storage] Failed to create checkpoint (snapshot).
Error: " << s.ToString();
+ warn("[storage] Failed to create checkpoint (snapshot). Error: {}",
s.ToString());
return {Status::NotOK, s.ToString()};
}
-
- LOG(INFO) << "[storage] Create checkpoint successfully";
+ info("[storage] Create checkpoint successfully");
} else {
// Replicas can share checkpoint to replication if the checkpoint existing
time is less than a half of WAL TTL.
int64_t can_shared_time_secs = storage->config_->rocks_db.wal_ttl_seconds
/ 2;
@@ -1117,7 +1115,7 @@ Status
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
if (now_secs - storage->GetCheckpointCreateTimeSecs() >
can_shared_time_secs) {
- LOG(WARNING) << "[storage] Can't use current checkpoint, waiting next
checkpoint";
+ warn("[storage] Can't use current checkpoint, waiting next checkpoint");
return {Status::NotOK, "Can't use current checkpoint, waiting for next
checkpoint"};
}
@@ -1125,10 +1123,10 @@ Status
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
// or the slave will fall into the full sync loop since it won't create
new checkpoint.
auto s = storage->InWALBoundary(storage->checkpoint_info_.latest_seq);
if (!s.IsOK()) {
- LOG(WARNING) << "[storage] Can't use current checkpoint, error: " <<
s.Msg();
+ warn("[storage] Can't use current checkpoint, error: {}", s.Msg());
return {Status::NotOK, fmt::format("Can't use current checkpoint, error:
{}", s.Msg())};
}
- LOG(INFO) << "[storage] Using current existing checkpoint";
+ info("[storage] Using current existing checkpoint");
}
ulm.unlock();
@@ -1192,9 +1190,9 @@ Status
Storage::ReplDataManager::CleanInvalidFiles(Storage *storage, const std::
auto s = storage->env_->DeleteFile(dir + "/" + *it);
if (!s.ok()) {
ret = Status(Status::NotOK, s.ToString());
- LOG(INFO) << "[storage] Failed to delete invalid file " << *it << " of
master checkpoint";
+ info("[storage] Failed to delete invalid file {} of master checkpoint",
*it);
} else {
- LOG(INFO) << "[storage] Succeed deleting invalid file " << *it << " of
master checkpoint";
+ info("[storage] Succeed deleting invalid file {} of master checkpoint",
*it);
}
}
return ret;
@@ -1204,14 +1202,14 @@ int Storage::ReplDataManager::OpenDataFile(Storage
*storage, const std::string &
std::string abs_path = storage->config_->checkpoint_dir + "/" + repl_file;
auto s = storage->env_->FileExists(abs_path);
if (!s.ok()) {
- LOG(ERROR) << "[storage] Data file [" << abs_path << "] not found";
+ error("[storage] Data file [{}] not found", abs_path);
return NullFD;
}
storage->env_->GetFileSize(abs_path, file_size);
auto rv = open(abs_path.c_str(), O_RDONLY);
if (rv < 0) {
- LOG(ERROR) << "[storage] Failed to open file: " << strerror(errno);
+ error("[storage] Failed to open file: {}", strerror(errno));
}
return rv;
@@ -1220,7 +1218,7 @@ int Storage::ReplDataManager::OpenDataFile(Storage
*storage, const std::string &
Status Storage::ReplDataManager::ParseMetaAndSave(Storage *storage,
rocksdb::BackupID meta_id, evbuffer *evbuf,
Storage::ReplDataManager::MetaInfo *meta) {
auto meta_file = "meta/" + std::to_string(meta_id);
- DLOG(INFO) << "[meta] id: " << meta_id;
+ debug("[meta] id: {}", meta_id);
// Save the meta to tmp file
auto wf = NewTmpFile(storage, storage->config_->backup_sync_dir, meta_file);
@@ -1230,28 +1228,27 @@ Status
Storage::ReplDataManager::ParseMetaAndSave(Storage *storage, rocksdb::Bac
// timestamp;
UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_LF);
- DLOG(INFO) << "[meta] timestamp: " << line.get();
+ debug("[meta] timestamp: {}", line.get());
meta->timestamp = std::strtoll(line.get(), nullptr, 10);
// sequence
line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
- DLOG(INFO) << "[meta] seq:" << line.get();
+ debug("[meta] seq: {}", line.get());
meta->seq = std::strtoull(line.get(), nullptr, 10);
// optional metadata
line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
if (strncmp(line.get(), "metadata", 8) == 0) {
- DLOG(INFO) << "[meta] meta: " << line.get();
+ debug("[meta] meta: {}", line.get());
meta->meta_data = std::string(line.get(), line.length);
line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
}
- DLOG(INFO) << "[meta] file count: " << line.get();
+ debug("[meta] file count: {}", line.get());
// file list
while (true) {
line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
if (!line) {
break;
}
-
- DLOG(INFO) << "[meta] file info: " << line.get();
+ debug("[meta] file info: {}", line.get());
auto cptr = line.get();
while (*(cptr++) != ' ') {
}
@@ -1274,7 +1271,7 @@ Status MkdirRecursively(rocksdb::Env *env, const
std::string &dir) {
for (auto pos = dir.find('/', 1); pos != std::string::npos; pos =
dir.find('/', pos + 1)) {
parent = dir.substr(0, pos);
if (auto s = env->CreateDirIfMissing(parent); !s.ok()) {
- LOG(ERROR) << "[storage] Failed to create directory '" << parent << "'
recursively. Error: " << s.ToString();
+ error("[storage] Failed to create directory '{}' recursively. Error:
{}", parent, s.ToString());
return {Status::NotOK};
}
}
@@ -1289,7 +1286,7 @@ std::unique_ptr<rocksdb::WritableFile>
Storage::ReplDataManager::NewTmpFile(Stor
std::string tmp_file = dir + "/" + repl_file + ".tmp";
auto s = storage->env_->FileExists(tmp_file);
if (s.ok()) {
- LOG(ERROR) << "[storage] Data file exists, override";
+ error("[storage] Data file exists, override");
storage->env_->DeleteFile(tmp_file);
}
@@ -1302,7 +1299,7 @@ std::unique_ptr<rocksdb::WritableFile>
Storage::ReplDataManager::NewTmpFile(Stor
std::unique_ptr<rocksdb::WritableFile> wf;
s = storage->env_->NewWritableFile(tmp_file, &wf, rocksdb::EnvOptions());
if (!s.ok()) {
- LOG(ERROR) << "[storage] Failed to create data file '" << tmp_file << "'.
Error: " << s.ToString();
+ error("[storage] Failed to create data file '{}'. Error: {}", tmp_file,
s.ToString());
return nullptr;
}