This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new bacc12a8b feat(table_env): Support to list all available table envs 
(#1915)
bacc12a8b is described below

commit bacc12a8bc37609f71c2a2ca3af836ef6c376fa7
Author: Yingchun Lai <[email protected]>
AuthorDate: Fri Apr 19 15:34:08 2024 +0800

    feat(table_env): Support to list all available table envs (#1915)
    
    Now we can query all available table environments by `/envs/list` path of
    meta server, e.g. `curl 127.0.0.1:34601/envs/list`, the output looks like:
    
    ```
    $ curl 127.0.0.1:34601/envs/list
    {
      "business.info": {
        "limitation": "",
        "sample": "",
        "type": "string"
      },
      "default_ttl": {
        "limitation": ">= 0",
        "sample": "86400",
        "type": "unsigned int32"
      },
      "manual_compact.disabled": {
        "limitation": "true | false",
        "sample": "true",
        "type": "bool"
      },
      ...
    }
    ```
---
 src/base/meta_store.cpp                            |   5 +-
 src/base/meta_store.h                              |   3 -
 src/common/replica_envs.cpp                        |   5 +
 src/common/replica_envs.h                          |  11 +-
 src/http/http_call_registry.h                      |   5 +
 src/http/http_server.cpp                           |   5 +
 src/http/http_server.h                             |   2 +
 src/meta/app_env_validator.cpp                     | 432 +++++++++++++--------
 src/meta/app_env_validator.h                       |  77 +++-
 src/meta/meta_http_service.cpp                     |   7 +-
 src/meta/server_state.cpp                          |  18 +-
 src/meta/server_state.h                            |   3 +
 src/meta/test/meta_app_envs_test.cpp               |  46 +--
 src/meta/test/meta_http_service_test.cpp           |   8 +-
 src/meta/test/meta_mauanl_compaction_test.cpp      |  64 ++-
 src/meta/test/meta_test_base.cpp                   |   2 +-
 src/meta/test/server_state_test.cpp                |  49 ++-
 src/replica/replica.h                              |  12 +-
 src/replica/replica_throttle.cpp                   |  14 +-
 src/replica/test/throttling_controller_test.cpp    |  18 +-
 src/server/pegasus_manual_compact_service.cpp      |  11 +-
 src/server/pegasus_manual_compact_service.h        |   3 -
 src/server/pegasus_server_impl.cpp                 |  36 +-
 src/server/test/manual_compact_service_test.cpp    |   4 +-
 src/server/test/pegasus_server_impl_test.cpp       |  14 +-
 .../token_bucket_throttling_controller_test.cpp    | 183 ++++++---
 src/utils/throttling_controller.cpp                | 199 +++++++---
 src/utils/throttling_controller.h                  |  30 +-
 src/utils/token_bucket_throttling_controller.cpp   |  88 ++---
 src/utils/token_bucket_throttling_controller.h     |   4 +-
 30 files changed, 874 insertions(+), 484 deletions(-)

diff --git a/src/base/meta_store.cpp b/src/base/meta_store.cpp
index 94aaf7db7..f9ca38ab7 100644
--- a/src/base/meta_store.cpp
+++ b/src/base/meta_store.cpp
@@ -34,9 +34,6 @@ const std::string meta_store::DATA_VERSION = 
"pegasus_data_version";
 const std::string meta_store::LAST_FLUSHED_DECREE = 
"pegasus_last_flushed_decree";
 const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME =
     "pegasus_last_manual_compact_finish_time";
-const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL = "normal";
-const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE = 
"prefer_write";
-const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD = 
"bulk_load";
 
 meta_store::meta_store(const char *log_prefix,
                        rocksdb::DB *db,
@@ -85,7 +82,7 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB 
*db,
 std::string meta_store::get_usage_scenario() const
 {
     // If couldn't find rocksdb usage scenario in meta column family, return 
normal in default.
-    std::string usage_scenario = ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
+    std::string usage_scenario = 
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
     auto ec = get_string_value_from_meta_cf(
         false, dsn::replica_envs::ROCKSDB_USAGE_SCENARIO, &usage_scenario);
     CHECK_PREFIX_MSG(ec == ::dsn::ERR_OK || ec == ::dsn::ERR_OBJECT_NOT_FOUND,
diff --git a/src/base/meta_store.h b/src/base/meta_store.h
index 060692d5c..a6e8e953b 100644
--- a/src/base/meta_store.h
+++ b/src/base/meta_store.h
@@ -92,9 +92,6 @@ private:
     static const std::string DATA_VERSION;
     static const std::string LAST_FLUSHED_DECREE;
     static const std::string LAST_MANUAL_COMPACT_FINISH_TIME;
-    static const std::string ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
-    static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE;
-    static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
 
     const std::string _log_prefix;
     rocksdb::DB *_db;
diff --git a/src/common/replica_envs.cpp b/src/common/replica_envs.cpp
index f1b85b0f2..2094f2e16 100644
--- a/src/common/replica_envs.cpp
+++ b/src/common/replica_envs.cpp
@@ -21,6 +21,11 @@
 
 namespace dsn {
 const uint64_t replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS = 20;
+const std::string 
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE("force");
+const std::string 
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip");
+const std::string replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL("normal");
+const std::string 
replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE("prefer_write");
+const std::string 
replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD("bulk_load");
 const std::string 
replica_envs::DENY_CLIENT_REQUEST("replica.deny_client_request");
 const std::string 
replica_envs::WRITE_QPS_THROTTLING("replica.write_throttling");
 const std::string 
replica_envs::WRITE_SIZE_THROTTLING("replica.write_throttling_by_size");
diff --git a/src/common/replica_envs.h b/src/common/replica_envs.h
index e80836397..d1a562cb1 100644
--- a/src/common/replica_envs.h
+++ b/src/common/replica_envs.h
@@ -35,8 +35,7 @@ namespace dsn {
 class replica_envs
 {
 public:
-    static const uint64_t MIN_SLOW_QUERY_THRESHOLD_MS;
-
+    // Environment variable keys.
     static const std::string DENY_CLIENT_REQUEST;
     static const std::string WRITE_QPS_THROTTLING;
     static const std::string WRITE_SIZE_THROTTLING;
@@ -74,6 +73,14 @@ public:
 
     static const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;
     static const std::set<std::string> ROCKSDB_STATIC_OPTIONS;
+
+    // Environment variable values.
+    static const uint64_t MIN_SLOW_QUERY_THRESHOLD_MS;
+    static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
+    static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
+    static const std::string ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
+    static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE;
+    static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
 };
 
 } // namespace dsn
diff --git a/src/http/http_call_registry.h b/src/http/http_call_registry.h
index 2e944463c..69eb80fc1 100644
--- a/src/http/http_call_registry.h
+++ b/src/http/http_call_registry.h
@@ -51,7 +51,12 @@ public:
     void add(const std::shared_ptr<http_call> &call)
     {
         std::lock_guard<std::mutex> guard(_mu);
+// Some tests (e.g. policy_context_test) create multiple objects which
+// register duplicate http paths, so disable checking the path when
+// MOCK_TEST enabled.
+#ifndef MOCK_TEST
         CHECK_EQ_MSG(_call_map.count(call->path), 0, "{} has been added", 
call->path);
+#endif
         _call_map[call->path] = call;
     }
 
diff --git a/src/http/http_server.cpp b/src/http/http_server.cpp
index cbd214bba..ce2178bfd 100644
--- a/src/http/http_server.cpp
+++ b/src/http/http_server.cpp
@@ -109,6 +109,11 @@ void http_service::register_handler(std::string sub_path,
     http_call_registry::instance().add(std::move(call));
 }
 
+void http_service::deregister_handler(std::string sub_path) const
+{
+    http_call_registry::instance().remove(get_rel_path(sub_path));
+}
+
 void http_server_base::update_config_handler(const http_request &req, 
http_response &resp)
 {
     auto res = dsn::update_config(req);
diff --git a/src/http/http_server.h b/src/http/http_server.h
index a9cf668b2..f2c393085 100644
--- a/src/http/http_server.h
+++ b/src/http/http_server.h
@@ -103,6 +103,8 @@ public:
                           std::string parameters,
                           std::string help) const;
 
+    void deregister_handler(std::string sub_path) const;
+
 private:
     // If sub_path is 'app/duplication', the built path would be 
'<root_path>/app/duplication',
     // where path() would be called as root_path.
diff --git a/src/meta/app_env_validator.cpp b/src/meta/app_env_validator.cpp
index e068022a0..229389b3b 100644
--- a/src/meta/app_env_validator.cpp
+++ b/src/meta/app_env_validator.cpp
@@ -19,60 +19,54 @@
 
 // IWYU pragma: no_include <ext/alloc_traits.h>
 #include <fmt/core.h>
+#include <fmt/format.h>
+#include <nlohmann/json.hpp>
 #include <stdint.h>
-#include <memory>
 #include <set>
 #include <utility>
 #include <vector>
 
 #include "common/replica_envs.h"
+#include "http/http_status_code.h"
 #include "utils/fmt_logging.h"
 #include "utils/string_conv.h"
 #include "utils/strings.h"
+#include "utils/throttling_controller.h"
 #include "utils/token_bucket_throttling_controller.h"
 
 namespace dsn {
 namespace replication {
-bool validate_app_envs(const std::map<std::string, std::string> &envs)
+app_env_validator::app_env_validator()
 {
-    // only check rocksdb app envs currently
+    register_all_validators();
+    register_handler(
+        "list",
+        std::bind(
+            &app_env_validator::list_all_envs, this, std::placeholders::_1, 
std::placeholders::_2),
+        "List all available table environments.");
+}
+
+app_env_validator::~app_env_validator() { deregister_handler("list"); }
 
-    for (const auto &it : envs) {
-        if (replica_envs::ROCKSDB_STATIC_OPTIONS.find(it.first) ==
+bool app_env_validator::validate_app_envs(const std::map<std::string, 
std::string> &envs)
+{
+    // only check rocksdb app envs currently
+    for (const auto & [ key, value ] : envs) {
+        if (replica_envs::ROCKSDB_STATIC_OPTIONS.find(key) ==
                 replica_envs::ROCKSDB_STATIC_OPTIONS.end() &&
-            replica_envs::ROCKSDB_DYNAMIC_OPTIONS.find(it.first) ==
+            replica_envs::ROCKSDB_DYNAMIC_OPTIONS.find(key) ==
                 replica_envs::ROCKSDB_DYNAMIC_OPTIONS.end()) {
             continue;
         }
         std::string hint_message;
-        if (!validate_app_env(it.first, it.second, hint_message)) {
-            LOG_WARNING(
-                "app env {}={} is invaild, hint_message:{}", it.first, 
it.second, hint_message);
+        if (!validate_app_env(key, value, hint_message)) {
+            LOG_WARNING("app env '{}={}' is invalid, hint_message: {}", key, 
value, hint_message);
             return false;
         }
     }
     return true;
 }
 
-bool validate_app_env(const std::string &env_name,
-                      const std::string &env_value,
-                      std::string &hint_message)
-{
-    return app_env_validator::instance().validate_app_env(env_name, env_value, 
hint_message);
-}
-
-bool check_slow_query(const std::string &env_value, std::string &hint_message)
-{
-    uint64_t threshold = 0;
-    if (!dsn::buf2uint64(env_value, threshold) ||
-        threshold < replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS) {
-        hint_message = fmt::format("Slow query threshold must be >= {}ms",
-                                   replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS);
-        return false;
-    }
-    return true;
-}
-
 bool check_deny_client(const std::string &env_value, std::string &hint_message)
 {
     std::vector<std::string> sub_sargs;
@@ -93,184 +87,290 @@ bool check_deny_client(const std::string &env_value, 
std::string &hint_message)
     return true;
 }
 
-bool check_rocksdb_iteration(const std::string &env_value, std::string 
&hint_message)
+bool check_throttling(const std::string &env_value, std::string &hint_message)
 {
-    uint64_t threshold = 0;
-    if (!dsn::buf2uint64(env_value, threshold) || threshold < 0) {
-        hint_message = "Rocksdb iteration threshold must be greater than zero";
-        return false;
-    }
-    return true;
+    uint64_t delay_units = 0;
+    uint64_t delay_ms = 0;
+    uint64_t reject_units = 0;
+    uint64_t reject_delay_ms = 0;
+    return utils::throttling_controller::parse_from_env(
+        env_value, delay_units, delay_ms, reject_units, reject_delay_ms, 
hint_message);
 }
 
-bool check_throttling(const std::string &env_value, std::string &hint_message)
+void app_env_validator::EnvInfo::init()
 {
-    std::vector<std::string> sargs;
-    utils::split_args(env_value.c_str(), sargs, ',');
-    if (sargs.empty()) {
-        hint_message = "The value shouldn't be empty";
-        return false;
-    }
-
-    // example for sarg: 100K*delay*100 / 100M*reject*100
-    bool reject_parsed = false;
-    bool delay_parsed = false;
-    for (std::string &sarg : sargs) {
-        std::vector<std::string> sub_sargs;
-        utils::split_args(sarg.c_str(), sub_sargs, '*', true);
-        if (sub_sargs.size() != 3) {
-            hint_message = fmt::format("The field count of {} should be 3", 
sarg);
-            return false;
-        }
-
-        // check the first part, which is must be a positive number followed 
with 'K' or 'M'
-        int64_t units = 0;
-        if (!sub_sargs[0].empty() &&
-            ('M' == *sub_sargs[0].rbegin() || 'K' == *sub_sargs[0].rbegin())) {
-            sub_sargs[0].pop_back();
-        }
-        if (!buf2int64(sub_sargs[0], units) || units < 0) {
-            hint_message = fmt::format("{} should be non-negative int", 
sub_sargs[0]);
-            return false;
-        }
-
-        // check the second part, which is must be "delay" or "reject"
-        if (sub_sargs[1] == "delay") {
-            if (delay_parsed) {
-                hint_message = "duplicate delay config";
-                return false;
-            }
-            delay_parsed = true;
-        } else if (sub_sargs[1] == "reject") {
-            if (reject_parsed) {
-                hint_message = "duplicate reject config";
-                return false;
-            }
-            reject_parsed = true;
-        } else {
-            hint_message = fmt::format("{} should be \"delay\" or \"reject\"", 
sub_sargs[1]);
-            return false;
+    // Set default limitation description.
+    if (limit_desc.empty()) {
+        switch (type) {
+        case ValueType::kBool:
+            limit_desc = "true | false";
+            break;
+        case ValueType::kString:
+            break;
+        default:
+            CHECK_TRUE(false);
+            __builtin_unreachable();
         }
+    }
 
-        // check the third part, which is must be a positive number or 0
-        int64_t delay_ms = 0;
-        if (!buf2int64(sub_sargs[2], delay_ms) || delay_ms < 0) {
-            hint_message = fmt::format("{} should be non-negative int", 
sub_sargs[2]);
-            return false;
+    // Set default sample.
+    if (sample.empty()) {
+        switch (type) {
+        case ValueType::kBool:
+            sample = "true";
+            break;
+        case ValueType::kString:
+            break;
+        default:
+            CHECK_TRUE(false);
+            __builtin_unreachable();
         }
     }
-
-    return true;
 }
 
-bool check_bool_value(const std::string &env_value, std::string &hint_message)
-{
-    bool result = false;
-    if (!dsn::buf2bool(env_value, result)) {
-        hint_message = fmt::format("invalid string {}, should be \"true\" or 
\"false\"", env_value);
-        return false;
-    }
-    return true;
-}
+app_env_validator::EnvInfo::EnvInfo(ValueType t) : type(t) { init(); }
 
-bool check_rocksdb_write_buffer_size(const std::string &env_value, std::string 
&hint_message)
+app_env_validator::EnvInfo::EnvInfo(ValueType t,
+                                    std::string ld,
+                                    std::string s,
+                                    string_validator_func v)
+    : type(t), limit_desc(std::move(ld)), sample(std::move(s)), 
string_validator(std::move(v))
 {
-    uint64_t val = 0;
-
-    if (!dsn::buf2uint64(env_value, val)) {
-        hint_message = fmt::format("rocksdb.write_buffer_size cannot set this 
val: {}", env_value);
-        return false;
-    }
-    if (val < (16 << 20) || val > (512 << 20)) {
-        hint_message =
-            fmt::format("rocksdb.write_buffer_size suggest set val in range 
[16777216, 536870912]");
-        return false;
-    }
-    return true;
+    CHECK_TRUE(type == ValueType::kString);
+    init();
 }
-bool check_rocksdb_num_levels(const std::string &env_value, std::string 
&hint_message)
-{
-    int32_t val = 0;
 
-    if (!dsn::buf2int32(env_value, val)) {
-        hint_message = fmt::format("rocksdb.num_levels cannot set this val: 
{}", env_value);
-        return false;
-    }
-    if (val < 1 || val > 10) {
-        hint_message = fmt::format("rocksdb.num_levels suggest set val in 
range [1 , 10]");
-        return false;
-    }
-    return true;
+app_env_validator::EnvInfo::EnvInfo(ValueType t,
+                                    std::string ld,
+                                    std::string s,
+                                    int_validator_func v)
+    : type(t), limit_desc(std::move(ld)), sample(std::move(s)), 
int_validator(std::move(v))
+{
+    CHECK_TRUE(type == ValueType::kInt64 || type == ValueType::kInt32);
+    init();
 }
 
 bool app_env_validator::validate_app_env(const std::string &env_name,
                                          const std::string &env_value,
                                          std::string &hint_message)
 {
-    auto func_iter = _validator_funcs.find(env_name);
-    if (func_iter != _validator_funcs.end()) {
-        // check function == nullptr means no check
-        if (nullptr != func_iter->second && !func_iter->second(env_value, 
hint_message)) {
-            LOG_WARNING("{}={} is invalid.", env_name, env_value);
+    // Check if the env is supported.
+    const auto func_iter = _validator_funcs.find(env_name);
+    if (func_iter == _validator_funcs.end()) {
+        hint_message = fmt::format("app_env '{}' is not supported", env_name);
+        return false;
+    }
+
+    // 'int_result' will be used if the env variable is integer type.
+    int64_t int_result = 0;
+    switch (func_iter->second.type) {
+    case ValueType::kBool: {
+        // Check by the default boolean validator.
+        bool result = false;
+        if (!dsn::buf2bool(env_value, result)) {
+            hint_message = fmt::format("invalid value '{}', should be a 
boolean", env_value);
+            return false;
+        }
+        break;
+    }
+    case ValueType::kInt32: {
+        // Check by the default int32 validator.
+        int32_t result = 0;
+        if (!dsn::buf2int32(env_value, result)) {
+            hint_message =
+                fmt::format("invalid value '{}', should be an 32 bits 
integer", env_value);
+            return false;
+        }
+        int_result = result;
+        break;
+    }
+    case ValueType::kInt64: {
+        // Check by the default int64 validator.
+        int64_t result = 0;
+        if (!dsn::buf2int64(env_value, result)) {
+            hint_message =
+                fmt::format("invalid value '{}', should be an 64 bits 
integer", env_value);
+            return false;
+        }
+        int_result = result;
+        break;
+    }
+    case ValueType::kString: {
+        // Check by the self defined validator.
+        if (nullptr != func_iter->second.string_validator &&
+            !func_iter->second.string_validator(env_value, hint_message)) {
             return false;
         }
+        break;
+    }
+    default:
+        CHECK_TRUE(false);
+        __builtin_unreachable();
+    }
 
-        return true;
+    if (func_iter->second.type == ValueType::kInt32 ||
+        func_iter->second.type == ValueType::kInt64) {
+        // Check by the self defined validator.
+        if (nullptr != func_iter->second.int_validator &&
+            !func_iter->second.int_validator(int_result)) {
+            hint_message = fmt::format(
+                "invalid value '{}', should be '{}'", env_value, 
func_iter->second.limit_desc);
+            return false;
+        }
     }
 
-    hint_message = fmt::format("app_env \"{}\" is not supported", env_name);
-    return false;
+    return true;
 }
 
 void app_env_validator::register_all_validators()
 {
+    static const auto kMinWriteBufferSize = 16 << 20;
+    static const auto kMaxWriteBufferSize = 512 << 20;
+    static const auto kMinLevel = 1;
+    static const auto kMaxLevel = 10;
+    static const std::string check_throttling_limit = 
"<size[K|M]>*<delay|reject>*<milliseconds>";
+    static const std::string check_throttling_sample = 
"10000*delay*100,20000*reject*100";
+
+    // TODO(yingchun): Use a macro to simplify the following 2 code blocks.
+    // EnvInfo for MANUAL_COMPACT_*_BOTTOMMOST_LEVEL_COMPACTION.
+    const std::set<std::string> valid_mcblcs(
+        {replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE,
+         replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP});
+    const std::string mcblc_sample(fmt::format("{}", fmt::join(valid_mcblcs, " 
| ")));
+    const app_env_validator::EnvInfo mcblc(
+        app_env_validator::ValueType::kString,
+        mcblc_sample,
+        replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP,
+        [=](const std::string &new_value, std::string &hint_message) {
+            if (valid_mcblcs.count(new_value) == 0) {
+                hint_message = mcblc_sample;
+                return false;
+            }
+            return true;
+        });
+
+    // EnvInfo for ROCKSDB_USAGE_SCENARIO.
+    const std::set<std::string> 
valid_russ({replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
+                                            
replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE,
+                                            
replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD});
+    const std::string rus_sample(fmt::format("{}", fmt::join(valid_russ, " | 
")));
+    const app_env_validator::EnvInfo rus(
+        app_env_validator::ValueType::kString,
+        rus_sample,
+        replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
+        [=](const std::string &new_value, std::string &hint_message) {
+            if (valid_russ.count(new_value) == 0) {
+                hint_message = rus_sample;
+                return false;
+            }
+            return true;
+        });
+
     _validator_funcs = {
         {replica_envs::SLOW_QUERY_THRESHOLD,
-         std::bind(&check_slow_query, std::placeholders::_1, 
std::placeholders::_2)},
+         {ValueType::kInt64,
+          fmt::format(">= {}", replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS),
+          "1000",
+          [](int64_t new_value) {
+              return replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS <= new_value;
+          }}},
         {replica_envs::WRITE_QPS_THROTTLING,
-         std::bind(&check_throttling, std::placeholders::_1, 
std::placeholders::_2)},
+         {ValueType::kString, check_throttling_limit, check_throttling_sample, 
&check_throttling}},
         {replica_envs::WRITE_SIZE_THROTTLING,
-         std::bind(&check_throttling, std::placeholders::_1, 
std::placeholders::_2)},
+         {ValueType::kString, check_throttling_limit, check_throttling_sample, 
&check_throttling}},
         {replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS,
-         std::bind(&check_rocksdb_iteration, std::placeholders::_1, 
std::placeholders::_2)},
-        {replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED,
-         std::bind(&check_bool_value, std::placeholders::_1, 
std::placeholders::_2)},
+         {ValueType::kInt64, ">= 0", "1000", [](int64_t new_value) { return 
new_value >= 0; }}},
+        {replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED, {ValueType::kBool}},
         {replica_envs::READ_QPS_THROTTLING,
-         std::bind(&check_throttling, std::placeholders::_1, 
std::placeholders::_2)},
+         {ValueType::kString, check_throttling_limit, check_throttling_sample, 
&check_throttling}},
         {replica_envs::READ_SIZE_THROTTLING,
-         std::bind(&utils::token_bucket_throttling_controller::validate,
-                   std::placeholders::_1,
-                   std::placeholders::_2)},
-        {replica_envs::SPLIT_VALIDATE_PARTITION_HASH,
-         std::bind(&check_bool_value, std::placeholders::_1, 
std::placeholders::_2)},
-        {replica_envs::USER_SPECIFIED_COMPACTION, nullptr},
+         {ValueType::kString,
+          "",
+          "20000*delay*100,20000*reject*100",
+          &utils::token_bucket_throttling_controller::validate}},
+        {replica_envs::SPLIT_VALIDATE_PARTITION_HASH, {ValueType::kBool}},
+        {replica_envs::USER_SPECIFIED_COMPACTION, {ValueType::kString}},
         {replica_envs::BACKUP_REQUEST_QPS_THROTTLING,
-         std::bind(&check_throttling, std::placeholders::_1, 
std::placeholders::_2)},
-        {replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND,
-         std::bind(&check_bool_value, std::placeholders::_1, 
std::placeholders::_2)},
+         {ValueType::kString, check_throttling_limit, check_throttling_sample, 
&check_throttling}},
+        {replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND, {ValueType::kBool}},
         {replica_envs::DENY_CLIENT_REQUEST,
-         std::bind(&check_deny_client, std::placeholders::_1, 
std::placeholders::_2)},
+         {ValueType::kString,
+          "timeout*all | timeout*write | timeout*read | reconfig*all | 
reconfig*write | "
+          "reconfig*read",
+          "timeout*all",
+          &check_deny_client}},
         {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
-         std::bind(&check_rocksdb_write_buffer_size, std::placeholders::_1, 
std::placeholders::_2)},
+         {ValueType::kInt64,
+          fmt::format("In range [{}, {}]", kMinWriteBufferSize, 
kMaxWriteBufferSize),
+          fmt::format("{}", kMinWriteBufferSize),
+          [](int64_t new_value) {
+              return kMinWriteBufferSize <= new_value && new_value <= 
kMaxWriteBufferSize;
+          }}},
         {replica_envs::ROCKSDB_NUM_LEVELS,
-         std::bind(&check_rocksdb_num_levels, std::placeholders::_1, 
std::placeholders::_2)},
-        // TODO(zhaoliwei): not implemented
-        {replica_envs::BUSINESS_INFO, nullptr},
-        {replica_envs::TABLE_LEVEL_DEFAULT_TTL, nullptr},
-        {replica_envs::ROCKSDB_USAGE_SCENARIO, nullptr},
-        {replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT, nullptr},
-        {replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS, nullptr},
-        {replica_envs::MANUAL_COMPACT_DISABLED, nullptr},
-        {replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT, nullptr},
-        {replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME, nullptr},
-        {replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL, nullptr},
-        {replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION, 
nullptr},
-        {replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, nullptr},
-        {replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr},
-        {replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, 
nullptr},
-        {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
-        {replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr},
-    };
+         {ValueType::kInt64,
+          fmt::format("In range [{}, {}]", kMinLevel, kMaxLevel),
+          "6",
+          [](int64_t new_value) { return kMinLevel <= new_value && new_value 
<= kMaxLevel; }}},
+        {replica_envs::BUSINESS_INFO, {ValueType::kString}},
+        {replica_envs::TABLE_LEVEL_DEFAULT_TTL,
+         {ValueType::kInt32, ">= 0", "86400", [](int64_t new_value) { return 
new_value >= 0; }}},
+        {replica_envs::ROCKSDB_USAGE_SCENARIO, rus},
+        {replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT,
+         {ValueType::kInt32, "> 0", "2", [](int64_t new_value) { return 
new_value > 0; }}},
+        {replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS,
+         {ValueType::kInt32, ">= 0", "3600", [](int64_t new_value) { return 
new_value >= 0; }}},
+        {replica_envs::MANUAL_COMPACT_DISABLED, {ValueType::kBool}},
+        {replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT,
+         {ValueType::kInt32, ">= 0", "8", [](int64_t new_value) { return 
new_value >= 0; }}},
+        {replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+         {ValueType::kInt64,
+          "> 0, timestamp (in seconds) to trigger the once manual compaction",
+          "1700000000",
+          [](int64_t new_value) { return new_value >= 0; }}},
+        {replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL,
+         {ValueType::kInt64,
+          "-1 or >= 1",
+          "6",
+          [](int64_t new_value) { return new_value == -1 || new_value >= 1; 
}}},
+        {replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION, mcblc},
+        // TODO(yingchun): enable the validator by refactoring
+        // pegasus_manual_compact_service::check_periodic_compact
+        {replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, 
{ValueType::kString}},
+        {replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL,
+         {ValueType::kInt64,
+          "-1 or >= 1",
+          "6",
+          [](int64_t new_value) { return new_value == -1 || new_value >= 1; 
}}},
+        {replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, 
mcblc},
+        {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, 
{ValueType::kString}},
+        {replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, 
{ValueType::kString}}};
+}
+
+const std::unordered_map<app_env_validator::ValueType, std::string>
+    app_env_validator::EnvInfo::ValueType2String{{ValueType::kBool, "bool"},
+                                                 {ValueType::kInt32, "unsigned 
int32"},
+                                                 {ValueType::kInt64, "unsigned 
int64"},
+                                                 {ValueType::kString, 
"string"}};
+
+nlohmann::json app_env_validator::EnvInfo::to_json() const
+{
+    const auto &type_str = ValueType2String.find(type);
+    CHECK_TRUE(type_str != ValueType2String.end());
+    nlohmann::json info;
+    info["type"] = type_str->second;
+    info["limitation"] = limit_desc;
+    info["sample"] = sample;
+    return info;
+}
+
+void app_env_validator::list_all_envs(const http_request &req, http_response 
&resp) const
+{
+    nlohmann::json envs;
+    for (const auto &validator_func : _validator_funcs) {
+        envs[validator_func.first] = validator_func.second.to_json();
+    }
+    resp.body = envs.dump(2);
+    resp.status_code = http_status_code::kOk;
 }
 
 } // namespace replication
diff --git a/src/meta/app_env_validator.h b/src/meta/app_env_validator.h
index d963df051..7a0ca1175 100644
--- a/src/meta/app_env_validator.h
+++ b/src/meta/app_env_validator.h
@@ -17,37 +17,86 @@
 
 #pragma once
 
+#include <nlohmann/json.hpp> // IWYU pragma: keep
+#include <nlohmann/json_fwd.hpp>
+#include <stdint.h>
 #include <functional>
 #include <map>
 #include <string>
-#include "utils/singleton.h"
+#include <unordered_map>
+
+#include "http/http_server.h"
 
 namespace dsn {
 namespace replication {
 
-bool validate_app_envs(const std::map<std::string, std::string> &envs);
-
-bool validate_app_env(const std::string &env_name,
-                      const std::string &env_value,
-                      std::string &hint_message);
-
-class app_env_validator : public utils::singleton<app_env_validator>
+class app_env_validator final : public http_service
 {
 public:
+    app_env_validator();
+
+    ~app_env_validator() final;
+
+    std::string path() const override { return "envs"; }
+
     bool validate_app_env(const std::string &env_name,
                           const std::string &env_value,
                           std::string &hint_message);
 
-private:
-    app_env_validator() { register_all_validators(); }
-    ~app_env_validator() = default;
+    bool validate_app_envs(const std::map<std::string, std::string> &envs);
 
+private:
     void register_all_validators();
 
-    using validator_func = std::function<bool(const std::string &, std::string 
&)>;
-    std::map<std::string, validator_func> _validator_funcs;
+    void list_all_envs(const http_request &req, http_response &resp) const;
+
+public:
+    // The type of table env.
+    enum class ValueType : uint32_t
+    {
+        kBool,
+        kInt32,
+        kInt64,
+        kString
+    };
+
+    // The type of table env and its limit description.
+    struct EnvInfo
+    {
+        using string_validator_func = std::function<bool(const std::string &, 
std::string &)>;
+        using int_validator_func = std::function<bool(int64_t)>;
+
+        ValueType type;
+        std::string limit_desc;
+        std::string sample;
+        string_validator_func string_validator;
+        int_validator_func int_validator;
+
+        // Construct an object.
+        EnvInfo(ValueType t);
+
+        // Construct an object with kString type.
+        EnvInfo(ValueType t,
+                std::string ld,
+                std::string s,
+                string_validator_func v = string_validator_func());
+
+        // Construct an object with kInt64 type.
+        EnvInfo(ValueType t,
+                std::string ld,
+                std::string s,
+                int_validator_func v = int_validator_func());
+
+        nlohmann::json to_json() const;
+
+        static const std::unordered_map<app_env_validator::ValueType, 
std::string> ValueType2String;
+
+    private:
+        void init();
+    };
 
-    friend class utils::singleton<app_env_validator>;
+    // The table envs and their limit descriptions, all available envs must be 
registered here.
+    std::map<std::string, EnvInfo> _validator_funcs;
 };
 
 } // namespace replication
diff --git a/src/meta/meta_http_service.cpp b/src/meta/meta_http_service.cpp
index 0a9e1bbf9..876409644 100644
--- a/src/meta/meta_http_service.cpp
+++ b/src/meta/meta_http_service.cpp
@@ -761,8 +761,11 @@ void meta_http_service::start_compaction_handler(const 
http_request &req, http_r
         resp.status_code = http_status_code::kBadRequest;
         return;
     }
-    if (info.bottommost_level_compaction.empty() || 
(info.bottommost_level_compaction != "skip" &&
-                                                     
info.bottommost_level_compaction != "force")) {
+    if (info.bottommost_level_compaction.empty() ||
+        (info.bottommost_level_compaction !=
+             replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP &&
+         info.bottommost_level_compaction !=
+             replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE)) {
         resp.body = "bottommost_level_compaction should ony be 'skip' or 
'force'";
         resp.status_code = http_status_code::kBadRequest;
         return;
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index f563b34f2..fd2701a9c 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -41,7 +41,6 @@
 // IWYU pragma: no_include <type_traits>
 #include <unordered_map>
 
-#include "app_env_validator.h"
 #include "common/duplication_common.h"
 #include "common/json_helper.h"
 #include "common/replica_envs.h"
@@ -50,6 +49,7 @@
 #include "common/replication_enums.h"
 #include "common/replication_other_types.h"
 #include "dump_file.h"
+#include "meta/app_env_validator.h"
 #include "meta/meta_data.h"
 #include "meta/meta_service.h"
 #include "meta/meta_state_service.h"
@@ -1121,7 +1121,7 @@ void server_state::create_app(dsn::message_ex *msg)
                
!validate_target_max_replica_count(request.options.replica_count)) {
         response.err = ERR_INVALID_PARAMETERS;
         will_create_app = false;
-    } else if (!validate_app_envs(request.options.envs)) {
+    } else if (!_app_env_validator.validate_app_envs(request.options.envs)) {
         response.err = ERR_INVALID_PARAMETERS;
         will_create_app = false;
     } else {
@@ -2802,10 +2802,16 @@ void server_state::set_app_envs(const app_env_rpc 
&env_rpc)
 
     std::ostringstream os;
     for (int i = 0; i < keys.size(); i++) {
-        if (i != 0)
+        if (i != 0) {
             os << ", ";
+        }
 
-        if (!validate_app_env(keys[i], values[i], 
env_rpc.response().hint_message)) {
+        if (!_app_env_validator.validate_app_env(
+                keys[i], values[i], env_rpc.response().hint_message)) {
+            LOG_WARNING("app env '{}={}' is invalid, hint_message: {}",
+                        keys[i],
+                        values[i],
+                        env_rpc.response().hint_message);
             env_rpc.response().err = ERR_INVALID_PARAMETERS;
             return;
         }
@@ -3143,9 +3149,9 @@ bool 
server_state::parse_compaction_envs(start_manual_compact_rpc rpc,
         }
     }
 
-    std::string bottommost = "skip";
+    std::string bottommost = 
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
     if (request.__isset.bottommost && request.bottommost) {
-        bottommost = "force";
+        bottommost = 
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
     }
     
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION);
     values.emplace_back(bottommost);
diff --git a/src/meta/server_state.h b/src/meta/server_state.h
index aad53ec5c..ad3664cf1 100644
--- a/src/meta/server_state.h
+++ b/src/meta/server_state.h
@@ -38,6 +38,7 @@
 #include <utility>
 #include <vector>
 
+#include "app_env_validator.h"
 #include "common/gpid.h"
 #include "common/manual_compact.h"
 #include "dsn.layer2_types.h"
@@ -429,6 +430,8 @@ private:
     int32_t _add_secondary_max_count_for_one_node;
     std::vector<std::unique_ptr<command_deregister>> _cmds;
 
+    app_env_validator _app_env_validator;
+
     table_metric_entities _table_metric_entities;
 };
 
diff --git a/src/meta/test/meta_app_envs_test.cpp 
b/src/meta/test/meta_app_envs_test.cpp
index f3422f7c7..58bdf83ef 100644
--- a/src/meta/test/meta_app_envs_test.cpp
+++ b/src/meta/test/meta_app_envs_test.cpp
@@ -28,6 +28,7 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <utility>
 
 #include "common/replica_envs.h"
 #include "gtest/gtest.h"
@@ -70,12 +71,12 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
         {replica_envs::WRITE_QPS_THROTTLING,
          "20A*delay*100",
          ERR_INVALID_PARAMETERS,
-         "20A should be non-negative int",
+         "'20A' should be an unsigned integer",
          "20M*delay*100"},
         {replica_envs::WRITE_QPS_THROTTLING,
          "-20*delay*100",
          ERR_INVALID_PARAMETERS,
-         "-20 should be non-negative int",
+         "'-20' should be an unsigned integer",
          "20M*delay*100"},
         {replica_envs::WRITE_QPS_THROTTLING,
          "",
@@ -85,37 +86,37 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
         {replica_envs::WRITE_QPS_THROTTLING,
          "20A*delay",
          ERR_INVALID_PARAMETERS,
-         "The field count of 20A*delay should be 3",
+         "The field count of '20A*delay' separated by '*' must be 3",
          "20M*delay*100"},
         {replica_envs::WRITE_QPS_THROTTLING,
          "20K*pass*100",
          ERR_INVALID_PARAMETERS,
-         "pass should be \"delay\" or \"reject\"",
+         "'pass' should be 'delay' or 'reject'",
          "20M*delay*100"},
         {replica_envs::WRITE_QPS_THROTTLING,
          "20K*delay*-100",
          ERR_INVALID_PARAMETERS,
-         "-100 should be non-negative int",
+         "'-100' should be an unsigned integer",
          "20M*delay*100"},
         {replica_envs::WRITE_QPS_THROTTLING,
          "2K**delay*100",
          ERR_INVALID_PARAMETERS,
-         "The field count of 2K**delay*100 should be 3",
+         "The field count of '2K**delay*100' separated by '*' must be 3",
          "20M*delay*100"},
         {replica_envs::WRITE_QPS_THROTTLING,
          "2K*delay**100",
          ERR_INVALID_PARAMETERS,
-         "The field count of 2K*delay**100 should be 3",
+         "The field count of '2K*delay**100' separated by '*' must be 3",
          "20M*delay*100"},
         {replica_envs::WRITE_QPS_THROTTLING,
          "2K*delay*100,3K*delay*100",
          ERR_INVALID_PARAMETERS,
-         "duplicate delay config",
+         "duplicate 'delay' config",
          "20M*delay*100"},
         {replica_envs::WRITE_QPS_THROTTLING,
          "2K*reject*100,3K*reject*100",
          ERR_INVALID_PARAMETERS,
-         "duplicate reject config",
+         "duplicate 'reject' config",
          "20M*delay*100"},
         {replica_envs::WRITE_QPS_THROTTLING, "20M*reject*100", ERR_OK, "", 
"20M*reject*100"},
         {replica_envs::WRITE_SIZE_THROTTLING, "300*delay*100", ERR_OK, "", 
"300*delay*100"},
@@ -124,18 +125,18 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
         {replica_envs::SLOW_QUERY_THRESHOLD,
          "19",
          ERR_INVALID_PARAMETERS,
-         "Slow query threshold must be >= 20ms",
+         "invalid value '19', should be '>= 20'",
          "20"},
         {replica_envs::SLOW_QUERY_THRESHOLD,
          "0",
          ERR_INVALID_PARAMETERS,
-         "Slow query threshold must be >= 20ms",
+         "invalid value '0', should be '>= 20'",
          "20"},
         {replica_envs::TABLE_LEVEL_DEFAULT_TTL, "10", ERR_OK, "", "10"},
-        {replica_envs::ROCKSDB_USAGE_SCENARIO, "20", ERR_OK, "", "20"},
+        {replica_envs::ROCKSDB_USAGE_SCENARIO, "bulk_load", ERR_OK, "", 
"bulk_load"},
         {replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT, "30", ERR_OK, "", 
"30"},
         {replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS, "40", ERR_OK, 
"", "40"},
-        {replica_envs::MANUAL_COMPACT_DISABLED, "50", ERR_OK, "", "50"},
+        {replica_envs::MANUAL_COMPACT_DISABLED, "true", ERR_OK, "", "true"},
         {replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT, "60", 
ERR_OK, "", "60"},
         {replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME, "70", ERR_OK, "", 
"70"},
         {replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL, "80", ERR_OK, "", 
"80"},
@@ -144,19 +145,19 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
         {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
          "100",
          ERR_INVALID_PARAMETERS,
-         "rocksdb.write_buffer_size suggest set val in range [16777216, 
536870912]",
+         "invalid value '100', should be 'In range [16777216, 536870912]'",
          "67108864"},
         {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
          "636870912",
          ERR_INVALID_PARAMETERS,
-         "rocksdb.write_buffer_size suggest set val in range [16777216, 
536870912]",
+         "invalid value '636870912', should be 'In range [16777216, 
536870912]'",
          "536870912"},
         {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE, "67108864", ERR_OK, "", 
"67108864"},
         {replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
-         "200",
+         replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP,
          ERR_OK,
          "",
-         "200"},
+         replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
         {replica_envs::BUSINESS_INFO, "300", ERR_OK, "", "300"},
         {replica_envs::DENY_CLIENT_REQUEST,
          "400",
@@ -188,7 +189,7 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
         {"not_exist_env",
          "500",
          ERR_INVALID_PARAMETERS,
-         "app_env \"not_exist_env\" is not supported",
+         "app_env 'not_exist_env' is not supported",
          ""}};
 
     auto app = find_app(app_name);
@@ -196,10 +197,11 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
         configuration_update_app_env_response response =
             update_app_envs(app_name, {test.env_key}, {test.env_value});
 
-        ASSERT_EQ(response.err, test.err);
-        ASSERT_EQ(response.hint_message, test.hint);
-        if (app->envs.find(test.env_key) != app->envs.end()) {
-            ASSERT_EQ(app->envs.at(test.env_key), test.expect_value);
+        ASSERT_EQ(test.err, response.err) << test.env_key << " : " << 
test.env_value;
+        ASSERT_EQ(test.hint, response.hint_message);
+        const auto it = app->envs.find(test.env_key);
+        if (it != app->envs.end()) {
+            ASSERT_EQ(test.expect_value, it->second);
         }
     }
 
diff --git a/src/meta/test/meta_http_service_test.cpp 
b/src/meta/test/meta_http_service_test.cpp
index ab0f3cf13..cadcdffc6 100644
--- a/src/meta/test/meta_http_service_test.cpp
+++ b/src/meta/test/meta_http_service_test.cpp
@@ -95,9 +95,9 @@ public:
         // env (value_version, 1) was set by create_app
         std::string fake_json = R"({")" + env_key + R"(":)" + R"(")" + 
env_value + R"(",)" +
                                 R"("value_version":"1"})" + "\n";
-        ASSERT_EQ(fake_resp.status_code, http_status_code::kOk)
+        ASSERT_EQ(http_status_code::kOk, fake_resp.status_code)
             << get_http_status_message(fake_resp.status_code);
-        ASSERT_EQ(fake_resp.body, fake_json);
+        ASSERT_EQ(fake_json, fake_resp.body);
     }
 
     std::unique_ptr<meta_http_service> _mhs;
@@ -381,12 +381,12 @@ TEST_F(meta_bulk_load_http_test, start_compaction_test)
 
     for (const auto &test : tests) {
         http_response resp = test_start_compaction(test.request_json);
-        ASSERT_EQ(resp.status_code, test.expected_code);
+        ASSERT_EQ(test.expected_code, resp.status_code);
         std::string expected_json = test.expected_response_json;
         if (test.expected_code == http_status_code::kOk) {
             expected_json += "\n";
         }
-        ASSERT_EQ(resp.body, expected_json);
+        ASSERT_EQ(expected_json, resp.body);
     }
 }
 
diff --git a/src/meta/test/meta_mauanl_compaction_test.cpp 
b/src/meta/test/meta_mauanl_compaction_test.cpp
index 3a0184ff7..6853d34e4 100644
--- a/src/meta/test/meta_mauanl_compaction_test.cpp
+++ b/src/meta/test/meta_mauanl_compaction_test.cpp
@@ -165,14 +165,62 @@ TEST_F(meta_app_compaction_test, test_start_compaction)
         int32_t running_count;
         error_code expected_err;
         std::string expected_bottommost;
-    } tests[] = {{"app_not_exist", "false", false, -1, 0, ERR_APP_NOT_EXIST, 
"skip"},
-                 {APP_NAME, "true", false, -1, 0, ERR_OPERATION_DISABLED, 
"skip"},
-                 {APP_NAME, "false", false, -5, 0, ERR_INVALID_PARAMETERS, 
"skip"},
-                 {APP_NAME, "false", false, -1, -1, ERR_INVALID_PARAMETERS, 
"skip"},
-                 {APP_NAME, "false", false, -1, 0, ERR_OK, "skip"},
-                 {APP_NAME, "false", true, -1, 0, ERR_OK, "force"},
-                 {APP_NAME, "false", false, 1, 0, ERR_OK, "skip"},
-                 {APP_NAME, "false", true, -1, 1, ERR_OK, "force"}};
+    } tests[] = {{"app_not_exist",
+                  "false",
+                  false,
+                  -1,
+                  0,
+                  ERR_APP_NOT_EXIST,
+                  
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+                 {APP_NAME,
+                  "true",
+                  false,
+                  -1,
+                  0,
+                  ERR_OPERATION_DISABLED,
+                  
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+                 {APP_NAME,
+                  "false",
+                  false,
+                  -5,
+                  0,
+                  ERR_INVALID_PARAMETERS,
+                  
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+                 {APP_NAME,
+                  "false",
+                  false,
+                  -1,
+                  -1,
+                  ERR_INVALID_PARAMETERS,
+                  
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+                 {APP_NAME,
+                  "false",
+                  false,
+                  -1,
+                  0,
+                  ERR_OK,
+                  
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+                 {APP_NAME,
+                  "false",
+                  true,
+                  -1,
+                  0,
+                  ERR_OK,
+                  
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE},
+                 {APP_NAME,
+                  "false",
+                  false,
+                  1,
+                  0,
+                  ERR_OK,
+                  
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+                 {APP_NAME,
+                  "false",
+                  true,
+                  -1,
+                  1,
+                  ERR_OK,
+                  
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE}};
 
     for (const auto &test : tests) {
         auto err = start_manual_compaction(test.app_name,
diff --git a/src/meta/test/meta_test_base.cpp b/src/meta/test/meta_test_base.cpp
index 4fb188370..965939d56 100644
--- a/src/meta/test/meta_test_base.cpp
+++ b/src/meta/test/meta_test_base.cpp
@@ -199,7 +199,7 @@ void meta_test_base::create_app(const std::string &name, 
uint32_t partition_coun
 
     auto result = fake_create_app(_ss.get(), req);
     fake_wait_rpc(result, resp);
-    ASSERT_EQ(resp.err, ERR_OK) << resp.err << " " << name;
+    ASSERT_EQ(ERR_OK, resp.err) << name;
 
     // wait for the table to create
     ASSERT_TRUE(_ss->spin_wait_staging(30));
diff --git a/src/meta/test/server_state_test.cpp 
b/src/meta/test/server_state_test.cpp
index 0c6295b95..7fdee4588 100644
--- a/src/meta/test/server_state_test.cpp
+++ b/src/meta/test/server_state_test.cpp
@@ -32,6 +32,7 @@
 #include <utility>
 #include <vector>
 
+#include "common/replica_envs.h"
 #include "common/replication.codes.h"
 #include "dsn.layer2_types.h"
 #include "gtest/gtest.h"
@@ -52,24 +53,35 @@ DSN_DECLARE_string(meta_state_service_type);
 namespace dsn {
 namespace replication {
 
-static const std::vector<std::string> keys = 
{"manual_compact.once.trigger_time",
-                                              
"manual_compact.once.target_level",
-                                              
"manual_compact.once.bottommost_level_compaction",
-                                              
"manual_compact.periodic.trigger_time",
-                                              
"manual_compact.periodic.target_level",
-                                              
"manual_compact.periodic.bottommost_level_compaction",
-                                              "rocksdb.usage_scenario",
-                                              
"rocksdb.checkpoint.reserve_min_count",
-                                              
"rocksdb.checkpoint.reserve_time_seconds"};
+static const std::vector<std::string> keys = {
+    dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+    dsn::replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL,
+    dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION,
+    dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
+    dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL,
+    dsn::replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
+    dsn::replica_envs::ROCKSDB_USAGE_SCENARIO,
+    dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT,
+    dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS};
 static const std::vector<std::string> values = {
-    "p1v1", "p1v2", "p1v3", "p2v1", "p2v2", "p2v3", "p3v1", "p3v2", "p3v3"};
-
-static const std::vector<std::string> del_keys = 
{"manual_compact.once.trigger_time",
-                                                  
"manual_compact.periodic.trigger_time",
-                                                  "rocksdb.usage_scenario"};
-static const std::set<std::string> del_keys_set = 
{"manual_compact.once.trigger_time",
-                                                   
"manual_compact.periodic.trigger_time",
-                                                   "rocksdb.usage_scenario"};
+    "1712846598",
+    "6",
+    dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE,
+    "1712846598",
+    "-1",
+    dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP,
+    dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
+    "1",
+    "0"};
+
+static const std::vector<std::string> del_keys = {
+    dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+    dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
+    dsn::replica_envs::ROCKSDB_USAGE_SCENARIO};
+static const std::set<std::string> del_keys_set = {
+    dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+    dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
+    dsn::replica_envs::ROCKSDB_USAGE_SCENARIO};
 
 static const std::string clear_prefix = "rocksdb";
 
@@ -112,8 +124,7 @@ void meta_service_test_app::app_envs_basic_test()
     ss->initialize(svc, apps_root);
 
     ss->_all_apps.emplace(std::make_pair(fake_app->app_id, fake_app));
-    dsn::error_code ec = ss->sync_apps_to_remote_storage();
-    ASSERT_EQ(ec, dsn::ERR_OK);
+    ASSERT_EQ(dsn::ERR_OK, ss->sync_apps_to_remote_storage());
 
     std::cout << "test server_state::set_app_envs()..." << std::endl;
     {
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 3d8b64119..cf95eafeb 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -499,7 +499,7 @@ private:
     void update_throttle_envs(const std::map<std::string, std::string> &envs);
     void update_throttle_env_internal(const std::map<std::string, std::string> 
&envs,
                                       const std::string &key,
-                                      throttling_controller &cntl);
+                                      utils::throttling_controller &cntl);
 
     // update allowed users for access controller
     void update_ac_allowed_users(const std::map<std::string, std::string> 
&envs);
@@ -620,10 +620,12 @@ private:
     bool _inactive_is_transient; // upgrade to P/S is allowed only iff true
     bool _is_initializing;       // when initializing, switching to primary 
need to update ballot
     deny_client _deny_client;    // if deny requests
-    throttling_controller _write_qps_throttling_controller;  // throttling by 
requests-per-second
-    throttling_controller _write_size_throttling_controller; // throttling by 
bytes-per-second
-    throttling_controller _read_qps_throttling_controller;
-    throttling_controller _backup_request_qps_throttling_controller;
+    utils::throttling_controller
+        _write_qps_throttling_controller; // throttling by requests-per-second
+    utils::throttling_controller
+        _write_size_throttling_controller; // throttling by bytes-per-second
+    utils::throttling_controller _read_qps_throttling_controller;
+    utils::throttling_controller _backup_request_qps_throttling_controller;
 
     // duplication
     std::shared_ptr<replica_duplicator_manager> _duplication_mgr;
diff --git a/src/replica/replica_throttle.cpp b/src/replica/replica_throttle.cpp
index ed6bd1cce..1d5ee36ef 100644
--- a/src/replica/replica_throttle.cpp
+++ b/src/replica/replica_throttle.cpp
@@ -42,8 +42,8 @@ namespace replication {
         int64_t delay_ms = 0;                                                  
                    \
         auto type = 
_##op_type##_##throttling_type##_throttling_controller.control(                \
             request->header->client.timeout_ms, request_units, delay_ms);      
                    \
-        if (type != throttling_controller::PASS) {                             
                    \
-            if (type == throttling_controller::DELAY) {                        
                    \
+        if (type != utils::throttling_controller::PASS) {                      
                    \
+            if (type == utils::throttling_controller::DELAY) {                 
                    \
                 tasking::enqueue(                                              
                    \
                     LPC_##op_type##_THROTTLING_DELAY,                          
                    \
                     &_tracker,                                                 
                    \
@@ -51,7 +51,7 @@ namespace replication {
                     get_gpid().thread_hash(),                                  
                    \
                     std::chrono::milliseconds(delay_ms));                      
                    \
                 METRIC_VAR_INCREMENT(throttling_delayed_##op_type##_requests); 
                    \
-            } else { /** type == throttling_controller::REJECT **/             
                    \
+            } else { /** type == utils::throttling_controller::REJECT **/      
                    \
                 if (delay_ms > 0) {                                            
                    \
                     tasking::enqueue(LPC_##op_type##_THROTTLING_DELAY,         
                    \
                                      &_tracker,                                
                    \
@@ -87,15 +87,15 @@ bool replica::throttle_backup_request(message_ex *request)
     int64_t delay_ms = 0;
     auto type = _backup_request_qps_throttling_controller.control(
         request->header->client.timeout_ms, 1, delay_ms);
-    if (type != throttling_controller::PASS) {
-        if (type == throttling_controller::DELAY) {
+    if (type != utils::throttling_controller::PASS) {
+        if (type == utils::throttling_controller::DELAY) {
             tasking::enqueue(LPC_read_THROTTLING_DELAY,
                              &_tracker,
                              [ this, req = message_ptr(request) ]() { 
on_client_read(req, true); },
                              get_gpid().thread_hash(),
                              std::chrono::milliseconds(delay_ms));
             METRIC_VAR_INCREMENT(throttling_delayed_backup_requests);
-        } else { /** type == throttling_controller::REJECT **/
+        } else { /** type == utils::throttling_controller::REJECT **/
             METRIC_VAR_INCREMENT(throttling_rejected_backup_requests);
         }
         return true;
@@ -118,7 +118,7 @@ void replica::update_throttle_envs(const 
std::map<std::string, std::string> &env
 
 void replica::update_throttle_env_internal(const std::map<std::string, 
std::string> &envs,
                                            const std::string &key,
-                                           throttling_controller &cntl)
+                                           utils::throttling_controller &cntl)
 {
     bool throttling_changed = false;
     std::string old_throttling;
diff --git a/src/replica/test/throttling_controller_test.cpp 
b/src/replica/test/throttling_controller_test.cpp
index 1a17df866..120ad0b4f 100644
--- a/src/replica/test/throttling_controller_test.cpp
+++ b/src/replica/test/throttling_controller_test.cpp
@@ -20,14 +20,14 @@
 #include "gtest/gtest.h"
 
 namespace dsn {
-namespace replication {
+namespace utils {
 
 class throttling_controller_test : public ::testing::Test
 {
 public:
     void test_parse_env_basic()
     {
-        throttling_controller cntl;
+        utils::throttling_controller cntl;
         std::string parse_err;
         bool env_changed = false;
         std::string old_value;
@@ -77,7 +77,7 @@ public:
 
     void test_parse_env_multiplier()
     {
-        throttling_controller cntl;
+        utils::throttling_controller cntl;
         std::string parse_err;
         bool env_changed = false;
         std::string old_value;
@@ -85,18 +85,16 @@ public:
         struct test_case_1
         {
             std::string env;
-
             int64_t delay_units;
             int64_t delay_ms;
             int64_t reject_units;
             int64_t reject_ms;
         } test_cases_1[] = {
-            {"20K*delay*100", 5000 + 1, 100, 0, 0},
-            {"20M*delay*100", 5000 * 1000 + 1, 100, 0, 0},
-            {"20M*delay*100,20M*reject*100", 5000 * 1000 + 1, 100, 5000 * 1000 
+ 1, 100},
-
+            {"20K*delay*100", (5 << 10) + 1, 100, 0, 0},
+            {"20M*delay*100", (5 << 20) + 1, 100, 0, 0},
+            {"20M*delay*100,20M*reject*100", (5 << 20) + 1, 100, (5 << 20) + 
1, 100},
             // throttling size exceeds int32_t max value
-            {"80000M*delay*100", int64_t(20) * 1000 * 1000 * 1000 + 1, 100, 0, 
0},
+            {"80000M*delay*100", (20000ULL << 20) + 1, 100, 0, 0},
         };
         for (const auto &tc : test_cases_1) {
             ASSERT_TRUE(cntl.parse_from_env(tc.env, 4, parse_err, env_changed, 
old_value));
@@ -127,5 +125,5 @@ TEST_F(throttling_controller_test, parse_env_basic) { 
test_parse_env_basic(); }
 
 TEST_F(throttling_controller_test, parse_env_multiplier) { 
test_parse_env_multiplier(); }
 
-} // namespace replication
+} // namespace utils
 } // namespace dsn
diff --git a/src/server/pegasus_manual_compact_service.cpp 
b/src/server/pegasus_manual_compact_service.cpp
index 617a7b452..f4d143690 100644
--- a/src/server/pegasus_manual_compact_service.cpp
+++ b/src/server/pegasus_manual_compact_service.cpp
@@ -61,11 +61,6 @@ namespace server {
 
 DEFINE_TASK_CODE(LPC_MANUAL_COMPACT, TASK_PRIORITY_COMMON, THREAD_POOL_COMPACT)
 
-const std::string
-    
pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE("force");
-const std::string
-    
pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip");
-
 
pegasus_manual_compact_service::pegasus_manual_compact_service(pegasus_server_impl
 *app)
     : replica_base(*app),
       _app(app),
@@ -260,9 +255,9 @@ void 
pegasus_manual_compact_service::extract_manual_compact_opts(
     find = envs.find(key_prefix + 
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION);
     if (find != envs.end()) {
         const std::string &argv = find->second;
-        if (argv == MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE) {
+        if (argv == 
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE) {
             options.bottommost_level_compaction = 
rocksdb::BottommostLevelCompaction::kForce;
-        } else if (argv == MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP) {
+        } else if (argv == 
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP) {
             options.bottommost_level_compaction = 
rocksdb::BottommostLevelCompaction::kSkip;
         } else {
             LOG_WARNING_PREFIX(
@@ -270,7 +265,7 @@ void 
pegasus_manual_compact_service::extract_manual_compact_opts(
                 find->first,
                 find->second,
                 // NOTICE associate with options.bottommost_level_compaction's 
default value above
-                MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP);
+                
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP);
         }
     }
 }
diff --git a/src/server/pegasus_manual_compact_service.h 
b/src/server/pegasus_manual_compact_service.h
index 329b10b7b..0f3a8163f 100644
--- a/src/server/pegasus_manual_compact_service.h
+++ b/src/server/pegasus_manual_compact_service.h
@@ -87,9 +87,6 @@ private:
 private:
     FRIEND_TEST(manual_compact_service_test, extract_manual_compact_opts);
 
-    static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
-    static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
-
     pegasus_server_impl *_app;
 #ifdef PEGASUS_UNIT_TEST
     uint64_t _mock_now_timestamp = 0;
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index a12ec788a..35419d8ef 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -148,7 +148,6 @@ const std::chrono::seconds 
pegasus_server_impl::kServerStatUpdateTimeSec = std::
 const std::string ROCKSDB_ENV_RESTORE_FORCE_RESTORE("restore.force_restore");
 const std::string ROCKSDB_ENV_RESTORE_POLICY_NAME("restore.policy_name");
 const std::string ROCKSDB_ENV_RESTORE_BACKUP_ID("restore.backup_id");
-const std::string 
ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS("rocksdb.checkpoint.reserve_time_seconds");
 
 using cf_opts_setter = std::function<bool(const std::string &, 
rocksdb::ColumnFamilyOptions &)>;
 const std::unordered_map<std::string, cf_opts_setter> cf_opts_setters = {
@@ -2754,7 +2753,7 @@ void pegasus_server_impl::update_usage_scenario(const 
std::map<std::string, std:
     // if not specified, default is normal
     auto find = envs.find(dsn::replica_envs::ROCKSDB_USAGE_SCENARIO);
     std::string new_usage_scenario =
-        (find != envs.end() ? find->second : 
meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL);
+        (find != envs.end() ? find->second : 
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL);
     if (new_usage_scenario != _usage_scenario) {
         std::string old_usage_scenario = _usage_scenario;
         if (set_usage_scenario(new_usage_scenario)) {
@@ -2802,7 +2801,7 @@ void pegasus_server_impl::update_checkpoint_reserve(const 
std::map<std::string,
             return;
         }
     }
-    find = envs.find(ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS);
+    find = 
envs.find(dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS);
     if (find != envs.end()) {
         if (!dsn::buf2int32(find->second, time) || time < 0) {
             LOG_ERROR_PREFIX("{}={} is invalid.", find->first, find->second);
@@ -2819,7 +2818,7 @@ void pegasus_server_impl::update_checkpoint_reserve(const 
std::map<std::string,
     }
     if (time != _checkpoint_reserve_time_seconds) {
         LOG_INFO_PREFIX("update app env[{}] from \"{}\" to \"{}\" succeed",
-                        ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS,
+                        
dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS,
                         _checkpoint_reserve_time_seconds,
                         time);
         _checkpoint_reserve_time_seconds = time;
@@ -3064,9 +3063,9 @@ bool pegasus_server_impl::set_usage_scenario(const 
std::string &usage_scenario)
         return false;
     std::string old_usage_scenario = _usage_scenario;
     std::unordered_map<std::string, std::string> new_options;
-    if (usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL ||
-        usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE) 
{
-        if (_usage_scenario == 
meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
+    if (usage_scenario == dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL 
||
+        usage_scenario == 
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE) {
+        if (_usage_scenario == 
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
             // old usage scenario is bulk load, reset first
             new_options["level0_file_num_compaction_trigger"] =
                 
std::to_string(_data_cf_opts.level0_file_num_compaction_trigger);
@@ -3086,12 +3085,12 @@ bool pegasus_server_impl::set_usage_scenario(const 
std::string &usage_scenario)
                 std::to_string(_data_cf_opts.max_write_buffer_number);
         }
 
-        if (usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL) {
+        if (usage_scenario == 
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL) {
             new_options["write_buffer_size"] =
                 
std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size));
             new_options["level0_file_num_compaction_trigger"] =
                 
std::to_string(_data_cf_opts.level0_file_num_compaction_trigger);
-        } else { // meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE
+        } else { // dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE
             uint64_t buffer_size = 
dsn::rand::next_u64(_data_cf_opts.write_buffer_size,
                                                        
_data_cf_opts.write_buffer_size * 2);
             new_options["write_buffer_size"] = std::to_string(buffer_size);
@@ -3099,7 +3098,7 @@ bool pegasus_server_impl::set_usage_scenario(const 
std::string &usage_scenario)
             new_options["level0_file_num_compaction_trigger"] =
                 std::to_string(std::max<uint64_t>(4UL, max_size / 
buffer_size));
         }
-    } else if (usage_scenario == 
meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
+    } else if (usage_scenario == 
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
         // refer to Options::PrepareForBulkLoad()
         new_options["level0_file_num_compaction_trigger"] = "1000000000";
         new_options["level0_slowdown_writes_trigger"] = "1000000000";
@@ -3219,9 +3218,9 @@ void pegasus_server_impl::recalculate_data_cf_options(
         return;
     }
     std::unordered_map<std::string, std::string> new_options;
-    if (meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario ||
-        meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE == 
_usage_scenario) {
-        if (meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario) {
+    if (dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == 
_usage_scenario ||
+        dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE == 
_usage_scenario) {
+        if (dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == 
_usage_scenario) {
             UPDATE_OPTION_IF_NOT_NEARBY(write_buffer_size, 
_data_cf_opts.write_buffer_size);
             UPDATE_OPTION_IF_NEEDED(level0_file_num_compaction_trigger);
         } else {
@@ -3343,11 +3342,12 @@ uint64_t pegasus_server_impl::do_manual_compact(const 
rocksdb::CompactRangeOptio
                     dsn_now_ms() - start_time);
 
     // do compact
-    LOG_INFO_PREFIX(
-        "start CompactRange, target_level = {}, bottommost_level_compaction = 
{}",
-        options.target_level,
-        options.bottommost_level_compaction == 
rocksdb::BottommostLevelCompaction::kForce ? "force"
-                                                                               
           : "skip");
+    LOG_INFO_PREFIX("start CompactRange, target_level = {}, 
bottommost_level_compaction = {}",
+                    options.target_level,
+                    options.bottommost_level_compaction ==
+                            rocksdb::BottommostLevelCompaction::kForce
+                        ? 
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE
+                        : 
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP);
     start_time = dsn_now_ms();
     auto status = _db->CompactRange(options, _data_cf, nullptr, nullptr);
     auto end_time = dsn_now_ms();
diff --git a/src/server/test/manual_compact_service_test.cpp 
b/src/server/test/manual_compact_service_test.cpp
index e6972e784..8ccd761b5 100644
--- a/src/server/test/manual_compact_service_test.cpp
+++ b/src/server/test/manual_compact_service_test.cpp
@@ -260,7 +260,7 @@ TEST_P(manual_compact_service_test, 
extract_manual_compact_opts)
     envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX +
          dsn::replica_envs::MANUAL_COMPACT_TARGET_LEVEL] = "2";
     envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION] =
-        
pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
+        dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
     extract_manual_compact_opts(envs, 
dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX, out);
     ASSERT_EQ(out.target_level, 2);
     ASSERT_EQ(out.bottommost_level_compaction, 
rocksdb::BottommostLevelCompaction::kForce);
@@ -268,7 +268,7 @@ TEST_P(manual_compact_service_test, 
extract_manual_compact_opts)
     envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX +
          dsn::replica_envs::MANUAL_COMPACT_TARGET_LEVEL] = "-1";
     envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION] =
-        
pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
+        dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
     extract_manual_compact_opts(envs, 
dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX, out);
     ASSERT_EQ(out.target_level, -1);
     ASSERT_EQ(out.bottommost_level_compaction, 
rocksdb::BottommostLevelCompaction::kSkip);
diff --git a/src/server/test/pegasus_server_impl_test.cpp 
b/src/server/test/pegasus_server_impl_test.cpp
index 229927d35..1c57922c4 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -28,7 +28,6 @@
 #include <string>
 #include <utility>
 
-#include "base/meta_store.h"
 #include "common/replica_envs.h"
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
@@ -160,17 +159,18 @@ TEST_P(pegasus_server_impl_test, 
test_open_db_with_latest_options)
 {
     // open a new db with no app env.
     ASSERT_EQ(dsn::ERR_OK, start());
-    ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, 
_server->_usage_scenario);
+    ASSERT_EQ(dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, 
_server->_usage_scenario);
     // set bulk_load scenario for the db.
-    
ASSERT_TRUE(_server->set_usage_scenario(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD));
-    ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, 
_server->_usage_scenario);
+    ASSERT_TRUE(
+        
_server->set_usage_scenario(dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD));
+    ASSERT_EQ(dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, 
_server->_usage_scenario);
     rocksdb::Options opts = _server->_db->GetOptions();
     ASSERT_EQ(1000000000, opts.level0_file_num_compaction_trigger);
     ASSERT_EQ(true, opts.disable_auto_compactions);
     // reopen the db.
     ASSERT_EQ(dsn::ERR_OK, _server->stop(false));
     ASSERT_EQ(dsn::ERR_OK, start());
-    ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, 
_server->_usage_scenario);
+    ASSERT_EQ(dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, 
_server->_usage_scenario);
     ASSERT_EQ(opts.level0_file_num_compaction_trigger,
               _server->_db->GetOptions().level0_file_num_compaction_trigger);
     ASSERT_EQ(opts.disable_auto_compactions, 
_server->_db->GetOptions().disable_auto_compactions);
@@ -180,9 +180,9 @@ TEST_P(pegasus_server_impl_test, test_open_db_with_app_envs)
 {
     std::map<std::string, std::string> envs;
     envs[dsn::replica_envs::ROCKSDB_USAGE_SCENARIO] =
-        meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
+        dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
     ASSERT_EQ(dsn::ERR_OK, start(envs));
-    ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, 
_server->_usage_scenario);
+    ASSERT_EQ(dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, 
_server->_usage_scenario);
 }
 
 TEST_P(pegasus_server_impl_test, test_open_db_with_rocksdb_envs)
diff --git a/src/utils/test/token_bucket_throttling_controller_test.cpp 
b/src/utils/test/token_bucket_throttling_controller_test.cpp
index d3812d484..cecb3b8b9 100644
--- a/src/utils/test/token_bucket_throttling_controller_test.cpp
+++ b/src/utils/test/token_bucket_throttling_controller_test.cpp
@@ -15,85 +15,95 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "utils/token_bucket_throttling_controller.h"
-
+#include <fmt/core.h>
+#include <stdint.h>
 #include <unistd.h>
+#include <limits>
+#include <memory>
+#include <string>
 
+#include "gmock/gmock.h"
 #include "gtest/gtest.h"
+#include "utils/TokenBucket.h"
+#include "utils/test_macros.h"
+#include "utils/throttling_controller.h"
+#include "utils/token_bucket_throttling_controller.h"
 
 namespace dsn {
 namespace utils {
 
-#define INVALIDATE_SITUATION_CHECK(env)                                        
                    \
-    do {                                                                       
                    \
-        std::string old_value, parse_err;                                      
                    \
-        bool env_changed_result = false;                                       
                    \
-        ASSERT_FALSE(cntl.parse_from_env(env, 4, parse_err, 
env_changed_result, old_value));       \
-        ASSERT_EQ(env_changed_result, false);                                  
                    \
-        ASSERT_EQ(parse_err, "wrong format, you can set like 20000 or 20K");   
                    \
-        ASSERT_EQ(cntl._enabled, true);                                        
                    \
-        ASSERT_EQ(old_value, old_env);                                         
                    \
-        ASSERT_EQ(cntl._env_value, old_env);                                   
                    \
-    } while (0)
-
-#define VALIDATE_SITUATION_CHECK(                                              
                    \
-    env, partition_count, throttle_size, enabled, env_changed, old_env)        
                    \
-    do {                                                                       
                    \
-        bool env_changed_result = false;                                       
                    \
-        std::string old_value, parse_err;                                      
                    \
-        int32_t partitioned_throttle_size = throttle_size / partition_count;   
                    \
-        ASSERT_TRUE(                                                           
                    \
-            cntl.parse_from_env(env, partition_count, parse_err, 
env_changed_result, old_value));  \
-        ASSERT_EQ(cntl._env_value, env);                                       
                    \
-        ASSERT_EQ(cntl._partition_count, partition_count);                     
                    \
-        ASSERT_EQ(cntl._burstsize, partitioned_throttle_size);                 
                    \
-        ASSERT_EQ(cntl._rate, partitioned_throttle_size);                      
                    \
-        ASSERT_EQ(cntl._enabled, enabled);                                     
                    \
-        ASSERT_EQ(env_changed_result, env_changed);                            
                    \
-        ASSERT_EQ(old_value, old_env);                                         
                    \
-        ASSERT_EQ(parse_err, "");                                              
                    \
-    } while (0)
-
 class token_bucket_throttling_controller_test : public ::testing::Test
 {
 public:
-    void test_parse_env_basic_token_bucket_throttling()
+    void INVALIDATE_SITUATION_CHECK(const std::string &env)
     {
-        token_bucket_throttling_controller cntl;
+        std::string old_value, parse_err;
+        bool env_changed_result = false;
+        ASSERT_FALSE(cntl.parse_from_env(env, 4, parse_err, 
env_changed_result, old_value));
+        ASSERT_EQ(env_changed_result, false);
+        ASSERT_NE(parse_err, "");
+        ASSERT_EQ(cntl._enabled, true);
+        ASSERT_EQ(old_value, old_env);
+        ASSERT_EQ(cntl._env_value, old_env);
+    }
 
+    void VALIDATE_SITUATION_CHECK(const std::string &env,
+                                  int partition_count,
+                                  uint64_t throttle_size,
+                                  bool enabled,
+                                  bool env_changed,
+                                  const std::string &old_env)
+    {
+        bool env_changed_result = false;
+        std::string old_value, parse_err;
+        int32_t partitioned_throttle_size = throttle_size / partition_count;
+        ASSERT_TRUE(
+            cntl.parse_from_env(env, partition_count, parse_err, 
env_changed_result, old_value));
+        ASSERT_EQ(cntl._env_value, env);
+        ASSERT_EQ(cntl._partition_count, partition_count);
+        ASSERT_EQ(cntl._burstsize, partitioned_throttle_size);
+        ASSERT_EQ(cntl._rate, partitioned_throttle_size);
+        ASSERT_EQ(cntl._enabled, enabled);
+        ASSERT_EQ(env_changed_result, env_changed);
+        ASSERT_EQ(old_value, old_env);
+        ASSERT_EQ(parse_err, "");
+    }
+
+    void test_parse_env_basic_token_bucket_throttling()
+    {
         // token_bucket_throttling_controller doesn't support delay only
-        VALIDATE_SITUATION_CHECK("20000*delay*100", 4, 0, false, true, "");
-        VALIDATE_SITUATION_CHECK("200K", 4, 200000, true, true, 
"20000*delay*100");
-        VALIDATE_SITUATION_CHECK("20000*delay*100,20000*reject*100", 4, 20000, 
true, true, "200K");
-        VALIDATE_SITUATION_CHECK("20K*delay*100,20K*reject*100",
-                                 4,
-                                 20000,
-                                 true,
-                                 true,
-                                 "20000*delay*100,20000*reject*100");
-        VALIDATE_SITUATION_CHECK(
-            "20000*reject*100", 4, 20000, true, true, 
"20K*delay*100,20K*reject*100");
+        NO_FATALS(VALIDATE_SITUATION_CHECK("20000*delay*100", 4, 0, false, 
true, ""));
+        NO_FATALS(VALIDATE_SITUATION_CHECK("200K", 4, 200 << 10, true, true, 
"20000*delay*100"));
+        NO_FATALS(VALIDATE_SITUATION_CHECK(
+            "20000*delay*100,20000*reject*100", 4, 20000, true, true, "200K"));
+        NO_FATALS(VALIDATE_SITUATION_CHECK("20K*delay*100,20K*reject*100",
+                                           4,
+                                           20 << 10,
+                                           true,
+                                           true,
+                                           
"20000*delay*100,20000*reject*100"));
+        NO_FATALS(VALIDATE_SITUATION_CHECK(
+            "20000*reject*100", 4, 20000, true, true, 
"20K*delay*100,20K*reject*100"));
 
         // invalid argument]
-        std::string old_env = "20000*reject*100";
-        INVALIDATE_SITUATION_CHECK("0");
-        INVALIDATE_SITUATION_CHECK("*deldday*100");
-        INVALIDATE_SITUATION_CHECK("");
-        INVALIDATE_SITUATION_CHECK("*reject");
-        INVALIDATE_SITUATION_CHECK("*reject*");
-        INVALIDATE_SITUATION_CHECK("reject*");
-        INVALIDATE_SITUATION_CHECK("reject");
-        INVALIDATE_SITUATION_CHECK("200g");
-        INVALIDATE_SITUATION_CHECK("200G");
-        INVALIDATE_SITUATION_CHECK("M");
-        INVALIDATE_SITUATION_CHECK("K");
-        INVALIDATE_SITUATION_CHECK("-1K");
-        INVALIDATE_SITUATION_CHECK("1aK");
-        INVALIDATE_SITUATION_CHECK("pegNo1");
-        INVALIDATE_SITUATION_CHECK("-20");
-        INVALIDATE_SITUATION_CHECK("12KM");
-        INVALIDATE_SITUATION_CHECK("1K2M");
-        INVALIDATE_SITUATION_CHECK("2000K0*reject*100");
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("0"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("*deldday*100"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK(""));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("*reject"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("*reject*"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("reject*"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("reject"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("200g"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("200G"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("M"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("K"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("-1K"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("1aK"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("pegNo1"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("-20"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("12KM"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("1K2M"));
+        NO_FATALS(INVALIDATE_SITUATION_CHECK("2000K0*reject*100"));
     }
 
     void throttle_test()
@@ -155,13 +165,58 @@ public:
         ASSERT_GT(fail_count1, 10000);
         ASSERT_LE(fail_count, fail_count1 * 1.2);
     }
+
+private:
+    token_bucket_throttling_controller cntl;
+
+    static const std::string old_env;
 };
 
+const std::string token_bucket_throttling_controller_test::old_env = 
"20000*reject*100";
+
 TEST_F(token_bucket_throttling_controller_test, 
test_parse_env_basic_token_bucket_throttling)
 {
     test_parse_env_basic_token_bucket_throttling();
 }
 
 TEST_F(token_bucket_throttling_controller_test, throttle_test) { 
throttle_test(); }
+
+TEST_F(token_bucket_throttling_controller_test, parse_unit_test)
+{
+    std::string max_minus_1 = 
std::to_string(std::numeric_limits<uint64_t>::max() - 1);
+    struct parse_unit_test_case
+    {
+        std::string input;
+        bool expected_result;
+        uint64_t expected_units;
+        std::string expected_hint_message_piece;
+    } tests[] = {
+        {"", false, 0, "integer"},
+        {"-", false, 0, "integer"},
+        {"A", false, 0, "integer"},
+        {"M", false, 0, "integer"},
+        {"K", false, 0, "integer"},
+        {"aM", false, 0, "integer"},
+        {"aK", false, 0, "integer"},
+        {fmt::format("{}M", max_minus_1), false, 0, "overflow"},
+        {fmt::format("{}K", max_minus_1), false, 0, "overflow"},
+        {"10M", true, 10 << 20, ""},
+        {"1K", true, 1 << 10, ""},
+        {"100", true, 100, ""},
+    };
+
+    for (const auto &test : tests) {
+        uint64_t actual_units = 0;
+        std::string actual_hint_message;
+        ASSERT_EQ(test.expected_result,
+                  throttling_controller::parse_unit(test.input, actual_units, 
actual_hint_message));
+        ASSERT_EQ(test.expected_units, actual_units);
+        if (test.expected_result) {
+            ASSERT_EQ(actual_hint_message, "");
+        } else {
+            ASSERT_STR_CONTAINS(actual_hint_message, 
test.expected_hint_message_piece);
+        }
+    }
+};
 } // namespace utils
 } // namespace dsn
diff --git a/src/utils/throttling_controller.cpp 
b/src/utils/throttling_controller.cpp
index 2350adb5a..3079b5748 100644
--- a/src/utils/throttling_controller.cpp
+++ b/src/utils/throttling_controller.cpp
@@ -17,6 +17,7 @@
 
 #include "throttling_controller.h"
 
+#include <fmt/core.h>
 // IWYU pragma: no_include <ext/alloc_traits.h>
 #include <algorithm>
 #include <memory>
@@ -27,7 +28,7 @@
 #include "utils/strings.h"
 
 namespace dsn {
-namespace replication {
+namespace utils {
 
 throttling_controller::throttling_controller()
     : _enabled(false),
@@ -41,87 +42,177 @@ throttling_controller::throttling_controller()
 {
 }
 
+bool throttling_controller::parse_unit(std::string arg,
+                                       /*out*/ uint64_t &units,
+                                       /*out*/ std::string &hint_message)
+{
+    hint_message.clear();
+    // Extract multiplier.
+    uint64_t unit_multiplier = 1;
+    if (!arg.empty()) {
+        switch (*arg.rbegin()) {
+        case 'M':
+            unit_multiplier = 1 << 20;
+            break;
+        case 'K':
+            unit_multiplier = 1 << 10;
+            break;
+        default:
+            // Maybe a number, it's valid.
+            break;
+        }
+        // Remove the tail 'M' or 'K'.
+        if (unit_multiplier != 1) {
+            arg.pop_back();
+        }
+    }
+
+    // Parse value.
+    uint64_t tmp;
+    if (!buf2uint64(arg, tmp)) {
+        hint_message = fmt::format("'{}' should be an unsigned integer", arg);
+        return false;
+    }
+
+    // Check overflow.
+    uint64_t result;
+    if (__builtin_mul_overflow(tmp, unit_multiplier, &result)) {
+        hint_message = fmt::format("'{}' result is overflow", arg);
+        return false;
+    }
+    units = tmp * unit_multiplier;
+
+    return true;
+}
+
+throttling_controller::ParseResult
+throttling_controller::parse_from_env(const std::string &arg,
+                                      const std::string &type,
+                                      /*out*/ uint64_t &units,
+                                      /*out*/ uint64_t &delay_ms,
+                                      /*out*/ std::string &hint_message)
+{
+    hint_message.clear();
+    std::vector<std::string> sub_args;
+    utils::split_args(arg.c_str(), sub_args, '*', true);
+    if (sub_args.size() != 3) {
+        hint_message = fmt::format("The field count of '{}' separated by '*' 
must be 3", arg);
+        return ParseResult::kFail;
+    }
+
+    // 1. Check the first part, which must be a positive number, optionally 
followed with 'K' or
+    // 'M'.
+    uint64_t u = 0;
+    if (!parse_unit(sub_args[0], u, hint_message)) {
+        return ParseResult::kFail;
+    }
+
+    // 2. Check the second part, which must be "delay" or "reject"
+    if (sub_args[1] != type) {
+        hint_message = fmt::format("'{}' should be 'delay' or 'reject'", 
sub_args[1]);
+        return ParseResult::kIgnore;
+    }
+
+    // 3. Check the third part, which must be an unsigned integer.
+    uint64_t ms = 0;
+    if (!buf2uint64(sub_args[2], ms)) {
+        hint_message = fmt::format("'{}' should be an unsigned integer", 
sub_args[2]);
+        return ParseResult::kFail;
+    }
+
+    units = u;
+    delay_ms = ms;
+    return ParseResult::kSuccess;
+}
+
 bool throttling_controller::parse_from_env(const std::string &env_value,
-                                           int partition_count,
-                                           std::string &parse_error,
-                                           bool &changed,
-                                           std::string &old_env_value)
+                                           /*out*/ uint64_t &delay_units,
+                                           /*out*/ uint64_t &delay_ms,
+                                           /*out*/ uint64_t &reject_units,
+                                           /*out*/ uint64_t &reject_delay_ms,
+                                           /*out*/ std::string &hint_message)
 {
-    changed = false;
-    if (_enabled && env_value == _env_value && partition_count == 
_partition_count)
-        return true;
+    hint_message.clear();
     std::vector<std::string> sargs;
-    utils::split_args(env_value.c_str(), sargs, ',', true);
+    utils::split_args(env_value.c_str(), sargs, ',');
     if (sargs.empty()) {
-        parse_error = "empty env value";
+        hint_message = "The value shouldn't be empty";
         return false;
     }
+
+    // Example for sarg: 100K*delay*100, 100M*reject*100, etc.
     bool delay_parsed = false;
-    int64_t delay_units = 0;
-    int64_t delay_ms = 0;
     bool reject_parsed = false;
-    int64_t reject_units = 0;
-    int64_t reject_delay_ms = 0;
-    for (std::string &s : sargs) {
-        std::vector<std::string> sargs1;
-        utils::split_args(s.c_str(), sargs1, '*', true);
-        if (sargs1.size() != 3) {
-            parse_error = "invalid field count, should be 3";
+    for (const auto &sarg : sargs) {
+        uint64_t units = 0;
+        uint64_t ms = 0;
+        // Check the "delay" args.
+        auto result = parse_from_env(sarg, "delay", units, ms, hint_message);
+        if (result == ParseResult::kFail) {
             return false;
         }
 
-        int64_t unit_multiplier = 1;
-        if (!sargs1[0].empty()) {
-            if (*sargs1[0].rbegin() == 'M') {
-                unit_multiplier = 1000 * 1000;
-            } else if (*sargs1[0].rbegin() == 'K') {
-                unit_multiplier = 1000;
-            }
-            if (unit_multiplier != 1) {
-                sargs1[0].pop_back();
-            }
-        }
-        int64_t units = 0;
-        if (!buf2int64(sargs1[0], units) || units < 0) {
-            parse_error = "invalid units, should be non-negative int";
-            return false;
-        }
-        units *= unit_multiplier;
-
-        int64_t ms = 0;
-        if (!buf2int64(sargs1[2], ms) || ms < 0) {
-            parse_error = "invalid delay ms, should be non-negative int";
-            return false;
-        }
-        if (sargs1[1] == "delay") {
+        if (result == ParseResult::kSuccess) {
             if (delay_parsed) {
-                parse_error = "duplicate delay config";
+                hint_message = "duplicate 'delay' config";
                 return false;
             }
             delay_parsed = true;
-            delay_units = units / partition_count + 1;
+            delay_units = units;
             delay_ms = ms;
-        } else if (sargs1[1] == "reject") {
+            continue;
+        }
+
+        // Check the "reject" args.
+        result = parse_from_env(sarg, "reject", units, ms, hint_message);
+        if (result == ParseResult::kSuccess) {
             if (reject_parsed) {
-                parse_error = "duplicate reject config";
+                hint_message = "duplicate 'reject' config";
                 return false;
             }
             reject_parsed = true;
-            reject_units = units / partition_count + 1;
+            reject_units = units;
             reject_delay_ms = ms;
-        } else {
-            parse_error = "invalid throttling type";
-            return false;
+            continue;
         }
+
+        if (hint_message.empty()) {
+            hint_message = fmt::format("only 'delay' or 'reject' is 
supported", sarg);
+        }
+        return false;
     }
+    return true;
+}
+
+bool throttling_controller::parse_from_env(const std::string &env_value,
+                                           int partition_count,
+                                           std::string &hint_message,
+                                           bool &changed,
+                                           std::string &old_env_value)
+{
+    hint_message.clear();
+    changed = false;
+    if (_enabled && env_value == _env_value && partition_count == 
_partition_count) {
+        return true;
+    }
+
+    uint64_t delay_units = 0;
+    uint64_t delay_ms = 0;
+    uint64_t reject_units = 0;
+    uint64_t reject_delay_ms = 0;
+    if (!parse_from_env(
+            env_value, delay_units, delay_ms, reject_units, reject_delay_ms, 
hint_message)) {
+        return false;
+    }
+
     changed = true;
     old_env_value = _env_value;
     _enabled = true;
     _env_value = env_value;
     _partition_count = partition_count;
-    _delay_units = delay_units;
+    _delay_units = delay_units == 0 ? 0 : (delay_units / partition_count + 1);
     _delay_ms = delay_ms;
-    _reject_units = reject_units;
+    _reject_units = reject_units == 0 ? 0 : (reject_units / partition_count + 
1);
     _reject_delay_ms = reject_delay_ms;
     return true;
 }
@@ -179,5 +270,5 @@ throttling_controller::throttling_type 
throttling_controller::control(
     return PASS;
 }
 
-} // namespace replication
+} // namespace utils
 } // namespace dsn
diff --git a/src/utils/throttling_controller.h 
b/src/utils/throttling_controller.h
index 40bc85432..f33c06dcf 100644
--- a/src/utils/throttling_controller.h
+++ b/src/utils/throttling_controller.h
@@ -21,8 +21,7 @@
 #include <string>
 
 namespace dsn {
-
-namespace replication {
+namespace utils {
 
 // Used for replica throttling.
 // Different throttling strategies may use different 'request_units', which is
@@ -51,14 +50,35 @@ public:
     // return true if parse succeed.
     // return false if parse failed for the reason of invalid env_value.
     // if return false, the original value will not be changed.
-    // 'parse_error' is set when return false.
+    // 'hint_message' is set when return false.
     // 'changed' is set when return true.
     // 'old_env_value' is set when 'changed' is set to true.
+    enum class ParseResult
+    {
+        kSuccess = 0,
+        kFail,
+        kIgnore
+    };
     bool parse_from_env(const std::string &env_value,
                         int partition_count,
-                        /*out*/ std::string &parse_error,
+                        /*out*/ std::string &hint_message,
                         /*out*/ bool &changed,
                         /*out*/ std::string &old_env_value);
+    static ParseResult parse_from_env(const std::string &arg,
+                                      const std::string &type,
+                                      /*out*/ uint64_t &units,
+                                      /*out*/ uint64_t &delay_ms,
+                                      /*out*/ std::string &hint_message);
+    static bool parse_from_env(const std::string &env_value,
+                               /*out*/ uint64_t &delay_units,
+                               /*out*/ uint64_t &delay_ms,
+                               /*out*/ uint64_t &reject_units,
+                               /*out*/ uint64_t &reject_delay_ms,
+                               /*out*/ std::string &hint_message);
+
+    // Return true if it's in format of '<unsigned integer>[K|M]'
+    static bool
+    parse_unit(std::string arg, /*out*/ uint64_t &units, /*out*/ std::string 
&hint_message);
 
     // reset to no throttling.
     void reset(/*out*/ bool &changed, /*out*/ std::string &old_env_value);
@@ -85,5 +105,5 @@ private:
     int64_t _cur_units;
 };
 
-} // namespace replication
+} // namespace utils
 } // namespace dsn
diff --git a/src/utils/token_bucket_throttling_controller.cpp 
b/src/utils/token_bucket_throttling_controller.cpp
index e1d5df5d7..323c11cff 100644
--- a/src/utils/token_bucket_throttling_controller.cpp
+++ b/src/utils/token_bucket_throttling_controller.cpp
@@ -23,6 +23,7 @@
 #include "string_conv.h"
 #include "utils/fmt_logging.h"
 #include "utils/ports.h"
+#include "utils/throttling_controller.h"
 
 namespace dsn {
 namespace utils {
@@ -85,7 +86,7 @@ bool token_bucket_throttling_controller::parse_from_env(const 
std::string &env_v
         return true;
     }
 
-    int64_t reject_size_value;
+    uint64_t reject_size_value;
     bool enabled;
     if (!transform_env_string(env_value, reject_size_value, enabled, 
parse_error)) {
         return false;
@@ -101,74 +102,67 @@ bool 
token_bucket_throttling_controller::parse_from_env(const std::string &env_v
     return true;
 }
 
-bool token_bucket_throttling_controller::string_to_value(std::string str, 
int64_t &value)
-{
-    int64_t unit_multiplier = 1;
-    if (*str.rbegin() == 'M') {
-        unit_multiplier = 1000 * 1000;
-    } else if (*str.rbegin() == 'K') {
-        unit_multiplier = 1000;
-    }
-    if (unit_multiplier != 1) {
-        str.pop_back();
-    }
-    if (!buf2int64(str, value) || value < 0) {
-        return false;
-    }
-    value *= unit_multiplier;
-    return true;
-}
-
 bool token_bucket_throttling_controller::validate(const std::string &env, 
std::string &hint_message)
 {
-    int64_t temp;
+    uint64_t temp;
     bool temp_bool;
-    bool validated = transform_env_string(env, temp, temp_bool, hint_message);
-    return validated;
+    return transform_env_string(env, temp, temp_bool, hint_message);
 };
 
 bool token_bucket_throttling_controller::transform_env_string(const 
std::string &env,
-                                                              int64_t 
&reject_size_value,
+                                                              uint64_t 
&reject_size_value,
                                                               bool &enabled,
                                                               std::string 
&hint_message)
 {
+    hint_message.clear();
     enabled = true;
 
-    if (buf2int64(env, reject_size_value) && reject_size_value > 0) {
+    // format like "200"
+    if (buf2uint64(env, reject_size_value) && reject_size_value > 0) {
         return true;
     }
 
     // format like "200K"
-    if (string_to_value(env, reject_size_value) && reject_size_value > 0) {
+    if (throttling_controller::parse_unit(env, reject_size_value, 
hint_message) &&
+        reject_size_value > 0) {
         return true;
     }
 
-    // format like "20000*delay*100"
-    if (env.find("delay") != -1 && env.find("reject") == -1) {
-        // rate must > 0 in TokenBucket.h
-        reject_size_value = 1;
-        enabled = false;
-
-        LOG_DEBUG("token_bucket_throttling_controller doesn't support delay 
method, so throttling "
-                  "controller is disabled now");
-        return true;
+    // format like "20000*delay*100", it's not supported.
+    {
+        uint64_t units = 0;
+        uint64_t ms = 0;
+        if (throttling_controller::parse_from_env(env, "delay", units, ms, 
hint_message) ==
+            throttling_controller::ParseResult::kSuccess) {
+            // rate must > 0 in TokenBucket.h
+            reject_size_value = 1;
+            enabled = false;
+
+            LOG_DEBUG(
+                "token_bucket_throttling_controller doesn't support delay 
method, so throttling "
+                "controller is disabled now");
+            return true;
+        }
     }
 
     // format like "20000*delay*100,20000*reject*100"
-    auto comma_index = env.find(",");
-    auto star_index = env.find("*reject", comma_index + 1);
-    if (star_index < 0) {
-        hint_message = "wrong format, you can set like 20000 or 20K";
-        return false;
-    }
-    auto reject_size = env.substr(comma_index + 1, star_index - comma_index - 
1);
-
-    if (string_to_value(reject_size, reject_size_value) && reject_size_value > 
0) {
-        return true;
+    uint64_t reject_units = 0;
+    {
+        uint64_t delay_units = 0;
+        uint64_t delay_ms = 0;
+        uint64_t reject_delay_ms = 0;
+        if (!throttling_controller::parse_from_env(
+                env, delay_units, delay_ms, reject_units, reject_delay_ms, 
hint_message)) {
+            return false;
+        }
+
+        if (reject_units == 0) {
+            hint_message = "reject value should be greater than 0";
+            return false;
+        }
     }
-
-    hint_message = "wrong format, you can set like 20000 or 20K";
-    return false;
+    reject_size_value = reject_units;
+    return true;
 }
 
 } // namespace utils
diff --git a/src/utils/token_bucket_throttling_controller.h 
b/src/utils/token_bucket_throttling_controller.h
index f765e2ab6..a83435ddb 100644
--- a/src/utils/token_bucket_throttling_controller.h
+++ b/src/utils/token_bucket_throttling_controller.h
@@ -85,13 +85,11 @@ public:
                         bool &changed,
                         std::string &old_env_value);
 
-    static bool string_to_value(std::string str, int64_t &value);
-
     // wrapper of transform_env_string, check if the env string is validated.
     static bool validate(const std::string &env, std::string &hint_message);
 
     static bool transform_env_string(const std::string &env,
-                                     int64_t &reject_size_value,
+                                     uint64_t &reject_size_value,
                                      bool &enabled,
                                      std::string &hint_message);
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to