This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 12f331f8a fix(meta): fix null pointer while clearing environment 
variables after table was dropped (#2181)
12f331f8a is described below

commit 12f331f8a22a5efad41113991bad3414cb81db92
Author: Dan Wang <[email protected]>
AuthorDate: Thu Jan 16 12:36:28 2025 +0800

    fix(meta): fix null pointer while clearing environment variables after 
table was dropped (#2181)
    
    https://github.com/apache/incubator-pegasus/issues/2149.
    
    Previously we've fixed the problem that meta server failed due to null 
pointer while
    setting environment variables locally immediately after a table was dropped 
in
    https://github.com/apache/incubator-pegasus/pull/2148. There's the same 
problem
    while clearing environment variables.
---
 src/meta/server_state.cpp           | 174 +++++++++++++++++++++++-----------
 src/meta/test/server_state_test.cpp | 184 +++++++++++++++++++++---------------
 2 files changed, 228 insertions(+), 130 deletions(-)

diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index ed309338b..72557a8c5 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -26,6 +26,7 @@
 
 // IWYU pragma: no_include <boost/detail/basic_pointerbuf.hpp>
 #include <boost/algorithm/string/join.hpp>
+#include <boost/algorithm/string/predicate.hpp>
 #include <boost/lexical_cast.hpp>
 // IWYU pragma: no_include <ext/alloc_traits.h>
 #include <fmt/core.h>
@@ -40,6 +41,7 @@
 #include <string>
 #include <string_view>
 #include <thread>
+#include <type_traits>
 #include <unordered_map>
 
 #include "common/duplication_common.h"
@@ -3266,15 +3268,15 @@ void server_state::del_app_envs(const app_env_rpc 
&env_rpc)
 
 void server_state::clear_app_envs(const app_env_rpc &env_rpc)
 {
-    const configuration_update_app_env_request &request = env_rpc.request();
+    const auto &request = env_rpc.request();
     if (!request.__isset.clear_prefix) {
         env_rpc.response().err = ERR_INVALID_PARAMETERS;
         LOG_WARNING("clear app envs failed with invalid request");
         return;
     }
 
-    const std::string &prefix = request.clear_prefix;
-    const std::string &app_name = request.app_name;
+    const auto &prefix = request.clear_prefix;
+    const auto &app_name = request.app_name;
     LOG_INFO("clear app envs for app({}) from remote({}): prefix = {}",
              app_name,
              env_rpc.remote_address(),
@@ -3283,79 +3285,145 @@ void server_state::clear_app_envs(const app_env_rpc 
&env_rpc)
     app_info ainfo;
     std::string app_path;
     {
+        FAIL_POINT_INJECT_NOT_RETURN_F(
+            "clear_app_envs_failed", [app_name, this](std::string_view s) {
+                zauto_write_lock l(_lock);
+
+                if (s == "not_found") {
+                    CHECK_EQ(_exist_apps.erase(app_name), 1);
+                    return;
+                }
+
+                if (s == "dropping") {
+                    gutil::FindOrDie(_exist_apps, app_name)->status = 
app_status::AS_DROPPING;
+                    return;
+                }
+            });
+
         zauto_read_lock l(_lock);
-        std::shared_ptr<app_state> app = get_app(app_name);
-        if (app == nullptr) {
-            LOG_WARNING("clear app envs failed with invalid app_name({})", 
app_name);
-            env_rpc.response().err = ERR_INVALID_PARAMETERS;
-            env_rpc.response().hint_message = "invalid app name";
+
+        const auto &app = get_app(app_name);
+        if (!app) {
+            LOG_WARNING("clear app envs failed since app_name({}) cannot be 
found", app_name);
+            env_rpc.response().err = ERR_APP_NOT_EXIST;
+            env_rpc.response().hint_message = "app cannot be found";
             return;
-        } else {
-            ainfo = *(reinterpret_cast<app_info *>(app.get()));
-            app_path = get_app_path(*app);
         }
+
+        if (app->status == app_status::AS_DROPPING) {
+            LOG_WARNING("clear app envs failed since app(name={}, id={}) is 
being dropped",
+                        app_name,
+                        app->app_id);
+            env_rpc.response().err = ERR_BUSY_DROPPING;
+            env_rpc.response().hint_message = "app is being dropped";
+            return;
+        }
+
+        ainfo = *app;
+        app_path = get_app_path(*app);
     }
 
     if (ainfo.envs.empty()) {
-        LOG_INFO("no key need to delete");
-        env_rpc.response().hint_message = "no key need to delete";
+        LOG_INFO("no key needs to be deleted for app({})", app_name);
+        env_rpc.response().err = ERR_OK;
+        env_rpc.response().hint_message = "no key needs to be deleted";
         return;
     }
 
-    std::set<std::string> erase_keys;
-    std::ostringstream oss;
-    oss << "deleted keys:";
+    std::set<std::string> deleted_keys;
+    std::string deleted_keys_info("deleted keys:");
 
     if (prefix.empty()) {
-        // ignore prefix
-        for (auto &kv : ainfo.envs) {
-            oss << std::endl << "    " << kv.first;
+        // Empty prefix means deleting all environments.
+        for (const auto &[key, _] : ainfo.envs) {
+            fmt::format_to(std::back_inserter(deleted_keys_info), "\n    {}", 
key);
         }
         ainfo.envs.clear();
     } else {
-        // acquire key
-        for (const auto &pair : ainfo.envs) {
-            const std::string &key = pair.first;
-            // normal : key = prefix.xxx
-            if (key.size() > prefix.size() + 1) {
-                if (key.substr(0, prefix.size()) == prefix && 
key.at(prefix.size()) == '.') {
-                    erase_keys.emplace(key);
-                }
+        // The full prefix is the prefix plus the separator dot(.).
+        const size_t full_prefix_len = prefix.size() + sizeof('.');
+        for (const auto &[key, _] : ainfo.envs) {
+            // The key is not the target if it is shorter than, or just has 
the same length
+            // as the full prefix.
+            if (key.size() <= full_prefix_len) {
+                continue;
+            }
+
+            // The key is not the target if the prefix is not matched.
+            if (!boost::algorithm::starts_with(key, prefix)) {
+                continue;
             }
+
+            // The key is not the target if the separator is not dot(.).
+            if (key[prefix.size()] != '.') {
+                continue;
+            }
+
+            deleted_keys.emplace(key);
         }
-        // erase
-        for (const auto &key : erase_keys) {
-            oss << std::endl << "    " << key;
+
+        for (const auto &key : deleted_keys) {
+            fmt::format_to(std::back_inserter(deleted_keys_info), "\n    {}", 
key);
             ainfo.envs.erase(key);
         }
-    }
 
-    if (!prefix.empty() && erase_keys.empty()) {
-        // no need update app_info
-        LOG_INFO("no key need to delete");
-        env_rpc.response().hint_message = "no key need to delete";
-        return;
-    } else {
-        env_rpc.response().hint_message = oss.str();
+        if (deleted_keys.empty()) {
+            LOG_INFO("no key needs to be deleted for app({})", app_name);
+            env_rpc.response().err = ERR_OK;
+            env_rpc.response().hint_message = "no key needs to be deleted";
+            return;
+        }
     }
 
-    do_update_app_info(
-        app_path, ainfo, [this, app_name, prefix, erase_keys, 
env_rpc](error_code ec) {
-            CHECK_EQ_MSG(ec, ERR_OK, "update app info to remote storage 
failed");
+    env_rpc.response().hint_message = std::move(deleted_keys_info);
 
-            zauto_write_lock l(_lock);
-            std::shared_ptr<app_state> app = get_app(app_name);
-            std::string old_envs = dsn::utils::kv_map_to_string(app->envs, 
',', '=');
-            if (prefix.empty()) {
-                app->envs.clear();
-            } else {
-                for (const auto &key : erase_keys) {
-                    app->envs.erase(key);
-                }
+    do_update_app_info(app_path, ainfo, [this, app_name, deleted_keys, 
env_rpc](error_code ec) {
+        CHECK_EQ_MSG(ec, ERR_OK, "update app({}) info to remote storage 
failed", app_name);
+
+        zauto_write_lock l(_lock);
+
+        FAIL_POINT_INJECT_NOT_RETURN_F("clear_app_envs_failed",
+                                       [app_name, this](std::string_view s) {
+                                           if (s == "dropped_after") {
+                                               
CHECK_EQ(_exist_apps.erase(app_name), 1);
+                                               return;
+                                           }
+                                       });
+
+        auto app = get_app(app_name);
+
+        // The table might be removed just before the callback function is 
invoked, thus we must
+        // check if this table still exists.
+        //
+        // TODO(wangdan): should make updates to remote storage sequential by 
supporting atomic
+        // set, otherwise update might be missing. For example, an update is 
setting the envs
+        // while another is dropping a table. The update setting the envs does 
not contain the
+        // dropped state. Once it is applied by remote storage after another 
update dropping
+        // the table, the state of the table would always be non-dropped on 
remote storage.
+        if (!app) {
+            LOG_ERROR("clear app envs failed since app({}) has just been 
dropped", app_name);
+            env_rpc.response().err = ERR_APP_DROPPED;
+            env_rpc.response().hint_message = "app has just been dropped";
+            return;
+        }
+
+        env_rpc.response().err = ERR_OK;
+
+        const auto &old_envs = dsn::utils::kv_map_to_string(app->envs, ',', 
'=');
+
+        if (deleted_keys.empty()) {
+            // `deleted_keys` would be empty only when `prefix` is empty. 
Therefore, empty
+            // `deleted_keys` means deleting all environments.
+            app->envs.clear();
+        } else {
+            for (const auto &key : deleted_keys) {
+                app->envs.erase(key);
             }
-            std::string new_envs = dsn::utils::kv_map_to_string(app->envs, 
',', '=');
-            LOG_INFO("app envs changed: old_envs = {}, new_envs = {}", 
old_envs, new_envs);
-        });
+        }
+
+        const auto &new_envs = dsn::utils::kv_map_to_string(app->envs, ',', 
'=');
+        LOG_INFO("app envs changed: old_envs = {}, new_envs = {}", old_envs, 
new_envs);
+    });
 }
 
 namespace {
diff --git a/src/meta/test/server_state_test.cpp 
b/src/meta/test/server_state_test.cpp
index 90af81185..6e15dd56c 100644
--- a/src/meta/test/server_state_test.cpp
+++ b/src/meta/test/server_state_test.cpp
@@ -56,53 +56,23 @@
 DSN_DECLARE_string(cluster_root);
 DSN_DECLARE_string(meta_state_service_type);
 
-namespace dsn {
-namespace replication {
-
-static const std::vector<std::string> keys = {
-    dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
-    dsn::replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL,
-    dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION,
-    dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
-    dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL,
-    dsn::replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
-    dsn::replica_envs::ROCKSDB_USAGE_SCENARIO,
-    dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT,
-    dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS};
-static const std::vector<std::string> values = {
-    "1712846598",
-    "6",
-    dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE,
-    "1712846598",
-    "-1",
-    dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP,
-    dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
-    "1",
-    "0"};
-
-static const std::vector<std::string> del_keys = {
-    dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
-    dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
-    dsn::replica_envs::ROCKSDB_USAGE_SCENARIO};
-static const std::set<std::string> del_keys_set = {
-    dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
-    dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
-    dsn::replica_envs::ROCKSDB_USAGE_SCENARIO};
-
-static const std::string clear_prefix = "rocksdb";
-
-// if str = "prefix.xxx" then return prefix
-// else return ""
-static std::string acquire_prefix(const std::string &str)
+namespace dsn::replication {
+
+namespace {
+
+// If `str` is <prefix>.xxx, return <prefix>; otherwise return empty 
string("").
+std::string acquire_prefix(const std::string &str)
 {
-    auto index = str.find('.');
-    if (index == std::string::npos) {
-        return "";
-    } else {
-        return str.substr(0, index);
+    const auto &dot = str.find('.');
+    if (dot == std::string::npos) {
+        return {};
     }
+
+    return str.substr(0, dot);
 }
 
+} // anonymous namespace
+
 class server_state_test
 {
 public:
@@ -178,6 +148,19 @@ public:
         return rpc;
     }
 
+    void test_clear_app_envs(const std::string &app_name,
+                             const std::string &prefix,
+                             const error_code expected_err)
+    {
+        configuration_update_app_env_request request;
+        request.__set_app_name(app_name);
+        request.__set_op(app_env_operation::type::APP_ENV_OP_CLEAR);
+        request.__set_clear_prefix(prefix);
+
+        auto rpc = clear_app_envs(request);
+        ASSERT_EQ(expected_err, rpc.response().err);
+    }
+
 private:
     static std::shared_ptr<app_state> fake_app_state(const std::string 
&app_name,
                                                      const int32_t app_id)
@@ -244,6 +227,28 @@ private:
 
 void meta_service_test_app::app_envs_basic_test()
 {
+    static const std::vector<std::string> kKeys = {
+        dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+        dsn::replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL,
+        dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION,
+        dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
+        dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL,
+        dsn::replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
+        dsn::replica_envs::ROCKSDB_USAGE_SCENARIO,
+        dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT,
+        dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS};
+
+    static const std::vector<std::string> kValues = {
+        "1712846598",
+        "6",
+        dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE,
+        "1712846598",
+        "-1",
+        dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP,
+        dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
+        "1",
+        "0"};
+
     server_state_test test;
     test.load_apps({"test_app1",
                     "test_set_app_envs_not_found",
@@ -251,7 +256,10 @@ void meta_service_test_app::app_envs_basic_test()
                     "test_set_app_envs_dropped_after",
                     "test_del_app_envs_not_found",
                     "test_del_app_envs_dropping",
-                    "test_del_app_envs_dropped_after"});
+                    "test_del_app_envs_dropped_after",
+                    "test_clear_app_envs_not_found",
+                    "test_clear_app_envs_dropping",
+                    "test_clear_app_envs_dropped_after"});
 
 #define TEST_SET_APP_ENVS_FAILED(action, err_code)                             
                    \
     std::cout << "test server_state::set_app_envs(" #action ")..." << 
std::endl;                   \
@@ -282,17 +290,17 @@ void meta_service_test_app::app_envs_basic_test()
     // Normal case for setting envs.
     std::cout << "test server_state::set_app_envs(success)..." << std::endl;
     {
-        test.test_set_app_envs("test_app1", keys, values, ERR_OK);
+        test.test_set_app_envs("test_app1", kKeys, kValues, ERR_OK);
 
         const auto &app = test.get_app("test_app1");
         ASSERT_TRUE(app);
 
-        for (size_t idx = 0; idx < keys.size(); ++idx) {
-            const auto &key = keys[idx];
+        for (size_t idx = 0; idx < kKeys.size(); ++idx) {
+            const auto &key = kKeys[idx];
 
             // Every env should be inserted.
             ASSERT_EQ(1, app->envs.count(key));
-            ASSERT_EQ(values[idx], app->envs.at(key));
+            ASSERT_EQ(kValues[idx], app->envs.at(key));
         }
     }
 
@@ -325,52 +333,81 @@ void meta_service_test_app::app_envs_basic_test()
 
 #undef TEST_DEL_APP_ENVS_FAILED
 
+    static const std::vector<std::string> kDelKeyList = {
+        dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+        dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
+        dsn::replica_envs::ROCKSDB_USAGE_SCENARIO};
+    static const std::set<std::string> kDelKeySet(kDelKeyList.begin(), 
kDelKeyList.end());
+
     // Normal case for deleting envs.
     std::cout << "test server_state::del_app_envs(success)..." << std::endl;
     {
-        test.test_del_app_envs("test_app1", del_keys, ERR_OK);
+        test.test_del_app_envs("test_app1", kDelKeyList, ERR_OK);
 
         const auto &app = test.get_app("test_app1");
         ASSERT_TRUE(app);
 
-        for (size_t idx = 0; idx < keys.size(); ++idx) {
-            const std::string &key = keys[idx];
-            if (del_keys_set.count(key) > 0) {
-                // The env in `del_keys_set` should be deleted.
+        for (size_t idx = 0; idx < kKeys.size(); ++idx) {
+            const std::string &key = kKeys[idx];
+            if (kDelKeySet.count(key) > 0) {
+                // The env in `kDelKeySet` should be deleted.
                 ASSERT_EQ(0, app->envs.count(key));
                 continue;
             }
 
-            // The env should still exist if it is not in `del_keys_set`.
+            // The env should still exist if it is not in `kDelKeySet`.
             ASSERT_EQ(1, app->envs.count(key));
-            ASSERT_EQ(values[idx], app->envs.at(key));
+            ASSERT_EQ(kValues[idx], app->envs.at(key));
         }
     }
 
-    std::cout << "test server_state::clear_app_envs()..." << std::endl;
+#define TEST_CLEAR_APP_ENVS_FAILED(action, err_code)                           
                    \
+    std::cout << "test server_state::clear_app_envs(" #action ")..." << 
std::endl;                 \
+    do {                                                                       
                    \
+        test.test_set_app_envs("test_clear_app_envs_" #action,                 
                    \
+                               {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE},      
                    \
+                               {"67108864"},                                   
                    \
+                               ERR_OK);                                        
                    \
+                                                                               
                    \
+        fail::setup();                                                         
                    \
+        fail::cfg("clear_app_envs_failed", "void(" #action ")");               
                    \
+                                                                               
                    \
+        test.test_clear_app_envs("test_clear_app_envs_" #action, "", 
err_code);                    \
+                                                                               
                    \
+        fail::teardown();                                                      
                    \
+    } while (0)
+
+    // Failed to clearing envs while table was not found.
+    TEST_CLEAR_APP_ENVS_FAILED(not_found, ERR_APP_NOT_EXIST);
+
+    // Failed to clearing envs while table was being dropped as the 
intermediate state.
+    TEST_CLEAR_APP_ENVS_FAILED(dropping, ERR_BUSY_DROPPING);
+
+    // The table was found dropped after the new envs had been persistent on 
the remote
+    // meta storage.
+    TEST_CLEAR_APP_ENVS_FAILED(dropped_after, ERR_APP_DROPPED);
+
+#undef TEST_CLEAR_APP_ENVS_FAILED
+
+    std::cout << "test server_state::clear_app_envs(success)..." << std::endl;
     {
         // Test specifying prefix.
         {
-            configuration_update_app_env_request request;
-            request.__set_app_name("test_app1");
-            request.__set_op(app_env_operation::type::APP_ENV_OP_CLEAR);
-            request.__set_clear_prefix(clear_prefix);
-
-            auto rpc = test.clear_app_envs(request);
-            ASSERT_EQ(ERR_OK, rpc.response().err);
+            static const std::string kClearPrefix = "rocksdb";
+            test.test_clear_app_envs("test_app1", kClearPrefix, ERR_OK);
 
             const auto &app = test.get_app("test_app1");
             ASSERT_TRUE(app);
 
-            for (size_t idx = 0; idx < keys.size(); ++idx) {
-                const std::string &key = keys[idx];
-                if (del_keys_set.count(key) > 0) {
+            for (size_t idx = 0; idx < kKeys.size(); ++idx) {
+                const std::string &key = kKeys[idx];
+                if (kDelKeySet.count(key) > 0) {
                     // The env should have been deleted during test for 
`del_app_envs`.
                     ASSERT_EQ(0, app->envs.count(key));
                     continue;
                 }
 
-                if (acquire_prefix(key) == clear_prefix) {
+                if (acquire_prefix(key) == kClearPrefix) {
                     // The env with specified prefix should be deleted.
                     ASSERT_EQ(0, app->envs.count(key));
                     continue;
@@ -378,19 +415,13 @@ void meta_service_test_app::app_envs_basic_test()
 
                 // Otherwise, the env should still exist.
                 ASSERT_EQ(1, app->envs.count(key));
-                ASSERT_EQ(values[idx], app->envs.at(key));
+                ASSERT_EQ(kValues[idx], app->envs.at(key));
             }
         }
 
         // Test clearing all.
         {
-            configuration_update_app_env_request request;
-            request.__set_app_name("test_app1");
-            request.__set_op(app_env_operation::type::APP_ENV_OP_CLEAR);
-            request.__set_clear_prefix("");
-
-            auto rpc = test.clear_app_envs(request);
-            ASSERT_EQ(ERR_OK, rpc.response().err);
+            test.test_clear_app_envs("test_app1", "", ERR_OK);
 
             const auto &app = test.get_app("test_app1");
             ASSERT_TRUE(app);
@@ -401,5 +432,4 @@ void meta_service_test_app::app_envs_basic_test()
     }
 }
 
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to