This is an automated email from the ASF dual-hosted git repository. hulk pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kvrocks.git
commit dd5365829c162054b97ee5e7a63f4592bebb0465 Author: xiaobiaozhao <[email protected]> AuthorDate: Sun Jul 16 18:28:58 2023 +0800 Fix config set compression type didn't take effect (#1576) Co-authored-by: git-hulk <[email protected]> Co-authored-by: Twice <[email protected]> --- src/config/config.cc | 76 +++++++++++++++++++-------------- src/config/config_type.h | 32 ++++++++------ src/storage/event_listener.cc | 20 +++------ src/storage/storage.h | 14 ++++++ tests/gocase/unit/config/config_test.go | 21 +++++++++ 5 files changed, 104 insertions(+), 59 deletions(-) diff --git a/src/config/config.cc b/src/config/config.cc index 599bfa49..20728957 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -45,44 +45,25 @@ constexpr const char *errBlobDbNotEnabled = "Must set rocksdb.enable_blob_files constexpr const char *errLevelCompactionDynamicLevelBytesNotSet = "Must set rocksdb.level_compaction_dynamic_level_bytes yes first."; -ConfigEnum compression_types[] = { - {"no", rocksdb::CompressionType::kNoCompression}, {"snappy", rocksdb::CompressionType::kSnappyCompression}, - {"lz4", rocksdb::CompressionType::kLZ4Compression}, {"zstd", rocksdb::CompressionType::kZSTD}, - {"zlib", rocksdb::CompressionType::kZlibCompression}, {nullptr, 0}}; - -ConfigEnum supervised_modes[] = {{"no", kSupervisedNone}, - {"auto", kSupervisedAutoDetect}, - {"upstart", kSupervisedUpStart}, - {"systemd", kSupervisedSystemd}, - {nullptr, 0}}; - -ConfigEnum log_levels[] = {{"info", google::INFO}, - {"warning", google::WARNING}, - {"error", google::ERROR}, - {"fatal", google::FATAL}, - {nullptr, 0}}; +std::vector<ConfigEnum> supervised_modes{ + {"no", kSupervisedNone}, + {"auto", kSupervisedAutoDetect}, + {"upstart", kSupervisedUpStart}, + {"systemd", kSupervisedSystemd}, +}; + +std::vector<ConfigEnum> log_levels{ + {"info", google::INFO}, + {"warning", google::WARNING}, + {"error", google::ERROR}, + {"fatal", google::FATAL}, +}; std::string TrimRocksDbPrefix(std::string s) { if (strncasecmp(s.data(), "rocksdb.", 8) != 0) return s; return s.substr(8, s.size() - 8); } -int ConfigEnumGetValue(ConfigEnum *ce, const char *name) { - while (ce->name != nullptr) { - if (strcasecmp(ce->name, name) == 0) return ce->val; - ce++; - } - return INT_MIN; -} - -const char *ConfigEnumGetName(ConfigEnum *ce, int val) { - while (ce->name != nullptr) { - if (ce->val == val) return ce->name; - ce++; - } - return nullptr; -} - Config::Config() { struct FieldWrapper { std::string name; @@ -92,6 +73,12 @@ Config::Config() { FieldWrapper(std::string name, bool readonly, ConfigField *field) : name(std::move(name)), readonly(readonly), field(field) {} }; + + std::vector<ConfigEnum> compression_types; + compression_types.reserve(engine::CompressionOptions.size()); + for (const auto &e : engine::CompressionOptions) { + compression_types.push_back({e.name, e.type}); + } FieldWrapper fields[] = { {"daemonize", true, new YesNoField(&daemonize, false)}, {"bind", true, new StringField(&binds_str_, "")}, @@ -327,6 +314,30 @@ void Config::initFieldCallback() { if (!srv) return Status::OK(); // srv is nullptr when load config from file return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v); }; + auto set_compression_type_cb = [](Server *srv, const std::string &k, const std::string &v) -> Status { + if (!srv) return Status::OK(); + + std::string compression_option; + for (auto &option : engine::CompressionOptions) { + if (option.name == v) { + compression_option = option.val; + break; + } + } + if (compression_option.empty()) { + return {Status::NotOK, "Invalid compression type"}; + } + + // For the first two levels, it may contain the frequently accessed data, + // so it'd be better to use uncompressed data to save the CPU. + std::string compression_levels = "kNoCompression:kNoCompression"; + auto db = srv->storage->GetDB(); + for (size_t i = 2; i < db->GetOptions().compression_per_level.size(); i++) { + compression_levels += ":"; + compression_levels += compression_option; + } + return srv->storage->SetOptionForAllColumnFamilies("compression_per_level", compression_levels); + }; #ifdef ENABLE_OPENSSL auto set_tls_option = [](Server *srv, const std::string &k, const std::string &v) { if (!srv) return Status::OK(); // srv is nullptr when load config from file @@ -600,6 +611,7 @@ void Config::initFieldCallback() { {"rocksdb.level0_slowdown_writes_trigger", set_cf_option_cb}, {"rocksdb.level0_stop_writes_trigger", set_cf_option_cb}, {"rocksdb.level0_file_num_compaction_trigger", set_cf_option_cb}, + {"rocksdb.compression", set_compression_type_cb}, #ifdef ENABLE_OPENSSL {"tls-cert-file", set_tls_option}, {"tls-key-file", set_tls_option}, diff --git a/src/config/config_type.h b/src/config/config_type.h index 12e78a6d..5c11af65 100644 --- a/src/config/config_type.h +++ b/src/config/config_type.h @@ -47,15 +47,12 @@ using UInt32Field = IntegerField<uint32_t>; using Int64Field = IntegerField<int64_t>; struct ConfigEnum { - const char *name; + const std::string name; const int val; }; enum ConfigType { SingleConfig, MultiConfig }; -int ConfigEnumGetValue(ConfigEnum *ce, const char *name); -const char *ConfigEnumGetName(ConfigEnum *ce, int val); - class ConfigField { public: ConfigField() = default; @@ -186,23 +183,34 @@ class YesNoField : public ConfigField { class EnumField : public ConfigField { public: - EnumField(int *receiver, ConfigEnum *enums, int e) : receiver_(receiver), enums_(enums) { *receiver_ = e; } + EnumField(int *receiver, std::vector<ConfigEnum> enums, int e) : receiver_(receiver), enums_(std::move(enums)) { + *receiver_ = e; + } ~EnumField() override = default; - std::string ToString() override { return ConfigEnumGetName(enums_, *receiver_); } + + std::string ToString() override { + for (const auto &e : enums_) { + if (e.val == *receiver_) return e.name; + } + return {}; + } + Status ToNumber(int64_t *n) override { *n = *receiver_; return Status::OK(); } + Status Set(const std::string &v) override { - int e = ConfigEnumGetValue(enums_, v.c_str()); - if (e == INT_MIN) { - return {Status::NotOK, "invalid enum option"}; + for (const auto &e : enums_) { + if (strcasecmp(e.name.c_str(), v.c_str()) == 0) { + *receiver_ = e.val; + return Status::OK(); + } } - *receiver_ = e; - return Status::OK(); + return {Status::NotOK, "invalid enum option"}; } private: int *receiver_; - ConfigEnum *enums_ = nullptr; + std::vector<ConfigEnum> enums_; }; diff --git a/src/storage/event_listener.cc b/src/storage/event_listener.cc index fa9b7c0a..46f399fc 100644 --- a/src/storage/event_listener.cc +++ b/src/storage/event_listener.cc @@ -41,22 +41,12 @@ std::string StallConditionType2String(const rocksdb::WriteStallCondition type) { } std::string CompressType2String(const rocksdb::CompressionType type) { - std::map<rocksdb::CompressionType, std::string> compression_type_string_map = { - {rocksdb::kNoCompression, "no"}, - {rocksdb::kSnappyCompression, "snappy"}, - {rocksdb::kZlibCompression, "zlib"}, - {rocksdb::kBZip2Compression, "zip2"}, - {rocksdb::kLZ4Compression, "lz4"}, - {rocksdb::kLZ4HCCompression, "lz4hc"}, - {rocksdb::kXpressCompression, "xpress"}, - {rocksdb::kZSTD, "zstd"}, - {rocksdb::kZSTDNotFinalCompression, "zstd_not_final"}, - {rocksdb::kDisableCompressionOption, "disable"}}; - auto iter = compression_type_string_map.find(type); - if (iter == compression_type_string_map.end()) { - return "unknown"; + for (const auto &option : engine::CompressionOptions) { + if (option.type == type) { + return option.name; + } } - return iter->second; + return "unknown"; } bool IsDiskQuotaExceeded(const rocksdb::Status &bg_error) { diff --git a/src/storage/storage.h b/src/storage/storage.h index ae4dc577..a6ae4259 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -64,6 +64,20 @@ constexpr const char *kPropagateScriptCommand = "script"; constexpr const char *kLuaFunctionPrefix = "lua_f_"; +struct CompressionOption { + rocksdb::CompressionType type; + const std::string name; + const std::string val; +}; + +inline const std::vector<CompressionOption> CompressionOptions = { + {rocksdb::kNoCompression, "no", "kNoCompression"}, + {rocksdb::kSnappyCompression, "snappy", "kSnappyCompression"}, + {rocksdb::kZlibCompression, "zlib", "kZlibCompression"}, + {rocksdb::kLZ4Compression, "lz4", "kLZ4Compression"}, + {rocksdb::kZSTD, "zstd", "kZSTD"}, +}; + class Storage { public: explicit Storage(Config *config); diff --git a/tests/gocase/unit/config/config_test.go b/tests/gocase/unit/config/config_test.go index 1642b763..2bca988a 100644 --- a/tests/gocase/unit/config/config_test.go +++ b/tests/gocase/unit/config/config_test.go @@ -108,3 +108,24 @@ func TestSetConfigBackupDir(t *testing.T) { require.True(t, hasCompactionFiles(newBackupDir)) require.True(t, hasCompactionFiles(originBackupDir)) } + +func TestConfigSetCompression(t *testing.T) { + configs := map[string]string{} + srv := util.StartServer(t, configs) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + require.NoError(t, rdb.Do(ctx, "SET", "foo", "bar").Err()) + + configKey := "rocksdb.compression" + supportedCompressions := []string{"no", "snappy", "zlib", "lz4", "zstd"} + for _, compression := range supportedCompressions { + require.NoError(t, rdb.ConfigSet(ctx, configKey, compression).Err()) + vals, err := rdb.ConfigGet(ctx, configKey).Result() + require.NoError(t, err) + require.EqualValues(t, compression, vals[configKey]) + } + require.ErrorContains(t, rdb.ConfigSet(ctx, configKey, "unsupported").Err(), "invalid enum option") +}
