This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 12f331f8a fix(meta): fix null pointer while clearing environment
variables after table was dropped (#2181)
12f331f8a is described below
commit 12f331f8a22a5efad41113991bad3414cb81db92
Author: Dan Wang <[email protected]>
AuthorDate: Thu Jan 16 12:36:28 2025 +0800
fix(meta): fix null pointer while clearing environment variables after
table was dropped (#2181)
https://github.com/apache/incubator-pegasus/issues/2149.
Previously we've fixed the problem that meta server failed due to null
pointer while
setting environment variables locally immediately after a table was dropped
in
https://github.com/apache/incubator-pegasus/pull/2148. There's the same
problem
while clearing environment variables.
---
src/meta/server_state.cpp | 174 +++++++++++++++++++++++-----------
src/meta/test/server_state_test.cpp | 184 +++++++++++++++++++++---------------
2 files changed, 228 insertions(+), 130 deletions(-)
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index ed309338b..72557a8c5 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -26,6 +26,7 @@
// IWYU pragma: no_include <boost/detail/basic_pointerbuf.hpp>
#include <boost/algorithm/string/join.hpp>
+#include <boost/algorithm/string/predicate.hpp>
#include <boost/lexical_cast.hpp>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
@@ -40,6 +41,7 @@
#include <string>
#include <string_view>
#include <thread>
+#include <type_traits>
#include <unordered_map>
#include "common/duplication_common.h"
@@ -3266,15 +3268,15 @@ void server_state::del_app_envs(const app_env_rpc
&env_rpc)
void server_state::clear_app_envs(const app_env_rpc &env_rpc)
{
- const configuration_update_app_env_request &request = env_rpc.request();
+ const auto &request = env_rpc.request();
if (!request.__isset.clear_prefix) {
env_rpc.response().err = ERR_INVALID_PARAMETERS;
LOG_WARNING("clear app envs failed with invalid request");
return;
}
- const std::string &prefix = request.clear_prefix;
- const std::string &app_name = request.app_name;
+ const auto &prefix = request.clear_prefix;
+ const auto &app_name = request.app_name;
LOG_INFO("clear app envs for app({}) from remote({}): prefix = {}",
app_name,
env_rpc.remote_address(),
@@ -3283,79 +3285,145 @@ void server_state::clear_app_envs(const app_env_rpc
&env_rpc)
app_info ainfo;
std::string app_path;
{
+ FAIL_POINT_INJECT_NOT_RETURN_F(
+ "clear_app_envs_failed", [app_name, this](std::string_view s) {
+ zauto_write_lock l(_lock);
+
+ if (s == "not_found") {
+ CHECK_EQ(_exist_apps.erase(app_name), 1);
+ return;
+ }
+
+ if (s == "dropping") {
+ gutil::FindOrDie(_exist_apps, app_name)->status =
app_status::AS_DROPPING;
+ return;
+ }
+ });
+
zauto_read_lock l(_lock);
- std::shared_ptr<app_state> app = get_app(app_name);
- if (app == nullptr) {
- LOG_WARNING("clear app envs failed with invalid app_name({})",
app_name);
- env_rpc.response().err = ERR_INVALID_PARAMETERS;
- env_rpc.response().hint_message = "invalid app name";
+
+ const auto &app = get_app(app_name);
+ if (!app) {
+ LOG_WARNING("clear app envs failed since app_name({}) cannot be
found", app_name);
+ env_rpc.response().err = ERR_APP_NOT_EXIST;
+ env_rpc.response().hint_message = "app cannot be found";
return;
- } else {
- ainfo = *(reinterpret_cast<app_info *>(app.get()));
- app_path = get_app_path(*app);
}
+
+ if (app->status == app_status::AS_DROPPING) {
+ LOG_WARNING("clear app envs failed since app(name={}, id={}) is
being dropped",
+ app_name,
+ app->app_id);
+ env_rpc.response().err = ERR_BUSY_DROPPING;
+ env_rpc.response().hint_message = "app is being dropped";
+ return;
+ }
+
+ ainfo = *app;
+ app_path = get_app_path(*app);
}
if (ainfo.envs.empty()) {
- LOG_INFO("no key need to delete");
- env_rpc.response().hint_message = "no key need to delete";
+ LOG_INFO("no key needs to be deleted for app({})", app_name);
+ env_rpc.response().err = ERR_OK;
+ env_rpc.response().hint_message = "no key needs to be deleted";
return;
}
- std::set<std::string> erase_keys;
- std::ostringstream oss;
- oss << "deleted keys:";
+ std::set<std::string> deleted_keys;
+ std::string deleted_keys_info("deleted keys:");
if (prefix.empty()) {
- // ignore prefix
- for (auto &kv : ainfo.envs) {
- oss << std::endl << " " << kv.first;
+ // Empty prefix means deleting all environments.
+ for (const auto &[key, _] : ainfo.envs) {
+ fmt::format_to(std::back_inserter(deleted_keys_info), "\n {}",
key);
}
ainfo.envs.clear();
} else {
- // acquire key
- for (const auto &pair : ainfo.envs) {
- const std::string &key = pair.first;
- // normal : key = prefix.xxx
- if (key.size() > prefix.size() + 1) {
- if (key.substr(0, prefix.size()) == prefix &&
key.at(prefix.size()) == '.') {
- erase_keys.emplace(key);
- }
+ // The full prefix is the prefix plus the separator dot(.).
+ const size_t full_prefix_len = prefix.size() + sizeof('.');
+ for (const auto &[key, _] : ainfo.envs) {
+ // The key is not the target if it is shorter than, or just has
the same length
+ // as the full prefix.
+ if (key.size() <= full_prefix_len) {
+ continue;
+ }
+
+ // The key is not the target if the prefix is not matched.
+ if (!boost::algorithm::starts_with(key, prefix)) {
+ continue;
}
+
+ // The key is not the target if the separator is not dot(.).
+ if (key[prefix.size()] != '.') {
+ continue;
+ }
+
+ deleted_keys.emplace(key);
}
- // erase
- for (const auto &key : erase_keys) {
- oss << std::endl << " " << key;
+
+ for (const auto &key : deleted_keys) {
+ fmt::format_to(std::back_inserter(deleted_keys_info), "\n {}",
key);
ainfo.envs.erase(key);
}
- }
- if (!prefix.empty() && erase_keys.empty()) {
- // no need update app_info
- LOG_INFO("no key need to delete");
- env_rpc.response().hint_message = "no key need to delete";
- return;
- } else {
- env_rpc.response().hint_message = oss.str();
+ if (deleted_keys.empty()) {
+ LOG_INFO("no key needs to be deleted for app({})", app_name);
+ env_rpc.response().err = ERR_OK;
+ env_rpc.response().hint_message = "no key needs to be deleted";
+ return;
+ }
}
- do_update_app_info(
- app_path, ainfo, [this, app_name, prefix, erase_keys,
env_rpc](error_code ec) {
- CHECK_EQ_MSG(ec, ERR_OK, "update app info to remote storage
failed");
+ env_rpc.response().hint_message = std::move(deleted_keys_info);
- zauto_write_lock l(_lock);
- std::shared_ptr<app_state> app = get_app(app_name);
- std::string old_envs = dsn::utils::kv_map_to_string(app->envs,
',', '=');
- if (prefix.empty()) {
- app->envs.clear();
- } else {
- for (const auto &key : erase_keys) {
- app->envs.erase(key);
- }
+ do_update_app_info(app_path, ainfo, [this, app_name, deleted_keys,
env_rpc](error_code ec) {
+ CHECK_EQ_MSG(ec, ERR_OK, "update app({}) info to remote storage
failed", app_name);
+
+ zauto_write_lock l(_lock);
+
+ FAIL_POINT_INJECT_NOT_RETURN_F("clear_app_envs_failed",
+ [app_name, this](std::string_view s) {
+ if (s == "dropped_after") {
+
CHECK_EQ(_exist_apps.erase(app_name), 1);
+ return;
+ }
+ });
+
+ auto app = get_app(app_name);
+
+ // The table might be removed just before the callback function is
invoked, thus we must
+ // check if this table still exists.
+ //
+ // TODO(wangdan): should make updates to remote storage sequential by
supporting atomic
+ // set, otherwise update might be missing. For example, an update is
setting the envs
+ // while another is dropping a table. The update setting the envs does
not contain the
+ // dropped state. Once it is applied by remote storage after another
update dropping
+ // the table, the state of the table would always be non-dropped on
remote storage.
+ if (!app) {
+ LOG_ERROR("clear app envs failed since app({}) has just been
dropped", app_name);
+ env_rpc.response().err = ERR_APP_DROPPED;
+ env_rpc.response().hint_message = "app has just been dropped";
+ return;
+ }
+
+ env_rpc.response().err = ERR_OK;
+
+ const auto &old_envs = dsn::utils::kv_map_to_string(app->envs, ',',
'=');
+
+ if (deleted_keys.empty()) {
+ // `deleted_keys` would be empty only when `prefix` is empty.
Therefore, empty
+ // `deleted_keys` means deleting all environments.
+ app->envs.clear();
+ } else {
+ for (const auto &key : deleted_keys) {
+ app->envs.erase(key);
}
- std::string new_envs = dsn::utils::kv_map_to_string(app->envs,
',', '=');
- LOG_INFO("app envs changed: old_envs = {}, new_envs = {}",
old_envs, new_envs);
- });
+ }
+
+ const auto &new_envs = dsn::utils::kv_map_to_string(app->envs, ',',
'=');
+ LOG_INFO("app envs changed: old_envs = {}, new_envs = {}", old_envs,
new_envs);
+ });
}
namespace {
diff --git a/src/meta/test/server_state_test.cpp
b/src/meta/test/server_state_test.cpp
index 90af81185..6e15dd56c 100644
--- a/src/meta/test/server_state_test.cpp
+++ b/src/meta/test/server_state_test.cpp
@@ -56,53 +56,23 @@
DSN_DECLARE_string(cluster_root);
DSN_DECLARE_string(meta_state_service_type);
-namespace dsn {
-namespace replication {
-
-static const std::vector<std::string> keys = {
- dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
- dsn::replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL,
- dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION,
- dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
- dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL,
- dsn::replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
- dsn::replica_envs::ROCKSDB_USAGE_SCENARIO,
- dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT,
- dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS};
-static const std::vector<std::string> values = {
- "1712846598",
- "6",
- dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE,
- "1712846598",
- "-1",
- dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP,
- dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
- "1",
- "0"};
-
-static const std::vector<std::string> del_keys = {
- dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
- dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
- dsn::replica_envs::ROCKSDB_USAGE_SCENARIO};
-static const std::set<std::string> del_keys_set = {
- dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
- dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
- dsn::replica_envs::ROCKSDB_USAGE_SCENARIO};
-
-static const std::string clear_prefix = "rocksdb";
-
-// if str = "prefix.xxx" then return prefix
-// else return ""
-static std::string acquire_prefix(const std::string &str)
+namespace dsn::replication {
+
+namespace {
+
+// If `str` is <prefix>.xxx, return <prefix>; otherwise return empty
string("").
+std::string acquire_prefix(const std::string &str)
{
- auto index = str.find('.');
- if (index == std::string::npos) {
- return "";
- } else {
- return str.substr(0, index);
+ const auto &dot = str.find('.');
+ if (dot == std::string::npos) {
+ return {};
}
+
+ return str.substr(0, dot);
}
+} // anonymous namespace
+
class server_state_test
{
public:
@@ -178,6 +148,19 @@ public:
return rpc;
}
+ void test_clear_app_envs(const std::string &app_name,
+ const std::string &prefix,
+ const error_code expected_err)
+ {
+ configuration_update_app_env_request request;
+ request.__set_app_name(app_name);
+ request.__set_op(app_env_operation::type::APP_ENV_OP_CLEAR);
+ request.__set_clear_prefix(prefix);
+
+ auto rpc = clear_app_envs(request);
+ ASSERT_EQ(expected_err, rpc.response().err);
+ }
+
private:
static std::shared_ptr<app_state> fake_app_state(const std::string
&app_name,
const int32_t app_id)
@@ -244,6 +227,28 @@ private:
void meta_service_test_app::app_envs_basic_test()
{
+ static const std::vector<std::string> kKeys = {
+ dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+ dsn::replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL,
+ dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION,
+ dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
+ dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL,
+ dsn::replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
+ dsn::replica_envs::ROCKSDB_USAGE_SCENARIO,
+ dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT,
+ dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS};
+
+ static const std::vector<std::string> kValues = {
+ "1712846598",
+ "6",
+ dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE,
+ "1712846598",
+ "-1",
+ dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP,
+ dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
+ "1",
+ "0"};
+
server_state_test test;
test.load_apps({"test_app1",
"test_set_app_envs_not_found",
@@ -251,7 +256,10 @@ void meta_service_test_app::app_envs_basic_test()
"test_set_app_envs_dropped_after",
"test_del_app_envs_not_found",
"test_del_app_envs_dropping",
- "test_del_app_envs_dropped_after"});
+ "test_del_app_envs_dropped_after",
+ "test_clear_app_envs_not_found",
+ "test_clear_app_envs_dropping",
+ "test_clear_app_envs_dropped_after"});
#define TEST_SET_APP_ENVS_FAILED(action, err_code)
\
std::cout << "test server_state::set_app_envs(" #action ")..." <<
std::endl; \
@@ -282,17 +290,17 @@ void meta_service_test_app::app_envs_basic_test()
// Normal case for setting envs.
std::cout << "test server_state::set_app_envs(success)..." << std::endl;
{
- test.test_set_app_envs("test_app1", keys, values, ERR_OK);
+ test.test_set_app_envs("test_app1", kKeys, kValues, ERR_OK);
const auto &app = test.get_app("test_app1");
ASSERT_TRUE(app);
- for (size_t idx = 0; idx < keys.size(); ++idx) {
- const auto &key = keys[idx];
+ for (size_t idx = 0; idx < kKeys.size(); ++idx) {
+ const auto &key = kKeys[idx];
// Every env should be inserted.
ASSERT_EQ(1, app->envs.count(key));
- ASSERT_EQ(values[idx], app->envs.at(key));
+ ASSERT_EQ(kValues[idx], app->envs.at(key));
}
}
@@ -325,52 +333,81 @@ void meta_service_test_app::app_envs_basic_test()
#undef TEST_DEL_APP_ENVS_FAILED
+ static const std::vector<std::string> kDelKeyList = {
+ dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+ dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
+ dsn::replica_envs::ROCKSDB_USAGE_SCENARIO};
+ static const std::set<std::string> kDelKeySet(kDelKeyList.begin(),
kDelKeyList.end());
+
// Normal case for deleting envs.
std::cout << "test server_state::del_app_envs(success)..." << std::endl;
{
- test.test_del_app_envs("test_app1", del_keys, ERR_OK);
+ test.test_del_app_envs("test_app1", kDelKeyList, ERR_OK);
const auto &app = test.get_app("test_app1");
ASSERT_TRUE(app);
- for (size_t idx = 0; idx < keys.size(); ++idx) {
- const std::string &key = keys[idx];
- if (del_keys_set.count(key) > 0) {
- // The env in `del_keys_set` should be deleted.
+ for (size_t idx = 0; idx < kKeys.size(); ++idx) {
+ const std::string &key = kKeys[idx];
+ if (kDelKeySet.count(key) > 0) {
+ // The env in `kDelKeySet` should be deleted.
ASSERT_EQ(0, app->envs.count(key));
continue;
}
- // The env should still exist if it is not in `del_keys_set`.
+ // The env should still exist if it is not in `kDelKeySet`.
ASSERT_EQ(1, app->envs.count(key));
- ASSERT_EQ(values[idx], app->envs.at(key));
+ ASSERT_EQ(kValues[idx], app->envs.at(key));
}
}
- std::cout << "test server_state::clear_app_envs()..." << std::endl;
+#define TEST_CLEAR_APP_ENVS_FAILED(action, err_code)
\
+ std::cout << "test server_state::clear_app_envs(" #action ")..." <<
std::endl; \
+ do {
\
+ test.test_set_app_envs("test_clear_app_envs_" #action,
\
+ {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE},
\
+ {"67108864"},
\
+ ERR_OK);
\
+
\
+ fail::setup();
\
+ fail::cfg("clear_app_envs_failed", "void(" #action ")");
\
+
\
+ test.test_clear_app_envs("test_clear_app_envs_" #action, "",
err_code); \
+
\
+ fail::teardown();
\
+ } while (0)
+
+ // Failed to clearing envs while table was not found.
+ TEST_CLEAR_APP_ENVS_FAILED(not_found, ERR_APP_NOT_EXIST);
+
+ // Failed to clearing envs while table was being dropped as the
intermediate state.
+ TEST_CLEAR_APP_ENVS_FAILED(dropping, ERR_BUSY_DROPPING);
+
+ // The table was found dropped after the new envs had been persistent on
the remote
+ // meta storage.
+ TEST_CLEAR_APP_ENVS_FAILED(dropped_after, ERR_APP_DROPPED);
+
+#undef TEST_CLEAR_APP_ENVS_FAILED
+
+ std::cout << "test server_state::clear_app_envs(success)..." << std::endl;
{
// Test specifying prefix.
{
- configuration_update_app_env_request request;
- request.__set_app_name("test_app1");
- request.__set_op(app_env_operation::type::APP_ENV_OP_CLEAR);
- request.__set_clear_prefix(clear_prefix);
-
- auto rpc = test.clear_app_envs(request);
- ASSERT_EQ(ERR_OK, rpc.response().err);
+ static const std::string kClearPrefix = "rocksdb";
+ test.test_clear_app_envs("test_app1", kClearPrefix, ERR_OK);
const auto &app = test.get_app("test_app1");
ASSERT_TRUE(app);
- for (size_t idx = 0; idx < keys.size(); ++idx) {
- const std::string &key = keys[idx];
- if (del_keys_set.count(key) > 0) {
+ for (size_t idx = 0; idx < kKeys.size(); ++idx) {
+ const std::string &key = kKeys[idx];
+ if (kDelKeySet.count(key) > 0) {
// The env should have been deleted during test for
`del_app_envs`.
ASSERT_EQ(0, app->envs.count(key));
continue;
}
- if (acquire_prefix(key) == clear_prefix) {
+ if (acquire_prefix(key) == kClearPrefix) {
// The env with specified prefix should be deleted.
ASSERT_EQ(0, app->envs.count(key));
continue;
@@ -378,19 +415,13 @@ void meta_service_test_app::app_envs_basic_test()
// Otherwise, the env should still exist.
ASSERT_EQ(1, app->envs.count(key));
- ASSERT_EQ(values[idx], app->envs.at(key));
+ ASSERT_EQ(kValues[idx], app->envs.at(key));
}
}
// Test clearing all.
{
- configuration_update_app_env_request request;
- request.__set_app_name("test_app1");
- request.__set_op(app_env_operation::type::APP_ENV_OP_CLEAR);
- request.__set_clear_prefix("");
-
- auto rpc = test.clear_app_envs(request);
- ASSERT_EQ(ERR_OK, rpc.response().err);
+ test.test_clear_app_envs("test_app1", "", ERR_OK);
const auto &app = test.get_app("test_app1");
ASSERT_TRUE(app);
@@ -401,5 +432,4 @@ void meta_service_test_app::app_envs_basic_test()
}
}
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]