This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 49315f56e fix(config): Setting rocksdb.level0_slowdown_writes_trigger
to 0 disables it (#3034)
49315f56e is described below
commit 49315f56e13123f974e10be132c0993a37a3eca1
Author: Ryan Liao <[email protected]>
AuthorDate: Thu Jul 3 23:17:26 2025 -0400
fix(config): Setting rocksdb.level0_slowdown_writes_trigger to 0 disables
it (#3034)
Co-authored-by: Aleks Lozovyuk <[email protected]>
---
kvrocks.conf | 12 +-
src/config/config.cc | 636 ++++++++++++++++++++++---------------------
src/storage/storage.cc | 10 +-
src/storage/storage.h | 1 +
tests/cppunit/config_test.cc | 43 +++
5 files changed, 389 insertions(+), 313 deletions(-)
diff --git a/kvrocks.conf b/kvrocks.conf
index 96e9b05bc..b69e2640a 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -890,14 +890,18 @@ rocksdb.delayed_write_rate 0
# Default: no
rocksdb.enable_pipelined_write no
-# Soft limit on number of level-0 files. We start slowing down writes at this
-# point. A value <0 means that no writing slow down will be triggered by
-# number of files in level-0.
+# Soft limit on number of level-0 files. We slow down writes at this point.
+# A value of 0 means that no writing slowdown will be triggered by number
+# of files in level-0. If this value is smaller than
+# rocksdb.level0_file_num_compaction_trigger, this will be set to
+# rocksdb.level0_file_num_compaction_trigger instead.
#
# Default: 20
rocksdb.level0_slowdown_writes_trigger 20
-# Maximum number of level-0 files. We stop writes at this point.
+# Maximum number of level-0 files. We stop writes at this point. If this value
+# is smaller than rocksdb.level0_slowdown_writes_trigger, this will be set to
+# rocksdb.level0_slowdown_writes_trigger instead.
#
# Default: 40
rocksdb.level0_stop_writes_trigger 40
diff --git a/src/config/config.cc b/src/config/config.cc
index 1e561fe6d..874e36886 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -281,7 +281,7 @@ Config::Config() {
{"rocksdb.compaction_readahead_size", false,
new IntField(&rocks_db.compaction_readahead_size, 2 * MiB, 0, 64 *
MiB)},
{"rocksdb.level0_slowdown_writes_trigger", false,
- new IntField(&rocks_db.level0_slowdown_writes_trigger, 20, 1, 1024)},
+ new IntField(&rocks_db.level0_slowdown_writes_trigger, 20, 0, 1024)},
{"rocksdb.level0_stop_writes_trigger", false, new
IntField(&rocks_db.level0_stop_writes_trigger, 40, 1, 1024)},
{"rocksdb.level0_file_num_compaction_trigger", false,
new IntField(&rocks_db.level0_file_num_compaction_trigger, 4, 1, 1024)},
@@ -468,318 +468,340 @@ void Config::initFieldCallback() {
return Status::OK();
};
- std::map<std::string, CallbackFn> callbacks = {
- {"workers",
- [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]]
const std::string &v) -> Status {
- if (!srv) return Status::OK();
- srv->AdjustWorkerThreads();
- return Status::OK();
- }},
- {"dir",
- [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string
&k,
- [[maybe_unused]] const std::string &v) -> Status {
- db_dir = dir + "/db";
- if (log_dir.empty()) log_dir = dir + ",stdout";
- checkpoint_dir = dir + "/checkpoint";
- sync_checkpoint_dir = dir + "/sync_checkpoint";
- backup_sync_dir = dir + "/backup_for_sync";
- if (backup_dir == kDefaultBackupDir) backup_dir = dir + "/backup";
- if (pidfile == kDefaultPidfile) pidfile = dir + "/kvrocks.pid";
- return Status::OK();
- }},
- {"backup-dir",
- [this](Server *srv, [[maybe_unused]] const std::string &k, const
std::string &v) -> Status {
- std::string previous_backup;
- {
- // Note: currently, backup_mu_ may block by backing up or purging,
- // the command may wait for seconds.
- std::lock_guard<std::mutex> lg(this->backup_mu);
- previous_backup = std::move(backup_dir);
- backup_dir = v;
- }
- if (!previous_backup.empty() && srv != nullptr && !srv->IsLoading()) {
- // info() should be called after log is initialized and server is
loaded.
- info("change backup dir from {} to {}", previous_backup, v);
- }
- return Status::OK();
- }},
- {"cluster-enabled",
- [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string
&k,
- [[maybe_unused]] const std::string &v) -> Status {
- if (cluster_enabled) slot_id_encoded = true;
- return Status::OK();
- }},
- {"bind",
- [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string
&k, const std::string &v) -> Status {
- std::vector<std::string> args = util::Split(v, " \t");
- binds = std::move(args);
- return Status::OK();
- }},
- {"maxclients",
- [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]]
const std::string &v) -> Status {
- if (!srv) return Status::OK();
- srv->AdjustOpenFilesLimit();
- return Status::OK();
- }},
- {"slaveof", replicaof_cb},
- {"replicaof", replicaof_cb},
- {"profiling-sample-commands",
- [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string
&k, const std::string &v) -> Status {
- std::vector<std::string> cmds = util::Split(v, ",");
- profiling_sample_all_commands = false;
- profiling_sample_commands.clear();
- for (auto const &cmd : cmds) {
- if (cmd == "*") {
- profiling_sample_all_commands = true;
+ std::map<std::string, CallbackFn> callbacks =
+ {
+ {"workers",
+ [](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ srv->AdjustWorkerThreads();
+ return Status::OK();
+ }},
+ {"dir",
+ [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const
std::string &k,
+ [[maybe_unused]] const std::string &v) -> Status {
+ db_dir = dir + "/db";
+ if (log_dir.empty()) log_dir = dir + ",stdout";
+ checkpoint_dir = dir + "/checkpoint";
+ sync_checkpoint_dir = dir + "/sync_checkpoint";
+ backup_sync_dir = dir + "/backup_for_sync";
+ if (backup_dir == kDefaultBackupDir) backup_dir = dir + "/backup";
+ if (pidfile == kDefaultPidfile) pidfile = dir + "/kvrocks.pid";
+ return Status::OK();
+ }},
+ {"backup-dir",
+ [this](Server *srv, [[maybe_unused]] const std::string &k, const
std::string &v) -> Status {
+ std::string previous_backup;
+ {
+ // Note: currently, backup_mu_ may block by backing up or
purging,
+ // the command may wait for seconds.
+ std::lock_guard<std::mutex> lg(this->backup_mu);
+ previous_backup = std::move(backup_dir);
+ backup_dir = v;
+ }
+ if (!previous_backup.empty() && srv != nullptr &&
!srv->IsLoading()) {
+ // info() should be called after log is initialized and server
is loaded.
+ info("change backup dir from {} to {}", previous_backup, v);
+ }
+ return Status::OK();
+ }},
+ {"cluster-enabled",
+ [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const
std::string &k,
+ [[maybe_unused]] const std::string &v) -> Status {
+ if (cluster_enabled) slot_id_encoded = true;
+ return Status::OK();
+ }},
+ {"bind",
+ [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const
std::string &k, const std::string &v) -> Status {
+ std::vector<std::string> args = util::Split(v, " \t");
+ binds = std::move(args);
+ return Status::OK();
+ }},
+ {"maxclients",
+ [](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ srv->AdjustOpenFilesLimit();
+ return Status::OK();
+ }},
+ {"slaveof", replicaof_cb},
+ {"replicaof", replicaof_cb},
+ {"profiling-sample-commands",
+ [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const
std::string &k, const std::string &v) -> Status {
+ std::vector<std::string> cmds = util::Split(v, ",");
+ profiling_sample_all_commands = false;
profiling_sample_commands.clear();
+ for (auto const &cmd : cmds) {
+ if (cmd == "*") {
+ profiling_sample_all_commands = true;
+ profiling_sample_commands.clear();
+ return Status::OK();
+ }
+ if (!redis::CommandTable::IsExists(cmd)) {
+ return {Status::NotOK, cmd + " is not Kvrocks supported
command"};
+ }
+ // profiling_sample_commands use command's original name,
regardless of rename-command directive
+ profiling_sample_commands.insert(cmd);
+ }
return Status::OK();
- }
- if (!redis::CommandTable::IsExists(cmd)) {
- return {Status::NotOK, cmd + " is not Kvrocks supported command"};
- }
- // profiling_sample_commands use command's original name,
regardless of rename-command directive
- profiling_sample_commands.insert(cmd);
- }
- return Status::OK();
- }},
- {"slowlog-max-len",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
- srv->GetSlowLog()->SetMaxEntries(slowlog_max_len);
- return Status::OK();
- }},
- {"slowlog-dump-logfile-level",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
- srv->GetSlowLog()->SetDumpToLogfileLevel(slowlog_dump_logfile_level);
- return Status::OK();
- }},
- {"max-db-size",
- [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]]
const std::string &v) -> Status {
- if (!srv) return Status::OK();
- srv->storage->CheckDBSizeLimit();
- return Status::OK();
- }},
- {"max-io-mb",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
- srv->storage->SetIORateLimit(max_io_mb);
- return Status::OK();
- }},
- {"profiling-sample-record-max-len",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
- srv->GetPerfLog()->SetMaxEntries(profiling_sample_record_max_len);
- return Status::OK();
- }},
- {"migrate-speed",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
- if (cluster_enabled)
srv->slot_migrator->SetMaxMigrationSpeed(migrate_speed);
- return Status::OK();
- }},
- {"migrate-pipeline-size",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
- if (cluster_enabled)
srv->slot_migrator->SetMaxPipelineSize(pipeline_size);
- return Status::OK();
- }},
- {"migrate-sequence-gap",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
- if (cluster_enabled)
srv->slot_migrator->SetSequenceGapLimit(sequence_gap);
- return Status::OK();
- }},
- {"migrate-batch-rate-limit-mb",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
-
srv->slot_migrator->SetMigrateBatchRateLimit(migrate_batch_rate_limit_mb * MiB);
- return Status::OK();
- }},
- {"migrate-batch-size-kb",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
- srv->slot_migrator->SetMigrateBatchSize(migrate_batch_size_kb * KiB);
- return Status::OK();
- }},
- {"log-level",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
- spdlog::set_level(log_level);
- return Status::OK();
- }},
- {"persist-cluster-nodes-enabled",
- [this](Server *srv, [[maybe_unused]] const std::string &k, const
std::string &v) -> Status {
- if (!srv || !cluster_enabled) return Status::OK();
- auto nodes_file_path = NodesFilePath();
- if (v == "yes") {
- return srv->cluster->DumpClusterNodes(nodes_file_path);
- }
- // Remove the cluster nodes file to avoid stale cluster nodes info
- remove(nodes_file_path.data());
- return Status::OK();
- }},
- {"repl-namespace-enabled",
- [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]]
const std::string &v) -> Status {
- if (!srv) return Status::OK();
- return srv->GetNamespace()->LoadAndRewrite();
- }},
+ }},
+ {"slowlog-max-len",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ srv->GetSlowLog()->SetMaxEntries(slowlog_max_len);
+ return Status::OK();
+ }},
+ {"slowlog-dump-logfile-level",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+
srv->GetSlowLog()->SetDumpToLogfileLevel(slowlog_dump_logfile_level);
+ return Status::OK();
+ }},
+ {"max-db-size",
+ [](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ srv->storage->CheckDBSizeLimit();
+ return Status::OK();
+ }},
+ {"max-io-mb",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ srv->storage->SetIORateLimit(max_io_mb);
+ return Status::OK();
+ }},
+ {"profiling-sample-record-max-len",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ srv->GetPerfLog()->SetMaxEntries(profiling_sample_record_max_len);
+ return Status::OK();
+ }},
+ {"migrate-speed",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ if (cluster_enabled)
srv->slot_migrator->SetMaxMigrationSpeed(migrate_speed);
+ return Status::OK();
+ }},
+ {"migrate-pipeline-size",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ if (cluster_enabled)
srv->slot_migrator->SetMaxPipelineSize(pipeline_size);
+ return Status::OK();
+ }},
+ {"migrate-sequence-gap",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ if (cluster_enabled)
srv->slot_migrator->SetSequenceGapLimit(sequence_gap);
+ return Status::OK();
+ }},
+ {"migrate-batch-rate-limit-mb",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+
srv->slot_migrator->SetMigrateBatchRateLimit(migrate_batch_rate_limit_mb * MiB);
+ return Status::OK();
+ }},
+ {"migrate-batch-size-kb",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ srv->slot_migrator->SetMigrateBatchSize(migrate_batch_size_kb *
KiB);
+ return Status::OK();
+ }},
+ {"log-level",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ spdlog::set_level(log_level);
+ return Status::OK();
+ }},
+ {"persist-cluster-nodes-enabled",
+ [this](Server *srv, [[maybe_unused]] const std::string &k, const
std::string &v) -> Status {
+ if (!srv || !cluster_enabled) return Status::OK();
+ auto nodes_file_path = NodesFilePath();
+ if (v == "yes") {
+ return srv->cluster->DumpClusterNodes(nodes_file_path);
+ }
+ // Remove the cluster nodes file to avoid stale cluster nodes info
+ remove(nodes_file_path.data());
+ return Status::OK();
+ }},
+ {"repl-namespace-enabled",
+ [](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ return srv->GetNamespace()->LoadAndRewrite();
+ }},
- {"rocksdb.target_file_size_base",
- [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
- if (!srv) return Status::OK();
- return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
-
std::to_string(rocks_db.target_file_size_base * MiB));
- }},
- {"rocksdb.write_buffer_size",
- [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
- if (!srv) return Status::OK();
- return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
-
std::to_string(rocks_db.write_buffer_size * MiB));
- }},
- {"rocksdb.disable_auto_compactions",
- [](Server *srv, const std::string &k, const std::string &v) -> Status {
- if (!srv) return Status::OK();
- std::string disable_auto_compactions = v == "yes" ? "true" : "false";
- return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
disable_auto_compactions);
- }},
- {"rocksdb.max_total_wal_size",
- [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
- if (!srv) return Status::OK();
- return srv->storage->SetDBOption(TrimRocksDbPrefix(k),
std::to_string(rocks_db.max_total_wal_size * MiB));
- }},
- {"rocksdb.enable_blob_files",
- [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
- if (!srv) return Status::OK();
- std::string enable_blob_files = rocks_db.enable_blob_files ? "true" :
"false";
- return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
enable_blob_files);
- }},
- {"rocksdb.min_blob_size",
- [this](Server *srv, const std::string &k, const std::string &v) ->
Status {
- if (!srv) return Status::OK();
- if (!rocks_db.enable_blob_files) {
- return {Status::NotOK, errBlobDbNotEnabled};
- }
- return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
- }},
- {"rocksdb.blob_file_size",
- [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
- if (!srv) return Status::OK();
- if (!rocks_db.enable_blob_files) {
- return {Status::NotOK, errBlobDbNotEnabled};
- }
- return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
-
std::to_string(rocks_db.blob_file_size));
- }},
- {"rocksdb.enable_blob_garbage_collection",
- [this](Server *srv, const std::string &k, const std::string &v) ->
Status {
- if (!srv) return Status::OK();
- if (!rocks_db.enable_blob_files) {
- return {Status::NotOK, errBlobDbNotEnabled};
- }
- std::string enable_blob_garbage_collection = v == "yes" ? "true" :
"false";
- return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
enable_blob_garbage_collection);
- }},
- {"rocksdb.blob_garbage_collection_age_cutoff",
- [this](Server *srv, const std::string &k, const std::string &v) ->
Status {
- if (!srv) return Status::OK();
- if (!rocks_db.enable_blob_files) {
- return {Status::NotOK, errBlobDbNotEnabled};
- }
- int val = 0;
- auto parse_result = ParseInt<int>(v, 10);
- if (!parse_result) {
- return {Status::NotOK, "Illegal blob_garbage_collection_age_cutoff
value."};
- }
- val = *parse_result;
- if (val < 0 || val > 100) {
- return {Status::NotOK, "blob_garbage_collection_age_cutoff must >=
0 and <= 100."};
- }
+ {"rocksdb.target_file_size_base",
+ [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
+
std::to_string(rocks_db.target_file_size_base * MiB));
+ }},
+ {"rocksdb.write_buffer_size",
+ [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
+
std::to_string(rocks_db.write_buffer_size * MiB));
+ }},
+ {"rocksdb.disable_auto_compactions",
+ [](Server *srv, const std::string &k, const std::string &v) ->
Status {
+ if (!srv) return Status::OK();
+ std::string disable_auto_compactions = v == "yes" ? "true" :
"false";
+ return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
disable_auto_compactions);
+ }},
+ {"rocksdb.max_total_wal_size",
+ [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ return srv->storage->SetDBOption(TrimRocksDbPrefix(k),
std::to_string(rocks_db.max_total_wal_size * MiB));
+ }},
+ {"rocksdb.enable_blob_files",
+ [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ std::string enable_blob_files = rocks_db.enable_blob_files ?
"true" : "false";
+ return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
enable_blob_files);
+ }},
+ {"rocksdb.min_blob_size",
+ [this](Server *srv, const std::string &k, const std::string &v) ->
Status {
+ if (!srv) return Status::OK();
+ if (!rocks_db.enable_blob_files) {
+ return {Status::NotOK, errBlobDbNotEnabled};
+ }
+ return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
+ }},
+ {"rocksdb.blob_file_size",
+ [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ if (!rocks_db.enable_blob_files) {
+ return {Status::NotOK, errBlobDbNotEnabled};
+ }
+ return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
+
std::to_string(rocks_db.blob_file_size));
+ }},
+ {"rocksdb.enable_blob_garbage_collection",
+ [this](Server *srv, const std::string &k, const std::string &v) ->
Status {
+ if (!srv) return Status::OK();
+ if (!rocks_db.enable_blob_files) {
+ return {Status::NotOK, errBlobDbNotEnabled};
+ }
+ std::string enable_blob_garbage_collection = v == "yes" ? "true"
: "false";
+ return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
enable_blob_garbage_collection);
+ }},
+ {"rocksdb.blob_garbage_collection_age_cutoff",
+ [this](Server *srv, const std::string &k, const std::string &v) ->
Status {
+ if (!srv) return Status::OK();
+ if (!rocks_db.enable_blob_files) {
+ return {Status::NotOK, errBlobDbNotEnabled};
+ }
+ int val = 0;
+ auto parse_result = ParseInt<int>(v, 10);
+ if (!parse_result) {
+ return {Status::NotOK, "Illegal
blob_garbage_collection_age_cutoff value."};
+ }
+ val = *parse_result;
+ if (val < 0 || val > 100) {
+ return {Status::NotOK, "blob_garbage_collection_age_cutoff must
>= 0 and <= 100."};
+ }
- double cutoff = val / 100.0;
- return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
std::to_string(cutoff));
- }},
- {"rocksdb.level_compaction_dynamic_level_bytes",
- [](Server *srv, const std::string &k, const std::string &v) -> Status {
- if (!srv) return Status::OK();
- std::string level_compaction_dynamic_level_bytes = v == "yes" ?
"true" : "false";
- return srv->storage->SetDBOption(TrimRocksDbPrefix(k),
level_compaction_dynamic_level_bytes);
- }},
- {"rocksdb.max_bytes_for_level_base",
- [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
- if (!srv) return Status::OK();
- if (!rocks_db.level_compaction_dynamic_level_bytes) {
- return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet};
- }
- return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
-
std::to_string(rocks_db.max_bytes_for_level_base));
- }},
- {"rocksdb.max_bytes_for_level_multiplier",
- [this](Server *srv, const std::string &k, const std::string &v) ->
Status {
- if (!srv) return Status::OK();
- if (!rocks_db.level_compaction_dynamic_level_bytes) {
- return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet};
- }
- return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
- }},
- {"rocksdb.sst_file_delete_rate_bytes_per_sec",
- [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
- if (!srv) return Status::OK();
-
srv->storage->SetSstFileDeleteRateBytesPerSecond(rocks_db.sst_file_delete_rate_bytes_per_sec);
- return Status::OK();
- }},
- {"rocksdb.max_open_files", set_db_option_cb},
- {"rocksdb.stats_dump_period_sec", set_db_option_cb},
- {"rocksdb.delayed_write_rate", set_db_option_cb},
- {"rocksdb.max_background_compactions", set_db_option_cb},
- {"rocksdb.max_background_flushes", set_db_option_cb},
- {"rocksdb.max_subcompactions", set_db_option_cb},
- {"rocksdb.compaction_readahead_size", set_db_option_cb},
- {"rocksdb.max_background_jobs", set_db_option_cb},
-
- {"rocksdb.max_compaction_bytes", set_cf_option_cb},
- {"rocksdb.max_write_buffer_number", set_cf_option_cb},
- {"rocksdb.min_write_buffer_number_to_merge", set_cf_option_cb},
- {"rocksdb.level0_slowdown_writes_trigger", set_cf_option_cb},
- {"rocksdb.level0_stop_writes_trigger", set_cf_option_cb},
- {"rocksdb.level0_file_num_compaction_trigger", set_cf_option_cb},
- {"rocksdb.compression", set_compression_type_cb},
- {"rocksdb.compression_start_level", set_compression_start_level_cb},
+ double cutoff = val / 100.0;
+ return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
std::to_string(cutoff));
+ }},
+ {"rocksdb.level_compaction_dynamic_level_bytes",
+ [](Server *srv, const std::string &k, const std::string &v) ->
Status {
+ if (!srv) return Status::OK();
+ std::string level_compaction_dynamic_level_bytes = v == "yes" ?
"true" : "false";
+ return srv->storage->SetDBOption(TrimRocksDbPrefix(k),
level_compaction_dynamic_level_bytes);
+ }},
+ {"rocksdb.max_bytes_for_level_base",
+ [this](Server *srv, const std::string &k, [[maybe_unused]] const
std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ if (!rocks_db.level_compaction_dynamic_level_bytes) {
+ return {Status::NotOK,
errLevelCompactionDynamicLevelBytesNotSet};
+ }
+ return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
+
std::to_string(rocks_db.max_bytes_for_level_base));
+ }},
+ {"rocksdb.max_bytes_for_level_multiplier",
+ [this](Server *srv, const std::string &k, const std::string &v) ->
Status {
+ if (!srv) return Status::OK();
+ if (!rocks_db.level_compaction_dynamic_level_bytes) {
+ return {Status::NotOK,
errLevelCompactionDynamicLevelBytesNotSet};
+ }
+ return
srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
+ }},
+ {"rocksdb.sst_file_delete_rate_bytes_per_sec",
+ [this](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+
srv->storage->SetSstFileDeleteRateBytesPerSecond(rocks_db.sst_file_delete_rate_bytes_per_sec);
+ return Status::OK();
+ }},
+ {"rocksdb.level0_slowdown_writes_trigger",
+ [this, &set_cf_option_cb](Server *srv, const std::string &k,
+ [[maybe_unused]] const std::string &v) ->
Status {
+ if (rocks_db.level0_slowdown_writes_trigger == 0) {
+ return set_cf_option_cb(srv, k,
std::to_string(rocks_db.level0_stop_writes_trigger));
+ }
+
+ return set_cf_option_cb(srv, k, v);
+ }},
+ {"rocksdb.level0_stop_writes_trigger",
+ [this](Server *srv, const std::string &k, const std::string &v) ->
Status {
+ if (!srv) return Status::OK();
+
+ std::unordered_map<std::string, std::string> options = {
+ {TrimRocksDbPrefix(k), v},
+ };
+
+ if (rocks_db.level0_slowdown_writes_trigger == 0) {
+ options["level0_slowdown_writes_trigger"] = v;
+ }
+
+ return srv->storage->SetOptionForAllColumnFamilies(options);
+ }},
+ {"rocksdb.max_open_files", set_db_option_cb},
+ {"rocksdb.stats_dump_period_sec", set_db_option_cb},
+ {"rocksdb.delayed_write_rate", set_db_option_cb},
+ {"rocksdb.max_background_compactions", set_db_option_cb},
+ {"rocksdb.max_background_flushes", set_db_option_cb},
+ {"rocksdb.max_subcompactions", set_db_option_cb},
+ {"rocksdb.compaction_readahead_size", set_db_option_cb},
+ {"rocksdb.max_background_jobs", set_db_option_cb},
+
+ {"rocksdb.max_compaction_bytes", set_cf_option_cb},
+ {"rocksdb.max_write_buffer_number", set_cf_option_cb},
+ {"rocksdb.min_write_buffer_number_to_merge", set_cf_option_cb},
+ {"rocksdb.level0_file_num_compaction_trigger", set_cf_option_cb},
+ {"rocksdb.compression", set_compression_type_cb},
+ {"rocksdb.compression_start_level", set_compression_start_level_cb},
#ifdef ENABLE_OPENSSL
- {"tls-cert-file", set_tls_option},
- {"tls-key-file", set_tls_option},
- {"tls-key-file-pass", set_tls_option},
- {"tls-ca-cert-file", set_tls_option},
- {"tls-ca-cert-dir", set_tls_option},
- {"tls-protocols", set_tls_option},
- {"tls-auth-clients", set_tls_option},
- {"tls-ciphers", set_tls_option},
- {"tls-ciphersuites", set_tls_option},
- {"tls-prefer-server-ciphers", set_tls_option},
- {"tls-session-caching", set_tls_option},
- {"tls-session-cache-size", set_tls_option},
- {"tls-session-cache-timeout", set_tls_option},
+ {"tls-cert-file", set_tls_option},
+ {"tls-key-file", set_tls_option},
+ {"tls-key-file-pass", set_tls_option},
+ {"tls-ca-cert-file", set_tls_option},
+ {"tls-ca-cert-dir", set_tls_option},
+ {"tls-protocols", set_tls_option},
+ {"tls-auth-clients", set_tls_option},
+ {"tls-ciphers", set_tls_option},
+ {"tls-ciphersuites", set_tls_option},
+ {"tls-prefer-server-ciphers", set_tls_option},
+ {"tls-session-caching", set_tls_option},
+ {"tls-session-cache-size", set_tls_option},
+ {"tls-session-cache-timeout", set_tls_option},
#endif
- {"histogram-bucket-boundaries",
- [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string
&k, const std::string &v) -> Status {
- std::vector<std::string> buckets = util::Split(v, ",");
- histogram_bucket_boundaries.clear();
- if (buckets.size() < 1) {
- return Status::OK();
- }
- for (const auto &bucket_val : buckets) {
- auto parse_result = ParseFloat<double>(bucket_val);
- if (!parse_result) {
- return {Status::NotOK, "The values in the bucket list must be
double or integer."};
- }
- histogram_bucket_boundaries.push_back(*parse_result);
- }
- if (!std::is_sorted(histogram_bucket_boundaries.begin(),
histogram_bucket_boundaries.end())) {
- return {Status::NotOK, "The values for the histogram must be
sorted."};
- }
- return Status::OK();
- }},
- };
+ {"histogram-bucket-boundaries",
+ [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const
std::string &k, const std::string &v) -> Status {
+ std::vector<std::string> buckets = util::Split(v, ",");
+ histogram_bucket_boundaries.clear();
+ if (buckets.size() < 1) {
+ return Status::OK();
+ }
+ for (const auto &bucket_val : buckets) {
+ auto parse_result = ParseFloat<double>(bucket_val);
+ if (!parse_result) {
+ return {Status::NotOK, "The values in the bucket list must be
double or integer."};
+ }
+ histogram_bucket_boundaries.push_back(*parse_result);
+ }
+ if (!std::is_sorted(histogram_bucket_boundaries.begin(),
histogram_bucket_boundaries.end())) {
+ return {Status::NotOK, "The values for the histogram must be
sorted."};
+ }
+ return Status::OK();
+ }},
+ };
for (const auto &iter : callbacks) {
auto field_iter = fields_.find(iter.first);
if (field_iter != fields_.end()) {
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 96eb931f3..236e91eb0 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -208,7 +208,9 @@ rocksdb::Options Storage::InitRocksDBOptions() {
options.rate_limiter = rate_limiter_;
options.delayed_write_rate =
static_cast<uint64_t>(config_->rocks_db.delayed_write_rate);
options.compaction_readahead_size =
static_cast<size_t>(config_->rocks_db.compaction_readahead_size);
- options.level0_slowdown_writes_trigger =
config_->rocks_db.level0_slowdown_writes_trigger;
+ options.level0_slowdown_writes_trigger =
config_->rocks_db.level0_slowdown_writes_trigger == 0
+ ?
config_->rocks_db.level0_stop_writes_trigger
+ :
config_->rocks_db.level0_slowdown_writes_trigger;
options.level0_stop_writes_trigger =
config_->rocks_db.level0_stop_writes_trigger;
options.level0_file_num_compaction_trigger =
config_->rocks_db.level0_file_num_compaction_trigger;
options.max_bytes_for_level_base =
config_->rocks_db.max_bytes_for_level_base;
@@ -224,8 +226,12 @@ rocksdb::Options Storage::InitRocksDBOptions() {
}
Status Storage::SetOptionForAllColumnFamilies(const std::string &key, const
std::string &value) {
+ return SetOptionForAllColumnFamilies({{key, value}});
+}
+
+Status Storage::SetOptionForAllColumnFamilies(const
std::unordered_map<std::string, std::string> &options_map) {
for (auto &cf_handle : cf_handles_) {
- auto s = db_->SetOptions(cf_handle, {{key, value}});
+ auto s = db_->SetOptions(cf_handle, options_map);
if (!s.ok()) return {Status::NotOK, s.ToString()};
}
return Status::OK();
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 8ef807eca..3bb4fdf09 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -222,6 +222,7 @@ class Storage {
void SetBlobDB(rocksdb::ColumnFamilyOptions *cf_options);
rocksdb::Options InitRocksDBOptions();
Status SetOptionForAllColumnFamilies(const std::string &key, const
std::string &value);
+ Status SetOptionForAllColumnFamilies(const std::unordered_map<std::string,
std::string> &options_map);
Status SetDBOption(const std::string &key, const std::string &value);
Status CreateColumnFamilies(const rocksdb::Options &options);
// The sequence_number will be pointed to the value of the sequence number
in range of DB,
diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc
index 84c4cc385..8bd7bbf6e 100644
--- a/tests/cppunit/config_test.cc
+++ b/tests/cppunit/config_test.cc
@@ -20,6 +20,7 @@
#include <gtest/gtest.h>
+#include <filesystem>
#include <fstream>
#include <iostream>
#include <map>
@@ -224,3 +225,45 @@ TEST(Config, DumpConfigLine) {
ASSERT_EQ(DumpConfigLine({"a", "xy"}), "a xy");
ASSERT_EQ(DumpConfigLine({"a", "x\n"}), "a \"x\\n\"");
}
+
+TEST(Config, DisableL0Slowdown) {
+ Config config;
+ config.db_dir = "test_l0_slowdown_dir";
+
+ std::error_code ec;
+ std::filesystem::remove_all(config.db_dir, ec);
+ ASSERT_TRUE(!ec);
+
+ auto storage = std::make_unique<engine::Storage>(&config);
+ ASSERT_TRUE(storage->Open().IsOK());
+
+ Server server(storage.get(), &config);
+
+ const std::string kL0SlowdownWritesTrigger =
"rocksdb.level0_slowdown_writes_trigger";
+ const std::string kL0StopWritesTrigger =
"rocksdb.level0_stop_writes_trigger";
+
+ const auto slowdown_is = [&storage](int value) {
+ return storage->GetDB()->GetOptions().level0_slowdown_writes_trigger ==
value;
+ };
+
+ const auto stop_is = [&storage](int value) {
+ return storage->GetDB()->GetOptions().level0_stop_writes_trigger == value;
+ };
+
+ ASSERT_TRUE(config.Set(&server, kL0StopWritesTrigger, "20").IsOK());
+ ASSERT_TRUE(config.Set(&server, kL0SlowdownWritesTrigger, "0").IsOK());
+ ASSERT_TRUE(slowdown_is(20));
+ ASSERT_TRUE(stop_is(20));
+
+ ASSERT_TRUE(config.Set(&server, kL0StopWritesTrigger, "50").IsOK());
+ ASSERT_TRUE(slowdown_is(50));
+ ASSERT_TRUE(stop_is(50));
+
+ ASSERT_TRUE(config.Set(&server, kL0SlowdownWritesTrigger, "20").IsOK());
+ ASSERT_TRUE(slowdown_is(20));
+ ASSERT_TRUE(stop_is(50));
+
+ ASSERT_TRUE(config.Set(&server, kL0SlowdownWritesTrigger, "0").IsOK());
+ ASSERT_TRUE(slowdown_is(50));
+ ASSERT_TRUE(stop_is(50));
+}