This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dcbb1e28 feat: skip IO_ERROR dir_node when assign replicas (#1512)
4dcbb1e28 is described below

commit 4dcbb1e28188fe855043dd84b3c1f03bd439aa7e
Author: Yingchun Lai <[email protected]>
AuthorDate: Thu Jun 8 10:53:29 2023 +0800

    feat: skip IO_ERROR dir_node when assign replicas (#1512)
    
    https://github.com/apache/incubator-pegasus/issues/1383
    
    A disk (a.k.a node_dir in Pegasus) is possible to become SPACE_INSUFFICIENT 
or
    IO_ERROR from NORMAL, meanwhile, it's possible to recovery from 
SPACE_INSUFFICIENT
    to NORMAL. So we can keep all node_dirs in system, but only reject to assign
    replicas on abnormal node_dirs, reject to do write type of operations on 
abnormal
    node_dirs.
    
    This patch also update some unit tests.
---
 idl/metadata.thrift                    |  7 ++++++-
 src/common/fs_manager.cpp              | 35 ++++++++++++++++++++++++++++------
 src/common/fs_manager.h                |  5 ++++-
 src/common/replication_enums.h         |  1 +
 src/common/test/fs_manager_test.cpp    |  7 ++++---
 src/replica/disk_cleaner.cpp           |  7 +++++++
 src/replica/replica_2pc.cpp            | 21 +++++++++++++++++---
 src/replica/replica_check.cpp          |  1 +
 src/replica/replica_context.cpp        |  9 +++++----
 src/replica/replica_context.h          |  2 +-
 src/replica/replica_disk_migrator.cpp  |  1 +
 src/replica/replica_stub.cpp           |  4 ++++
 src/replica/test/replica_disk_test.cpp |  1 +
 src/utils/error_code.h                 |  2 ++
 14 files changed, 84 insertions(+), 19 deletions(-)

diff --git a/idl/metadata.thrift b/idl/metadata.thrift
index 7c8448bee..5a7d3e4b3 100644
--- a/idl/metadata.thrift
+++ b/idl/metadata.thrift
@@ -58,7 +58,12 @@ enum split_status
 enum disk_status
 {
     NORMAL = 0,
-    SPACE_INSUFFICIENT
+    // Indicate the disk is in space insufficiency. See config
+    // [replication].disk_min_available_space_ratio for more details.
+    SPACE_INSUFFICIENT,
+    // Indicate the disk is in IO error. The disk will be marked as IO_ERROR
+    // when it's read/write unavailable.
+    IO_ERROR
 }
 
 enum manual_compaction_status
diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index a33d20e54..3d3eda94f 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -127,7 +127,8 @@ void dir_node::update_disk_stat()
     disk_available_ratio = static_cast<int>(
         disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 / 
disk_capacity_mb));
 
-    auto old_status = status;
+    // It's able to change status from NORMAL to SPACE_INSUFFICIENT, and vice 
versa.
+    disk_status::type old_status = status;
     auto new_status = disk_available_ratio < 
FLAGS_disk_min_available_space_ratio
                           ? disk_status::SPACE_INSUFFICIENT
                           : disk_status::NORMAL;
@@ -202,6 +203,7 @@ void fs_manager::initialize(const std::vector<std::string> 
&data_dirs,
         // Check the status of this directory.
         std::string cdir;
         std::string err_msg;
+        disk_status::type status = disk_status::NORMAL;
         if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, 
err_msg) ||
                          !utils::filesystem::check_dir_rw(dir, err_msg))) {
             if (FLAGS_ignore_broken_disk) {
@@ -209,9 +211,7 @@ void fs_manager::initialize(const std::vector<std::string> 
&data_dirs,
             } else {
                 CHECK(false, err_msg);
             }
-            // TODO(yingchun): Remove the 'continue' and mark its io error 
status, regardless
-            //  the status of the disks, add all disks.
-            continue;
+            status = disk_status::IO_ERROR;
         }
 
         // Normalize the data directories.
@@ -219,9 +219,12 @@ void fs_manager::initialize(const std::vector<std::string> 
&data_dirs,
         utils::filesystem::get_normalized_path(cdir, norm_path);
 
         // Create and add this dir_node.
-        auto dn = std::make_shared<dir_node>(dir_tag, norm_path);
+        auto dn = std::make_shared<dir_node>(dir_tag, norm_path, 0, 0, 0, 
status);
         dir_nodes.emplace_back(dn);
-        LOG_INFO("mark data dir({}) as tag({})", norm_path, dir_tag);
+        LOG_INFO("mark data dir({}) as tag({}) with status({})",
+                 norm_path,
+                 dir_tag,
+                 enum_to_string(status));
     }
     CHECK_FALSE(dir_nodes.empty());
 
@@ -268,6 +271,10 @@ dir_node *fs_manager::find_best_dir_for_new_replica(const 
gpid &pid) const
         zauto_write_lock l(_lock);
         // Try to find the dir_node with the least replica count.
         for (const auto &dn : _dir_nodes) {
+            // Do not allocate new replica on dir_node which is not NORMAL.
+            if (dn->status != disk_status::NORMAL) {
+                continue;
+            }
             CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", 
pid, dn->tag);
             uint64_t app_replicas_count = dn->replicas_count(pid.get_app_id());
             uint64_t total_replicas_count = dn->replicas_count();
@@ -308,6 +315,7 @@ void 
fs_manager::specify_dir_for_new_replica_for_test(dir_node *specified_dn,
         }
     }
     CHECK(dn_found, "dir_node({}) is not exist", specified_dn->tag);
+    CHECK_EQ(disk_status::NORMAL, specified_dn->status);
     const auto dir = specified_dn->replica_dir(app_type, pid);
     CHECK_TRUE(dsn::utils::filesystem::create_directory(dir));
     specified_dn->holding_replicas[pid.get_app_id()].emplace(pid);
@@ -346,6 +354,13 @@ void fs_manager::update_disk_stat()
     zauto_write_lock l(_lock);
     reset_disk_stat();
     for (auto &dn : _dir_nodes) {
+        // If the disk is already in IO_ERROR status, it will not change to 
other status, just skip
+        // it.
+        if (dn->status == disk_status::IO_ERROR) {
+            LOG_WARNING("skip to update disk stat for dir({}), because it is 
in IO_ERROR status",
+                        dn->tag);
+            continue;
+        }
         dn->update_disk_stat();
         _total_capacity_mb += dn->disk_capacity_mb;
         _total_available_mb += dn->disk_available_mb;
@@ -401,6 +416,10 @@ dir_node *fs_manager::find_replica_dir(dsn::string_view 
app_type, gpid pid)
     {
         zauto_read_lock l(_lock);
         for (const auto &dn : _dir_nodes) {
+            // Skip IO error dir_node.
+            if (dn->status == disk_status::IO_ERROR) {
+                continue;
+            }
             const auto dir = dn->replica_dir(app_type, pid);
             if (utils::filesystem::directory_exists(dir)) {
                 // Check if there are duplicate replica instance directories.
@@ -455,6 +474,10 @@ dir_node 
*fs_manager::create_child_replica_dir(dsn::string_view app_type,
     {
         zauto_read_lock l(_lock);
         for (const auto &dn : _dir_nodes) {
+            // Skip non-available dir_node.
+            if (dn->status != disk_status::NORMAL) {
+                continue;
+            }
             child_dir = dn->replica_dir(app_type, child_pid);
             // <parent_dir> = <prefix>/<gpid>.<app_type>
             // check if <parent_dir>'s <prefix> is equal to <data_dir>
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index be19d79b6..e4c51667d 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -19,6 +19,7 @@
 
 #include <gtest/gtest_prod.h>
 #include <stdint.h>
+#include <atomic>
 #include <functional>
 #include <map>
 #include <memory>
@@ -48,7 +49,7 @@ public:
     int64_t disk_capacity_mb;
     int64_t disk_available_mb;
     int disk_available_ratio;
-    disk_status::type status;
+    std::atomic<disk_status::type> status;
     std::map<app_id, std::set<gpid>> holding_replicas;
     std::map<app_id, std::set<gpid>> holding_primary_replicas;
     std::map<app_id, std::set<gpid>> holding_secondary_replicas;
@@ -147,6 +148,8 @@ private:
     int _min_available_ratio = 100;
     int _max_available_ratio = 0;
 
+    // Once dir_node has been added to '_dir_nodes', it will not be removed, 
it will be marked
+    // as non-NORMAL status if it is not available.
     std::vector<std::shared_ptr<dir_node>> _dir_nodes;
     // ] end of lock
 
diff --git a/src/common/replication_enums.h b/src/common/replication_enums.h
index 8f07a70b9..5eef1710e 100644
--- a/src/common/replication_enums.h
+++ b/src/common/replication_enums.h
@@ -152,6 +152,7 @@ ENUM_END2(replication::disk_migration_status::type, 
disk_migration_status)
 ENUM_BEGIN2(replication::disk_status::type, disk_status, 
replication::disk_status::NORMAL)
 ENUM_REG(replication::disk_status::NORMAL)
 ENUM_REG(replication::disk_status::SPACE_INSUFFICIENT)
+ENUM_REG(replication::disk_status::IO_ERROR)
 ENUM_END2(replication::disk_status::type, disk_status)
 
 ENUM_BEGIN2(replication::manual_compaction_status::type,
diff --git a/src/common/test/fs_manager_test.cpp 
b/src/common/test/fs_manager_test.cpp
index a60ddc1f7..542b122f1 100644
--- a/src/common/test/fs_manager_test.cpp
+++ b/src/common/test/fs_manager_test.cpp
@@ -54,15 +54,16 @@ TEST(fs_manager, initialize)
     {
         std::string create_dir_ok;
         std::string check_dir_rw_ok;
-        int32_t data_dir_size;
-    } tests[]{{"true", "true", 3}, {"true", "false", 2}, {"false", "false", 
2}};
+        // Regardless of the status of the disk, the number of dir_nodes 
should be 3.
+        int32_t dir_node_size;
+    } tests[]{{"true", "true", 3}, {"true", "false", 3}, {"false", "false", 
3}};
     int i = 0;
     for (const auto &test : tests) {
         fail::cfg("filesystem_create_directory", "return(" + 
test.create_dir_ok + ")");
         fail::cfg("filesystem_check_dir_rw", "return(" + test.check_dir_rw_ok 
+ ")");
         fs_manager fm;
         fm.initialize({"disk1", "disk2", "disk3"}, {"tag1", "tag2", "tag3"});
-        ASSERT_EQ(test.data_dir_size, fm.get_dir_nodes().size()) << i;
+        ASSERT_EQ(test.dir_node_size, fm.get_dir_nodes().size()) << i;
         i++;
     }
     fail::teardown();
diff --git a/src/replica/disk_cleaner.cpp b/src/replica/disk_cleaner.cpp
index 04dd174bb..9488c32a6 100644
--- a/src/replica/disk_cleaner.cpp
+++ b/src/replica/disk_cleaner.cpp
@@ -23,8 +23,10 @@
 #include <stdint.h>
 #include <sys/types.h>
 #include <algorithm>
+#include <atomic>
 
 #include "common/fs_manager.h"
+#include "metadata_types.h"
 #include "runtime/api_layer1.h"
 #include "utils/error_code.h"
 #include "utils/filesystem.h"
@@ -73,6 +75,11 @@ error_s disk_remove_useless_dirs(const 
std::vector<std::shared_ptr<dir_node>> &d
 {
     std::vector<std::string> sub_list;
     for (const auto &dn : dir_nodes) {
+        // It's allowed to clear up the directory when it's 
SPACE_INSUFFICIENT, but not allowed when
+        // it's IO_ERROR.
+        if (dn->status == disk_status::IO_ERROR) {
+            continue;
+        }
         std::vector<std::string> tmp_list;
         if (!dsn::utils::filesystem::get_subdirectories(dn->full_dir, 
tmp_list, false)) {
             LOG_WARNING("gc_disk: failed to get subdirectories in {}", 
dn->full_dir);
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index be1328b73..2a8e47787 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -27,6 +27,7 @@
 #include <fmt/core.h>
 #include <inttypes.h>
 #include <stddef.h>
+#include <atomic>
 #include <chrono>
 #include <functional>
 #include <memory>
@@ -116,6 +117,21 @@ DSN_DEFINE_uint64(
 DSN_DECLARE_int32(max_mutation_count_in_prepare_list);
 DSN_DECLARE_int32(staleness_for_commit);
 
+namespace {
+error_code disk_status_to_error_code(disk_status::type ds)
+{
+    switch (ds) {
+    case disk_status::SPACE_INSUFFICIENT:
+        return dsn::ERR_DISK_INSUFFICIENT;
+    case disk_status::IO_ERROR:
+        return dsn::ERR_DISK_IO_ERROR;
+    default:
+        CHECK_EQ(disk_status::NORMAL, ds);
+        return dsn::ERR_OK;
+    }
+}
+} // anonymous namespace
+
 void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
 {
     _checker.only_one_thread_access();
@@ -179,9 +195,8 @@ void replica::on_client_write(dsn::message_ex *request, 
bool ignore_throttling)
     }
 
     if (FLAGS_reject_write_when_disk_insufficient &&
-        (_dir_node->status == disk_status::SPACE_INSUFFICIENT ||
-         _primary_states.secondary_disk_space_insufficient())) {
-        response_client_write(request, ERR_DISK_INSUFFICIENT);
+        (_dir_node->status != disk_status::NORMAL || 
_primary_states.secondary_disk_abnormal())) {
+        response_client_write(request, 
disk_status_to_error_code(_dir_node->status));
         return;
     }
 
diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp
index e978a91db..bf4b0bff8 100644
--- a/src/replica/replica_check.cpp
+++ b/src/replica/replica_check.cpp
@@ -33,6 +33,7 @@
  *     xxxx-xx-xx, author, fix bug about xxx
  */
 
+#include <atomic>
 #include <chrono>
 #include <memory>
 #include <unordered_map>
diff --git a/src/replica/replica_context.cpp b/src/replica/replica_context.cpp
index 6bb5dd48d..67833799e 100644
--- a/src/replica/replica_context.cpp
+++ b/src/replica/replica_context.cpp
@@ -180,13 +180,14 @@ void primary_context::cleanup_split_states()
     split_stopped_secondary.clear();
 }
 
-bool primary_context::secondary_disk_space_insufficient() const
+bool primary_context::secondary_disk_abnormal() const
 {
     for (const auto &kv : secondary_disk_status) {
-        if (kv.second == disk_status::SPACE_INSUFFICIENT) {
-            LOG_INFO("partition[{}] secondary[{}] disk space is insufficient",
+        if (kv.second != disk_status::NORMAL) {
+            LOG_INFO("partition[{}] secondary[{}] disk space is {}",
                      membership.pid,
-                     kv.first.to_string());
+                     kv.first.to_string(),
+                     enum_to_string(kv.second));
             return true;
         }
     }
diff --git a/src/replica/replica_context.h b/src/replica/replica_context.h
index 5d8c15eb0..472492616 100644
--- a/src/replica/replica_context.h
+++ b/src/replica/replica_context.h
@@ -115,7 +115,7 @@ public:
 
     void cleanup_split_states();
 
-    bool secondary_disk_space_insufficient() const;
+    bool secondary_disk_abnormal() const;
 
 public:
     // membership mgr, including learners
diff --git a/src/replica/replica_disk_migrator.cpp 
b/src/replica/replica_disk_migrator.cpp
index 0054f2734..1afc5c62e 100644
--- a/src/replica/replica_disk_migrator.cpp
+++ b/src/replica/replica_disk_migrator.cpp
@@ -129,6 +129,7 @@ bool 
replica_disk_migrator::check_migration_args(replica_disk_migrate_rpc rpc)
     bool valid_origin_disk = false;
     bool valid_target_disk = false;
     // _dir_nodes: std::vector<std::shared_ptr<dir_node>>
+    // TODO(yingchun): skip disks which are SPACE_INSUFFICIENT or IO_ERROR.
     for (const auto &dir_node : 
_replica->get_replica_stub()->_fs_manager._dir_nodes) {
         if (dir_node->tag == req.origin_disk) {
             valid_origin_disk = true;
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 54b2879e9..dcacf4baf 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -618,6 +618,10 @@ void replica_stub::initialize(const replication_options 
&opts, bool clear /* = f
     LOG_INFO("start to load replicas");
     std::map<dir_node *, std::vector<std::string>> dirs_by_dn;
     for (const auto &dn : _fs_manager.get_dir_nodes()) {
+        // Skip IO error dir_node.
+        if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
+            continue;
+        }
         std::vector<std::string> sub_directories;
         CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, 
sub_directories, false),
               "fail to get sub_directories in {}",
diff --git a/src/replica/test/replica_disk_test.cpp 
b/src/replica/test/replica_disk_test.cpp
index ce7441199..11b1399ac 100644
--- a/src/replica/test/replica_disk_test.cpp
+++ b/src/replica/test/replica_disk_test.cpp
@@ -21,6 +21,7 @@
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
 #include <unistd.h>
+#include <atomic>
 #include <cstdint>
 #include <map>
 #include <memory>
diff --git a/src/utils/error_code.h b/src/utils/error_code.h
index 998b21872..25feabd48 100644
--- a/src/utils/error_code.h
+++ b/src/utils/error_code.h
@@ -174,4 +174,6 @@ DEFINE_ERR_CODE(ERR_RANGER_PARSE_ACL)
 DEFINE_ERR_CODE(ERR_RANGER_POLICIES_NO_NEED_UPDATE)
 
 DEFINE_ERR_CODE(ERR_RDB_CORRUPTION)
+
+DEFINE_ERR_CODE(ERR_DISK_IO_ERROR)
 } // namespace dsn


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to