This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 9303c3aba fix: Fix the corruption RocksDB instance will be reused bug
(#1422)
9303c3aba is described below
commit 9303c3aba87c8392705e52baeeee768ad385d5b3
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Apr 11 14:19:36 2023 +0800
fix: Fix the corruption RocksDB instance will be reused bug (#1422)
https://github.com/apache/incubator-pegasus/issues/1383
This patch deal with the error `kCorruption` returned from storage
engine of write requests. After replica server got such an error,
it will trash the replica to a trash path
`<app_id>.<pid>.pegasus.<timestamp>.err`.
Note that the replica server may crash because the corrupted replica
has been trashed and closed, it is left to be completed by another
patches.
---
src/common/fs_manager.h | 2 +
src/replica/replica.h | 7 ++
src/replica/replica_failover.cpp | 4 +
src/replica/replica_learn.cpp | 10 +-
src/replica/replica_stub.cpp | 6 +
src/replica/replica_stub.h | 1 +
src/replica/replication_app_base.cpp | 9 +-
src/replica/replication_app_base.h | 4 +-
src/replica/split/replica_split_manager.cpp | 26 +++--
src/replica/test/clear.sh | 2 +-
src/replica/test/replica_test.cpp | 39 +++++++
src/server/rocksdb_wrapper.cpp | 11 ++
.../base_api_test/integration_test.cpp | 130 +++++++++++++++++++++
src/utils/error_code.h | 2 +
14 files changed, 238 insertions(+), 15 deletions(-)
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 3c29973c2..75427cc89 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -19,6 +19,7 @@
#include <stdint.h>
#include <functional>
+#include <gtest/gtest_prod.h>
#include <map>
#include <memory>
#include <set>
@@ -147,6 +148,7 @@ private:
friend class replica_disk_migrator;
friend class replica_disk_test_base;
friend class open_replica_test;
+ FRIEND_TEST(replica_test, test_auto_trash);
};
} // replication
} // dsn
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 59218d17a..fdcdfbb19 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -42,6 +42,7 @@
// which is binded to this replication partition
//
+#include <gtest/gtest_prod.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
@@ -78,6 +79,7 @@ namespace dsn {
class gpid;
class perf_counter;
class rpc_address;
+
namespace dist {
namespace block_service {
class block_filesystem;
@@ -520,6 +522,8 @@ private:
void update_app_max_replica_count(int32_t max_replica_count);
void update_app_name(const std::string &app_name);
+ bool is_data_corrupted() const { return _data_corrupted; }
+
private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_queue;
@@ -540,6 +544,7 @@ private:
friend class replica_disk_migrate_test;
friend class open_replica_test;
friend class replica_follower;
+ FRIEND_TEST(replica_test, test_auto_trash);
// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
@@ -661,6 +666,8 @@ private:
disk_status::type _disk_status{disk_status::NORMAL};
bool _allow_ingest_behind{false};
+ // Indicate where the storage engine data is corrupted and unrecoverable.
+ bool _data_corrupted{false};
};
typedef dsn::ref_ptr<replica> replica_ptr;
} // namespace replication
diff --git a/src/replica/replica_failover.cpp b/src/replica/replica_failover.cpp
index 64fee2988..94edaab83 100644
--- a/src/replica/replica_failover.cpp
+++ b/src/replica/replica_failover.cpp
@@ -54,6 +54,10 @@ void replica::handle_local_failure(error_code error)
{
LOG_INFO_PREFIX("handle local failure error {}, status = {}", error,
enum_to_string(status()));
+ if (error == ERR_RDB_CORRUPTION) {
+ _data_corrupted = true;
+ }
+
if (status() == partition_status::PS_PRIMARY) {
_stub->remove_replica_on_meta_server(_app_info,
_primary_states.membership);
}
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index 1d1abadfc..d3bbfe638 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -1236,6 +1236,10 @@ void replica::handle_learning_error(error_code err, bool
is_local_error)
err,
is_local_error ? "local_error" : "remote error");
+ if (is_local_error && err == ERR_RDB_CORRUPTION) {
+ _data_corrupted = true;
+ }
+
_stub->_counter_replicas_learning_recent_learn_fail_count->increment();
update_local_configuration_with_no_ballot_change(
@@ -1495,7 +1499,11 @@ error_code
replica::apply_learned_state_from_private_log(learn_state &state)
}
// TODO: assign the returned error_code to err and
check it
- _app->apply_mutation(mu);
+ auto ec = _app->apply_mutation(mu);
+ if (ec != ERR_OK) {
+ handle_local_failure(ec);
+ return;
+ }
// appends logs-in-cache into plog to ensure them
can be duplicated.
// if current case is step back, it means the logs
has been reserved
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 2b3030295..deb912342 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -2383,6 +2383,12 @@ void replica_stub::close_replica(replica_ptr r)
_counter_replicas_closing_count->decrement();
}
+ if (r->is_data_corrupted()) {
+ _fs_manager.remove_replica(id);
+ move_to_err_path(r->dir(), "trash replica");
+ _counter_replicas_recent_replica_move_error_count->increment();
+ }
+
LOG_INFO("{}: finish to close replica", name);
}
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index af04af6c0..81be17212 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -387,6 +387,7 @@ private:
friend class replica_follower_test;
friend class replica_http_service_test;
FRIEND_TEST(replica_test, test_clear_on_failure);
+ FRIEND_TEST(replica_test, test_auto_trash);
typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr,
app_info, replica_info>>
diff --git a/src/replica/replication_app_base.cpp
b/src/replica/replication_app_base.cpp
index e7d3ef8ec..8b20d6df8 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -448,7 +448,14 @@ error_code replication_app_base::apply_mutation(const
mutation *mu)
// because the external sst files may not exist, in this case, we
won't consider it as
// an error.
if (!has_ingestion_request) {
- return ERR_LOCAL_APP_FAILURE;
+ switch (storage_error) {
+ // TODO(yingchun): Now only kCorruption is dealt, consider to deal
with more storage
+ // engine errors.
+ case rocksdb::Status::kCorruption:
+ return ERR_RDB_CORRUPTION;
+ default:
+ return ERR_LOCAL_APP_FAILURE;
+ }
}
}
diff --git a/src/replica/replication_app_base.h
b/src/replica/replication_app_base.h
index b543a9111..66ac6fb9b 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -139,8 +139,10 @@ public:
// Return code:
// - ERR_OK: everything is OK.
+ // - ERR_RDB_CORRUPTION: encountered some unrecoverable data errors,
i.e. kCorruption from
+ // storage engine.
// - ERR_LOCAL_APP_FAILURE: other type of errors.
- error_code apply_mutation(const mutation *mu);
+ error_code apply_mutation(const mutation *mu) WARN_UNUSED_RESULT;
// methods need to implement on storage engine side
virtual error_code start(int argc, char **argv) = 0;
diff --git a/src/replica/split/replica_split_manager.cpp
b/src/replica/split/replica_split_manager.cpp
index f366407c1..0c63a5ebf 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -427,17 +427,21 @@
replica_split_manager::child_apply_private_logs(std::vector<std::string> plog_fi
error_code ec;
int64_t offset;
// temp prepare_list used for apply states
- prepare_list plist(_replica,
- _replica->_app->last_committed_decree(),
- FLAGS_max_mutation_count_in_prepare_list,
- [this](mutation_ptr &mu) {
- if (mu->data.header.decree !=
- _replica->_app->last_committed_decree() + 1) {
- return;
- }
-
- _replica->_app->apply_mutation(mu);
- });
+ prepare_list plist(
+ _replica,
+ _replica->_app->last_committed_decree(),
+ FLAGS_max_mutation_count_in_prepare_list,
+ [this](mutation_ptr &mu) {
+ if (mu->data.header.decree !=
_replica->_app->last_committed_decree() + 1) {
+ return;
+ }
+
+ auto e = _replica->_app->apply_mutation(mu);
+ if (e != ERR_OK) {
+ LOG_ERROR_PREFIX("got an error({}) in commit stage of
prepare_list", e);
+ return;
+ }
+ });
// replay private log
ec = mutation_log::replay(plog_files,
diff --git a/src/replica/test/clear.sh b/src/replica/test/clear.sh
index e11f60c6d..4dd4f0840 100755
--- a/src/replica/test/clear.sh
+++ b/src/replica/test/clear.sh
@@ -17,4 +17,4 @@
# specific language governing permissions and limitations
# under the License.
-rm -rf core.* data/ log.* replica.* tag* test* test_cluster/
+rm -rf *.err core.* data/ log.* replica.* tag* test* test_cluster/
diff --git a/src/replica/test/replica_test.cpp
b/src/replica/test/replica_test.cpp
index 42be2a2ae..8e09a8e03 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -40,6 +40,7 @@
#include "metadata_types.h"
#include "perf_counter/perf_counter.h"
#include "perf_counter/perf_counter_wrapper.h"
+#include "replica/disk_cleaner.h"
#include "replica/replica.h"
#include "replica/replica_http_service.h"
#include "replica/replica_stub.h"
@@ -59,6 +60,7 @@
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
+#include "utils/string_conv.h"
namespace dsn {
namespace replication {
@@ -480,6 +482,43 @@ TEST_F(replica_test, test_clear_on_failure)
ASSERT_FALSE(has_gpid(pid));
}
+TEST_F(replica_test, test_auto_trash)
+{
+ // Disable failure detector to avoid connecting with meta server which is
not started.
+ FLAGS_fd_disabled = true;
+
+ replica *rep =
+ stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY,
1, false, true);
+ auto path = rep->dir();
+ dsn::utils::filesystem::create_directory(path);
+ ASSERT_TRUE(has_gpid(pid));
+
+ rep->handle_local_failure(ERR_RDB_CORRUPTION);
+ stub->wait_closing_replicas_finished();
+
+ ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
+ dir_node *dn = stub->get_fs_manager()->get_dir_node(path);
+ ASSERT_NE(dn, nullptr);
+ std::vector<std::string> subs;
+ ASSERT_TRUE(dsn::utils::filesystem::get_subdirectories(dn->full_dir, subs,
false));
+ bool found = false;
+ const int ts_length = 16;
+ size_t err_pos = path.size() + ts_length + 1; // Add 1 for dot in path.
+ for (const auto &sub : subs) {
+ if (sub.size() <= path.size()) {
+ continue;
+ }
+ uint64_t ts = 0;
+ if (sub.find(path) == 0 && sub.find(kFolderSuffixErr) == err_pos &&
+ dsn::buf2uint64(sub.substr(path.size() + 1, ts_length), ts)) {
+ found = true;
+ break;
+ }
+ }
+ ASSERT_TRUE(found);
+ ASSERT_FALSE(has_gpid(pid));
+}
+
TEST_F(replica_test, update_deny_client_test)
{
struct update_deny_client_test
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index c5872ec13..64d37d9be 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -35,12 +35,19 @@
#include "server/pegasus_write_service.h"
#include "utils/blob.h"
#include "utils/fail_point.h"
+#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
namespace pegasus {
namespace server {
+DSN_DEFINE_int32(pegasus.server,
+ inject_write_error_for_test,
+ 0,
+ "Which error code to inject in write path, 0 means no error.
Only for test.");
+DSN_TAG_VARIABLE(inject_write_error_for_test, FT_MUTABLE);
+
rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
: replica_base(server),
_db(server->_db),
@@ -154,6 +161,10 @@ int rocksdb_wrapper::write(int64_t decree)
{
CHECK_GT(_write_batch->Count(), 0);
+ if (dsn_unlikely(FLAGS_inject_write_error_for_test !=
rocksdb::Status::kOk)) {
+ return FLAGS_inject_write_error_for_test;
+ }
+
FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return
FAIL_DB_WRITE; });
rocksdb::Status status =
diff --git a/src/test/function_test/base_api_test/integration_test.cpp
b/src/test/function_test/base_api_test/integration_test.cpp
new file mode 100644
index 000000000..8ab3cc56c
--- /dev/null
+++ b/src/test/function_test/base_api_test/integration_test.cpp
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <fmt/core.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+#include <unistd.h>
+#include <iostream>
+#include <string>
+
+#include "include/pegasus/client.h"
+#include "pegasus/error.h"
+#include "test/function_test/utils/test_util.h"
+#include "test/function_test/utils/utils.h"
+
+using namespace ::pegasus;
+
+typedef pegasus_client::internal_info internal_info;
+
+class integration_test : public test_util
+{
+};
+
+TEST_F(integration_test, write_corrupt_db)
+{
+ // Inject a write error kCorruption to RS-0.
+ ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
+ "curl 'localhost:34801/updateConfig?inject_write_error_for_test=2'"));
+
+ std::string skey = "skey";
+ std::string value = "value";
+ int ok_count = 0;
+ int corruption_count = 0;
+ for (int i = 0; i < 1000; i++) {
+ std::string hkey = fmt::format("hkey1_{}", i);
+ int ret = PERR_OK;
+ do {
+ ret = client_->set(hkey, skey, value);
+ if (ret == PERR_OK) {
+ ok_count++;
+ break;
+ } else if (ret == PERR_CORRUPTION) {
+ // Suppose there must some primaries on RS-0.
+ corruption_count++;
+ break;
+ } else if (ret == PERR_TIMEOUT) {
+ // If RS-0 crashed before (learn failed when write storage
engine but get
+ // kCorruption),
+ // a new write operation on the primary replica it ever held
will cause timeout.
+ // Force to fetch the latest route table.
+ client_ =
+ pegasus_client_factory::get_client(cluster_name_.c_str(),
app_name_.c_str());
+ ASSERT_TRUE(client_ != nullptr);
+ } else {
+ ASSERT_TRUE(false) << ret;
+ }
+ } while (true);
+
+ // Since only 1 replica server failed, so we can still get correct
value from other replica
+ // servers.
+ std::string got_value;
+ ret = client_->get(hkey, skey, got_value);
+ do {
+ if (ret == PERR_OK) {
+ break;
+ }
+ ASSERT_EQ(PERR_NOT_FOUND, ret);
+ client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), app_name_.c_str());
+ ASSERT_TRUE(client_ != nullptr);
+
+ ret = client_->get(hkey, skey, got_value);
+ } while (true);
+ ASSERT_EQ(value, got_value);
+ }
+
+ EXPECT_GT(ok_count, 0);
+ EXPECT_GT(corruption_count, 0);
+ std::cout << "ok_count: " << ok_count << ", corruption_count: " <<
corruption_count;
+
+ // Now only 2 RS left.
+ std::string rs_count;
+ ASSERT_NO_FATAL_FAILURE(run_cmd(
+ "ps aux | grep 'pegasus_server config.ini -app_list replica' | grep -v
grep | wc -l",
+ &rs_count));
+ ASSERT_EQ("2", rs_count);
+
+ // Replica server 0 is able to start normally.
+ // After restart, the 'inject_write_error_for_test' config value will be
reset to 0 (i.e. OK).
+ ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("./run.sh
start_onebox_instance -r 1"));
+ ASSERT_NO_FATAL_FAILURE(run_cmd(
+ "ps aux | grep 'pegasus_server config.ini -app_list replica' | grep -v
grep | wc -l",
+ &rs_count));
+ ASSERT_EQ("3", rs_count);
+
+ // Make best effort to rebalance the cluster,
+ ASSERT_NO_FATAL_FAILURE(
+ run_cmd_from_project_root("echo 'set_meta_level lively' | ./run.sh
shell"));
+ usleep(10 * 1000 * 1000);
+
+ for (int i = 0; i < 1000; i++) {
+ std::string hkey = fmt::format("hkey2_{}", i);
+ int ret = client_->set(hkey, skey, value);
+ ASSERT_EQ(PERR_OK, ret) << ret;
+ std::string got_value;
+ ASSERT_EQ(PERR_OK, client_->get(hkey, skey, got_value));
+ ASSERT_EQ(value, got_value);
+ }
+
+ ASSERT_NO_FATAL_FAILURE(run_cmd(
+ "ps aux | grep 'pegasus_server config.ini -app_list replica' | grep -v
grep | wc -l",
+ &rs_count));
+ ASSERT_EQ("3", rs_count);
+}
diff --git a/src/utils/error_code.h b/src/utils/error_code.h
index 613a67efe..998b21872 100644
--- a/src/utils/error_code.h
+++ b/src/utils/error_code.h
@@ -172,4 +172,6 @@ DEFINE_ERR_CODE(ERR_RETRY_EXHAUSTED)
DEFINE_ERR_CODE(ERR_SYNC_RANGER_POLICIES_FAILED)
DEFINE_ERR_CODE(ERR_RANGER_PARSE_ACL)
DEFINE_ERR_CODE(ERR_RANGER_POLICIES_NO_NEED_UPDATE)
+
+DEFINE_ERR_CODE(ERR_RDB_CORRUPTION)
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]