This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 9303c3aba fix: Fix the corruption RocksDB instance will be reused bug 
(#1422)
9303c3aba is described below

commit 9303c3aba87c8392705e52baeeee768ad385d5b3
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Apr 11 14:19:36 2023 +0800

    fix: Fix the corruption RocksDB instance will be reused bug (#1422)
    
    https://github.com/apache/incubator-pegasus/issues/1383
    
    This patch deal with the error `kCorruption` returned from storage
    engine of write requests. After replica server got such an error,
    it will trash the replica to a trash path
    `<app_id>.<pid>.pegasus.<timestamp>.err`.
    
    Note that the replica server may crash because the corrupted replica
    has been trashed and closed, it is left to be completed by another
    patches.
---
 src/common/fs_manager.h                            |   2 +
 src/replica/replica.h                              |   7 ++
 src/replica/replica_failover.cpp                   |   4 +
 src/replica/replica_learn.cpp                      |  10 +-
 src/replica/replica_stub.cpp                       |   6 +
 src/replica/replica_stub.h                         |   1 +
 src/replica/replication_app_base.cpp               |   9 +-
 src/replica/replication_app_base.h                 |   4 +-
 src/replica/split/replica_split_manager.cpp        |  26 +++--
 src/replica/test/clear.sh                          |   2 +-
 src/replica/test/replica_test.cpp                  |  39 +++++++
 src/server/rocksdb_wrapper.cpp                     |  11 ++
 .../base_api_test/integration_test.cpp             | 130 +++++++++++++++++++++
 src/utils/error_code.h                             |   2 +
 14 files changed, 238 insertions(+), 15 deletions(-)

diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 3c29973c2..75427cc89 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -19,6 +19,7 @@
 
 #include <stdint.h>
 #include <functional>
+#include <gtest/gtest_prod.h>
 #include <map>
 #include <memory>
 #include <set>
@@ -147,6 +148,7 @@ private:
     friend class replica_disk_migrator;
     friend class replica_disk_test_base;
     friend class open_replica_test;
+    FRIEND_TEST(replica_test, test_auto_trash);
 };
 } // replication
 } // dsn
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 59218d17a..fdcdfbb19 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -42,6 +42,7 @@
 // which is binded to this replication partition
 //
 
+#include <gtest/gtest_prod.h>
 #include <stddef.h>
 #include <stdint.h>
 #include <atomic>
@@ -78,6 +79,7 @@ namespace dsn {
 class gpid;
 class perf_counter;
 class rpc_address;
+
 namespace dist {
 namespace block_service {
 class block_filesystem;
@@ -520,6 +522,8 @@ private:
     void update_app_max_replica_count(int32_t max_replica_count);
     void update_app_name(const std::string &app_name);
 
+    bool is_data_corrupted() const { return _data_corrupted; }
+
 private:
     friend class ::dsn::replication::test::test_checker;
     friend class ::dsn::replication::mutation_queue;
@@ -540,6 +544,7 @@ private:
     friend class replica_disk_migrate_test;
     friend class open_replica_test;
     friend class replica_follower;
+    FRIEND_TEST(replica_test, test_auto_trash);
 
     // replica configuration, updated by update_local_configuration ONLY
     replica_configuration _config;
@@ -661,6 +666,8 @@ private:
     disk_status::type _disk_status{disk_status::NORMAL};
 
     bool _allow_ingest_behind{false};
+    // Indicate where the storage engine data is corrupted and unrecoverable.
+    bool _data_corrupted{false};
 };
 typedef dsn::ref_ptr<replica> replica_ptr;
 } // namespace replication
diff --git a/src/replica/replica_failover.cpp b/src/replica/replica_failover.cpp
index 64fee2988..94edaab83 100644
--- a/src/replica/replica_failover.cpp
+++ b/src/replica/replica_failover.cpp
@@ -54,6 +54,10 @@ void replica::handle_local_failure(error_code error)
 {
     LOG_INFO_PREFIX("handle local failure error {}, status = {}", error, 
enum_to_string(status()));
 
+    if (error == ERR_RDB_CORRUPTION) {
+        _data_corrupted = true;
+    }
+
     if (status() == partition_status::PS_PRIMARY) {
         _stub->remove_replica_on_meta_server(_app_info, 
_primary_states.membership);
     }
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index 1d1abadfc..d3bbfe638 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -1236,6 +1236,10 @@ void replica::handle_learning_error(error_code err, bool 
is_local_error)
         err,
         is_local_error ? "local_error" : "remote error");
 
+    if (is_local_error && err == ERR_RDB_CORRUPTION) {
+        _data_corrupted = true;
+    }
+
     _stub->_counter_replicas_learning_recent_learn_fail_count->increment();
 
     update_local_configuration_with_no_ballot_change(
@@ -1495,7 +1499,11 @@ error_code 
replica::apply_learned_state_from_private_log(learn_state &state)
                            }
 
                            // TODO: assign the returned error_code to err and 
check it
-                           _app->apply_mutation(mu);
+                           auto ec = _app->apply_mutation(mu);
+                           if (ec != ERR_OK) {
+                               handle_local_failure(ec);
+                               return;
+                           }
 
                            // appends logs-in-cache into plog to ensure them 
can be duplicated.
                            // if current case is step back, it means the logs 
has been reserved
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 2b3030295..deb912342 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -2383,6 +2383,12 @@ void replica_stub::close_replica(replica_ptr r)
         _counter_replicas_closing_count->decrement();
     }
 
+    if (r->is_data_corrupted()) {
+        _fs_manager.remove_replica(id);
+        move_to_err_path(r->dir(), "trash replica");
+        _counter_replicas_recent_replica_move_error_count->increment();
+    }
+
     LOG_INFO("{}: finish to close replica", name);
 }
 
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index af04af6c0..81be17212 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -387,6 +387,7 @@ private:
     friend class replica_follower_test;
     friend class replica_http_service_test;
     FRIEND_TEST(replica_test, test_clear_on_failure);
+    FRIEND_TEST(replica_test, test_auto_trash);
 
     typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
     typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, 
app_info, replica_info>>
diff --git a/src/replica/replication_app_base.cpp 
b/src/replica/replication_app_base.cpp
index e7d3ef8ec..8b20d6df8 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -448,7 +448,14 @@ error_code replication_app_base::apply_mutation(const 
mutation *mu)
         //      because the external sst files may not exist, in this case, we 
won't consider it as
         //      an error.
         if (!has_ingestion_request) {
-            return ERR_LOCAL_APP_FAILURE;
+            switch (storage_error) {
+            // TODO(yingchun): Now only kCorruption is dealt, consider to deal 
with more storage
+            //  engine errors.
+            case rocksdb::Status::kCorruption:
+                return ERR_RDB_CORRUPTION;
+            default:
+                return ERR_LOCAL_APP_FAILURE;
+            }
         }
     }
 
diff --git a/src/replica/replication_app_base.h 
b/src/replica/replication_app_base.h
index b543a9111..66ac6fb9b 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -139,8 +139,10 @@ public:
 
     // Return code:
     //   - ERR_OK: everything is OK.
+    //   - ERR_RDB_CORRUPTION: encountered some unrecoverable data errors, 
i.e. kCorruption from
+    //     storage engine.
     //   - ERR_LOCAL_APP_FAILURE: other type of errors.
-    error_code apply_mutation(const mutation *mu);
+    error_code apply_mutation(const mutation *mu) WARN_UNUSED_RESULT;
 
     // methods need to implement on storage engine side
     virtual error_code start(int argc, char **argv) = 0;
diff --git a/src/replica/split/replica_split_manager.cpp 
b/src/replica/split/replica_split_manager.cpp
index f366407c1..0c63a5ebf 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -427,17 +427,21 @@ 
replica_split_manager::child_apply_private_logs(std::vector<std::string> plog_fi
     error_code ec;
     int64_t offset;
     // temp prepare_list used for apply states
-    prepare_list plist(_replica,
-                       _replica->_app->last_committed_decree(),
-                       FLAGS_max_mutation_count_in_prepare_list,
-                       [this](mutation_ptr &mu) {
-                           if (mu->data.header.decree !=
-                               _replica->_app->last_committed_decree() + 1) {
-                               return;
-                           }
-
-                           _replica->_app->apply_mutation(mu);
-                       });
+    prepare_list plist(
+        _replica,
+        _replica->_app->last_committed_decree(),
+        FLAGS_max_mutation_count_in_prepare_list,
+        [this](mutation_ptr &mu) {
+            if (mu->data.header.decree != 
_replica->_app->last_committed_decree() + 1) {
+                return;
+            }
+
+            auto e = _replica->_app->apply_mutation(mu);
+            if (e != ERR_OK) {
+                LOG_ERROR_PREFIX("got an error({}) in commit stage of 
prepare_list", e);
+                return;
+            }
+        });
 
     // replay private log
     ec = mutation_log::replay(plog_files,
diff --git a/src/replica/test/clear.sh b/src/replica/test/clear.sh
index e11f60c6d..4dd4f0840 100755
--- a/src/replica/test/clear.sh
+++ b/src/replica/test/clear.sh
@@ -17,4 +17,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-rm -rf core.* data/ log.* replica.* tag* test* test_cluster/
+rm -rf *.err core.* data/ log.* replica.* tag* test* test_cluster/
diff --git a/src/replica/test/replica_test.cpp 
b/src/replica/test/replica_test.cpp
index 42be2a2ae..8e09a8e03 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -40,6 +40,7 @@
 #include "metadata_types.h"
 #include "perf_counter/perf_counter.h"
 #include "perf_counter/perf_counter_wrapper.h"
+#include "replica/disk_cleaner.h"
 #include "replica/replica.h"
 #include "replica/replica_http_service.h"
 #include "replica/replica_stub.h"
@@ -59,6 +60,7 @@
 #include "utils/filesystem.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
+#include "utils/string_conv.h"
 
 namespace dsn {
 namespace replication {
@@ -480,6 +482,43 @@ TEST_F(replica_test, test_clear_on_failure)
     ASSERT_FALSE(has_gpid(pid));
 }
 
+TEST_F(replica_test, test_auto_trash)
+{
+    // Disable failure detector to avoid connecting with meta server which is 
not started.
+    FLAGS_fd_disabled = true;
+
+    replica *rep =
+        stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 
1, false, true);
+    auto path = rep->dir();
+    dsn::utils::filesystem::create_directory(path);
+    ASSERT_TRUE(has_gpid(pid));
+
+    rep->handle_local_failure(ERR_RDB_CORRUPTION);
+    stub->wait_closing_replicas_finished();
+
+    ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
+    dir_node *dn = stub->get_fs_manager()->get_dir_node(path);
+    ASSERT_NE(dn, nullptr);
+    std::vector<std::string> subs;
+    ASSERT_TRUE(dsn::utils::filesystem::get_subdirectories(dn->full_dir, subs, 
false));
+    bool found = false;
+    const int ts_length = 16;
+    size_t err_pos = path.size() + ts_length + 1; // Add 1 for dot in path.
+    for (const auto &sub : subs) {
+        if (sub.size() <= path.size()) {
+            continue;
+        }
+        uint64_t ts = 0;
+        if (sub.find(path) == 0 && sub.find(kFolderSuffixErr) == err_pos &&
+            dsn::buf2uint64(sub.substr(path.size() + 1, ts_length), ts)) {
+            found = true;
+            break;
+        }
+    }
+    ASSERT_TRUE(found);
+    ASSERT_FALSE(has_gpid(pid));
+}
+
 TEST_F(replica_test, update_deny_client_test)
 {
     struct update_deny_client_test
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index c5872ec13..64d37d9be 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -35,12 +35,19 @@
 #include "server/pegasus_write_service.h"
 #include "utils/blob.h"
 #include "utils/fail_point.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/ports.h"
 
 namespace pegasus {
 namespace server {
 
+DSN_DEFINE_int32(pegasus.server,
+                 inject_write_error_for_test,
+                 0,
+                 "Which error code to inject in write path, 0 means no error. 
Only for test.");
+DSN_TAG_VARIABLE(inject_write_error_for_test, FT_MUTABLE);
+
 rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
     : replica_base(server),
       _db(server->_db),
@@ -154,6 +161,10 @@ int rocksdb_wrapper::write(int64_t decree)
 {
     CHECK_GT(_write_batch->Count(), 0);
 
+    if (dsn_unlikely(FLAGS_inject_write_error_for_test != 
rocksdb::Status::kOk)) {
+        return FLAGS_inject_write_error_for_test;
+    }
+
     FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return 
FAIL_DB_WRITE; });
 
     rocksdb::Status status =
diff --git a/src/test/function_test/base_api_test/integration_test.cpp 
b/src/test/function_test/base_api_test/integration_test.cpp
new file mode 100644
index 000000000..8ab3cc56c
--- /dev/null
+++ b/src/test/function_test/base_api_test/integration_test.cpp
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <fmt/core.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+#include <unistd.h>
+#include <iostream>
+#include <string>
+
+#include "include/pegasus/client.h"
+#include "pegasus/error.h"
+#include "test/function_test/utils/test_util.h"
+#include "test/function_test/utils/utils.h"
+
+using namespace ::pegasus;
+
+typedef pegasus_client::internal_info internal_info;
+
+class integration_test : public test_util
+{
+};
+
+TEST_F(integration_test, write_corrupt_db)
+{
+    // Inject a write error kCorruption to RS-0.
+    ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
+        "curl 'localhost:34801/updateConfig?inject_write_error_for_test=2'"));
+
+    std::string skey = "skey";
+    std::string value = "value";
+    int ok_count = 0;
+    int corruption_count = 0;
+    for (int i = 0; i < 1000; i++) {
+        std::string hkey = fmt::format("hkey1_{}", i);
+        int ret = PERR_OK;
+        do {
+            ret = client_->set(hkey, skey, value);
+            if (ret == PERR_OK) {
+                ok_count++;
+                break;
+            } else if (ret == PERR_CORRUPTION) {
+                // Suppose there must some primaries on RS-0.
+                corruption_count++;
+                break;
+            } else if (ret == PERR_TIMEOUT) {
+                // If RS-0 crashed before (learn failed when write storage 
engine but get
+                // kCorruption),
+                // a new write operation on the primary replica it ever held 
will cause timeout.
+                // Force to fetch the latest route table.
+                client_ =
+                    pegasus_client_factory::get_client(cluster_name_.c_str(), 
app_name_.c_str());
+                ASSERT_TRUE(client_ != nullptr);
+            } else {
+                ASSERT_TRUE(false) << ret;
+            }
+        } while (true);
+
+        // Since only 1 replica server failed, so we can still get correct 
value from other replica
+        // servers.
+        std::string got_value;
+        ret = client_->get(hkey, skey, got_value);
+        do {
+            if (ret == PERR_OK) {
+                break;
+            }
+            ASSERT_EQ(PERR_NOT_FOUND, ret);
+            client_ = 
pegasus_client_factory::get_client(cluster_name_.c_str(), app_name_.c_str());
+            ASSERT_TRUE(client_ != nullptr);
+
+            ret = client_->get(hkey, skey, got_value);
+        } while (true);
+        ASSERT_EQ(value, got_value);
+    }
+
+    EXPECT_GT(ok_count, 0);
+    EXPECT_GT(corruption_count, 0);
+    std::cout << "ok_count: " << ok_count << ", corruption_count: " << 
corruption_count;
+
+    // Now only 2 RS left.
+    std::string rs_count;
+    ASSERT_NO_FATAL_FAILURE(run_cmd(
+        "ps aux | grep 'pegasus_server config.ini -app_list replica' | grep -v 
grep | wc -l",
+        &rs_count));
+    ASSERT_EQ("2", rs_count);
+
+    // Replica server 0 is able to start normally.
+    // After restart, the 'inject_write_error_for_test' config value will be 
reset to 0 (i.e. OK).
+    ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("./run.sh 
start_onebox_instance -r 1"));
+    ASSERT_NO_FATAL_FAILURE(run_cmd(
+        "ps aux | grep 'pegasus_server config.ini -app_list replica' | grep -v 
grep | wc -l",
+        &rs_count));
+    ASSERT_EQ("3", rs_count);
+
+    // Make best effort to rebalance the cluster,
+    ASSERT_NO_FATAL_FAILURE(
+        run_cmd_from_project_root("echo 'set_meta_level lively' | ./run.sh 
shell"));
+    usleep(10 * 1000 * 1000);
+
+    for (int i = 0; i < 1000; i++) {
+        std::string hkey = fmt::format("hkey2_{}", i);
+        int ret = client_->set(hkey, skey, value);
+        ASSERT_EQ(PERR_OK, ret) << ret;
+        std::string got_value;
+        ASSERT_EQ(PERR_OK, client_->get(hkey, skey, got_value));
+        ASSERT_EQ(value, got_value);
+    }
+
+    ASSERT_NO_FATAL_FAILURE(run_cmd(
+        "ps aux | grep 'pegasus_server config.ini -app_list replica' | grep -v 
grep | wc -l",
+        &rs_count));
+    ASSERT_EQ("3", rs_count);
+}
diff --git a/src/utils/error_code.h b/src/utils/error_code.h
index 613a67efe..998b21872 100644
--- a/src/utils/error_code.h
+++ b/src/utils/error_code.h
@@ -172,4 +172,6 @@ DEFINE_ERR_CODE(ERR_RETRY_EXHAUSTED)
 DEFINE_ERR_CODE(ERR_SYNC_RANGER_POLICIES_FAILED)
 DEFINE_ERR_CODE(ERR_RANGER_PARSE_ACL)
 DEFINE_ERR_CODE(ERR_RANGER_POLICIES_NO_NEED_UPDATE)
+
+DEFINE_ERR_CODE(ERR_RDB_CORRUPTION)
 } // namespace dsn


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to