empiredan commented on code in PR #1477:
URL: 
https://github.com/apache/incubator-pegasus/pull/1477#discussion_r1203667395


##########
src/replica/test/mock_utils.h:
##########
@@ -231,23 +231,27 @@ class mock_replica : public replica
 };
 typedef dsn::ref_ptr<mock_replica> mock_replica_ptr;
 
-inline std::unique_ptr<mock_replica> create_mock_replica(replica_stub *stub,
-                                                         int appid = 1,
-                                                         int partition_index = 
1,
-                                                         const char *dir = 
"./")
+inline std::unique_ptr<mock_replica>
+create_mock_replica(replica_stub *stub, int appid = 1, int partition_index = 1)
 {
     gpid gpid(appid, partition_index);
     app_info app_info;
     app_info.app_type = "replica";
     app_info.app_name = "temp";
 
-    return std::make_unique<mock_replica>(stub, gpid, app_info, dir);
+    dir_node *dn =

Review Comment:
   ```suggestion
       const auto * const dn =
   ```



##########
src/replica/test/replica_disk_migrate_test.cpp:
##########
@@ -362,31 +362,44 @@ TEST_F(replica_disk_migrate_test, 
disk_migrate_replica_update)
     }
 }
 
+// Test load from new replica dir failed, then fall back to load from origin 
dir succeed,
+// and then mark the "new" replica dir as ".gar".
 TEST_F(replica_disk_migrate_test, disk_migrate_replica_open)
 {
+    gpid test_pid(app_info_1.app_id, 4);
+
+    // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1.
     auto &request = *fake_migrate_rpc.mutable_request();
-    request.pid = dsn::gpid(app_info_1.app_id, 4);
+    request.pid = test_pid;
     request.origin_disk = "tag_2";
     request.target_disk = "tag_empty_1";
 
-    remove_mock_dir_node(request.origin_disk);
+    // Remove the gpid 1.4 dir which is created in constructor.
+    const std::string kReplicaOriginDir =
+        fmt::format("./{}/{}.replica", request.origin_disk, 
request.pid.to_string());
+    utils::filesystem::remove_path(kReplicaOriginDir);
+    stub->get_fs_manager()->remove_replica(test_pid);
+
+    // Create the related dirs.
     const std::string kReplicaOriginSuffixDir = fmt::format(
         "./{}/{}.replica.disk.migrate.ori/", request.origin_disk, 
request.pid.to_string());
     const std::string kReplicaNewDir =
         fmt::format("./{}/{}.replica/", request.target_disk, 
request.pid.to_string());
     utils::filesystem::create_directory(kReplicaOriginSuffixDir);
     utils::filesystem::create_directory(kReplicaNewDir);
 
+    // The replica can be opened nomally. In fact, the original dir is opened, 
and the new dir will
+    // be garbage.
     fail::cfg("mock_replica_load", "return()");
-    const std::string kReplicaOriginDir =
-        fmt::format("./{}/{}.replica", request.origin_disk, 
request.pid.to_string());
-    const std::string kReplicaGarDir =
-        fmt::format("./{}/{}.replica.gar", request.target_disk, 
request.pid.to_string());
     open_replica(app_info_1, request.pid);
 
+    // Check it works as expected.
+    const std::string kReplicaGarDir =

Review Comment:
   ```suggestion
       const auto kReplicaGarDir =
   ```



##########
src/replica/test/mock_utils.h:
##########
@@ -231,23 +231,27 @@ class mock_replica : public replica
 };
 typedef dsn::ref_ptr<mock_replica> mock_replica_ptr;
 
-inline std::unique_ptr<mock_replica> create_mock_replica(replica_stub *stub,
-                                                         int appid = 1,
-                                                         int partition_index = 
1,
-                                                         const char *dir = 
"./")
+inline std::unique_ptr<mock_replica>
+create_mock_replica(replica_stub *stub, int appid = 1, int partition_index = 1)
 {
     gpid gpid(appid, partition_index);
     app_info app_info;
     app_info.app_type = "replica";
     app_info.app_name = "temp";
 
-    return std::make_unique<mock_replica>(stub, gpid, app_info, dir);
+    dir_node *dn =
+        
stub->get_fs_manager()->create_replica_dir_if_necessary(app_info.app_type.c_str(),
 gpid);
+    CHECK_NOTNULL(dn, "");
+    const auto replica_path = dn->replica_dir(app_info.app_type.c_str(), gpid);

Review Comment:
   ```suggestion
       const auto replica_path = dn->replica_dir(app_info.app_type, gpid);
   ```



##########
src/replica/test/replica_disk_migrate_test.cpp:
##########
@@ -362,31 +362,44 @@ TEST_F(replica_disk_migrate_test, 
disk_migrate_replica_update)
     }
 }
 
+// Test load from new replica dir failed, then fall back to load from origin 
dir succeed,
+// and then mark the "new" replica dir as ".gar".
 TEST_F(replica_disk_migrate_test, disk_migrate_replica_open)
 {
+    gpid test_pid(app_info_1.app_id, 4);
+
+    // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1.
     auto &request = *fake_migrate_rpc.mutable_request();
-    request.pid = dsn::gpid(app_info_1.app_id, 4);
+    request.pid = test_pid;
     request.origin_disk = "tag_2";
     request.target_disk = "tag_empty_1";
 
-    remove_mock_dir_node(request.origin_disk);
+    // Remove the gpid 1.4 dir which is created in constructor.
+    const std::string kReplicaOriginDir =
+        fmt::format("./{}/{}.replica", request.origin_disk, 
request.pid.to_string());

Review Comment:
   ```suggestion
           fmt::format("./{}/{}.replica", request.origin_disk, request.pid);
   ```



##########
src/common/fs_manager.cpp:
##########
@@ -236,44 +277,40 @@ void fs_manager::add_replica(const gpid &pid, const 
std::string &pid_dir)
     LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, 
dn->tag);
 }
 
-void fs_manager::allocate_dir(const gpid &pid, const std::string &type, 
/*out*/ std::string &dir)
+dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
 {
-    char buffer[256];
-    sprintf(buffer, "%d.%d.%s", pid.get_app_id(), pid.get_partition_index(), 
type.c_str());
-
-    zauto_write_lock l(_lock);
-
     dir_node *selected = nullptr;
-
-    unsigned least_app_replicas_count = 0;
-    unsigned least_total_replicas_count = 0;
-
-    for (auto &n : _dir_nodes) {
-        CHECK(!n->has(pid), "gpid({}) already in dir_node({})", pid, n->tag);
-        unsigned app_replicas = n->replicas_count(pid.get_app_id());
-        unsigned total_replicas = n->replicas_count();
-
-        if (selected == nullptr || least_app_replicas_count > app_replicas) {
-            least_app_replicas_count = app_replicas;
-            least_total_replicas_count = total_replicas;
-            selected = n.get();
-        } else if (least_app_replicas_count == app_replicas &&
-                   least_total_replicas_count > total_replicas) {
-            least_total_replicas_count = total_replicas;
-            selected = n.get();
+    uint64_t least_app_replicas_count = 0;
+    uint64_t least_total_replicas_count = 0;
+    {
+        zauto_write_lock l(_lock);
+        // Try to find the dir_node with the least replica count.
+        for (const auto &dn : _dir_nodes) {
+            CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", 
pid, dn->tag);
+            uint64_t app_replicas_count = dn->replicas_count(pid.get_app_id());
+            uint64_t total_replicas_count = dn->replicas_count();
+
+            if (selected == nullptr || least_app_replicas_count > 
app_replicas_count) {
+                least_app_replicas_count = app_replicas_count;
+                least_total_replicas_count = total_replicas_count;
+                selected = dn.get();
+            } else if (least_app_replicas_count == app_replicas_count &&
+                       least_total_replicas_count > total_replicas_count) {
+                least_total_replicas_count = total_replicas_count;
+                selected = dn.get();
+            }
         }
     }
-
-    LOG_INFO(
-        "{}: put pid({}) to dir({}), which has {} replicas of current app, {} 
replicas totally",
-        dsn_primary_address(),
-        pid,
-        selected->tag,
-        least_app_replicas_count,
-        least_total_replicas_count);
-
-    selected->holding_replicas[pid.get_app_id()].emplace(pid);
-    dir = utils::filesystem::path_combine(selected->full_dir, buffer);
+    if (selected) {

Review Comment:
   ```suggestion
       if (selected != nullptr) {
   ```



##########
src/common/test/fs_manager_test.cpp:
##########
@@ -64,19 +102,113 @@ TEST(fs_manager, dir_update_disk_status)
 TEST(fs_manager, get_dir_node)
 {
     fs_manager fm;
-    fm.initialize({"/data1"}, {"data1"});
+    fm.initialize({"./data1"}, {"data1"});
+    const auto &dns = fm.get_dir_nodes();
+    ASSERT_EQ(1, dns.size());
+    const auto &base_dir =
+        dns[0]->full_dir.substr(0, dns[0]->full_dir.size() - 
std::string("/data1").size());
 
     ASSERT_EQ(nullptr, fm.get_dir_node(""));
     ASSERT_EQ(nullptr, fm.get_dir_node("/"));
 
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1"));
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1/"));
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1/replica1"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/replica1"));
+
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2"));
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/"));
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/replica1"));
+}
+
+TEST(fs_manager, find_replica_dir)
+{
+    fs_manager fm;
+    fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", 
"data3"});
+
+    const char *app_type = "find_replica_dir";
+    gpid test_pid(1, 0);
+
+    // Clear up the remaining directories if exist.
+    for (const auto &dn : fm.get_dir_nodes()) {
+        remove_path(dn->replica_dir(app_type, test_pid));
+    }
+
+    ASSERT_EQ(nullptr, fm.find_replica_dir(app_type, test_pid));
+    dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+    ASSERT_NE(nullptr, dn);
+    const auto dir = dn->replica_dir(app_type, test_pid);
+    ASSERT_TRUE(directory_exists(dir));
+    dir_node *dn1 = fm.find_replica_dir(app_type, test_pid);

Review Comment:
   ```suggestion
       auto dn1 = fm.find_replica_dir(app_type, test_pid);
   ```



##########
src/replica/test/replica_disk_migrate_test.cpp:
##########
@@ -362,31 +362,44 @@ TEST_F(replica_disk_migrate_test, 
disk_migrate_replica_update)
     }
 }
 
+// Test load from new replica dir failed, then fall back to load from origin 
dir succeed,
+// and then mark the "new" replica dir as ".gar".
 TEST_F(replica_disk_migrate_test, disk_migrate_replica_open)
 {
+    gpid test_pid(app_info_1.app_id, 4);
+
+    // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1.
     auto &request = *fake_migrate_rpc.mutable_request();
-    request.pid = dsn::gpid(app_info_1.app_id, 4);
+    request.pid = test_pid;
     request.origin_disk = "tag_2";
     request.target_disk = "tag_empty_1";
 
-    remove_mock_dir_node(request.origin_disk);
+    // Remove the gpid 1.4 dir which is created in constructor.
+    const std::string kReplicaOriginDir =
+        fmt::format("./{}/{}.replica", request.origin_disk, 
request.pid.to_string());
+    utils::filesystem::remove_path(kReplicaOriginDir);
+    stub->get_fs_manager()->remove_replica(test_pid);
+
+    // Create the related dirs.
     const std::string kReplicaOriginSuffixDir = fmt::format(

Review Comment:
   ```suggestion
       const auto kReplicaOriginSuffixDir = fmt::format(
   ```



##########
src/replica/test/replica_disk_migrate_test.cpp:
##########
@@ -362,31 +362,44 @@ TEST_F(replica_disk_migrate_test, 
disk_migrate_replica_update)
     }
 }
 
+// Test load from new replica dir failed, then fall back to load from origin 
dir succeed,
+// and then mark the "new" replica dir as ".gar".
 TEST_F(replica_disk_migrate_test, disk_migrate_replica_open)
 {
+    gpid test_pid(app_info_1.app_id, 4);
+
+    // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1.
     auto &request = *fake_migrate_rpc.mutable_request();
-    request.pid = dsn::gpid(app_info_1.app_id, 4);
+    request.pid = test_pid;
     request.origin_disk = "tag_2";
     request.target_disk = "tag_empty_1";
 
-    remove_mock_dir_node(request.origin_disk);
+    // Remove the gpid 1.4 dir which is created in constructor.
+    const std::string kReplicaOriginDir =
+        fmt::format("./{}/{}.replica", request.origin_disk, 
request.pid.to_string());
+    utils::filesystem::remove_path(kReplicaOriginDir);
+    stub->get_fs_manager()->remove_replica(test_pid);
+
+    // Create the related dirs.
     const std::string kReplicaOriginSuffixDir = fmt::format(
         "./{}/{}.replica.disk.migrate.ori/", request.origin_disk, 
request.pid.to_string());
     const std::string kReplicaNewDir =
         fmt::format("./{}/{}.replica/", request.target_disk, 
request.pid.to_string());
     utils::filesystem::create_directory(kReplicaOriginSuffixDir);
     utils::filesystem::create_directory(kReplicaNewDir);
 
+    // The replica can be opened nomally. In fact, the original dir is opened, 
and the new dir will
+    // be garbage.
     fail::cfg("mock_replica_load", "return()");
-    const std::string kReplicaOriginDir =
-        fmt::format("./{}/{}.replica", request.origin_disk, 
request.pid.to_string());
-    const std::string kReplicaGarDir =
-        fmt::format("./{}/{}.replica.gar", request.target_disk, 
request.pid.to_string());
     open_replica(app_info_1, request.pid);
 
+    // Check it works as expected.
+    const std::string kReplicaGarDir =
+        fmt::format("./{}/{}.replica.gar", request.target_disk, 
request.pid.to_string());

Review Comment:
   ```suggestion
           fmt::format("./{}/{}.replica.gar", request.target_disk, request.pid);
   ```



##########
src/replica/test/replica_disk_migrate_test.cpp:
##########
@@ -362,31 +362,44 @@ TEST_F(replica_disk_migrate_test, 
disk_migrate_replica_update)
     }
 }
 
+// Test load from new replica dir failed, then fall back to load from origin 
dir succeed,
+// and then mark the "new" replica dir as ".gar".
 TEST_F(replica_disk_migrate_test, disk_migrate_replica_open)
 {
+    gpid test_pid(app_info_1.app_id, 4);
+
+    // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1.
     auto &request = *fake_migrate_rpc.mutable_request();
-    request.pid = dsn::gpid(app_info_1.app_id, 4);
+    request.pid = test_pid;
     request.origin_disk = "tag_2";
     request.target_disk = "tag_empty_1";
 
-    remove_mock_dir_node(request.origin_disk);
+    // Remove the gpid 1.4 dir which is created in constructor.
+    const std::string kReplicaOriginDir =
+        fmt::format("./{}/{}.replica", request.origin_disk, 
request.pid.to_string());
+    utils::filesystem::remove_path(kReplicaOriginDir);
+    stub->get_fs_manager()->remove_replica(test_pid);
+
+    // Create the related dirs.
     const std::string kReplicaOriginSuffixDir = fmt::format(
         "./{}/{}.replica.disk.migrate.ori/", request.origin_disk, 
request.pid.to_string());
     const std::string kReplicaNewDir =

Review Comment:
   ```suggestion
       const auto kReplicaNewDir =
   ```



##########
src/common/test/fs_manager_test.cpp:
##########
@@ -64,19 +102,113 @@ TEST(fs_manager, dir_update_disk_status)
 TEST(fs_manager, get_dir_node)
 {
     fs_manager fm;
-    fm.initialize({"/data1"}, {"data1"});
+    fm.initialize({"./data1"}, {"data1"});
+    const auto &dns = fm.get_dir_nodes();
+    ASSERT_EQ(1, dns.size());
+    const auto &base_dir =
+        dns[0]->full_dir.substr(0, dns[0]->full_dir.size() - 
std::string("/data1").size());
 
     ASSERT_EQ(nullptr, fm.get_dir_node(""));
     ASSERT_EQ(nullptr, fm.get_dir_node("/"));
 
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1"));
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1/"));
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1/replica1"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/replica1"));
+
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2"));
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/"));
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/replica1"));
+}
+
+TEST(fs_manager, find_replica_dir)
+{
+    fs_manager fm;
+    fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", 
"data3"});
+
+    const char *app_type = "find_replica_dir";
+    gpid test_pid(1, 0);
+
+    // Clear up the remaining directories if exist.
+    for (const auto &dn : fm.get_dir_nodes()) {
+        remove_path(dn->replica_dir(app_type, test_pid));
+    }
+
+    ASSERT_EQ(nullptr, fm.find_replica_dir(app_type, test_pid));
+    dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);

Review Comment:
   ```suggestion
       auto dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
   ```



##########
src/replica/test/replica_disk_migrate_test.cpp:
##########
@@ -362,31 +362,44 @@ TEST_F(replica_disk_migrate_test, 
disk_migrate_replica_update)
     }
 }
 
+// Test load from new replica dir failed, then fall back to load from origin 
dir succeed,
+// and then mark the "new" replica dir as ".gar".
 TEST_F(replica_disk_migrate_test, disk_migrate_replica_open)
 {
+    gpid test_pid(app_info_1.app_id, 4);
+
+    // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1.
     auto &request = *fake_migrate_rpc.mutable_request();
-    request.pid = dsn::gpid(app_info_1.app_id, 4);
+    request.pid = test_pid;
     request.origin_disk = "tag_2";
     request.target_disk = "tag_empty_1";
 
-    remove_mock_dir_node(request.origin_disk);
+    // Remove the gpid 1.4 dir which is created in constructor.
+    const std::string kReplicaOriginDir =
+        fmt::format("./{}/{}.replica", request.origin_disk, 
request.pid.to_string());
+    utils::filesystem::remove_path(kReplicaOriginDir);
+    stub->get_fs_manager()->remove_replica(test_pid);
+
+    // Create the related dirs.
     const std::string kReplicaOriginSuffixDir = fmt::format(
         "./{}/{}.replica.disk.migrate.ori/", request.origin_disk, 
request.pid.to_string());
     const std::string kReplicaNewDir =
         fmt::format("./{}/{}.replica/", request.target_disk, 
request.pid.to_string());

Review Comment:
   ```suggestion
           fmt::format("./{}/{}.replica/", request.target_disk, request.pid);
   ```



##########
src/replica/test/replica_disk_migrate_test.cpp:
##########
@@ -362,31 +362,44 @@ TEST_F(replica_disk_migrate_test, 
disk_migrate_replica_update)
     }
 }
 
+// Test load from new replica dir failed, then fall back to load from origin 
dir succeed,
+// and then mark the "new" replica dir as ".gar".
 TEST_F(replica_disk_migrate_test, disk_migrate_replica_open)
 {
+    gpid test_pid(app_info_1.app_id, 4);
+
+    // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1.
     auto &request = *fake_migrate_rpc.mutable_request();
-    request.pid = dsn::gpid(app_info_1.app_id, 4);
+    request.pid = test_pid;
     request.origin_disk = "tag_2";
     request.target_disk = "tag_empty_1";
 
-    remove_mock_dir_node(request.origin_disk);
+    // Remove the gpid 1.4 dir which is created in constructor.
+    const std::string kReplicaOriginDir =
+        fmt::format("./{}/{}.replica", request.origin_disk, 
request.pid.to_string());
+    utils::filesystem::remove_path(kReplicaOriginDir);
+    stub->get_fs_manager()->remove_replica(test_pid);
+
+    // Create the related dirs.
     const std::string kReplicaOriginSuffixDir = fmt::format(
         "./{}/{}.replica.disk.migrate.ori/", request.origin_disk, 
request.pid.to_string());

Review Comment:
   ```suggestion
           "./{}/{}.replica.disk.migrate.ori/", request.origin_disk, 
request.pid);
   ```



##########
src/replica/test/replica_disk_migrate_test.cpp:
##########
@@ -362,31 +362,44 @@ TEST_F(replica_disk_migrate_test, 
disk_migrate_replica_update)
     }
 }
 
+// Test load from new replica dir failed, then fall back to load from origin 
dir succeed,
+// and then mark the "new" replica dir as ".gar".
 TEST_F(replica_disk_migrate_test, disk_migrate_replica_open)
 {
+    gpid test_pid(app_info_1.app_id, 4);
+
+    // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1.
     auto &request = *fake_migrate_rpc.mutable_request();
-    request.pid = dsn::gpid(app_info_1.app_id, 4);
+    request.pid = test_pid;
     request.origin_disk = "tag_2";
     request.target_disk = "tag_empty_1";
 
-    remove_mock_dir_node(request.origin_disk);
+    // Remove the gpid 1.4 dir which is created in constructor.
+    const std::string kReplicaOriginDir =

Review Comment:
   ```suggestion
       const auto kReplicaOriginDir =
   ```



##########
src/replica/replica_stub.cpp:
##########
@@ -1768,36 +1738,38 @@ void replica_stub::init_gc_for_test()
 
 void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id)
 {
-    std::string replica_path;
     std::pair<app_info, replica_info> closed_info;
-
     {
         zauto_write_lock l(_replicas_lock);
         auto iter = _closed_replicas.find(id);
         if (iter == _closed_replicas.end())
             return;
         closed_info = iter->second;
         _closed_replicas.erase(iter);
-        _fs_manager.remove_replica(id);
     }
+    _fs_manager.remove_replica(id);
 
-    replica_path = get_replica_dir(closed_info.first.app_type.c_str(), id, 
false);
-    if (replica_path.empty()) {
+    const auto dn = 
_fs_manager.find_replica_dir(closed_info.first.app_type.c_str(), id);

Review Comment:
   ```suggestion
       const auto * const dn = 
_fs_manager.find_replica_dir(closed_info.first.app_type.c_str(), id);
   ```



##########
src/common/test/fs_manager_test.cpp:
##########
@@ -64,19 +102,113 @@ TEST(fs_manager, dir_update_disk_status)
 TEST(fs_manager, get_dir_node)
 {
     fs_manager fm;
-    fm.initialize({"/data1"}, {"data1"});
+    fm.initialize({"./data1"}, {"data1"});
+    const auto &dns = fm.get_dir_nodes();
+    ASSERT_EQ(1, dns.size());
+    const auto &base_dir =
+        dns[0]->full_dir.substr(0, dns[0]->full_dir.size() - 
std::string("/data1").size());
 
     ASSERT_EQ(nullptr, fm.get_dir_node(""));
     ASSERT_EQ(nullptr, fm.get_dir_node("/"));
 
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1"));
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1/"));
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1/replica1"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/replica1"));
+
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2"));
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/"));
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/replica1"));
+}
+
+TEST(fs_manager, find_replica_dir)
+{
+    fs_manager fm;
+    fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", 
"data3"});
+
+    const char *app_type = "find_replica_dir";
+    gpid test_pid(1, 0);
+
+    // Clear up the remaining directories if exist.
+    for (const auto &dn : fm.get_dir_nodes()) {
+        remove_path(dn->replica_dir(app_type, test_pid));
+    }
+
+    ASSERT_EQ(nullptr, fm.find_replica_dir(app_type, test_pid));
+    dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+    ASSERT_NE(nullptr, dn);
+    const auto dir = dn->replica_dir(app_type, test_pid);
+    ASSERT_TRUE(directory_exists(dir));
+    dir_node *dn1 = fm.find_replica_dir(app_type, test_pid);
+    ASSERT_EQ(dn, dn1);
+}
+
+TEST(fs_manager, create_replica_dir_if_necessary)
+{
+    fs_manager fm;
+
+    const char *app_type = "create_replica_dir_if_necessary";
+    gpid test_pid(1, 0);
 
-    ASSERT_EQ(nullptr, fm.get_dir_node("/data2"));
-    ASSERT_EQ(nullptr, fm.get_dir_node("/data2/"));
-    ASSERT_EQ(nullptr, fm.get_dir_node("/data2/replica1"));
+    // Could not find a valid dir_node.
+    ASSERT_EQ(nullptr, fm.create_replica_dir_if_necessary(app_type, test_pid));
+
+    // It's able to create a dir for the replica after a valid dir_node has 
been added.
+    fm.add_new_dir_node("./data1", "data1");
+    dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+    ASSERT_NE(nullptr, dn);
+    ASSERT_EQ("data1", dn->tag);
 }
 
+TEST(fs_manager, create_child_replica_dir)
+{
+    fs_manager fm;
+    fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", 
"data3"});
+
+    const char *app_type = "create_child_replica_dir";
+    gpid test_pid(1, 0);
+    gpid test_child_pid(1, 0);
+
+    dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+    ASSERT_NE(nullptr, dn);
+    const auto dir = dn->replica_dir(app_type, test_pid);
+
+    dir_node *child_dn = fm.create_child_replica_dir(app_type, test_child_pid, 
dir);

Review Comment:
   ```suggestion
       auto child_dn = fm.create_child_replica_dir(app_type, test_child_pid, 
dir);
   ```



##########
src/common/test/fs_manager_test.cpp:
##########
@@ -64,19 +102,113 @@ TEST(fs_manager, dir_update_disk_status)
 TEST(fs_manager, get_dir_node)
 {
     fs_manager fm;
-    fm.initialize({"/data1"}, {"data1"});
+    fm.initialize({"./data1"}, {"data1"});
+    const auto &dns = fm.get_dir_nodes();
+    ASSERT_EQ(1, dns.size());
+    const auto &base_dir =
+        dns[0]->full_dir.substr(0, dns[0]->full_dir.size() - 
std::string("/data1").size());
 
     ASSERT_EQ(nullptr, fm.get_dir_node(""));
     ASSERT_EQ(nullptr, fm.get_dir_node("/"));
 
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1"));
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1/"));
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1/replica1"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/replica1"));
+
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2"));
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/"));
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/replica1"));
+}
+
+TEST(fs_manager, find_replica_dir)
+{
+    fs_manager fm;
+    fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", 
"data3"});
+
+    const char *app_type = "find_replica_dir";
+    gpid test_pid(1, 0);
+
+    // Clear up the remaining directories if exist.
+    for (const auto &dn : fm.get_dir_nodes()) {
+        remove_path(dn->replica_dir(app_type, test_pid));
+    }
+
+    ASSERT_EQ(nullptr, fm.find_replica_dir(app_type, test_pid));
+    dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+    ASSERT_NE(nullptr, dn);
+    const auto dir = dn->replica_dir(app_type, test_pid);
+    ASSERT_TRUE(directory_exists(dir));
+    dir_node *dn1 = fm.find_replica_dir(app_type, test_pid);
+    ASSERT_EQ(dn, dn1);
+}
+
+TEST(fs_manager, create_replica_dir_if_necessary)
+{
+    fs_manager fm;
+
+    const char *app_type = "create_replica_dir_if_necessary";
+    gpid test_pid(1, 0);
 
-    ASSERT_EQ(nullptr, fm.get_dir_node("/data2"));
-    ASSERT_EQ(nullptr, fm.get_dir_node("/data2/"));
-    ASSERT_EQ(nullptr, fm.get_dir_node("/data2/replica1"));
+    // Could not find a valid dir_node.
+    ASSERT_EQ(nullptr, fm.create_replica_dir_if_necessary(app_type, test_pid));
+
+    // It's able to create a dir for the replica after a valid dir_node has 
been added.
+    fm.add_new_dir_node("./data1", "data1");
+    dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+    ASSERT_NE(nullptr, dn);
+    ASSERT_EQ("data1", dn->tag);
 }
 
+TEST(fs_manager, create_child_replica_dir)
+{
+    fs_manager fm;
+    fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", 
"data3"});
+
+    const char *app_type = "create_child_replica_dir";
+    gpid test_pid(1, 0);
+    gpid test_child_pid(1, 0);
+
+    dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+    ASSERT_NE(nullptr, dn);
+    const auto dir = dn->replica_dir(app_type, test_pid);
+
+    dir_node *child_dn = fm.create_child_replica_dir(app_type, test_child_pid, 
dir);
+    ASSERT_EQ(dn, child_dn);
+    const auto child_dir = child_dn->replica_dir(app_type, test_child_pid);
+    ASSERT_TRUE(directory_exists(child_dir));
+    ASSERT_EQ(dir, child_dir);
+}
+
+TEST(fs_manager, find_best_dir_for_new_replica)
+{
+    // dn1 | 1.0,  1.1 +1.6
+    // dn2 | 1.2,  1.3 +1.7   2.0
+    // dn3 | 1.4  +1.5 +1.7   2.1
+    auto dn1 = std::make_shared<dir_node>("./data1", "data1");
+    dn1->holding_replicas[1] = {gpid(1, 0), gpid(1, 1)};
+    auto dn2 = std::make_shared<dir_node>("./data2", "data2");
+    dn2->holding_replicas[1] = {gpid(1, 2), gpid(1, 3)};
+    dn2->holding_replicas[2] = {gpid(2, 0)};
+    auto dn3 = std::make_shared<dir_node>("./data3", "data3");
+    dn3->holding_replicas[1] = {gpid(1, 4)};
+    dn3->holding_replicas[2] = {gpid(2, 1)};
+    fs_manager fm;
+    fm._dir_nodes = {dn1, dn2, dn3};
+
+    gpid pid_1_5(1, 5);
+    dir_node *dn = fm.find_best_dir_for_new_replica(pid_1_5);

Review Comment:
   ```suggestion
       auto dn = fm.find_best_dir_for_new_replica(pid_1_5);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to