empiredan commented on code in PR #1511:
URL:
https://github.com/apache/incubator-pegasus/pull/1511#discussion_r1275842107
##########
src/server/pegasus_server_impl.cpp:
##########
@@ -2650,11 +2720,43 @@ void
pegasus_server_impl::update_app_envs_before_open_db(
update_validate_partition_hash(envs);
update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
+ set_rocksdb_options_before_creating(envs);
}
void pegasus_server_impl::query_app_envs(/*out*/ std::map<std::string,
std::string> &envs)
{
envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = _usage_scenario;
+ // write_buffer_size involves random values (refer to
pegasus_server_impl::set_usage_scenario),
+ // so it can only be taken from _data_cf_opts
+ envs[ROCKSDB_WRITE_BUFFER_SIZE] = fmt::format("{}",
_data_cf_opts.write_buffer_size);
Review Comment:
```suggestion
envs[ROCKSDB_WRITE_BUFFER_SIZE] =
std::to_string(_data_cf_opts.write_buffer_size);
```
##########
src/base/pegasus_const.cpp:
##########
@@ -101,4 +108,47 @@ const std::string
USER_SPECIFIED_COMPACTION("user_specified_compaction");
const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");
const std::string ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");
+
+const std::string ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");
+
+const std::string ROCKSDB_NUM_LEVELS("rocksdb.num_levels");
+
+const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS = {
+ ROCKSDB_WRITE_BUFFER_SIZE,
+};
+const std::set<std::string> ROCKSDB_STATIC_OPTIONS = {
+ ROCKSDB_NUM_LEVELS,
+};
+
+const std::unordered_map<std::string, cf_opts_setter> cf_opts_setters = {
+ {ROCKSDB_WRITE_BUFFER_SIZE,
+ [](const std::string &str, rocksdb::ColumnFamilyOptions &option) -> bool {
+ uint64_t val = 0;
+ if (!dsn::buf2uint64(str, val)) {
+ return false;
+ }
+ option.write_buffer_size = static_cast<size_t>(val);
+ return true;
+ }},
+ {ROCKSDB_NUM_LEVELS,
+ [](const std::string &str, rocksdb::ColumnFamilyOptions &option) -> bool {
+ int32_t val = 0;
+ if (!dsn::buf2int32(str, val)) {
+ return false;
+ }
+ option.num_levels = val;
+ return true;
+ }},
+};
+
+const std::unordered_map<std::string, cf_opts_getter> cf_opts_getters = {
+ {ROCKSDB_WRITE_BUFFER_SIZE,
+ [](const rocksdb::ColumnFamilyOptions &option, /*out*/ std::string &str) {
+ str = fmt::format("{}", option.write_buffer_size);
Review Comment:
```suggestion
str = std::to_string(option.write_buffer_size);
```
##########
src/base/pegasus_const.cpp:
##########
@@ -101,4 +108,47 @@ const std::string
USER_SPECIFIED_COMPACTION("user_specified_compaction");
const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");
const std::string ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");
+
+const std::string ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");
+
+const std::string ROCKSDB_NUM_LEVELS("rocksdb.num_levels");
+
+const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS = {
+ ROCKSDB_WRITE_BUFFER_SIZE,
+};
+const std::set<std::string> ROCKSDB_STATIC_OPTIONS = {
+ ROCKSDB_NUM_LEVELS,
+};
+
+const std::unordered_map<std::string, cf_opts_setter> cf_opts_setters = {
+ {ROCKSDB_WRITE_BUFFER_SIZE,
+ [](const std::string &str, rocksdb::ColumnFamilyOptions &option) -> bool {
+ uint64_t val = 0;
+ if (!dsn::buf2uint64(str, val)) {
+ return false;
+ }
+ option.write_buffer_size = static_cast<size_t>(val);
+ return true;
+ }},
+ {ROCKSDB_NUM_LEVELS,
+ [](const std::string &str, rocksdb::ColumnFamilyOptions &option) -> bool {
+ int32_t val = 0;
+ if (!dsn::buf2int32(str, val)) {
+ return false;
+ }
+ option.num_levels = val;
+ return true;
+ }},
+};
+
+const std::unordered_map<std::string, cf_opts_getter> cf_opts_getters = {
+ {ROCKSDB_WRITE_BUFFER_SIZE,
+ [](const rocksdb::ColumnFamilyOptions &option, /*out*/ std::string &str) {
+ str = fmt::format("{}", option.write_buffer_size);
+ }},
+ {ROCKSDB_NUM_LEVELS,
+ [](const rocksdb::ColumnFamilyOptions &option, /*out*/ std::string &str) {
+ str = fmt::format("{}", option.num_levels);
Review Comment:
```suggestion
str = std::to_string(option.num_levels);
```
##########
src/server/pegasus_server_impl.cpp:
##########
@@ -2625,6 +2627,73 @@ pegasus_server_impl::get_restore_dir_from_env(const
std::map<std::string, std::s
return res;
}
+void pegasus_server_impl::update_rocksdb_dynamic_options(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ std::unordered_map<std::string, std::string> new_options;
+ for (const auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ std::vector<std::string> args;
+ // split_args example: Parse "write_buffer_size" from
"rocksdb.write_buffer_size"
+ dsn::utils::split_args(option.c_str(), args, '.');
+ CHECK_EQ(args.size(), 2);
+ new_options[args[1]] = find->second;
+ }
+
+ // doing set option
+ if (!new_options.empty() && set_options(new_options)) {
+ LOG_INFO("Set rocksdb dynamic options success");
+ }
+}
+
+void pegasus_server_impl::set_rocksdb_options_before_creating(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ auto setter = cf_opts_setters.find(option);
+ if (setter == cf_opts_setters.end()) {
+ LOG_WARNING("cannot find {} setter function, and set this option
fail.", option);
+ continue;
+ }
Review Comment:
I think we'd better to do an assertion since it should never fail here ?
```suggestion
CHECK_TRUE(setter != cf_opts_setters.end());
```
##########
src/server/pegasus_server_impl.cpp:
##########
@@ -2650,11 +2720,43 @@ void
pegasus_server_impl::update_app_envs_before_open_db(
update_validate_partition_hash(envs);
update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
+ set_rocksdb_options_before_creating(envs);
}
void pegasus_server_impl::query_app_envs(/*out*/ std::map<std::string,
std::string> &envs)
{
envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = _usage_scenario;
+ // write_buffer_size involves random values (refer to
pegasus_server_impl::set_usage_scenario),
+ // so it can only be taken from _data_cf_opts
+ envs[ROCKSDB_WRITE_BUFFER_SIZE] = fmt::format("{}",
_data_cf_opts.write_buffer_size);
+
+ // Get Data ColumnFamilyOptions directly from _data_cf
+ rocksdb::ColumnFamilyDescriptor desc;
+ auto s = _data_cf->GetDescriptor(&desc);
+ CHECK_TRUE(s.ok());
+ for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
+ auto getter = cf_opts_getters.find(option);
+ if (getter == cf_opts_getters.end()) {
+ LOG_WARNING("cannot find {} getter function, and get this option
fail.", option);
+ continue;
+ }
Review Comment:
I think we'd better to do an assertion since it should never fail here ?
```suggestion
CHECK_TRUE(getter != cf_opts_getters.end());
```
##########
src/server/pegasus_server_impl.cpp:
##########
@@ -2650,11 +2720,43 @@ void
pegasus_server_impl::update_app_envs_before_open_db(
update_validate_partition_hash(envs);
update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
+ set_rocksdb_options_before_creating(envs);
}
void pegasus_server_impl::query_app_envs(/*out*/ std::map<std::string,
std::string> &envs)
{
envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = _usage_scenario;
+ // write_buffer_size involves random values (refer to
pegasus_server_impl::set_usage_scenario),
+ // so it can only be taken from _data_cf_opts
+ envs[ROCKSDB_WRITE_BUFFER_SIZE] = fmt::format("{}",
_data_cf_opts.write_buffer_size);
+
+ // Get Data ColumnFamilyOptions directly from _data_cf
+ rocksdb::ColumnFamilyDescriptor desc;
+ auto s = _data_cf->GetDescriptor(&desc);
+ CHECK_TRUE(s.ok());
+ for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
+ auto getter = cf_opts_getters.find(option);
+ if (getter == cf_opts_getters.end()) {
+ LOG_WARNING("cannot find {} getter function, and get this option
fail.", option);
+ continue;
+ }
+ std::string option_val;
+ getter->second(desc.options, option_val);
+ envs[option] = option_val;
+ }
+ for (const auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) {
+ if (option.compare(ROCKSDB_WRITE_BUFFER_SIZE) == 0) {
+ continue;
+ }
+ auto getter = cf_opts_getters.find(option);
+ if (getter == cf_opts_getters.end()) {
+ LOG_WARNING("cannot find {} getter function, and get this option
fail.", option);
+ continue;
+ }
Review Comment:
```suggestion
CHECK_TRUE(getter != cf_opts_getters.end());
```
##########
src/server/pegasus_server_impl.cpp:
##########
@@ -2625,6 +2627,73 @@ pegasus_server_impl::get_restore_dir_from_env(const
std::map<std::string, std::s
return res;
}
+void pegasus_server_impl::update_rocksdb_dynamic_options(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ std::unordered_map<std::string, std::string> new_options;
+ for (const auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ std::vector<std::string> args;
+ // split_args example: Parse "write_buffer_size" from
"rocksdb.write_buffer_size"
+ dsn::utils::split_args(option.c_str(), args, '.');
+ CHECK_EQ(args.size(), 2);
+ new_options[args[1]] = find->second;
+ }
+
+ // doing set option
+ if (!new_options.empty() && set_options(new_options)) {
+ LOG_INFO("Set rocksdb dynamic options success");
+ }
+}
+
+void pegasus_server_impl::set_rocksdb_options_before_creating(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ auto setter = cf_opts_setters.find(option);
Review Comment:
```suggestion
const auto &setter = cf_opts_setters.find(option);
```
##########
src/server/pegasus_server_impl.cpp:
##########
@@ -2625,6 +2627,73 @@ pegasus_server_impl::get_restore_dir_from_env(const
std::map<std::string, std::s
return res;
}
+void pegasus_server_impl::update_rocksdb_dynamic_options(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ std::unordered_map<std::string, std::string> new_options;
+ for (const auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ std::vector<std::string> args;
+ // split_args example: Parse "write_buffer_size" from
"rocksdb.write_buffer_size"
+ dsn::utils::split_args(option.c_str(), args, '.');
+ CHECK_EQ(args.size(), 2);
+ new_options[args[1]] = find->second;
+ }
+
+ // doing set option
+ if (!new_options.empty() && set_options(new_options)) {
+ LOG_INFO("Set rocksdb dynamic options success");
+ }
+}
+
+void pegasus_server_impl::set_rocksdb_options_before_creating(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
+ auto find = envs.find(option);
Review Comment:
```suggestion
const auto &find = envs.find(option);
```
##########
src/server/pegasus_server_impl.cpp:
##########
@@ -2625,6 +2627,73 @@ pegasus_server_impl::get_restore_dir_from_env(const
std::map<std::string, std::s
return res;
}
+void pegasus_server_impl::update_rocksdb_dynamic_options(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ std::unordered_map<std::string, std::string> new_options;
+ for (const auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ std::vector<std::string> args;
+ // split_args example: Parse "write_buffer_size" from
"rocksdb.write_buffer_size"
+ dsn::utils::split_args(option.c_str(), args, '.');
+ CHECK_EQ(args.size(), 2);
+ new_options[args[1]] = find->second;
+ }
+
+ // doing set option
+ if (!new_options.empty() && set_options(new_options)) {
+ LOG_INFO("Set rocksdb dynamic options success");
+ }
+}
+
+void pegasus_server_impl::set_rocksdb_options_before_creating(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ auto setter = cf_opts_setters.find(option);
+ if (setter == cf_opts_setters.end()) {
+ LOG_WARNING("cannot find {} setter function, and set this option
fail.", option);
+ continue;
+ }
+ if (setter->second(find->second, _data_cf_opts)) {
+ LOG_INFO_PREFIX("Set {} \"{}\" succeed", find->first,
find->second);
+ }
+ }
+
+ for (const auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ auto setter = cf_opts_setters.find(option);
+ if (setter == cf_opts_setters.end()) {
+ LOG_WARNING("cannot find {} setter function, and set this option
fail.", option);
+ continue;
+ }
Review Comment:
```suggestion
CHECK_TRUE(setter != cf_opts_setters.end());
```
##########
src/server/pegasus_server_impl.cpp:
##########
@@ -2625,6 +2627,73 @@ pegasus_server_impl::get_restore_dir_from_env(const
std::map<std::string, std::s
return res;
}
+void pegasus_server_impl::update_rocksdb_dynamic_options(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ std::unordered_map<std::string, std::string> new_options;
+ for (const auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ std::vector<std::string> args;
+ // split_args example: Parse "write_buffer_size" from
"rocksdb.write_buffer_size"
+ dsn::utils::split_args(option.c_str(), args, '.');
+ CHECK_EQ(args.size(), 2);
+ new_options[args[1]] = find->second;
+ }
+
+ // doing set option
+ if (!new_options.empty() && set_options(new_options)) {
+ LOG_INFO("Set rocksdb dynamic options success");
+ }
+}
+
+void pegasus_server_impl::set_rocksdb_options_before_creating(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ auto setter = cf_opts_setters.find(option);
+ if (setter == cf_opts_setters.end()) {
+ LOG_WARNING("cannot find {} setter function, and set this option
fail.", option);
+ continue;
+ }
+ if (setter->second(find->second, _data_cf_opts)) {
+ LOG_INFO_PREFIX("Set {} \"{}\" succeed", find->first,
find->second);
+ }
+ }
+
+ for (const auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) {
+ auto find = envs.find(option);
Review Comment:
```suggestion
const auto &find = envs.find(option);
```
##########
src/server/pegasus_server_impl.cpp:
##########
@@ -2625,6 +2627,73 @@ pegasus_server_impl::get_restore_dir_from_env(const
std::map<std::string, std::s
return res;
}
+void pegasus_server_impl::update_rocksdb_dynamic_options(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ std::unordered_map<std::string, std::string> new_options;
+ for (const auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ std::vector<std::string> args;
+ // split_args example: Parse "write_buffer_size" from
"rocksdb.write_buffer_size"
+ dsn::utils::split_args(option.c_str(), args, '.');
+ CHECK_EQ(args.size(), 2);
+ new_options[args[1]] = find->second;
+ }
+
+ // doing set option
+ if (!new_options.empty() && set_options(new_options)) {
+ LOG_INFO("Set rocksdb dynamic options success");
+ }
+}
+
+void pegasus_server_impl::set_rocksdb_options_before_creating(
+ const std::map<std::string, std::string> &envs)
+{
+ if (envs.empty()) {
+ return;
+ }
+
+ for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ auto setter = cf_opts_setters.find(option);
+ if (setter == cf_opts_setters.end()) {
+ LOG_WARNING("cannot find {} setter function, and set this option
fail.", option);
+ continue;
+ }
+ if (setter->second(find->second, _data_cf_opts)) {
+ LOG_INFO_PREFIX("Set {} \"{}\" succeed", find->first,
find->second);
+ }
+ }
+
+ for (const auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) {
+ auto find = envs.find(option);
+ if (find == envs.end()) {
+ continue;
+ }
+
+ auto setter = cf_opts_setters.find(option);
Review Comment:
```suggestion
const auto &setter = cf_opts_setters.find(option);
```
##########
src/server/pegasus_server_impl.cpp:
##########
@@ -2650,11 +2720,43 @@ void
pegasus_server_impl::update_app_envs_before_open_db(
update_validate_partition_hash(envs);
update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
+ set_rocksdb_options_before_creating(envs);
}
void pegasus_server_impl::query_app_envs(/*out*/ std::map<std::string,
std::string> &envs)
{
envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = _usage_scenario;
+ // write_buffer_size involves random values (refer to
pegasus_server_impl::set_usage_scenario),
+ // so it can only be taken from _data_cf_opts
+ envs[ROCKSDB_WRITE_BUFFER_SIZE] = fmt::format("{}",
_data_cf_opts.write_buffer_size);
+
+ // Get Data ColumnFamilyOptions directly from _data_cf
+ rocksdb::ColumnFamilyDescriptor desc;
+ auto s = _data_cf->GetDescriptor(&desc);
+ CHECK_TRUE(s.ok());
Review Comment:
```suggestion
CHECK_TRUE(_data_cf->GetDescriptor(&desc).ok());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]