This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dd2b7ab8 refactor(function_test): abstract some functions to reduce 
duplicate code (#1756)
4dd2b7ab8 is described below

commit 4dd2b7ab8d7815dccd9f378b1737616a422b83c0
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Dec 11 17:12:14 2023 +0800

    refactor(function_test): abstract some functions to reduce duplicate code 
(#1756)
---
 .../function_test/backup_restore/CMakeLists.txt    |  20 +-
 .../backup_restore/test_backup_and_restore.cpp     | 151 ++-------
 .../function_test/base_api/integration_test.cpp    |   7 +-
 src/test/function_test/base_api/test_batch_get.cpp |   2 +-
 src/test/function_test/base_api/test_recall.cpp    |   4 +-
 src/test/function_test/base_api/test_scan.cpp      |  25 +-
 src/test/function_test/base_api/test_ttl.cpp       |  33 +-
 .../function_test/bulk_load/test_bulk_load.cpp     |  34 +-
 .../function_test/detect_hotspot/CMakeLists.txt    |  20 +-
 .../detect_hotspot/test_detect_hotspot.cpp         |  41 +--
 .../function_test/partition_split/test_split.cpp   |  71 +++--
 src/test/function_test/recovery/CMakeLists.txt     |  20 +-
 src/test/function_test/recovery/test_recovery.cpp  |   8 +-
 src/test/function_test/restore/CMakeLists.txt      |  21 +-
 src/test/function_test/restore/test_restore.cpp    | 129 ++------
 src/test/function_test/throttle/test_throttle.cpp  | 344 ++++++++-------------
 src/test/function_test/utils/test_util.cpp         |  80 ++++-
 src/test/function_test/utils/test_util.h           |  12 +-
 18 files changed, 398 insertions(+), 624 deletions(-)

diff --git a/src/test/function_test/backup_restore/CMakeLists.txt 
b/src/test/function_test/backup_restore/CMakeLists.txt
index 4b12451ec..41a6c6ed2 100644
--- a/src/test/function_test/backup_restore/CMakeLists.txt
+++ b/src/test/function_test/backup_restore/CMakeLists.txt
@@ -21,16 +21,16 @@ set(MY_PROJ_SRC "")
 set(MY_SRC_SEARCH_MODE "GLOB")
 
 set(MY_PROJ_LIBS
-    dsn_client
-    dsn_replication_common
-    dsn_utils
-    gtest
-    sasl2
-    gssapi_krb5
-    krb5
-    function_test_utils
-    pegasus_client_static
-    )
+        dsn_client
+        dsn_replication_common
+        dsn_utils
+        gtest
+        sasl2
+        gssapi_krb5
+        krb5
+        function_test_utils
+        pegasus_client_static
+        test_utils)
 
 set(MY_BOOST_LIBS Boost::system Boost::filesystem)
 
diff --git a/src/test/function_test/backup_restore/test_backup_and_restore.cpp 
b/src/test/function_test/backup_restore/test_backup_and_restore.cpp
index 12dceb930..80e7497bd 100644
--- a/src/test/function_test/backup_restore/test_backup_and_restore.cpp
+++ b/src/test/function_test/backup_restore/test_backup_and_restore.cpp
@@ -16,8 +16,6 @@
 // under the License.
 
 #include <stdint.h>
-#include <unistd.h>
-#include <iostream>
 #include <memory>
 #include <string>
 #include <vector>
@@ -25,14 +23,12 @@
 #include "backup_types.h"
 #include "base/pegasus_const.h"
 #include "client/replication_ddl_client.h"
-#include "dsn.layer2_types.h"
 #include "gtest/gtest.h"
-#include "include/pegasus/client.h"
-#include "include/pegasus/error.h"
-#include "runtime/rpc/rpc_address.h"
 #include "test/function_test/utils/test_util.h"
+#include "test_util/test_util.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
+#include "utils/test_macros.h"
 #include "utils/utils.h"
 
 using namespace dsn;
@@ -44,135 +40,50 @@ class backup_restore_test : public test_util
 public:
     void TearDown() override
     {
-        ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
+        ASSERT_EQ(ERR_OK, ddl_client_->drop_app(table_name_, 0));
         ASSERT_EQ(ERR_OK, ddl_client_->drop_app(s_new_app_name, 0));
     }
 
-    bool write_data()
+    void wait_backup_complete(int64_t backup_id)
     {
-        for (int i = 0; i < s_num_of_rows; ++i) {
-            int ret = client_->set("hashkey_" + std::to_string(i),
-                                   "sortkey_" + std::to_string(i),
-                                   "value_" + std::to_string(i));
-            if (ret != pegasus::PERR_OK) {
-                std::cout << "write data failed. " << std::endl;
-                return false;
-            }
-        }
-        return true;
-    }
-
-    bool verify_data(const std::string &app_name)
-    {
-        for (int i = 0; i < s_num_of_rows; ++i) {
-            const std::string &expected_value = "value_" + std::to_string(i);
-            std::string value;
-            int ret =
-                client_->get("hashkey_" + std::to_string(i), "sortkey_" + 
std::to_string(i), value);
-            if (ret != pegasus::PERR_OK) {
-                return false;
-            }
-            if (value != expected_value) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    start_backup_app_response start_backup(const std::string 
&user_specified_path = "")
-    {
-        return ddl_client_->backup_app(app_id_, s_provider_type, 
user_specified_path).get_value();
-    }
-
-    query_backup_status_response query_backup(int64_t backup_id)
-    {
-        return ddl_client_->query_backup(app_id_, backup_id).get_value();
-    }
-
-    error_code start_restore(int64_t backup_id, const std::string 
&user_specified_path = "")
-    {
-        return ddl_client_->do_restore(s_provider_type,
-                                       cluster_name_,
-                                       /*policy_name=*/"",
-                                       backup_id,
-                                       app_name_,
-                                       app_id_,
-                                       s_new_app_name,
-                                       /*skip_bad_partition=*/false,
-                                       user_specified_path);
-    }
-
-    bool wait_backup_complete(int64_t backup_id, int max_sleep_seconds)
-    {
-        int sleep_sec = 0;
-        bool is_backup_complete = false;
-        while (!is_backup_complete && sleep_sec <= max_sleep_seconds) {
-            std::cout << "sleep a while to wait backup complete." << std::endl;
-            sleep(s_check_interval_sec);
-            sleep_sec += s_check_interval_sec;
-
-            auto resp = query_backup(backup_id);
-            if (resp.err != ERR_OK) {
-                return false;
-            }
-            // we got only one backup_item for a certain app_id and backup_id.
-            auto item = resp.backup_items[0];
-            is_backup_complete = (item.end_time_ms > 0);
-        }
-        return is_backup_complete;
-    }
-
-    bool wait_app_become_healthy(const std::string &app_name, uint32_t 
max_sleep_seconds)
-    {
-        int sleep_sec = 0;
-        bool is_app_healthy = false;
-        while (!is_app_healthy && sleep_sec <= max_sleep_seconds) {
-            std::cout << "sleep a while to wait app become healthy." << 
std::endl;
-            sleep(s_check_interval_sec);
-            sleep_sec += s_check_interval_sec;
-
-            int32_t partition_count;
-            std::vector<partition_configuration> partitions;
-            auto err = ddl_client_->list_app(app_name, app_id_, 
partition_count, partitions);
-            if (err != ERR_OK) {
-                std::cout << "list app " + app_name + " failed" << std::endl;
-                return false;
-            }
-            int32_t healthy_partition_count = 0;
-            for (const auto &partition : partitions) {
-                if (partition.primary.is_invalid()) {
-                    break;
-                }
-                if (partition.secondaries.size() + 1 < 
partition.max_replica_count) {
-                    break;
-                }
-                healthy_partition_count++;
-            }
-            is_app_healthy = (healthy_partition_count == partition_count);
-        }
-        return is_app_healthy;
+        ASSERT_IN_TIME(
+            [&] {
+                auto resp = ddl_client_->query_backup(table_id_, 
backup_id).get_value();
+                ASSERT_EQ(dsn::ERR_OK, resp.err);
+                ASSERT_FALSE(resp.backup_items.empty());
+                // we got only one backup_item for a certain app_id and 
backup_id.
+                ASSERT_GT(resp.backup_items[0].end_time_ms, 0);
+            },
+            180);
     }
 
     void test_backup_and_restore(const std::string &user_specified_path = "")
     {
-        ASSERT_TRUE(wait_app_become_healthy(app_name_, 180));
+        NO_FATALS(wait_table_healthy(table_name_));
+        NO_FATALS(write_data(s_num_of_rows));
+        NO_FATALS(verify_data(table_name_, s_num_of_rows));
 
-        ASSERT_TRUE(write_data());
-        ASSERT_TRUE(verify_data(app_name_));
-
-        auto resp = start_backup(user_specified_path);
+        auto resp =
+            ddl_client_->backup_app(table_id_, s_provider_type, 
user_specified_path).get_value();
         ASSERT_EQ(ERR_OK, resp.err);
         int64_t backup_id = resp.backup_id;
-        ASSERT_TRUE(wait_backup_complete(backup_id, 180));
-        ASSERT_EQ(ERR_OK, start_restore(backup_id, user_specified_path));
-        ASSERT_TRUE(wait_app_become_healthy(s_new_app_name, 180));
-
-        ASSERT_TRUE(verify_data(s_new_app_name));
+        NO_FATALS(wait_backup_complete(backup_id));
+        ASSERT_EQ(ERR_OK,
+                  ddl_client_->do_restore(s_provider_type,
+                                          cluster_name_,
+                                          /* policy_name */ "",
+                                          backup_id,
+                                          table_name_,
+                                          table_id_,
+                                          s_new_app_name,
+                                          /* skip_bad_partition */ false,
+                                          user_specified_path));
+        NO_FATALS(wait_table_healthy(s_new_app_name));
+        NO_FATALS(verify_data(s_new_app_name, s_num_of_rows));
     }
 
 private:
     static const uint32_t s_num_of_rows = 1000;
-    static const uint8_t s_check_interval_sec = 10;
     static const std::string s_new_app_name;
     static const std::string s_provider_type;
 };
diff --git a/src/test/function_test/base_api/integration_test.cpp 
b/src/test/function_test/base_api/integration_test.cpp
index 4229ae58e..d76ca1e82 100644
--- a/src/test/function_test/base_api/integration_test.cpp
+++ b/src/test/function_test/base_api/integration_test.cpp
@@ -70,7 +70,7 @@ TEST_F(integration_test, write_corrupt_db)
                 // cause timeout.
                 // Force to fetch the latest route table.
                 client_ =
-                    pegasus_client_factory::get_client(cluster_name_.c_str(), 
app_name_.c_str());
+                    pegasus_client_factory::get_client(cluster_name_.c_str(), 
table_name_.c_str());
                 ASSERT_TRUE(client_ != nullptr);
             } else {
                 ASSERT_TRUE(false) << ret;
@@ -86,7 +86,8 @@ TEST_F(integration_test, write_corrupt_db)
                 break;
             }
             ASSERT_EQ(PERR_NOT_FOUND, ret);
-            client_ = 
pegasus_client_factory::get_client(cluster_name_.c_str(), app_name_.c_str());
+            client_ =
+                pegasus_client_factory::get_client(cluster_name_.c_str(), 
table_name_.c_str());
             ASSERT_TRUE(client_ != nullptr);
 
             ret = client_->get(hkey, skey, got_value);
@@ -179,7 +180,7 @@ TEST_F(integration_test, read_corrupt_db)
                 // a new read operation on the primary replica it ever held 
will cause timeout.
                 // Force to fetch the latest route table.
                 client_ =
-                    pegasus_client_factory::get_client(cluster_name_.c_str(), 
app_name_.c_str());
+                    pegasus_client_factory::get_client(cluster_name_.c_str(), 
table_name_.c_str());
                 ASSERT_TRUE(client_ != nullptr);
             } else {
                 ASSERT_TRUE(false) << ret;
diff --git a/src/test/function_test/base_api/test_batch_get.cpp 
b/src/test/function_test/base_api/test_batch_get.cpp
index da688fc5f..1ddda11b2 100644
--- a/src/test/function_test/base_api/test_batch_get.cpp
+++ b/src/test/function_test/base_api/test_batch_get.cpp
@@ -49,7 +49,7 @@ class batch_get : public test_util
 TEST_F(batch_get, set_and_then_batch_get)
 {
     auto rrdb_client =
-        new ::dsn::apps::rrdb_client(cluster_name_.c_str(), meta_list_, 
app_name_.c_str());
+        new ::dsn::apps::rrdb_client(cluster_name_.c_str(), meta_list_, 
table_name_.c_str());
 
     int test_data_count = 100;
     int test_timeout_milliseconds = 3000;
diff --git a/src/test/function_test/base_api/test_recall.cpp 
b/src/test/function_test/base_api/test_recall.cpp
index 24a2c7f9a..2ca118bbf 100644
--- a/src/test/function_test/base_api/test_recall.cpp
+++ b/src/test/function_test/base_api/test_recall.cpp
@@ -72,7 +72,7 @@ TEST_F(drop_and_recall, simple)
     }
 
     // drop the table
-    ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(app_name_, 0));
+    ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(table_name_, 0));
 
     // wait for all elements to be dropped
     for (int i = 0; i < partition_count_; ++i) {
@@ -91,7 +91,7 @@ TEST_F(drop_and_recall, simple)
     }
 
     // then recall table
-    ASSERT_EQ(dsn::ERR_OK, ddl_client_->recall_app(app_id_, ""));
+    ASSERT_EQ(dsn::ERR_OK, ddl_client_->recall_app(table_id_, ""));
 
     // then read all keys
     for (int i = 0; i < kv_count; ++i) {
diff --git a/src/test/function_test/base_api/test_scan.cpp 
b/src/test/function_test/base_api/test_scan.cpp
index 2a8add472..a188f880e 100644
--- a/src/test/function_test/base_api/test_scan.cpp
+++ b/src/test/function_test/base_api/test_scan.cpp
@@ -20,14 +20,12 @@
 #include <string.h>
 #include <time.h>
 #include <atomic>
-#include <chrono>
 #include <cstdint>
 #include <cstdlib>
 #include <iterator>
 #include <map>
 #include <memory>
 #include <string>
-#include <thread>
 #include <utility>
 #include <vector>
 
@@ -36,14 +34,13 @@
 #include "client/replication_ddl_client.h"
 #include "gtest/gtest.h"
 #include "include/pegasus/client.h"
-#include "meta_admin_types.h"
 #include "pegasus/error.h"
 #include "test/function_test/utils/test_util.h"
 #include "test/function_test/utils/utils.h"
 #include "utils/error_code.h"
-#include "utils/errors.h"
 #include "utils/fmt_logging.h"
 #include "utils/synchronize.h"
+#include "utils/test_macros.h"
 
 using namespace ::pegasus;
 
@@ -53,14 +50,14 @@ public:
     void SetUp() override
     {
         test_util::SetUp();
-        ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(app_name_, 0));
-        ASSERT_EQ(dsn::ERR_OK, ddl_client_->create_app(app_name_, "pegasus", 
8, 3, {}, false));
-        client_ = pegasus_client_factory::get_client(cluster_name_.c_str(), 
app_name_.c_str());
+        ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(table_name_, 0));
+        ASSERT_EQ(dsn::ERR_OK, ddl_client_->create_app(table_name_, "pegasus", 
8, 3, {}, false));
+        client_ = pegasus_client_factory::get_client(cluster_name_.c_str(), 
table_name_.c_str());
         ASSERT_TRUE(client_ != nullptr);
         ASSERT_NO_FATAL_FAILURE(fill_database());
     }
 
-    void TearDown() override { ASSERT_EQ(dsn::ERR_OK, 
ddl_client_->drop_app(app_name_, 0)); }
+    void TearDown() override { ASSERT_EQ(dsn::ERR_OK, 
ddl_client_->drop_app(table_name_, 0)); }
 
     // REQUIRED: 'buffer_' has been filled with random chars.
     const std::string random_string() const
@@ -425,12 +422,7 @@ TEST_F(scan_test, REQUEST_EXPIRE_TS)
 TEST_F(scan_test, ITERATION_TIME_LIMIT)
 {
     // update iteration threshold to 1ms
-    auto response = ddl_client_->set_app_envs(
-        client_->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, 
{std::to_string(1)});
-    ASSERT_EQ(true, response.is_ok());
-    ASSERT_EQ(dsn::ERR_OK, response.get_value().err);
-    // wait envs to be synced.
-    std::this_thread::sleep_for(std::chrono::seconds(30));
+    NO_FATALS(update_table_env({ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, 
{std::to_string(1)}));
 
     // write data into table
     int32_t i = 0;
@@ -450,8 +442,5 @@ TEST_F(scan_test, ITERATION_TIME_LIMIT)
     ASSERT_EQ(-1, count);
 
     // set iteration threshold to 100ms
-    response = ddl_client_->set_app_envs(
-        client_->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, 
{std::to_string(100)});
-    ASSERT_TRUE(response.is_ok());
-    ASSERT_EQ(dsn::ERR_OK, response.get_value().err);
+    NO_FATALS(update_table_env({ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, 
{std::to_string(100)}));
 }
diff --git a/src/test/function_test/base_api/test_ttl.cpp 
b/src/test/function_test/base_api/test_ttl.cpp
index d97c13e3a..8c6f2b845 100644
--- a/src/test/function_test/base_api/test_ttl.cpp
+++ b/src/test/function_test/base_api/test_ttl.cpp
@@ -29,11 +29,10 @@
 #include "client/replication_ddl_client.h"
 #include "gtest/gtest.h"
 #include "include/pegasus/client.h"
-#include "meta_admin_types.h"
 #include "pegasus/error.h"
 #include "test/function_test/utils/test_util.h"
 #include "utils/error_code.h"
-#include "utils/errors.h"
+#include "utils/test_macros.h"
 #include "utils/utils.h"
 
 using namespace ::dsn;
@@ -48,7 +47,7 @@ public:
         ASSERT_NO_FATAL_FAILURE(set_default_ttl_secs(0));
     }
 
-    void TearDown() override { ASSERT_EQ(dsn::ERR_OK, 
ddl_client_->drop_app(app_name_, 0)); }
+    void TearDown() override { ASSERT_EQ(dsn::ERR_OK, 
ddl_client_->drop_app(table_name_, 0)); }
 
     void set_default_ttl_secs(int32_t ttl)
     {
@@ -57,13 +56,7 @@ public:
 
         std::string env = envs[TABLE_LEVEL_DEFAULT_TTL];
         if ((env.empty() && ttl != 0) || env != std::to_string(ttl)) {
-            auto response = ddl_client_->set_app_envs(
-                client_->get_app_name(), {TABLE_LEVEL_DEFAULT_TTL}, 
{std::to_string(ttl)});
-            ASSERT_EQ(true, response.is_ok());
-            ASSERT_EQ(ERR_OK, response.get_value().err);
-
-            // wait envs to be synced.
-            
std::this_thread::sleep_for(std::chrono::seconds(sleep_secs_for_envs_effect));
+            NO_FATALS(update_table_env({TABLE_LEVEL_DEFAULT_TTL}, 
{std::to_string(ttl)}));
         }
     }
 
@@ -125,14 +118,8 @@ TEST_F(ttl_test, set_without_default_ttl)
     ASSERT_EQ(ttl_test_value_2, value);
 
     // trigger a manual compaction
-    auto response = ddl_client_->set_app_envs(client_->get_app_name(),
-                                              
{MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY},
-                                              {std::to_string(time(nullptr))});
-    ASSERT_EQ(true, response.is_ok());
-    ASSERT_EQ(ERR_OK, response.get_value().err);
-
-    // wait envs to be synced, and manual lcompaction has been finished.
-    
std::this_thread::sleep_for(std::chrono::seconds(sleep_secs_for_envs_effect));
+    NO_FATALS(
+        update_table_env({MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY}, 
{std::to_string(time(nullptr))}));
 
     // check expired one
     ASSERT_EQ(PERR_NOT_FOUND, client_->ttl(ttl_hash_key, ttl_test_sort_key_1, 
ttl_seconds));
@@ -199,14 +186,8 @@ TEST_F(ttl_test, set_with_default_ttl)
     ASSERT_EQ(ttl_test_value_2, value);
 
     // trigger a manual compaction
-    auto response = ddl_client_->set_app_envs(client_->get_app_name(),
-                                              
{MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY},
-                                              {std::to_string(time(nullptr))});
-    ASSERT_EQ(true, response.is_ok());
-    ASSERT_EQ(ERR_OK, response.get_value().err);
-
-    // wait envs to be synced, and manual compaction has been finished.
-    
std::this_thread::sleep_for(std::chrono::seconds(sleep_secs_for_envs_effect));
+    NO_FATALS(
+        update_table_env({MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY}, 
{std::to_string(time(nullptr))}));
 
     // check forever one
     ASSERT_EQ(PERR_OK, client_->ttl(ttl_hash_key, ttl_test_sort_key_0, 
ttl_seconds));
diff --git a/src/test/function_test/bulk_load/test_bulk_load.cpp 
b/src/test/function_test/bulk_load/test_bulk_load.cpp
index e5457bc1a..661fdc6d3 100644
--- a/src/test/function_test/bulk_load/test_bulk_load.cpp
+++ b/src/test/function_test/bulk_load/test_bulk_load.cpp
@@ -41,7 +41,6 @@
 #include "include/pegasus/client.h"
 #include "include/pegasus/error.h"
 #include "meta/meta_bulk_load_service.h"
-#include "meta_admin_types.h"
 #include "test/function_test/utils/test_util.h"
 #include "utils/blob.h"
 #include "utils/enum_helper.h"
@@ -82,7 +81,7 @@ protected:
     {
         TRICKY_CODE_TO_AVOID_LINK_ERROR;
         bulk_load_local_app_root_ =
-            fmt::format("{}/{}/{}", kLocalBulkLoadRoot, kCluster, app_name_);
+            fmt::format("{}/{}/{}", kLocalBulkLoadRoot, kCluster, table_name_);
     }
 
     void SetUp() override
@@ -93,7 +92,7 @@ protected:
 
     void TearDown() override
     {
-        ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
+        ASSERT_EQ(ERR_OK, ddl_client_->drop_app(table_name_, 0));
         NO_FATALS(run_cmd_from_project_root("rm -rf " + kLocalBulkLoadRoot));
     }
 
@@ -119,7 +118,7 @@ protected:
         ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(bulk_load_info_path, 
fm.md5));
         std::string value = nlohmann::json(fm).dump();
         auto bulk_load_info_meta_path =
-            fmt::format("{}/{}/{}/.bulk_load_info.meta", kLocalBulkLoadRoot, 
kCluster, app_name_);
+            fmt::format("{}/{}/{}/.bulk_load_info.meta", kLocalBulkLoadRoot, 
kCluster, table_name_);
         auto s =
             
rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
                                        rocksdb::Slice(value),
@@ -149,8 +148,8 @@ protected:
 
         // Generate 'bulk_load_info'.
         auto bulk_load_info_path =
-            fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot, 
kCluster, app_name_);
-        NO_FATALS(generate_bulk_load_info(bulk_load_info(app_id_, app_name_, 
partition_count_),
+            fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot, 
kCluster, table_name_);
+        NO_FATALS(generate_bulk_load_info(bulk_load_info(table_id_, 
table_name_, partition_count_),
                                           bulk_load_info_path));
 
         // Generate '.bulk_load_info.meta'.
@@ -160,7 +159,7 @@ protected:
     error_code start_bulk_load(bool ingest_behind = false)
     {
         return ddl_client_
-            ->start_bulk_load(app_name_, kCluster, kProvider, kBulkLoad, 
ingest_behind)
+            ->start_bulk_load(table_name_, kCluster, kProvider, kBulkLoad, 
ingest_behind)
             .get_value()
             .err;
     }
@@ -170,15 +169,6 @@ protected:
         NO_FATALS(run_cmd_from_project_root("rm " + file_path));
     }
 
-    void update_allow_ingest_behind(const string &allow_ingest_behind)
-    {
-        const auto ret = ddl_client_->set_app_envs(
-            app_name_, {ROCKSDB_ALLOW_INGEST_BEHIND}, {allow_ingest_behind});
-        ASSERT_EQ(ERR_OK, ret.get_value().err);
-        std::cout << "sleep 31s to wait app_envs update" << std::endl;
-        std::this_thread::sleep_for(std::chrono::seconds(31));
-    }
-
     bulk_load_status::type wait_bulk_load_finish(int64_t remain_seconds)
     {
         int64_t sleep_time = 5;
@@ -192,7 +182,7 @@ protected:
             std::cout << "sleep " << sleep_time << "s to query bulk status" << 
std::endl;
             std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
 
-            auto resp = ddl_client_->query_bulk_load(app_name_).get_value();
+            auto resp = ddl_client_->query_bulk_load(table_name_).get_value();
             err = resp.err;
             if (err == ERR_OK) {
                 last_status = resp.app_status;
@@ -320,12 +310,12 @@ TEST_F(bulk_load_test, inconsistent_bulk_load_info)
 {
     // Only 'app_id' and 'partition_count' will be checked in Pegasus server, 
so just inject these
     // kind of inconsistencies.
-    bulk_load_info tests[] = {{app_id_ + 1, app_name_, partition_count_},
-                              {app_id_, app_name_, partition_count_ * 2}};
+    bulk_load_info tests[] = {{table_id_ + 1, table_name_, partition_count_},
+                              {table_id_, table_name_, partition_count_ * 2}};
     for (const auto &test : tests) {
         // Generate inconsistent 'bulk_load_info'.
         auto bulk_load_info_path =
-            fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot, 
kCluster, app_name_);
+            fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot, 
kCluster, table_name_);
         NO_FATALS(generate_bulk_load_info(test, bulk_load_info_path));
 
         // Generate '.bulk_load_info.meta'.
@@ -347,7 +337,7 @@ TEST_F(bulk_load_test, missing_p0_bulk_load_metadata)
 // Test bulk load failed because the allow_ingest_behind config is 
inconsistent.
 TEST_F(bulk_load_test, allow_ingest_behind_inconsistent)
 {
-    NO_FATALS(update_allow_ingest_behind("false"));
+    NO_FATALS(update_table_env({ROCKSDB_ALLOW_INGEST_BEHIND}, {"false"}));
     ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load(true));
 }
 
@@ -358,6 +348,6 @@ TEST_F(bulk_load_test, normal) { check_bulk_load(false, 
"oldValue", "valueAfterB
 // load data.
 TEST_F(bulk_load_test, allow_ingest_behind)
 {
-    NO_FATALS(update_allow_ingest_behind("true"));
+    NO_FATALS(update_table_env({ROCKSDB_ALLOW_INGEST_BEHIND}, {"true"}));
     check_bulk_load(true, "oldValue", "valueAfterBulkLoad");
 }
diff --git a/src/test/function_test/detect_hotspot/CMakeLists.txt 
b/src/test/function_test/detect_hotspot/CMakeLists.txt
index 64f5d09d4..8a8070d85 100644
--- a/src/test/function_test/detect_hotspot/CMakeLists.txt
+++ b/src/test/function_test/detect_hotspot/CMakeLists.txt
@@ -28,16 +28,16 @@ set(MY_PROJ_SRC "")
 set(MY_SRC_SEARCH_MODE "GLOB")
 
 set(MY_PROJ_LIBS
-    dsn_client
-    dsn_replication_common
-    dsn_utils
-    pegasus_client_static
-    gtest
-    sasl2
-    gssapi_krb5
-    krb5
-    function_test_utils
-    )
+        dsn_client
+        dsn_replication_common
+        dsn_utils
+        pegasus_client_static
+        gtest
+        sasl2
+        gssapi_krb5
+        krb5
+        function_test_utils
+        test_utils)
 
 set(MY_BOOST_LIBS Boost::system Boost::filesystem)
 
diff --git a/src/test/function_test/detect_hotspot/test_detect_hotspot.cpp 
b/src/test/function_test/detect_hotspot/test_detect_hotspot.cpp
index 785513370..659d37905 100644
--- a/src/test/function_test/detect_hotspot/test_detect_hotspot.cpp
+++ b/src/test/function_test/detect_hotspot/test_detect_hotspot.cpp
@@ -34,6 +34,7 @@
 #include "test/function_test/utils/test_util.h"
 #include "test/function_test/utils/utils.h"
 #include "utils/error_code.h"
+#include "utils/test_macros.h"
 #include "utils/utils.h"
 
 using namespace ::dsn;
@@ -62,7 +63,7 @@ protected:
         TRICKY_CODE_TO_AVOID_LINK_ERROR;
         test_util::SetUp();
 
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
+        NO_FATALS(run_cmd_from_project_root(
             "curl 'localhost:34101/updateConfig?enable_detect_hotkey=true'"));
     }
 
@@ -96,7 +97,7 @@ protected:
         bool find_hotkey = false;
         dsn::replication::detect_hotkey_response resp;
         for (int partition_index = 0; partition_index < partitions_.size(); 
partition_index++) {
-            req.pid = dsn::gpid(app_id_, partition_index);
+            req.pid = dsn::gpid(table_id_, partition_index);
             ASSERT_EQ(dsn::ERR_OK,
                       
ddl_client_->detect_hotkey(partitions_[partition_index].primary, req, resp));
             if (!resp.hotkey_result.empty()) {
@@ -124,7 +125,7 @@ protected:
 
         req.action = dsn::replication::detect_action::QUERY;
         for (int partition_index = 0; partition_index < partitions_.size(); 
partition_index++) {
-            req.pid = dsn::gpid(app_id_, partition_index);
+            req.pid = dsn::gpid(table_id_, partition_index);
             ASSERT_EQ(dsn::ERR_OK,
                       
ddl_client_->detect_hotkey(partitions_[partition_index].primary, req, resp));
             ASSERT_EQ("Can't get hotkey now, now state: 
hotkey_collector_state::STOPPED",
@@ -134,18 +135,18 @@ protected:
 
     void write_hotspot_data()
     {
-        ASSERT_NO_FATAL_FAILURE(
+        NO_FATALS(
             generate_dataset(warmup_second, detection_type::write_data, 
key_type::random_dataset));
-        ASSERT_NO_FATAL_FAILURE(generate_dataset(
+        NO_FATALS(generate_dataset(
             max_detection_second, detection_type::write_data, 
key_type::hotspot_dataset));
-        ASSERT_NO_FATAL_FAILURE(get_result(detection_type::write_data, 
key_type::hotspot_dataset));
+        NO_FATALS(get_result(detection_type::write_data, 
key_type::hotspot_dataset));
     }
 
     void write_random_data()
     {
-        ASSERT_NO_FATAL_FAILURE(generate_dataset(
+        NO_FATALS(generate_dataset(
             max_detection_second, detection_type::write_data, 
key_type::random_dataset));
-        ASSERT_NO_FATAL_FAILURE(get_result(detection_type::write_data, 
key_type::random_dataset));
+        NO_FATALS(get_result(detection_type::write_data, 
key_type::random_dataset));
     }
 
     void capture_until_maxtime()
@@ -154,7 +155,7 @@ protected:
         dsn::replication::detect_hotkey_request req;
         req.type = dsn::replication::hotkey_type::type::WRITE;
         req.action = dsn::replication::detect_action::START;
-        req.pid = dsn::gpid(app_id_, target_partition);
+        req.pid = dsn::gpid(table_id_, target_partition);
 
         dsn::replication::detect_hotkey_response resp;
         ASSERT_EQ(dsn::ERR_OK,
@@ -169,7 +170,7 @@ protected:
 
         // max_detection_second > max_seconds_to_detect_hotkey
         int max_seconds_to_detect_hotkey = 160;
-        ASSERT_NO_FATAL_FAILURE(generate_dataset(
+        NO_FATALS(generate_dataset(
             max_seconds_to_detect_hotkey, detection_type::write_data, 
key_type::random_dataset));
 
         req.action = dsn::replication::detect_action::QUERY;
@@ -181,36 +182,36 @@ protected:
 
     void read_hotspot_data()
     {
-        ASSERT_NO_FATAL_FAILURE(
+        NO_FATALS(
             generate_dataset(warmup_second, detection_type::read_data, 
key_type::hotspot_dataset));
-        ASSERT_NO_FATAL_FAILURE(generate_dataset(
+        NO_FATALS(generate_dataset(
             max_detection_second, detection_type::read_data, 
key_type::hotspot_dataset));
-        ASSERT_NO_FATAL_FAILURE(get_result(detection_type::read_data, 
key_type::hotspot_dataset));
+        NO_FATALS(get_result(detection_type::read_data, 
key_type::hotspot_dataset));
     }
 
     void read_random_data()
     {
-        ASSERT_NO_FATAL_FAILURE(generate_dataset(
+        NO_FATALS(generate_dataset(
             max_detection_second, detection_type::read_data, 
key_type::random_dataset));
-        ASSERT_NO_FATAL_FAILURE(get_result(detection_type::read_data, 
key_type::random_dataset));
+        NO_FATALS(get_result(detection_type::read_data, 
key_type::random_dataset));
     }
 };
 
 TEST_F(detect_hotspot_test, write_hotspot_data_test)
 {
     std::cout << "start testing write hotspot data..." << std::endl;
-    ASSERT_NO_FATAL_FAILURE(write_hotspot_data());
+    NO_FATALS(write_hotspot_data());
     std::cout << "write hotspot data passed....." << std::endl;
     std::cout << "start testing write random data..." << std::endl;
-    ASSERT_NO_FATAL_FAILURE(write_random_data());
+    NO_FATALS(write_random_data());
     std::cout << "write random data passed....." << std::endl;
     std::cout << "start testing max detection time..." << std::endl;
-    ASSERT_NO_FATAL_FAILURE(capture_until_maxtime());
+    NO_FATALS(capture_until_maxtime());
     std::cout << "max detection time passed....." << std::endl;
     std::cout << "start testing read hotspot data..." << std::endl;
-    ASSERT_NO_FATAL_FAILURE(read_hotspot_data());
+    NO_FATALS(read_hotspot_data());
     std::cout << "read hotspot data passed....." << std::endl;
     std::cout << "start testing read random data..." << std::endl;
-    ASSERT_NO_FATAL_FAILURE(read_random_data());
+    NO_FATALS(read_random_data());
     std::cout << "read random data passed....." << std::endl;
 }
diff --git a/src/test/function_test/partition_split/test_split.cpp 
b/src/test/function_test/partition_split/test_split.cpp
index 23c7329a5..42942121b 100644
--- a/src/test/function_test/partition_split/test_split.cpp
+++ b/src/test/function_test/partition_split/test_split.cpp
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <fmt/core.h>
 #include <stdint.h>
+#include <stdio.h>
 #include <stdlib.h>
 #include <chrono>
 #include <iostream>
@@ -38,6 +40,7 @@
 #include "test_util/test_util.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
+#include "utils/test_macros.h"
 
 using namespace dsn;
 using namespace dsn::replication;
@@ -50,30 +53,30 @@ public:
     {
         TRICKY_CODE_TO_AVOID_LINK_ERROR;
         static int32_t test_case = 0;
-        app_name_ = table_name_prefix + std::to_string(test_case++);
+        table_name_ = table_name_prefix + std::to_string(test_case++);
     }
 
     void SetUp() override
     {
         test_util::SetUp();
-        ASSERT_NO_FATAL_FAILURE(write_data_before_split());
+        NO_FATALS(write_data_before_split());
         ASSERT_EQ(
             ERR_OK,
-            ddl_client_->start_partition_split(app_name_, partition_count_ * 
2).get_value().err);
+            ddl_client_->start_partition_split(table_name_, partition_count_ * 
2).get_value().err);
     }
 
-    void TearDown() override { ASSERT_EQ(ERR_OK, 
ddl_client_->drop_app(app_name_, 0)); }
+    void TearDown() override { ASSERT_EQ(ERR_OK, 
ddl_client_->drop_app(table_name_, 0)); }
 
     bool is_split_finished()
     {
-        auto err_resp = ddl_client_->query_partition_split(app_name_);
+        auto err_resp = ddl_client_->query_partition_split(table_name_);
         auto status_map = err_resp.get_value().status;
         return err_resp.get_value().err == ERR_INVALID_STATE;
     }
 
     bool check_partition_split_status(int32_t target_pidx, split_status::type 
target_status)
     {
-        auto err_resp = ddl_client_->query_partition_split(app_name_);
+        auto err_resp = ddl_client_->query_partition_split(table_name_);
         auto status_map = err_resp.get_value().status;
         // is_single_partition
         if (target_pidx > 0) {
@@ -95,8 +98,8 @@ public:
                                        int32_t parent_pidx,
                                        int32_t old_partition_count = 0)
     {
-        auto err_resp =
-            ddl_client_->control_partition_split(app_name_, type, parent_pidx, 
old_partition_count);
+        auto err_resp = ddl_client_->control_partition_split(
+            table_name_, type, parent_pidx, old_partition_count);
         return err_resp.get_value().err;
     }
 
@@ -144,9 +147,8 @@ public:
     {
         std::cout << "Verify data(count=" << dataset_count + 
count_during_split_
                   << ") after partition split......" << std::endl;
-        ASSERT_NO_FATAL_FAILURE(
-            verify_data(dataset_hashkey_prefix, dataset_sortkey_prefix, 
dataset_count));
-        ASSERT_NO_FATAL_FAILURE(
+        NO_FATALS(verify_data(dataset_hashkey_prefix, dataset_sortkey_prefix, 
dataset_count));
+        NO_FATALS(
             verify_data(splitting_hashkey_prefix, splitting_sortkey_prefix, 
count_during_split_));
     }
 
@@ -230,13 +232,13 @@ TEST_F(partition_split_test, split_with_write)
         [&] {
             // write data during partition split
             // TODO(yingchun): better to write data background continuously
-            ASSERT_NO_FATAL_FAILURE(write_data_during_split());
+            NO_FATALS(write_data_during_split());
             ASSERT_TRUE(is_split_finished());
         },
         300);
 
     std::cout << "Partition split succeed" << std::endl;
-    ASSERT_NO_FATAL_FAILURE(verify_data_after_split());
+    NO_FATALS(verify_data_after_split());
 }
 
 TEST_F(partition_split_test, split_with_read)
@@ -245,13 +247,13 @@ TEST_F(partition_split_test, split_with_read)
         [&] {
             // read data during partition split
             // TODO(yingchun): better to read data background continuously
-            ASSERT_NO_FATAL_FAILURE(read_data_during_split());
+            NO_FATALS(read_data_during_split());
             ASSERT_TRUE(is_split_finished());
         },
         300);
 
     std::cout << "Partition split succeed" << std::endl;
-    ASSERT_NO_FATAL_FAILURE(verify_data_after_split());
+    NO_FATALS(verify_data_after_split());
 }
 
 TEST_F(partition_split_test, split_with_scan)
@@ -260,15 +262,15 @@ TEST_F(partition_split_test, split_with_scan)
     ASSERT_IN_TIME_WITH_FIXED_INTERVAL(
         [&] {
             // TODO(yingchun): better to scan data background continuously
-            ASSERT_NO_FATAL_FAILURE(hash_scan_during_split(count++));
+            NO_FATALS(hash_scan_during_split(count++));
             ASSERT_TRUE(is_split_finished());
         },
         300);
 
-    std::cout << "Partition split succeed" << std::endl;
-    ASSERT_NO_FATAL_FAILURE(verify_data_after_split());
+    fmt::print(stdout, "Partition split succeed\n");
+    NO_FATALS(verify_data_after_split());
     std::this_thread::sleep_for(std::chrono::seconds(30));
-    ASSERT_NO_FATAL_FAILURE(full_scan_after_split());
+    NO_FATALS(full_scan_after_split());
 }
 
 TEST_F(partition_split_test, pause_split)
@@ -277,13 +279,15 @@ TEST_F(partition_split_test, pause_split)
     const int32_t target_partition = 2, count = 30;
     ASSERT_IN_TIME_WITH_FIXED_INTERVAL(
         [&] {
-            ASSERT_NO_FATAL_FAILURE(write_data_during_split());
+            NO_FATALS(write_data_during_split());
             // pause target partition split
             if (!already_pause && check_partition_split_status(-1, 
split_status::SPLITTING)) {
                 ASSERT_EQ(ERR_OK,
                           control_partition_split(split_control_type::PAUSE, 
target_partition));
-                std::cout << "Table(" << app_name_ << ") pause partition[" << 
target_partition
-                          << "] split succeed" << std::endl;
+                fmt::print(stdout,
+                           "Table({}) pause partition[{}] split succeed\n",
+                           table_name_,
+                           target_partition);
                 already_pause = true;
             }
             // restart target partition split
@@ -291,16 +295,18 @@ TEST_F(partition_split_test, pause_split)
                 check_partition_split_status(target_partition, 
split_status::PAUSED)) {
                 ASSERT_EQ(ERR_OK,
                           control_partition_split(split_control_type::RESTART, 
target_partition));
-                std::cout << "Table(" << app_name_ << ") restart split 
partition["
-                          << target_partition << "] succeed" << std::endl;
+                fmt::print(stdout,
+                           "Table({}) restart split partition[{}] succeed\n",
+                           table_name_,
+                           target_partition);
                 already_restart = true;
             }
             ASSERT_TRUE(is_split_finished());
         },
         300);
 
-    std::cout << "Partition split succeed" << std::endl;
-    ASSERT_NO_FATAL_FAILURE(verify_data_after_split());
+    fmt::print(stdout, "Partition split succeed\n");
+    NO_FATALS(verify_data_after_split());
 }
 
 TEST_F(partition_split_test, cancel_split)
@@ -309,12 +315,11 @@ TEST_F(partition_split_test, cancel_split)
     bool already_pause = false;
     ASSERT_IN_TIME_WITH_FIXED_INTERVAL(
         [&] {
-            ASSERT_NO_FATAL_FAILURE(write_data_during_split());
+            NO_FATALS(write_data_during_split());
             // pause all partition split
             if (!already_pause && check_partition_split_status(-1, 
split_status::SPLITTING)) {
                 ASSERT_EQ(ERR_OK, 
control_partition_split(split_control_type::PAUSE, -1));
-                std::cout << "Table(" << app_name_ << ") pause all partitions 
split succeed"
-                          << std::endl;
+                fmt::print(stdout, "Table({}) pause all partitions split 
succeed\n", table_name_);
                 already_pause = true;
             }
             ASSERT_TRUE(check_partition_split_status(-1, 
split_status::PAUSED));
@@ -323,15 +328,15 @@ TEST_F(partition_split_test, cancel_split)
 
     // cancel partition split
     ASSERT_EQ(ERR_OK, control_partition_split(split_control_type::CANCEL, -1, 
partition_count_));
-    std::cout << "Table(" << app_name_ << ") cancel partitions split succeed" 
<< std::endl;
+    fmt::print(stdout, "Table({}) cancel partitions split succeed\n", 
table_name_);
     // write data during cancel partition split
     ASSERT_IN_TIME_WITH_FIXED_INTERVAL(
         [&] {
-            ASSERT_NO_FATAL_FAILURE(write_data_during_split());
+            NO_FATALS(write_data_during_split());
             ASSERT_TRUE(is_split_finished());
         },
         300);
 
-    std::cout << "Partition split succeed" << std::endl;
-    ASSERT_NO_FATAL_FAILURE(verify_data_after_split());
+    fmt::print(stdout, "Partition split succeed\n");
+    NO_FATALS(verify_data_after_split());
 }
diff --git a/src/test/function_test/recovery/CMakeLists.txt 
b/src/test/function_test/recovery/CMakeLists.txt
index c83e94324..a13d12734 100644
--- a/src/test/function_test/recovery/CMakeLists.txt
+++ b/src/test/function_test/recovery/CMakeLists.txt
@@ -28,16 +28,16 @@ set(MY_PROJ_SRC "")
 set(MY_SRC_SEARCH_MODE "GLOB")
 
 set(MY_PROJ_LIBS
-    dsn_client
-    dsn_replication_common
-    dsn_utils
-    pegasus_client_static
-    gtest
-    sasl2
-    gssapi_krb5
-    krb5
-    function_test_utils
-    )
+        dsn_client
+        dsn_replication_common
+        dsn_utils
+        pegasus_client_static
+        gtest
+        sasl2
+        gssapi_krb5
+        krb5
+        function_test_utils
+        test_utils)
 
 set(MY_BOOST_LIBS Boost::system Boost::filesystem)
 
diff --git a/src/test/function_test/recovery/test_recovery.cpp 
b/src/test/function_test/recovery/test_recovery.cpp
index 7b406254b..a2f59e709 100644
--- a/src/test/function_test/recovery/test_recovery.cpp
+++ b/src/test/function_test/recovery/test_recovery.cpp
@@ -191,7 +191,7 @@ TEST_F(recovery_test, recovery)
 
         // then wait the apps to ready
         ASSERT_EQ(dsn::ERR_OK,
-                  ddl_client_->create_app(app_name_, "pegasus", 
partition_count_, 3, {}, false));
+                  ddl_client_->create_app(table_name_, "pegasus", 
partition_count_, 3, {}, false));
 
         ASSERT_NO_FATAL_FAILURE(verify_data(dataset_count));
     }
@@ -216,7 +216,7 @@ TEST_F(recovery_test, recovery)
 
         // then wait the app to ready
         ASSERT_EQ(dsn::ERR_OK,
-                  ddl_client_->create_app(app_name_, "pegasus", 
partition_count_, 3, {}, false));
+                  ddl_client_->create_app(table_name_, "pegasus", 
partition_count_, 3, {}, false));
 
         ASSERT_NO_FATAL_FAILURE(verify_data(dataset_count));
     }
@@ -247,7 +247,7 @@ TEST_F(recovery_test, recovery)
 
         // then wait the apps to ready
         ASSERT_EQ(dsn::ERR_OK,
-                  ddl_client_->create_app(app_name_, "pegasus", 
partition_count_, 3, {}, false));
+                  ddl_client_->create_app(table_name_, "pegasus", 
partition_count_, 3, {}, false));
 
         ASSERT_NO_FATAL_FAILURE(verify_data(dataset_count));
     }
@@ -277,7 +277,7 @@ TEST_F(recovery_test, recovery)
 
         // then wait the apps to ready
         ASSERT_EQ(dsn::ERR_OK,
-                  ddl_client_->create_app(app_name_, "pegasus", 
partition_count_, 3, {}, false));
+                  ddl_client_->create_app(table_name_, "pegasus", 
partition_count_, 3, {}, false));
 
         ASSERT_NO_FATAL_FAILURE(verify_data(dataset_count));
     }
diff --git a/src/test/function_test/restore/CMakeLists.txt 
b/src/test/function_test/restore/CMakeLists.txt
index f30f90185..18b0ad444 100644
--- a/src/test/function_test/restore/CMakeLists.txt
+++ b/src/test/function_test/restore/CMakeLists.txt
@@ -28,17 +28,16 @@ set(MY_PROJ_SRC "")
 set(MY_SRC_SEARCH_MODE "GLOB")
 
 set(MY_PROJ_LIBS
-    dsn_client
-    dsn_replication_common
-    dsn_utils
-    pegasus_client_static
-    gtest
-    sasl2
-    gssapi_krb5
-    krb5
-    function_test_utils
-    test_utils
-    )
+        dsn_client
+        dsn_replication_common
+        dsn_utils
+        gtest
+        sasl2
+        gssapi_krb5
+        krb5
+        function_test_utils
+        pegasus_client_static
+        test_utils)
 
 set(MY_BOOST_LIBS Boost::system Boost::filesystem)
 
diff --git a/src/test/function_test/restore/test_restore.cpp 
b/src/test/function_test/restore/test_restore.cpp
index a2350a408..1862373cb 100644
--- a/src/test/function_test/restore/test_restore.cpp
+++ b/src/test/function_test/restore/test_restore.cpp
@@ -19,6 +19,8 @@
 
 #include <boost/cstdint.hpp>
 #include <boost/lexical_cast.hpp>
+#include <fmt/core.h>
+#include <stdio.h>
 #include <unistd.h>
 #include <algorithm>
 #include <chrono>
@@ -33,12 +35,7 @@
 #include "client/partition_resolver.h"
 #include "client/replication_ddl_client.h"
 #include "common/gpid.h"
-#include "dsn.layer2_types.h"
 #include "gtest/gtest.h"
-#include "include/pegasus/client.h"
-#include "pegasus/error.h"
-#include "runtime/api_layer1.h"
-#include "runtime/rpc/rpc_address.h"
 #include "test/function_test/utils/global_env.h"
 #include "test/function_test/utils/test_util.h"
 #include "test_util/test_util.h"
@@ -46,6 +43,7 @@
 #include "utils/filesystem.h"
 #include "utils/fmt_logging.h"
 #include "utils/process_utils.h"
+#include "utils/test_macros.h"
 
 using namespace ::dsn;
 using namespace ::dsn::replication;
@@ -63,9 +61,9 @@ public:
                      dsn::utils::filesystem::path_combine(cluster_name_, 
policy_name);
         backup_dir = "onebox/" + provider_dir + '/' + cluster_name_;
 
-        write_data();
+        NO_FATALS(write_data(kv_pair_cnt));
 
-        std::vector<int32_t> app_ids({app_id_});
+        std::vector<int32_t> app_ids({table_id_});
         auto err = ddl_client_->add_backup_policy(policy_name,
                                                   backup_provider_name,
                                                   app_ids,
@@ -76,51 +74,7 @@ public:
         ASSERT_EQ(err, ERR_OK);
     }
 
-    void TearDown() override { ASSERT_EQ(ERR_OK, 
ddl_client_->drop_app(app_name_, 0)); }
-
-    void write_data()
-    {
-        std::cout << "start to write " << kv_pair_cnt << " key-value pairs, 
using set().."
-                  << std::endl;
-        int64_t start = dsn_now_ms();
-        int err = PERR_OK;
-        ASSERT_NE(client_, nullptr);
-        for (int i = 1; i <= kv_pair_cnt; i++) {
-            std::string index = std::to_string(i);
-            std::string h_key = hash_key_prefix + "_" + index;
-            std::string s_key = sort_key_prefix + "_" + index;
-            std::string value = value_prefix + "_" + index;
-            err = client_->set(h_key, s_key, value);
-            ASSERT_EQ(err, PERR_OK);
-        }
-        int64_t end = dsn_now_ms();
-        int64_t ts = (end - start) / 1000;
-        std::cout << "write data complete, total time = " << ts << "s" << 
std::endl;
-    }
-
-    void verify_data()
-    {
-        std::cout << "start to get " << kv_pair_cnt << " key-value pairs, 
using get()..."
-                  << std::endl;
-        pegasus_client *new_pg_client = 
pegasus::pegasus_client_factory::get_client(
-            cluster_name_.c_str(), new_app_name.c_str());
-        ASSERT_NE(nullptr, new_pg_client);
-
-        int64_t start = dsn_now_ms();
-        for (int i = 1; i <= kv_pair_cnt; i++) {
-            std::string index = std::to_string(i);
-            std::string h_key = hash_key_prefix + "_" + index;
-            std::string s_key = sort_key_prefix + "_" + index;
-            std::string value = value_prefix + "_" + index;
-            std::string value_new;
-            ASSERT_EQ(PERR_OK, new_pg_client->get(h_key, s_key, value_new)) << 
h_key << " : "
-                                                                            << 
s_key;
-            ASSERT_EQ(value, value_new);
-        }
-        int64_t end = dsn_now_ms();
-        int64_t ts = (end - start) / 1000;
-        std::cout << "verify data complete, total time = " << ts << "s" << 
std::endl;
-    }
+    void TearDown() override { ASSERT_EQ(ERR_OK, 
ddl_client_->drop_app(table_name_, 0)); }
 
     void restore()
     {
@@ -128,57 +82,28 @@ public:
         ASSERT_EQ(ERR_OK,
                   ddl_client_->do_restore(backup_provider_name,
                                           cluster_name_,
-                                          /*old_policy_name=*/"",
+                                          /* policy_name */ "",
                                           time_stamp,
-                                          app_name_,
-                                          app_id_,
+                                          table_name_,
+                                          table_id_,
                                           new_app_name,
                                           false));
-        ASSERT_NO_FATAL_FAILURE(wait_app_healthy());
+        NO_FATALS(wait_table_healthy(new_app_name));
     }
 
-    void wait_app_healthy()
+    void wait_backup_complete()
     {
         ASSERT_IN_TIME(
             [&] {
-                int32_t app_id = 0;
-                int32_t partition_cnt = 0;
-                std::vector<partition_configuration> p_confs;
-                ASSERT_EQ(ERR_OK,
-                          ddl_client_->list_app(new_app_name, app_id, 
partition_cnt, p_confs));
-                for (int i = 0; i < p_confs.size(); i++) {
-                    const auto &pc = p_confs[i];
-                    ASSERT_FALSE(pc.primary.is_invalid());
-                    ASSERT_EQ(1 + pc.secondaries.size(), pc.max_replica_count);
-                }
+                time_stamp = get_first_backup_timestamp();
+                fmt::print(stdout, "first backup_timestamp = {}", time_stamp);
+
+                auto backup_info = fmt::format("{}/{}/backup_info", 
backup_dir, time_stamp);
+                ASSERT_TRUE(dsn::utils::filesystem::file_exists(backup_info));
             },
             180);
     }
 
-    bool wait_backup_complete(int64_t seconds)
-    {
-        // wait backup the first backup complete at most (seconds)second
-        int64_t sleep_time = 0;
-        bool is_backup_complete = false;
-        while (seconds > 0 && !is_backup_complete) {
-            sleep_time = 0;
-            if (seconds >= 3) {
-                sleep_time = 3;
-            } else {
-                sleep_time = seconds;
-            }
-            seconds -= sleep_time;
-            std::cout << "sleep " << sleep_time << "s to wait backup 
complete..." << std::endl;
-            std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
-
-            time_stamp = get_first_backup_timestamp();
-            std::cout << "first backup_timestamp = " << time_stamp << 
std::endl;
-
-            is_backup_complete = is_app_info_backup_complete();
-        }
-        return is_backup_complete;
-    }
-
     int64_t get_first_backup_timestamp()
     {
         std::string pegasus_root_dir = global_env::instance()._pegasus_root;
@@ -199,26 +124,12 @@ public:
         }
         result = result.substr(0, index);
         if (!result.empty()) {
-            auto res = boost::lexical_cast<int64_t>(result);
-            return res;
+            return boost::lexical_cast<int64_t>(result);
         } else {
             return 0;
         }
     }
 
-    bool find_second_backup_timestamp()
-    {
-        std::vector<std::string> dirs;
-        ::dsn::utils::filesystem::get_subdirectories(policy_dir, dirs, false);
-        return (dirs.size() >= 2);
-    }
-
-    bool is_app_info_backup_complete()
-    {
-        std::string backup_info = backup_dir + "/" + 
std::to_string(time_stamp) + "/backup_info";
-        return dsn::utils::filesystem::file_exists(backup_info);
-    }
-
 public:
     std::string policy_dir;
     std::string backup_dir;
@@ -250,10 +161,10 @@ TEST_F(restore_test, restore)
 {
     std::cout << "start testing restore..." << std::endl;
     // step1: wait backup complete
-    ASSERT_TRUE(wait_backup_complete(180));
+    NO_FATALS(wait_backup_complete());
     // step2: test restore
-    ASSERT_NO_FATAL_FAILURE(restore());
+    NO_FATALS(restore());
     // step3: verify_data
-    ASSERT_NO_FATAL_FAILURE(verify_data());
+    NO_FATALS(verify_data(new_app_name, kv_pair_cnt));
     std::cout << "restore passed....." << std::endl;
 }
diff --git a/src/test/function_test/throttle/test_throttle.cpp 
b/src/test/function_test/throttle/test_throttle.cpp
index e30ef726a..d7e5f61b5 100644
--- a/src/test/function_test/throttle/test_throttle.cpp
+++ b/src/test/function_test/throttle/test_throttle.cpp
@@ -19,9 +19,7 @@
 
 #include <fmt/core.h>
 #include <stdint.h>
-#include <unistd.h>
 #include <atomic>
-#include <iostream>
 #include <map>
 #include <memory>
 #include <string>
@@ -39,10 +37,10 @@
 #include "test/function_test/utils/utils.h"
 #include "test_util/test_util.h"
 #include "utils/error_code.h"
-#include "utils/errors.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/rand.h"
+#include "utils/test_macros.h"
 
 using namespace dsn;
 using namespace dsn::replication;
@@ -155,18 +153,18 @@ public:
             keys.emplace_back("replica.write_throttling_by_size");
             values.emplace_back(fmt::format("{}*reject*200", value));
         }
-        ASSERT_EQ(ERR_OK, ddl_client_->set_app_envs(app_name_, keys, 
values).get_error().code());
+        NO_FATALS(update_table_env(keys, values));
     }
 
     void restore_throttle()
     {
         std::map<string, string> envs;
-        ASSERT_EQ(ERR_OK, ddl_client_->get_app_envs(app_name_, envs));
+        ASSERT_EQ(ERR_OK, ddl_client_->get_app_envs(table_name_, envs));
         std::vector<string> keys;
         for (const auto &env : envs) {
             keys.emplace_back(env.first);
         }
-        ASSERT_EQ(ERR_OK, ddl_client_->del_app_envs(app_name_, keys));
+        ASSERT_EQ(ERR_OK, ddl_client_->del_app_envs(table_name_, keys));
     }
 
     void start_test(const throttle_test_plan &test_plan)
@@ -329,42 +327,34 @@ TEST_F(throttle_test, test)
     throttle_test_plan plan;
 
     plan = {"set test / throttle by size / normal value size", 
operation_type::set, 1024, 1, 50};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::write_by_size,
-                                         plan.limit_qps * plan.single_value_sz 
* plan.multi_count));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_size,
+                           plan.limit_qps * plan.single_value_sz * 
plan.multi_count));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 15);
 
     plan = {"set test / throttle by qps / normal value size", 
operation_type::set, 1024, 1, 50};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::write_by_qps, 
plan.limit_qps));
-    std::cout << "wait 30s for setting env " << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_qps, plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 15);
 
     plan = {"get test / throttle by size / normal value size", 
operation_type::get, 1024, 1, 50};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::read_by_size,
-                                         plan.limit_qps * plan.single_value_sz 
* plan.multi_count));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_size,
+                           plan.limit_qps * plan.single_value_sz * 
plan.multi_count));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     auto actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                         plan.multi_count * plan.limit_qps;
     result.finalize();
     ASSERT_NEAR(result.total_size_per_sec, actual_value, actual_value * 0.3);
 
     plan = {"get test / throttle by qps", operation_type::get, 1024, 1, 50};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::read_by_qps, 
plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_qps, plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 15);
 
@@ -373,14 +363,11 @@ TEST_F(throttle_test, test)
             1024,
             50,
             50};
-    ASSERT_NO_FATAL_FAILURE(
-        set_throttle(throttle_type::read_by_size,
-                     (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
-                         plan.multi_count * plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_size,
+                           (uint64_t)(plan.single_value_sz + test_hashkey_len 
+ test_sortkey_len) *
+                               plan.multi_count * plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
@@ -391,39 +378,32 @@ TEST_F(throttle_test, test)
             1024,
             50,
             50};
-    ASSERT_NO_FATAL_FAILURE(
-        set_throttle(throttle_type::write_by_size,
-                     (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
-                         plan.multi_count * plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_size,
+                           (uint64_t)(plan.single_value_sz + test_hashkey_len 
+ test_sortkey_len) *
+                               plan.multi_count * plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
     ASSERT_NEAR(result.total_size_per_sec, actual_value, actual_value * 0.3);
     plan = {
         "set test / throttle by qps&size / normal value size", 
operation_type::set, 1024, 1, 50};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::write_by_size,
-                                         plan.limit_qps * plan.single_value_sz 
* plan.multi_count));
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::write_by_qps, 
plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_size,
+                           plan.limit_qps * plan.single_value_sz * 
plan.multi_count));
+    NO_FATALS(set_throttle(throttle_type::write_by_qps, plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 15);
 
     plan = {
         "get test / throttle by qps&size / normal value size", 
operation_type::get, 1024, 1, 50};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::read_by_size,
-                                         plan.limit_qps * plan.single_value_sz 
* plan.multi_count));
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::read_by_qps, 
plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_size,
+                           plan.limit_qps * plan.single_value_sz * 
plan.multi_count));
+    NO_FATALS(set_throttle(throttle_type::read_by_qps, plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 15);
 
@@ -433,15 +413,13 @@ TEST_F(throttle_test, test)
             1024,
             1,
             50};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(
+    NO_FATALS(set_throttle(
         throttle_type::write_by_size,
         plan.limit_qps * (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
             plan.multi_count * 1000));
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::write_by_qps, 
plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_qps, plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 15);
 
@@ -450,15 +428,13 @@ TEST_F(throttle_test, test)
             1024,
             1,
             50};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(
+    NO_FATALS(set_throttle(
         throttle_type::read_by_size,
         plan.limit_qps * (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
             plan.multi_count * 1000));
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::read_by_qps, 
plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_qps, plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 15);
 
@@ -467,13 +443,11 @@ TEST_F(throttle_test, test)
             1024,
             1,
             50};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::write_by_size,
-                                         plan.limit_qps * plan.single_value_sz 
* plan.multi_count));
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::write_by_qps, 
plan.limit_qps * 1000));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_size,
+                           plan.limit_qps * plan.single_value_sz * 
plan.multi_count));
+    NO_FATALS(set_throttle(throttle_type::write_by_qps, plan.limit_qps * 
1000));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
@@ -484,13 +458,11 @@ TEST_F(throttle_test, test)
             1024,
             1,
             50};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::read_by_size,
-                                         plan.limit_qps * plan.single_value_sz 
* plan.multi_count));
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::read_by_qps, 
plan.limit_qps * 1000));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_size,
+                           plan.limit_qps * plan.single_value_sz * 
plan.multi_count));
+    NO_FATALS(set_throttle(throttle_type::read_by_qps, plan.limit_qps * 1000));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
@@ -502,14 +474,11 @@ TEST_F(throttle_test, test)
             1024 * FLAGS_throttle_test_medium_value_kb,
             1,
             50};
-    ASSERT_NO_FATAL_FAILURE(
-        set_throttle(throttle_type::write_by_size,
-                     (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
-                         plan.multi_count * plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_size,
+                           (uint64_t)(plan.single_value_sz + test_hashkey_len 
+ test_sortkey_len) *
+                               plan.multi_count * plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
@@ -520,14 +489,11 @@ TEST_F(throttle_test, test)
             1024 * FLAGS_throttle_test_medium_value_kb,
             1,
             50};
-    ASSERT_NO_FATAL_FAILURE(
-        set_throttle(throttle_type::read_by_size,
-                     (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
-                         plan.multi_count * plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_size,
+                           (uint64_t)(plan.single_value_sz + test_hashkey_len 
+ test_sortkey_len) *
+                               plan.multi_count * plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
@@ -538,14 +504,11 @@ TEST_F(throttle_test, test)
             1024 * FLAGS_throttle_test_large_value_kb,
             1,
             50};
-    ASSERT_NO_FATAL_FAILURE(
-        set_throttle(throttle_type::write_by_size,
-                     (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
-                         plan.multi_count * plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_size,
+                           (uint64_t)(plan.single_value_sz + test_hashkey_len 
+ test_sortkey_len) *
+                               plan.multi_count * plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
@@ -556,70 +519,55 @@ TEST_F(throttle_test, test)
             1024 * FLAGS_throttle_test_large_value_kb,
             1,
             50};
-    ASSERT_NO_FATAL_FAILURE(
-        set_throttle(throttle_type::read_by_size,
-                     (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
-                         plan.multi_count * plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_size,
+                           (uint64_t)(plan.single_value_sz + test_hashkey_len 
+ test_sortkey_len) *
+                               plan.multi_count * plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
     ASSERT_NEAR(result.total_size_per_sec, actual_value, actual_value * 0.3);
 
     plan = {"set test / throttle by size / 100b value size", 
operation_type::set, 100, 1, 50};
-    ASSERT_NO_FATAL_FAILURE(
-        set_throttle(throttle_type::write_by_size,
-                     (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
-                         plan.multi_count * plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_size,
+                           (uint64_t)(plan.single_value_sz + test_hashkey_len 
+ test_sortkey_len) *
+                               plan.multi_count * plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
     ASSERT_NEAR(result.total_size_per_sec, actual_value, actual_value * 0.3);
 
     plan = {"get test / throttle by size / 100b value size", 
operation_type::get, 100, 1, 50};
-    ASSERT_NO_FATAL_FAILURE(
-        set_throttle(throttle_type::read_by_size,
-                     (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
-                         plan.multi_count * plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_size,
+                           (uint64_t)(plan.single_value_sz + test_hashkey_len 
+ test_sortkey_len) *
+                               plan.multi_count * plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
     ASSERT_NEAR(result.total_size_per_sec, actual_value, actual_value * 0.3);
 
     plan = {"set test / throttle by size / 10b value size", 
operation_type::set, 10, 1, 50};
-    ASSERT_NO_FATAL_FAILURE(
-        set_throttle(throttle_type::write_by_size,
-                     (plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
-                         plan.multi_count * plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_size,
+                           (plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                               plan.multi_count * plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
     ASSERT_NEAR(result.total_size_per_sec, actual_value, actual_value * 0.3);
 
     plan = {"get test / throttle by size / 10b value size", 
operation_type::get, 10, 1, 50};
-    ASSERT_NO_FATAL_FAILURE(
-        set_throttle(throttle_type::read_by_size,
-                     (plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
-                         plan.multi_count * plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_size,
+                           (plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                               plan.multi_count * plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
                    plan.multi_count * plan.limit_qps;
     result.finalize();
@@ -632,11 +580,9 @@ TEST_F(throttle_test, test)
             50,
             50,
             true};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::read_by_size, 
5000000));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_size, 5000000));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_size_per_sec, (uint64_t)5000000, 
(uint64_t)5000000 * 0.3);
 
@@ -646,55 +592,45 @@ TEST_F(throttle_test, test)
             50,
             50,
             true};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::write_by_size, 
5000000));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_size, 5000000));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_size_per_sec, (uint64_t)5000000, 
(uint64_t)5000000 * 0.3);
 
     // hotkey test
     plan = {
         "get test / throttle by qps / hotkey test", operation_type::get, 1024, 
1, 50, false, true};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::read_by_qps, 
plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_qps, plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 15);
 
     plan = {
         "set test / throttle by qps / hotkey test", operation_type::set, 1024, 
1, 50, false, true};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::write_by_qps, 
plan.limit_qps));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_qps, plan.limit_qps));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 15);
 
     plan = {
         "set test / throttle by size / hotkey test", operation_type::set, 
1024, 1, 50, false, true};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::write_by_size,
-                                         plan.limit_qps * plan.single_value_sz 
* plan.multi_count));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::write_by_size,
+                           plan.limit_qps * plan.single_value_sz * 
plan.multi_count));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)plan.limit_qps * plan.single_value_sz * 
plan.multi_count;
     result.finalize();
     ASSERT_NEAR(result.total_size_per_sec, actual_value, actual_value * 0.3);
 
     plan = {
         "get test / throttle by size / hotkey test", operation_type::get, 
1024, 1, 50, false, true};
-    ASSERT_NO_FATAL_FAILURE(set_throttle(throttle_type::read_by_size,
-                                         plan.limit_qps * plan.single_value_sz 
* plan.multi_count));
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(set_throttle(throttle_type::read_by_size,
+                           plan.limit_qps * plan.single_value_sz * 
plan.multi_count));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     actual_value = (uint64_t)plan.limit_qps * plan.single_value_sz * 
plan.multi_count;
     result.finalize();
     ASSERT_NEAR(result.total_size_per_sec, actual_value, actual_value * 0.3);
@@ -702,55 +638,31 @@ TEST_F(throttle_test, test)
     // mix delay&reject test
     plan = {
         "set test / throttle by qps 500 / no delay throttle", 
operation_type::set, 1024, 1, 500};
-    ASSERT_EQ(ERR_OK,
-              ddl_client_->set_app_envs(app_name_, 
{"replica.write_throttling"}, {"500*reject*200"})
-                  .get_error()
-                  .code());
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(update_table_env({"replica.write_throttling"}, 
{"500*reject*200"}));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 100);
 
     plan = {
         "get test / throttle by qps 500 / no delay throttle", 
operation_type::get, 1024, 1, 500};
-    ASSERT_EQ(ERR_OK,
-              ddl_client_->set_app_envs(app_name_, 
{"replica.read_throttling"}, {"500*reject*200"})
-                  .get_error()
-                  .code());
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(update_table_env({"replica.read_throttling"}, 
{"500*reject*200"}));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 100);
 
     plan = {"set test / throttle by qps 500 / delay throttle", 
operation_type::set, 1024, 1, 500};
-    ASSERT_EQ(ERR_OK,
-              ddl_client_
-                  ->set_app_envs(
-                      app_name_, {"replica.write_throttling"}, 
{"300*delay*100,500*reject*200"})
-                  .get_error()
-                  .code());
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(update_table_env({"replica.write_throttling"}, 
{"300*delay*100,500*reject*200"}));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 100);
 
     plan = {"get test / throttle by qps 500 / delay throttle", 
operation_type::get, 1024, 1, 500};
-    ASSERT_EQ(
-        ERR_OK,
-        ddl_client_
-            ->set_app_envs(app_name_, {"replica.read_throttling"}, 
{"300*delay*100,500*reject*200"})
-            .get_error()
-            .code());
-    std::cout << "wait 30s for setting env" << std::endl;
-    sleep(30);
-    ASSERT_NO_FATAL_FAILURE(start_test(plan));
-    ASSERT_NO_FATAL_FAILURE(restore_throttle());
+    NO_FATALS(update_table_env({"replica.read_throttling"}, 
{"300*delay*100,500*reject*200"}));
+    NO_FATALS(start_test(plan));
+    NO_FATALS(restore_throttle());
     result.finalize();
     ASSERT_NEAR(result.total_qps, plan.limit_qps, 100);
 }
diff --git a/src/test/function_test/utils/test_util.cpp 
b/src/test/function_test/utils/test_util.cpp
index 3a7cb3870..563aa3c9e 100644
--- a/src/test/function_test/utils/test_util.cpp
+++ b/src/test/function_test/utils/test_util.cpp
@@ -20,9 +20,13 @@
 #include "test_util.h"
 
 #include <nlohmann/json.hpp>
+#include <pegasus/error.h>
+#include <stdio.h>
 #include <unistd.h>
+#include <chrono>
 #include <fstream>
 #include <initializer_list>
+#include <thread>
 #include <utility>
 #include <vector>
 
@@ -32,13 +36,17 @@
 #include "fmt/core.h"
 #include "gtest/gtest.h"
 #include "include/pegasus/client.h"
+#include "meta_admin_types.h"
 #include "nlohmann/detail/iterators/iter_impl.hpp"
 #include "nlohmann/json_fwd.hpp"
+#include "runtime/api_layer1.h"
 #include "runtime/rpc/rpc_address.h"
 #include "test/function_test/utils/global_env.h"
 #include "test/function_test/utils/utils.h"
+#include "test_util/test_util.h"
 #include "utils/defer.h"
 #include "utils/error_code.h"
+#include "utils/errors.h"
 #include "utils/filesystem.h"
 #include "utils/rand.h"
 
@@ -54,7 +62,7 @@ using std::vector;
 namespace pegasus {
 
 test_util::test_util(map<string, string> create_envs)
-    : cluster_name_("onebox"), app_name_("temp"), 
create_envs_(std::move(create_envs))
+    : cluster_name_("onebox"), table_name_("temp"), 
create_envs_(std::move(create_envs))
 {
 }
 
@@ -73,21 +81,22 @@ void test_util::SetUp()
     ddl_client_->set_max_wait_app_ready_secs(120);
 
     dsn::error_code ret =
-        ddl_client_->create_app(app_name_, "pegasus", partition_count_, 3, 
create_envs_, false);
+        ddl_client_->create_app(table_name_, "pegasus", partition_count_, 3, 
create_envs_, false);
     if (ret == dsn::ERR_INVALID_PARAMETERS) {
-        ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(app_name_, 0));
+        ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(table_name_, 0));
         ASSERT_EQ(dsn::ERR_OK,
                   ddl_client_->create_app(
-                      app_name_, "pegasus", partition_count_, 3, create_envs_, 
false));
+                      table_name_, "pegasus", partition_count_, 3, 
create_envs_, false));
     } else {
         ASSERT_EQ(dsn::ERR_OK, ret);
     }
-    client_ = pegasus_client_factory::get_client(cluster_name_.c_str(), 
app_name_.c_str());
+    client_ = pegasus_client_factory::get_client(cluster_name_.c_str(), 
table_name_.c_str());
     ASSERT_TRUE(client_ != nullptr);
 
     int32_t partition_count;
-    ASSERT_EQ(dsn::ERR_OK, ddl_client_->list_app(app_name_, app_id_, 
partition_count, partitions_));
-    ASSERT_NE(0, app_id_);
+    ASSERT_EQ(dsn::ERR_OK,
+              ddl_client_->list_app(table_name_, table_id_, partition_count, 
partitions_));
+    ASSERT_NE(0, table_id_);
     ASSERT_EQ(partition_count_, partition_count);
     ASSERT_EQ(partition_count_, partitions_.size());
 }
@@ -143,4 +152,61 @@ int test_util::get_leader_count(const string &table_name, 
int replica_server_ind
     return leader_count;
 }
 
+void test_util::wait_table_healthy(const std::string &table_name) const
+{
+    ASSERT_IN_TIME(
+        [&] {
+            int32_t table_id = 0;
+            int32_t pcount = 0;
+            std::vector<partition_configuration> pcs;
+            ASSERT_EQ(dsn::ERR_OK, ddl_client_->list_app(table_name, table_id, 
pcount, pcs));
+            for (const auto &pc : pcs) {
+                ASSERT_FALSE(pc.primary.is_invalid());
+                ASSERT_EQ(1 + pc.secondaries.size(), pc.max_replica_count);
+            }
+        },
+        180);
+}
+
+void test_util::write_data(int count) const
+{
+    fmt::print(stdout, "start to write {} key-value pairs...\n", count);
+    ASSERT_NE(client_, nullptr);
+    int64_t start = dsn_now_ms();
+    for (int i = 0; i < count; i++) {
+        ASSERT_EQ(PERR_OK,
+                  client_->set(fmt::format("hash_key_{}", i),
+                               fmt::format("sort_key_{}", i),
+                               fmt::format("value_{}", i)));
+    }
+    fmt::print(stdout, "write data complete, total time = {}s", (dsn_now_ms() 
- start) / 1000);
+}
+
+void test_util::verify_data(const std::string &table_name, int count) const
+{
+    fmt::print(stdout, "start to get {} key-value pairs...\n", count);
+    auto *client = pegasus_client_factory::get_client(cluster_name_.c_str(), 
table_name.c_str());
+    ASSERT_NE(client, nullptr);
+    int64_t start = dsn_now_ms();
+    for (int i = 0; i < count; i++) {
+        std::string value_new;
+        ASSERT_EQ(
+            PERR_OK,
+            client->get(fmt::format("hash_key_{}", i), 
fmt::format("sort_key_{}", i), value_new));
+        ASSERT_EQ(fmt::format("value_{}", i), value_new);
+    }
+    int64_t end = dsn_now_ms();
+    fmt::print(stdout, "verify data complete, total time = {}s", (dsn_now_ms() 
- start) / 1000);
+}
+
+void test_util::update_table_env(const std::vector<std::string> &keys,
+                                 const std::vector<std::string> &values) const
+{
+    auto resp = ddl_client_->set_app_envs(table_name_, keys, values);
+    ASSERT_TRUE(resp.is_ok());
+    ASSERT_EQ(dsn::ERR_OK, resp.get_value().err);
+    // TODO(yingchun): update the sync interval to reduce time.
+    fmt::print(stdout, "sleep 31s to wait app_envs update\n");
+    std::this_thread::sleep_for(std::chrono::seconds(31));
+}
 } // namespace pegasus
diff --git a/src/test/function_test/utils/test_util.h 
b/src/test/function_test/utils/test_util.h
index d296bd3ff..2b1bad2b1 100644
--- a/src/test/function_test/utils/test_util.h
+++ b/src/test/function_test/utils/test_util.h
@@ -63,11 +63,19 @@ public:
     // on the 'table_name'.
     static int get_leader_count(const std::string &table_name, int 
replica_server_index);
 
+    void wait_table_healthy(const std::string &table_name) const;
+
+    void write_data(int count) const;
+    void verify_data(const std::string &table_name, int count) const;
+
+    void update_table_env(const std::vector<std::string> &keys,
+                          const std::vector<std::string> &values) const;
+
 protected:
     const std::string cluster_name_;
-    std::string app_name_;
+    std::string table_name_;
     const std::map<std::string, std::string> create_envs_;
-    int32_t app_id_;
+    int32_t table_id_;
     int32_t partition_count_ = 8;
     std::vector<dsn::partition_configuration> partitions_;
     pegasus_client *client_ = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to