This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 226067a5984 branch-3.0: [fix](restore) Make the DirMoveTask 
idempotent. #47313 (#47426)
226067a5984 is described below

commit 226067a598442237ed59649eac6190a09401fc30
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 19 10:28:21 2025 +0800

    branch-3.0: [fix](restore) Make the DirMoveTask idempotent. #47313 (#47426)
    
    Cherry-picked from #47313
    
    Co-authored-by: walter <[email protected]>
---
 be/src/runtime/snapshot_loader.cpp       |  26 +++-
 be/test/runtime/snapshot_loader_test.cpp | 257 ++++++++++++++++++++++++++++++-
 2 files changed, 278 insertions(+), 5 deletions(-)

diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index 109fe479368..97955d94051 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -58,7 +58,17 @@
 #include "util/thrift_rpc_helper.h"
 
 namespace doris {
-namespace {
+
+static std::string get_loaded_tag_path(const std::string& snapshot_path) {
+    return snapshot_path + "/LOADED";
+}
+
+static Status write_loaded_tag(const std::string& snapshot_path, int64_t 
tablet_id) {
+    std::unique_ptr<io::FileWriter> writer;
+    std::string file = get_loaded_tag_path(snapshot_path);
+    RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file, &writer));
+    return writer->close();
+}
 
 Status upload_with_checksum(io::RemoteFileSystem& fs, std::string_view 
local_path,
                             std::string_view remote_path, std::string_view 
checksum) {
@@ -85,8 +95,6 @@ bool _end_with(std::string_view str, std::string_view match) {
            str.compare(str.size() - match.size(), match.size(), match) == 0;
 }
 
-} // namespace
-
 SnapshotLoader::SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t 
job_id, int64_t task_id,
                                const TNetworkAddress& broker_addr,
                                const std::map<std::string, std::string>& prop)
@@ -751,6 +759,14 @@ Status SnapshotLoader::move(const std::string& 
snapshot_path, TabletSharedPtr ta
         return Status::InternalError(ss.str());
     }
 
+    std::string loaded_tag_path = get_loaded_tag_path(snapshot_path);
+    bool already_loaded = false;
+    RETURN_IF_ERROR(io::global_local_filesystem()->exists(loaded_tag_path, 
&already_loaded));
+    if (already_loaded) {
+        LOG(INFO) << "snapshot path already moved: " << snapshot_path;
+        return Status::OK();
+    }
+
     // rename the rowset ids and tabletid info in rowset meta
     auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, 
tablet_id,
                                                           
tablet->replica_id(), tablet->table_id(),
@@ -820,6 +836,10 @@ Status SnapshotLoader::move(const std::string& 
snapshot_path, TabletSharedPtr ta
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
+
+    // mark the snapshot path as loaded
+    RETURN_IF_ERROR(write_loaded_tag(snapshot_path, tablet_id));
+
     LOG(INFO) << "finished to reload header of tablet: " << tablet_id;
 
     return status;
diff --git a/be/test/runtime/snapshot_loader_test.cpp 
b/be/test/runtime/snapshot_loader_test.cpp
index 9e4de5f3f33..cb75760bb6d 100644
--- a/be/test/runtime/snapshot_loader_test.cpp
+++ b/be/test/runtime/snapshot_loader_test.cpp
@@ -17,18 +17,215 @@
 
 #include "runtime/snapshot_loader.h"
 
+#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
 #include <gtest/gtest-message.h>
 #include <gtest/gtest-test-part.h>
+#include <gtest/gtest_pred_impl.h>
 
+#include <cstdint>
 #include <filesystem>
+#include <iostream>
+#include <string>
 
-#include "gtest/gtest_pred_impl.h"
+#include "common/config.h"
+#include "common/object_pool.h"
+#include "exec/tablet_info.h"
+#include "io/fs/local_file_system.h"
+#include "olap/data_dir.h"
+#include "olap/delta_writer.h"
+#include "olap/iterators.h"
+#include "olap/olap_define.h"
+#include "olap/options.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/segment_v2/segment.h"
+#include "olap/schema.h"
+#include "olap/segment_loader.h"
+#include "olap/snapshot_manager.h"
 #include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/tablet_manager.h"
+#include "olap/task/engine_publish_version_task.h"
+#include "olap/txn_manager.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/descriptor_helper.h"
+#include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/runtime/vdatetime_value.h"
 
 namespace doris {
 
-TEST(SnapshotLoaderTest, NormalCase) {
+static const uint32_t MAX_PATH_LEN = 1024;
+static StorageEngine* engine_ref = nullptr;
+static std::string storage_root_path;
+
+static void set_up() {
+    char buffer[MAX_PATH_LEN];
+    EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+    storage_root_path = std::string(buffer) + "/snapshot_data_test";
+    auto st = 
io::global_local_filesystem()->delete_directory(storage_root_path);
+    ASSERT_TRUE(st.ok()) << st;
+    st = io::global_local_filesystem()->create_directory(storage_root_path);
+    ASSERT_TRUE(st.ok()) << st;
+    std::vector<StorePath> paths;
+    paths.emplace_back(storage_root_path, -1);
+
+    doris::EngineOptions options;
+    options.store_paths = paths;
+    options.backend_uid = UniqueId::gen_uid();
+    auto engine = std::make_unique<StorageEngine>(options);
+    engine_ref = engine.get();
+    Status s = engine->open();
+    ASSERT_TRUE(s.ok()) << s;
+    ASSERT_TRUE(s.ok()) << s;
+
+    ExecEnv* exec_env = doris::ExecEnv::GetInstance();
+    exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
+    exec_env->set_storage_engine(std::move(engine));
+}
+
+static void tear_down() {
+    ExecEnv* exec_env = doris::ExecEnv::GetInstance();
+    exec_env->set_memtable_memory_limiter(nullptr);
+    engine_ref = nullptr;
+    exec_env->set_storage_engine(nullptr);
+
+    if (storage_root_path.empty()) {
+        return;
+    }
+
+    // Status s = 
io::global_local_filesystem()->delete_directory(storage_root_path);
+    // EXPECT_TRUE(s.ok()) << "delete directory " << s;
+}
+
+static TCreateTabletReq create_tablet(int64_t partition_id, int64_t tablet_id,
+                                      int32_t schema_hash) {
+    TColumnType col_type;
+    col_type.__set_type(TPrimitiveType::SMALLINT);
+    TColumn col1;
+    col1.__set_column_name("col1");
+    col1.__set_column_type(col_type);
+    col1.__set_is_key(true);
+    std::vector<TColumn> cols;
+    cols.push_back(col1);
+    TTabletSchema tablet_schema;
+    tablet_schema.__set_short_key_column_count(1);
+    tablet_schema.__set_schema_hash(schema_hash);
+    tablet_schema.__set_keys_type(TKeysType::AGG_KEYS);
+    tablet_schema.__set_storage_type(TStorageType::COLUMN);
+    tablet_schema.__set_columns(cols);
+    TCreateTabletReq create_tablet_req;
+    create_tablet_req.__set_tablet_schema(tablet_schema);
+    create_tablet_req.__set_tablet_id(tablet_id);
+    create_tablet_req.__set_partition_id(partition_id);
+    create_tablet_req.__set_version(2);
+    return create_tablet_req;
+}
+
+static TDescriptorTable create_descriptor_tablet() {
+    TDescriptorTableBuilder dtb;
+    TTupleDescriptorBuilder tuple_builder;
+    tuple_builder.add_slot(
+            
TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("col1").column_pos(0).build());
+    tuple_builder.build(&dtb);
+    return dtb.desc_tbl();
+}
+
+static void add_rowset(int64_t tablet_id, int32_t schema_hash, int64_t 
partition_id, int64_t txn_id,
+                       int16_t value) {
+    TDescriptorTable tdesc_tbl = create_descriptor_tablet();
+    ObjectPool obj_pool;
+    DescriptorTbl* desc_tbl = nullptr;
+    static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    auto param = std::make_shared<OlapTableSchemaParam>();
+
+    PUniqueId load_id;
+    load_id.set_hi(0);
+    load_id.set_lo(0);
+    WriteRequest write_req;
+    write_req.tablet_id = tablet_id;
+    write_req.schema_hash = schema_hash;
+    write_req.txn_id = txn_id;
+    write_req.partition_id = partition_id;
+    write_req.load_id = load_id;
+    write_req.tuple_desc = tuple_desc;
+    write_req.slots = &(tuple_desc->slots());
+    write_req.is_high_priority = false;
+    write_req.table_schema_param = param;
+    auto profile = std::make_unique<RuntimeProfile>("LoadChannels");
+    auto delta_writer =
+            std::make_unique<DeltaWriter>(*engine_ref, write_req, 
profile.get(), TUniqueId {});
+
+    vectorized::Block block;
+    for (const auto& slot_desc : tuple_desc->slots()) {
+        std::cout << "slot_desc: " << slot_desc->col_name() << std::endl;
+        
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+
+    std::cout << "total column " << block.mutate_columns().size() << std::endl;
+    auto columns = block.mutate_columns();
+    int16_t c1 = value;
+    columns[0]->insert_data((const char*)&c1, sizeof(c1));
+    Status res = delta_writer->write(&block, {0});
+    EXPECT_TRUE(res.ok());
+
+    res = delta_writer->close();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->wait_flush();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->build_rowset();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->submit_calc_delete_bitmap_task();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->wait_calc_delete_bitmap();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->commit_txn(PSlaveTabletNodes());
+    ASSERT_TRUE(res.ok()) << res;
+
+    TabletSharedPtr tablet = 
engine_ref->tablet_manager()->get_tablet(tablet_id);
+    ASSERT_TRUE(tablet != nullptr);
+
+    std::cout << "before publish, tablet row nums:" << tablet->num_rows() << 
std::endl;
+    Version version;
+    version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
+    version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
+    std::cout << "start to add rowset version:" << version.first << "-" << 
version.second
+              << std::endl;
+    std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+    engine_ref->txn_manager()->get_txn_related_tablets(txn_id, partition_id, 
&tablet_related_rs);
+    ASSERT_EQ(1, tablet_related_rs.size());
+
+    std::cout << "start to publish txn" << std::endl;
+    RowsetSharedPtr rowset = tablet_related_rs.begin()->second;
+
+    TabletPublishStatistics stats;
+    res = engine_ref->txn_manager()->publish_txn(partition_id, tablet, txn_id, 
version, &stats);
+    ASSERT_TRUE(res.ok()) << res;
+    std::cout << "start to add inc rowset:" << rowset->rowset_id()
+              << ", num rows:" << rowset->num_rows() << ", version:" << 
rowset->version().first
+              << "-" << rowset->version().second << std::endl;
+    res = tablet->add_inc_rowset(rowset);
+    ASSERT_TRUE(res.ok()) << res;
+}
+
+class SnapshotLoaderTest : public ::testing::Test {
+public:
+    SnapshotLoaderTest() {}
+    ~SnapshotLoaderTest() {}
+    static void SetUpTestSuite() { set_up(); }
+
+    static void TearDownTestSuite() { tear_down(); }
+};
+
+TEST_F(SnapshotLoaderTest, NormalCase) {
     StorageEngine engine({});
     SnapshotLoader loader(engine, ExecEnv::GetInstance(), 1L, 2L);
 
@@ -87,4 +284,60 @@ TEST(SnapshotLoaderTest, NormalCase) {
     EXPECT_EQ(10005, tablet_id);
 }
 
+TEST_F(SnapshotLoaderTest, DirMoveTaskIsIdempotent) {
+    // 1. create a tablet
+    int64_t tablet_id = 111;
+    int32_t schema_hash = 222;
+    int64_t partition_id = 333;
+    TCreateTabletReq req = create_tablet(partition_id, tablet_id, schema_hash);
+    RuntimeProfile profile("CreateTablet");
+    Status status = engine_ref->create_tablet(req, &profile);
+    EXPECT_TRUE(status.ok());
+    TabletSharedPtr tablet = 
engine_ref->tablet_manager()->get_tablet(tablet_id);
+    EXPECT_TRUE(tablet != nullptr);
+
+    // 2. add a rowset
+    add_rowset(tablet_id, schema_hash, partition_id, 100, 100);
+    auto version = tablet->max_version();
+    std::cout << "version: " << version.first << ", " << version.second << 
std::endl;
+
+    // 3. make a snapshot
+    string snapshot_path;
+    bool allow_incremental_clone = false; // not used
+    TSnapshotRequest snapshot_request;
+    snapshot_request.tablet_id = tablet_id;
+    snapshot_request.schema_hash = schema_hash;
+    snapshot_request.version = version.second;
+    status = engine_ref->snapshot_mgr()->make_snapshot(snapshot_request, 
&snapshot_path,
+                                                       
&allow_incremental_clone);
+    ASSERT_TRUE(status.ok());
+
+    // 4. load the snapshot to another tablet
+    snapshot_path = fmt::format("{}/{}/{}", snapshot_path, tablet_id, 
schema_hash);
+    SnapshotLoader loader1(*engine_ref, ExecEnv::GetInstance(), 1L, tablet_id);
+    status = loader1.move(snapshot_path, tablet, true);
+    ASSERT_TRUE(status.ok()) << status;
+
+    // 5. Insert a rowset to the tablet
+    // reload tablet
+    tablet = engine_ref->tablet_manager()->get_tablet(tablet_id);
+    EXPECT_TRUE(tablet != nullptr);
+    add_rowset(tablet_id, schema_hash, partition_id, 200, 200);
+    version = tablet->max_version();
+    std::cout << "version: " << version.first << ", " << version.second << 
std::endl;
+
+    // 6. load the snapshot to the tablet again, this request should be 
idempotent
+    SnapshotLoader loader2(*engine_ref, ExecEnv::GetInstance(), 2L, tablet_id);
+    status = loader2.move(snapshot_path, tablet, true);
+    ASSERT_TRUE(status.ok()) << status;
+
+    // reload tablet
+    tablet = engine_ref->tablet_manager()->get_tablet(tablet_id);
+    EXPECT_TRUE(tablet != nullptr);
+    auto last_version = tablet->max_version();
+    std::cout << "last version: " << last_version.first << ", " << 
last_version.second << std::endl;
+    ASSERT_EQ(version.first, last_version.first);
+    ASSERT_EQ(version.second, last_version.second);
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to