GehaFearless commented on code in PR #1930:
URL: 
https://github.com/apache/incubator-pegasus/pull/1930#discussion_r1522715844


##########
src/shell/commands/local_partition_split.cpp:
##########
@@ -0,0 +1,752 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// IWYU pragma: no_include <ext/alloc_traits.h>
+#include <fmt/core.h>
+#include <fmt/format.h>
+#include <rocksdb/db.h>
+#include <rocksdb/env.h>
+#include <rocksdb/iterator.h>
+#include <rocksdb/metadata.h>
+#include <rocksdb/options.h>
+#include <rocksdb/slice.h>
+#include <rocksdb/sst_file_reader.h>
+#include <rocksdb/sst_file_writer.h>
+#include <rocksdb/table_properties.h>
+#include <rocksdb/threadpool.h>
+#include <stdio.h>
+#include <algorithm>
+#include <cmath>
+#include <cstdint>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "base/meta_store.h"
+#include "base/pegasus_key_schema.h"
+#include "base/value_schema_manager.h"
+#include "client/partition_resolver.h"
+#include "client/replication_ddl_client.h"
+#include "common/gpid.h"
+#include "common/replication_common.h"
+#include "dsn.layer2_types.h"
+#include "pegasus_value_schema.h"
+#include "replica/replica_stub.h"
+#include "replica/replication_app_base.h"
+#include "shell/argh.h"
+#include "shell/command_executor.h"
+#include "shell/command_helper.h"
+#include "shell/commands.h"
+#include "utils/blob.h"
+#include "utils/errors.h"
+#include "utils/filesystem.h"
+#include "utils/fmt_logging.h"
+#include "utils/load_dump_object.h"
+#include "utils/output_utils.h"
+
+const std::string local_partition_split_help =
+    "<src_data_dirs> <dst_data_dirs> <src_app_id> "
+    "<dst_app_id> <src_partition_ids> <src_partition_count> "
+    "<dst_partition_count> <dst_app_name> [--post_full_compact] [--post_count] 
"
+    "[--threads_per_data_dir num] [--threads_per_partition num]";
+
+struct ToSplitPatition
+{
+    std::string replica_dir;
+    dsn::app_info ai;
+    dsn::replication::replica_init_info rii;
+    int32_t pidx = 0;
+};
+
+struct LocalPartitionSplitContext
+{
+    // Parameters from the command line.
+    std::vector<std::string> src_data_dirs;
+    std::vector<std::string> dst_data_dirs;
+    uint32_t src_app_id = 0;
+    uint32_t dst_app_id = 0;
+    std::set<uint32_t> src_partition_ids;
+    uint32_t src_partition_count = 0;
+    uint32_t dst_partition_count = 0;
+    uint32_t threads_per_data_dir = 1;
+    uint32_t threads_per_partition = 1;
+    std::string dst_app_name;
+    bool post_full_compact = false;
+    bool post_count = false;
+
+    // Calculate from the parameters above.
+    uint32_t split_count = 0;
+};
+
+struct FileSplitResult
+{
+    std::string filename;
+    bool success = false;
+    std::vector<uint64_t> split_counts;
+};
+
+struct PartitionSplitResult
+{
+    std::string src_replica_dir;
+    std::map<std::string, int64_t> key_count_by_dst_replica_dirs;
+    bool success = false;
+    std::vector<FileSplitResult> fsrs;
+};
+
+struct DataDirSplitResult
+{
+    std::string src_data_dir;
+    std::string dst_data_dir;
+    bool success = false;
+    std::vector<PartitionSplitResult> psrs;
+};
+
+bool validate_parameters(LocalPartitionSplitContext &lpsc)
+{
+    // TODO(yingchun): check disk space.
+    // Check <src_data_dirs> and <dst_data_dirs>.
+    RETURN_FALSE_IF_NOT(
+        lpsc.src_data_dirs.size() == lpsc.dst_data_dirs.size(),
+        "invalid command, the list size of <src_data_dirs> and <dst_data_dirs> 
must be equal");
+
+    // Check <dst_app_id>.
+    RETURN_FALSE_IF_NOT(
+        lpsc.src_app_id != lpsc.dst_app_id,
+        "invalid command, <src_app_id> and <dst_app_id> should not be equal 
({} vs. {})",
+        lpsc.src_app_id,
+        lpsc.dst_app_id);
+
+    // Check <src_partition_ids>.
+    for (const auto src_partition_id : lpsc.src_partition_ids) {
+        RETURN_FALSE_IF_NOT(
+            src_partition_id < lpsc.src_partition_count,
+            "invalid command, partition ids in <src_partition_ids> should be 
in range [0, {})",
+            lpsc.src_partition_count);
+    }
+
+    // Check <dst_partition_count>.
+    RETURN_FALSE_IF_NOT(lpsc.dst_partition_count > lpsc.src_partition_count,
+                        "invalid command, <dst_partition_count> should be 
larger than "
+                        "<src_partition_count> ({} vs. {})",
+                        lpsc.dst_partition_count,
+                        lpsc.src_partition_count);
+    lpsc.split_count = lpsc.dst_partition_count / lpsc.src_partition_count;
+    const auto log2n = static_cast<uint32_t>(log2(lpsc.split_count));
+    RETURN_FALSE_IF_NOT(pow(2, log2n) == lpsc.split_count,
+                        "invalid command, <dst_partition_count> should be 2^n 
times of "
+                        "<src_partition_count> ({} vs. {})",
+                        lpsc.dst_partition_count,
+                        lpsc.src_partition_count);
+
+    const auto es = 
replication_ddl_client::validate_app_name(lpsc.dst_app_name);
+    RETURN_FALSE_IF_NOT(es.is_ok(),
+                        "invalid command, <dst_app_name> '{}' is invalid: {}",
+                        lpsc.dst_app_name,
+                        es.description());
+
+    return true;
+}
+
+std::string construct_split_directory(const std::string &parent_dir,
+                                      const ToSplitPatition &tsp,
+                                      uint32_t dst_app_id,
+                                      uint32_t split_index)
+{
+    return fmt::format("{}/{}.{}.pegasus",
+                       parent_dir,
+                       dst_app_id,
+                       tsp.pidx + split_index * tsp.ai.partition_count);
+}
+
+bool split_file(const LocalPartitionSplitContext &lpsc,
+                const ToSplitPatition &tsp,
+                const rocksdb::LiveFileMetaData &file,
+                const std::string &tmp_split_replicas_dir,
+                uint32_t pegasus_data_version,
+                FileSplitResult &sfr)
+{
+    const auto src_sst_file = 
dsn::utils::filesystem::path_combine(file.db_path, file.name);
+
+    // 1. Open reader.
+    // TODO(yingchun): improve options.
+    auto reader = std::make_unique<rocksdb::SstFileReader>(rocksdb::Options());
+    RETURN_FALSE_IF_NON_RDB_OK(
+        reader->Open(src_sst_file), "open reader file '{}' failed", 
src_sst_file);
+    RETURN_FALSE_IF_NON_RDB_OK(
+        reader->VerifyChecksum(), "verify reader file '{}' failed", 
src_sst_file);
+
+    // 2. Validate the files.
+    const auto tbl_ppts = reader->GetTableProperties();
+    // The metadata column family file has been skipped in the previous steps.
+    CHECK_NE(tbl_ppts->column_family_name, 
pegasus::server::meta_store::META_COLUMN_FAMILY_NAME);
+    // TODO(yingchun): It seems the SstFileReader could only read the live 
key-value
+    //  pairs in the sst file. If a key-value pair is put in a higher level 
and deleted
+    //  in a lower level, it can still be read when iterate the high level sst 
file,
+    //  which means the deleted key-value pair will appear again.
+    //  So it's needed to do a full compaction before using the 
'local_partition_split'
+    //  tool to remove this kind of keys from DB.
+    //  We use the following validators to check the sst file.
+    RETURN_FALSE_IF_NOT(tbl_ppts->num_deletions == 0,
+                        "invalid sst file '{}', it contains {} deletions",
+                        src_sst_file,
+                        tbl_ppts->num_deletions);
+    RETURN_FALSE_IF_NOT(tbl_ppts->num_merge_operands == 0,
+                        "invalid sst file '{}', it contains {} merge_operands",
+                        src_sst_file,
+                        tbl_ppts->num_merge_operands);
+    RETURN_FALSE_IF_NOT(tbl_ppts->num_range_deletions == 0,
+                        "invalid sst file '{}', it contains {} 
range_deletions",
+                        src_sst_file,
+                        tbl_ppts->num_range_deletions);
+
+    // 3. Prepare the split temporary directories.
+    std::vector<std::string> dst_tmp_rdb_dirs;
+    dst_tmp_rdb_dirs.resize(lpsc.split_count);
+    for (int i = 0; i < lpsc.split_count; i++) {
+        const auto dst_tmp_rdb_dir =
+            construct_split_directory(tmp_split_replicas_dir, tsp, 
lpsc.dst_app_id, i);
+
+        
RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(dst_tmp_rdb_dir),
+                            "create directory '{}' failed",
+                            dst_tmp_rdb_dir);
+
+        dst_tmp_rdb_dirs[i] = dst_tmp_rdb_dir;
+    }
+
+    // 4. Iterate the sst file though sst reader, then split it to multiple 
sst files
+    // though sst writers.
+    std::shared_ptr<rocksdb::SstFileWriter> writers[lpsc.split_count];
+    std::unique_ptr<rocksdb::Iterator> iter(reader->NewIterator({}));
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+        const auto &skey = iter->key();
+        const auto &svalue = iter->value();
+        // Skip empty write, see:
+        // https://pegasus.apache.org/zh/2018/03/07/last_flushed_decree.html.
+        if (skey.empty() &&
+            pegasus::value_schema_manager::instance()
+                .get_value_schema(pegasus_data_version)
+                ->extract_user_data(svalue.ToString())
+                .empty()) {
+            continue;
+        }
+
+        // i. Calculate the hash value and corresponding new partition index.
+        dsn::blob bb_key(skey.data(), 0, skey.size());
+        uint64_t hash_value = pegasus::pegasus_key_hash(bb_key);
+        const auto new_pidx = 
dsn::replication::partition_resolver::get_partition_index(
+            static_cast<int>(lpsc.dst_partition_count), hash_value);
+        CHECK_LE(0, new_pidx);
+        CHECK_LT(new_pidx, lpsc.dst_partition_count);
+
+        // ii. Calculate the writer index.
+        const auto writer_idx = new_pidx / lpsc.src_partition_count;
+        CHECK_LE(0, writer_idx);
+        CHECK_LT(writer_idx, lpsc.split_count);
+
+        // TODO(yingchun): improve to check expired data.
+
+        // iii. Create the writer if needed.
+        auto &dst_writer = writers[writer_idx];
+        if (!dst_writer) {
+            const auto dst_tmp_rdb_file =
+                fmt::format("{}{}", dst_tmp_rdb_dirs[writer_idx], file.name);
+            // TODO(yingchun): improve options.
+            dst_writer =
+                
std::make_shared<rocksdb::SstFileWriter>(rocksdb::EnvOptions(), 
rocksdb::Options());
+            RETURN_FALSE_IF_NON_RDB_OK(dst_writer->Open(dst_tmp_rdb_file),
+                                       "open writer file '{}' failed",
+                                       dst_tmp_rdb_file);
+        }
+
+        // iv. Write data to the new partition sst file.
+        sfr.split_counts[writer_idx]++;
+        RETURN_FALSE_IF_NON_RDB_OK(dst_writer->Put(skey, svalue),
+                                   "write data failed when split from file {}",
+                                   src_sst_file);
+    }
+
+    // 5. Finalize the writers.
+    for (int i = 0; i < lpsc.split_count; i++) {
+        // Skip the non-opened writer.
+        if (sfr.split_counts[i] == 0) {
+            CHECK_TRUE(writers[i] == nullptr);
+            continue;
+        }
+
+        RETURN_FALSE_IF_NON_RDB_OK(writers[i]->Finish(nullptr),
+                                   "finalize writer split from file '{}' 
failed",
+                                   src_sst_file);
+    }
+    return true;
+}
+
+bool open_rocksdb(const rocksdb::DBOptions &db_opts,
+                  const std::string &rdb_dir,
+                  bool read_only,
+                  const std::vector<rocksdb::ColumnFamilyDescriptor> &cf_dscs,
+                  std::vector<rocksdb::ColumnFamilyHandle *> *cf_hdls,
+                  rocksdb::DB **db)
+{
+    CHECK_NOTNULL(cf_hdls, "");
+    CHECK_NOTNULL(db, "");
+    if (read_only) {
+        RETURN_FALSE_IF_NON_RDB_OK(
+            rocksdb::DB::OpenForReadOnly(db_opts, rdb_dir, cf_dscs, cf_hdls, 
db),
+            "open rocksdb in '{}' failed",
+            rdb_dir);
+    } else {
+        RETURN_FALSE_IF_NON_RDB_OK(rocksdb::DB::Open(db_opts, rdb_dir, 
cf_dscs, cf_hdls, db),
+                                   "open rocksdb in '{}' failed",
+                                   rdb_dir);
+    }
+    CHECK_EQ(2, cf_hdls->size());
+    CHECK_EQ(pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, 
(*cf_hdls)[0]->GetName());
+    CHECK_EQ(pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, 
(*cf_hdls)[1]->GetName());
+
+    return true;
+}
+
+void release_db(std::vector<rocksdb::ColumnFamilyHandle *> *cf_hdls, 
rocksdb::DB **db)
+{
+    CHECK_NOTNULL(cf_hdls, "");
+    CHECK_NOTNULL(db, "");
+    for (auto cf_hdl : *cf_hdls) {
+        delete cf_hdl;
+    }
+    cf_hdls->clear();
+    delete *db;
+    *db = nullptr;
+}
+
+bool split_partition(const LocalPartitionSplitContext &lpsc,
+                     const ToSplitPatition &tsp,
+                     const std::string &dst_replicas_dir,
+                     const std::string &tmp_split_replicas_dir,
+                     PartitionSplitResult &psr)
+{
+    static const std::string kRdbDirPostfix =
+        
dsn::utils::filesystem::path_combine(dsn::replication::replication_app_base::kDataDir,
+                                             
dsn::replication::replication_app_base::kRdbDir);
+    const auto rdb_dir = dsn::utils::filesystem::path_combine(tsp.replica_dir, 
kRdbDirPostfix);
+    fmt::print(stdout, " start to split '{}'\n", rdb_dir);
+
+    // 1. Open the original rocksdb in read-only mode.
+    rocksdb::DBOptions db_opts;
+    // The following options should be set in Pegasus 2.0 and lower versions.
+    // db_opts.pegasus_data = true;
+    // db_opts.pegasus_data_version = pegasus::PEGASUS_DATA_VERSION_MAX;
+    const std::vector<rocksdb::ColumnFamilyDescriptor> cf_dscs(
+        {{pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, {}},
+         {pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, {}}});
+    std::vector<rocksdb::ColumnFamilyHandle *> cf_hdls;
+    rocksdb::DB *db = nullptr;
+    RETURN_FALSE_IF_NOT(open_rocksdb(db_opts, rdb_dir, true, cf_dscs, 
&cf_hdls, &db), "");
+
+    // 2. Get metadata from rocksdb.
+    // - In Pegasus versions lower than 2.0, the metadata is only stored in 
the MANIFEST
+    //   file.
+    // - In Pegasus 2.0, the metadata is stored both in the metadata column 
family and
+    //   MANIFEST file.
+    // - Since Pegasus 2.1, the metadata is only stored in the metadata column 
family.
+    auto ms = std::make_unique<pegasus::server::meta_store>(rdb_dir.c_str(), 
db, cf_hdls[1]);
+    uint64_t last_committed_decree;
+    RETURN_FALSE_IF_NON_OK(ms->get_last_flushed_decree(&last_committed_decree),
+                           "get_last_flushed_decree from '{}' failed",
+                           rdb_dir);
+
+    uint32_t pegasus_data_version;
+    RETURN_FALSE_IF_NON_OK(
+        ms->get_data_version(&pegasus_data_version), "get_data_version from 
'{}' failed", rdb_dir);
+
+    uint64_t last_manual_compact_finish_time;
+    RETURN_FALSE_IF_NON_OK(
+        
ms->get_last_manual_compact_finish_time(&last_manual_compact_finish_time),
+        "get_last_manual_compact_finish_time from '{}' failed",
+        rdb_dir);
+
+    // 3. Get all live sst files.
+    std::vector<rocksdb::LiveFileMetaData> files;
+    db->GetLiveFilesMetaData(&files);
+
+    // 4. Close rocksdb.
+    release_db(&cf_hdls, &db);
+
+    // 5. Split the sst files.
+    auto files_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(
+        rocksdb::NewThreadPool(static_cast<int>(lpsc.threads_per_partition)));
+    psr.fsrs.reserve(files.size());
+    for (const auto &file : files) {
+        // Skip metadata column family files, we will write metadata manually 
later in
+        // the new DB.
+        if (file.column_family_name == 
pegasus::server::meta_store::META_COLUMN_FAMILY_NAME) {
+            fmt::print(
+                stdout, "  skip [{}]: {}: {}\n", file.column_family_name, 
file.db_path, file.name);
+            continue;
+        }
+
+        // Statistic the file split result.
+        psr.fsrs.emplace_back();
+        auto &sfr = psr.fsrs.back();
+        sfr.filename = file.name;
+        sfr.split_counts.resize(lpsc.split_count);
+
+        files_thread_pool->SubmitJob([=, &sfr]() {
+            sfr.success =
+                split_file(lpsc, tsp, file, tmp_split_replicas_dir, 
pegasus_data_version, sfr);
+        });
+    }
+    files_thread_pool->WaitForJobsAndJoinAllThreads();
+    files_thread_pool.reset();
+
+    // 6. Create new rocksdb instances for the new partitions.
+    // TODO(yingchun): poolize the following operations if necessary.
+    for (int i = 0; i < lpsc.split_count; i++) {
+        // The new replica is placed in 'dst_replicas_dir'.
+        const auto new_replica_dir =
+            construct_split_directory(dst_replicas_dir, tsp, lpsc.dst_app_id, 
i);
+        const auto new_rdb_dir =
+            dsn::utils::filesystem::path_combine(new_replica_dir, 
kRdbDirPostfix);
+
+        // i. Create the directory for the split rocksdb.
+        // TODO(yingchun): make sure it's not exist!
+        
RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(new_rdb_dir),
+                            "create directory '{}' failed",
+                            new_rdb_dir);
+
+        // ii. Open new rocksdb.
+        rocksdb::DBOptions new_db_opts;
+        new_db_opts.create_if_missing = true;
+        // Create the 'pegasus_meta_cf' column family.
+        new_db_opts.create_missing_column_families = true;
+        RETURN_FALSE_IF_NOT(open_rocksdb(new_db_opts, new_rdb_dir, false, 
cf_dscs, &cf_hdls, &db),
+                            "");
+        const auto count_of_new_replica =
+            psr.key_count_by_dst_replica_dirs.insert({new_replica_dir, -1});
+        CHECK_TRUE(count_of_new_replica.second);
+
+        // iii. Ingest the split sst files to the new rocksdb.
+        do {
+            // Skip non-exist directory.
+            const auto dst_tmp_rdb_dir =
+                construct_split_directory(tmp_split_replicas_dir, tsp, 
lpsc.dst_app_id, i);
+            if (!dsn::utils::filesystem::directory_exists(dst_tmp_rdb_dir)) {
+                break;
+            }
+
+            // Gather all files.
+            rocksdb::IngestExternalFileArg arg;
+            arg.column_family = cf_hdls[0];
+            RETURN_FALSE_IF_NOT(
+                dsn::utils::filesystem::get_subfiles(dst_tmp_rdb_dir, 
arg.external_files, false),
+                "get sub-files from '{}' failed",
+                dst_tmp_rdb_dir);
+
+            // Skip empty directory.
+            if (arg.external_files.empty()) {
+                break;
+            }
+
+            // Ingest files.
+            RETURN_FALSE_IF_NON_RDB_OK(db->IngestExternalFiles({arg}),
+                                       "ingest files from '{}' to '{}' failed",
+                                       dst_tmp_rdb_dir,
+                                       new_rdb_dir);
+
+            // Optional full compaction.
+            if (lpsc.post_full_compact) {
+                RETURN_FALSE_IF_NON_RDB_OK(
+                    db->CompactRange(rocksdb::CompactRangeOptions(), nullptr, 
nullptr),
+                    "full compact rocksdb in '{}' failed",
+                    new_rdb_dir);
+            }
+
+            // Optional data counting.
+            if (lpsc.post_count) {
+                std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator({}));
+                int new_total_count = 0;
+                for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+                    new_total_count++;
+                }
+                count_of_new_replica.first->second = new_total_count;
+            }
+        } while (false);
+
+        // iv. Set metadata to rocksdb.
+        // - In Pegasus versions lower than 2.0, the metadata is only stored 
in the MANIFEST
+        //   file.
+        // - In Pegasus 2.0, the metadata is stored both in the metadata 
column family and
+        //   MANIFEST file.
+        // - Since Pegasus 2.1, the metadata is only stored in the metadata 
column family.
+
+        // TODO(yingchun): these metadata are only written to the metadata 
column family,
+        //  not the manifest file. So this tool is not supporting Pegasus 
versions lower
+        //  than 2.0.
+        //  For Pegasus 2.0, it's needed to set 
[pegasus.server]get_meta_store_type =
+        //  "metacf" when restart replica servers after using this tool.
+        auto new_ms =
+            std::make_unique<pegasus::server::meta_store>(new_rdb_dir.c_str(), 
db, cf_hdls[1]);
+        new_ms->set_data_version(pegasus_data_version);
+        new_ms->set_last_flushed_decree(last_committed_decree);
+        
new_ms->set_last_manual_compact_finish_time(last_manual_compact_finish_time);
+        rocksdb::FlushOptions options;
+        options.wait = true;
+        RETURN_FALSE_IF_NON_RDB_OK(
+            db->Flush(options, cf_hdls), "flush rocksdb in '{}' failed", 
new_rdb_dir);
+
+        // v. Close rocksdb.
+        release_db(&cf_hdls, &db);
+
+        // vi. Generate new ".app-info".
+        dsn::app_info new_ai(tsp.ai);
+        new_ai.app_name = lpsc.dst_app_name;
+        new_ai.app_id = static_cast<int32_t>(lpsc.dst_app_id);
+        new_ai.partition_count = 
static_cast<int32_t>(lpsc.dst_partition_count);
+        // Note that the online partition split used 'init_partition_count' 
field will be
+        // reset.
+        new_ai.init_partition_count = -1;
+        dsn::replication::replica_app_info rai(&new_ai);
+        const auto rai_path = dsn::utils::filesystem::path_combine(
+            new_replica_dir, dsn::replication::replica_app_info::kAppInfo);
+        RETURN_FALSE_IF_NON_OK(rai.store(rai_path), "write replica_app_info 
'{}' failed", rai_path);
+
+        // vii. Generate new ".init-info".
+        dsn::replication::replica_init_info new_rii(tsp.rii);
+        new_rii.init_offset_in_shared_log = 0;
+        new_rii.init_offset_in_private_log = 0;
+        const auto rii_path =
+            dsn::utils::filesystem::path_combine(new_replica_dir, 
replica_init_info::kInitInfo);
+        RETURN_FALSE_IF_NON_OK(dsn::utils::dump_rjobj_to_file(new_rii, 
rii_path),
+                               "write replica_init_info '{}' failed",
+                               rii_path);
+    }
+    if (std::any_of(psr.fsrs.begin(), psr.fsrs.end(), [](const FileSplitResult 
&fsr) {
+            return !fsr.success;
+        })) {
+        return false;
+    }
+    return true;
+}
+
+bool split_data_directory(const LocalPartitionSplitContext &lpsc,
+                          const std::string &src_data_dir,
+                          const std::string &dst_data_dir,
+                          DataDirSplitResult &ddsr)
+{
+    static const std::string kReplicasDir =
+        
dsn::utils::filesystem::path_combine(dsn::replication::replication_options::kReplicaAppType,
+                                             
dsn::replication::replication_options::kRepsDir);
+
+    // 1. Collect all replica directories from 'src_data_dir'.
+    const auto src_replicas_dir = 
dsn::utils::filesystem::path_combine(src_data_dir, kReplicasDir);
+    std::vector<std::string> replica_dirs;
+    RETURN_FALSE_IF_NOT(
+        dsn::utils::filesystem::get_subdirectories(src_replicas_dir, 
replica_dirs, false),
+        "get sub-directories from '{}' failed",
+        src_replicas_dir);
+
+    // 2. Create temporary split directory on 'dst_data_dir'.
+    const auto tmp_split_replicas_dir = 
dsn::utils::filesystem::path_combine(dst_data_dir, "split");
+    
RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(tmp_split_replicas_dir),
+                        "create split directory '{}' failed",
+                        tmp_split_replicas_dir);
+
+    // 3. Gather partitions to split.
+    std::vector<ToSplitPatition> to_split_partitions;
+    std::set<uint32_t> exist_app_ids;
+    std::set<std::string> exist_app_names;
+    std::set<uint32_t> remain_partition_ids(lpsc.src_partition_ids);
+    const std::set<std::string> ordered_replica_dirs(replica_dirs.begin(), 
replica_dirs.end());
+    for (const auto &replica_dir : ordered_replica_dirs) {
+        // i. Validate the replica directory.
+        dsn::app_info ai;
+        dsn::gpid pid;
+        std::string hint_message;
+        if (!replica_stub::validate_replica_dir(replica_dir, ai, pid, 
hint_message)) {
+            fmt::print(stderr, "invalid replica dir '{}': {}\n", replica_dir, 
hint_message);
+            continue;
+        }
+
+        // ii. Skip the non-<src_app_id>.
+        CHECK_EQ(pid.get_app_id(), ai.app_id);
+        if (ai.app_id != lpsc.src_app_id) {
+            continue;
+        }
+
+        // iii. Skip and warning for the replica with the same app id but not 
desired partition
+        // index.
+        const auto cur_pidx = pid.get_partition_index();
+        if (lpsc.src_partition_ids.count(cur_pidx) == 0) {
+            fmt::print(stdout,
+                       "WARNING: the partition index {} of the <src_app_id> {} 
is skipped\n",
+                       cur_pidx,
+                       lpsc.src_app_id);
+            continue;
+        }
+
+        // iv. Continue and warning if the <dst_app_id> exist.
+        exist_app_ids.insert(ai.app_id);
+        if (exist_app_ids.count(lpsc.dst_app_id) != 0) {
+            fmt::print(
+                stdout,
+                "WARNING: there is already a replica {} with the same 
<dst_app_id> {} exists\n",
+                replica_dir,
+                lpsc.dst_app_id);
+        }
+
+        // v. Continue and warning if the <dst_app_name> exist.
+        exist_app_names.insert(ai.app_name);
+        if (exist_app_names.count(lpsc.dst_app_name) != 0) {
+            fmt::print(
+                stdout,
+                "WARNING: there is already a replica {} with the same 
<dst_app_name> {} exists\n",
+                replica_dir,
+                lpsc.dst_app_name);
+        }
+
+        // vi. Check if <src_partition_count> matches.
+        RETURN_FALSE_IF_NOT(ai.partition_count == lpsc.src_partition_count,
+                            "unmatched <src_partition_count> ({} vs {})",
+                            ai.partition_count,
+                            lpsc.src_partition_count);
+
+        // vii. Check the app status.
+        RETURN_FALSE_IF_NOT(ai.status == dsn::app_status::AS_AVAILABLE,
+                            "not support to split app '{}' in non-AVAILABLE 
status",
+                            ai.app_name);
+
+        // viii. Check if the app is duplicating or bulk loading.
+        RETURN_FALSE_IF_NOT(!ai.duplicating && !ai.is_bulk_loading,
+                            "not support to split app '{}' which is 
duplicating or bulk loading",
+                            ai.app_name);
+
+        // ix. Load the replica_init_info.
+        dsn::replication::replica_init_info rii;
+        const auto rii_path =
+            dsn::utils::filesystem::path_combine(replica_dir, 
replica_init_info::kInitInfo);
+        RETURN_FALSE_IF_NON_OK(dsn::utils::load_rjobj_from_file(rii_path, 
&rii),
+                               "load replica_init_info from '{}' failed",
+                               rii_path);
+
+        // x. Gather the replica.
+        to_split_partitions.push_back({replica_dir, ai, rii, 
pid.get_partition_index()});
+        remain_partition_ids.erase(cur_pidx);
+    }
+
+    if (!remain_partition_ids.empty()) {
+        fmt::print(stdout,
+                   "WARNING: the partitions {} are skipped to be split\n",
+                   fmt::join(remain_partition_ids, ","));
+    }
+
+    // 4. Split the partitions.
+    const auto dst_replicas_dir = 
dsn::utils::filesystem::path_combine(dst_data_dir, kReplicasDir);
+    auto partitions_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(
+        rocksdb::NewThreadPool(static_cast<int>(lpsc.threads_per_data_dir)));
+    ddsr.psrs.reserve(to_split_partitions.size());
+    for (const auto &tsp : to_split_partitions) {
+        // Statistic the partition split result.
+        ddsr.psrs.emplace_back();
+        auto &psr = ddsr.psrs.back();
+        psr.src_replica_dir = tsp.replica_dir;
+
+        partitions_thread_pool->SubmitJob([=, &psr]() {
+            psr.success = split_partition(lpsc, tsp, dst_replicas_dir, 
tmp_split_replicas_dir, psr);
+        });
+    }
+    partitions_thread_pool->WaitForJobsAndJoinAllThreads();
+    if (std::any_of(ddsr.psrs.begin(), ddsr.psrs.end(), [](const 
PartitionSplitResult &psr) {
+            return !psr.success;
+        })) {
+        return false;
+    }
+    return true;
+}
+
+bool local_partition_split(command_executor *e, shell_context *sc, arguments 
args)
+{
+    // 1. Parse parameters.
+    argh::parser cmd(args.argc, args.argv);
+    RETURN_FALSE_IF_NOT(cmd.pos_args().size() >= 8,
+                        "invalid command, should be in the form of '{}'",
+                        local_partition_split_help);
+    int param_index = 1;
+    LocalPartitionSplitContext lpsc;
+    PARSE_STRS(lpsc.src_data_dirs);
+    PARSE_STRS(lpsc.dst_data_dirs);
+    PARSE_UINT(lpsc.src_app_id);
+    PARSE_UINT(lpsc.dst_app_id);
+    PARSE_UINTS(lpsc.src_partition_ids);
+    PARSE_UINT(lpsc.src_partition_count);
+    PARSE_UINT(lpsc.dst_partition_count);
+    lpsc.dst_app_name = cmd(param_index++).str();
+    PARSE_OPT_UINT("threads_per_data_dir", lpsc.threads_per_data_dir, 1);
+    PARSE_OPT_UINT("threads_per_partition", lpsc.threads_per_partition, 1);
+    lpsc.post_full_compact = cmd["--post_full_compact"];
+    lpsc.post_count = cmd["--post_count"];
+
+    // 2. Check parameters.
+    if (!validate_parameters(lpsc)) {
+        return false;
+    }
+
+    // 3. Split each data directory.
+    auto data_dirs_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(
+        rocksdb::NewThreadPool(static_cast<int>(lpsc.src_data_dirs.size())));
+    CHECK_EQ(lpsc.src_data_dirs.size(), lpsc.dst_data_dirs.size());

Review Comment:
   `validate_parameters` already check it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to