This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4939d8c6d1e branch-3.0: [Enhancement](recycler) Add some UT for 
recycler #47739 (#47803)
4939d8c6d1e is described below

commit 4939d8c6d1e8f8d59b2cbeafb586bde99b6a8826
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 12 12:11:09 2025 +0800

    branch-3.0: [Enhancement](recycler) Add some UT for recycler #47739 (#47803)
    
    Cherry-picked from #47739
    
    Co-authored-by: abmdocrt <[email protected]>
---
 cloud/src/recycler/recycler.cpp    |   6 +-
 cloud/src/recycler/s3_accessor.cpp |   2 +
 cloud/test/hdfs_accessor_test.cpp  |   4 +
 cloud/test/recycler_test.cpp       | 310 +++++++++++++++++++++++++++++++++++++
 4 files changed, 320 insertions(+), 2 deletions(-)

diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 633a0e081c6..52597fad04b 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -282,8 +282,8 @@ void Recycler::recycle_callback() {
             recycling_instance_map_.erase(instance_id);
         }
         auto elpased_ms =
-                ctime_ms -
-                
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+                
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() -
+                ctime_ms;
         LOG_INFO("finish recycle instance")
                 .tag("instance_id", instance_id)
                 .tag("cost_ms", elpased_ms);
@@ -1595,6 +1595,8 @@ int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaClou
             DCHECK(accessor_map_.count(*rid))
                     << "uninitilized accessor, instance_id=" << instance_id_
                     << " resource_id=" << resource_id << " path[0]=" << 
(*paths)[0];
+            
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::delete_rowset_data.no_resource_id",
+                                     &accessor_map_);
             if (!accessor_map_.contains(*rid)) {
                 LOG_WARNING("delete rowset data accessor_map_ does not 
contains resouce id")
                         .tag("resource_id", resource_id)
diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index 224b36c277c..0ef150e20d1 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -22,6 +22,7 @@
 #include <aws/core/client/DefaultRetryStrategy.h>
 #include <aws/s3/S3Client.h>
 #include <bvar/reducer.h>
+#include <cpp/sync_point.h>
 #include <gen_cpp/cloud.pb.h>
 
 #include <algorithm>
@@ -224,6 +225,7 @@ std::string S3Accessor::to_uri(const std::string& 
relative_path) const {
 }
 
 int S3Accessor::create(S3Conf conf, std::shared_ptr<S3Accessor>* accessor) {
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("S3Accessor::init.s3_init_failed", 
(int)-1);
     switch (conf.provider) {
     case S3Conf::GCS:
         *accessor = std::make_shared<GcsAccessor>(std::move(conf));
diff --git a/cloud/test/hdfs_accessor_test.cpp 
b/cloud/test/hdfs_accessor_test.cpp
index 11c0af3853b..40b9fdda3b4 100644
--- a/cloud/test/hdfs_accessor_test.cpp
+++ b/cloud/test/hdfs_accessor_test.cpp
@@ -258,6 +258,10 @@ TEST(HdfsAccessorTest, delete_prefix) {
     put_and_verify("data/20000/1_0.dat");
     put_and_verify("data111/10000/1_0.dat");
 
+    ret = accessor.delete_prefix("nonexist");
+    EXPECT_EQ(ret, -1);
+    ret = accessor.delete_prefix("/");
+    EXPECT_EQ(ret, -1);
     ret = accessor.delete_prefix("data/10000/1_");
     EXPECT_EQ(ret, 0);
     ret = accessor.delete_prefix("data/10000/2_");
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 57ebb8c1347..f1834ad8003 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -3406,6 +3406,216 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) {
     
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 1);
 }
 
+TEST(RecyclerTest, recycle_tablet_without_resource_id) {
+    auto* sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&sp](int*) {
+        sp->clear_all_call_backs();
+        sp->clear_trace();
+        sp->disable_processing();
+    });
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    EXPECT_EQ(txn_kv->init(), 0);
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string key;
+    std::string val;
+
+    InstanceKeyInfo key_info {"test_instance"};
+    instance_key(key_info, &key);
+    InstanceInfoPB instance;
+    instance.set_instance_id("GetObjStoreInfoTestInstance");
+
+    auto accessor = std::make_shared<MockAccessor>();
+    EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0);
+    sp->set_call_back(
+            "InstanceRecycler::init_storage_vault_accessors.mock_vault", 
[&accessor](auto&& args) {
+                auto* map = try_any_cast<
+                        std::unordered_map<std::string, 
std::shared_ptr<StorageVaultAccessor>>*>(
+                        args[0]);
+                auto* vault = try_any_cast<StorageVaultPB*>(args[1]);
+                if (vault->name() == "test_success_hdfs_vault") {
+                    map->emplace(vault->id(), accessor);
+                }
+            });
+    sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta", 
[](auto&& args) {
+        auto* resp = try_any_cast<GetRowsetResponse*>(args[0]);
+        auto* rs = resp->add_rowset_meta();
+        EXPECT_EQ(rs->has_resource_id(), false);
+    });
+    sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
+        auto* ret = try_any_cast_ret<int>(args);
+        ret->first = -1;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    // succeed to init MockAccessor
+    {
+        HdfsBuildConf hdfs_build_conf;
+        StorageVaultPB vault;
+        hdfs_build_conf.set_fs_name("fs_name");
+        hdfs_build_conf.set_user("root");
+        HdfsVaultInfo hdfs_info;
+        hdfs_info.set_prefix("root_path");
+        hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+        vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+        vault.set_name("test_success_hdfs_vault");
+        vault.set_id("success_vault");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "4"}), 
vault.SerializeAsString());
+    }
+
+    val = instance.SerializeAsString();
+    txn->put(key, val);
+    EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+    EXPECT_EQ(accessor->exists("data/0/test.csv"), 0);
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    EXPECT_EQ(recycler.init(), 0);
+    EXPECT_EQ(recycler.accessor_map_.size(), 1);
+
+    // useful mock accessor
+    
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 0);
+
+    // recycle tablet will fail because unuseful obj accessor can not 
connectted
+    EXPECT_EQ(recycler.recycle_tablet(0), -1);
+    // no resource id, cannot recycle
+    
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 0);
+}
+
+TEST(RecyclerTest, recycle_tablet_with_wrong_resource_id) {
+    auto* sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&sp](int*) {
+        sp->clear_all_call_backs();
+        sp->clear_trace();
+        sp->disable_processing();
+    });
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    EXPECT_EQ(txn_kv->init(), 0);
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string key;
+    std::string val;
+
+    InstanceKeyInfo key_info {"test_instance"};
+    instance_key(key_info, &key);
+    InstanceInfoPB instance;
+    instance.set_instance_id("GetObjStoreInfoTestInstance");
+
+    auto accessor = std::make_shared<MockAccessor>();
+    EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0);
+    sp->set_call_back(
+            "InstanceRecycler::init_storage_vault_accessors.mock_vault", 
[&accessor](auto&& args) {
+                auto* map = try_any_cast<
+                        std::unordered_map<std::string, 
std::shared_ptr<StorageVaultAccessor>>*>(
+                        args[0]);
+                auto* vault = try_any_cast<StorageVaultPB*>(args[1]);
+                if (vault->name() == "test_success_hdfs_vault") {
+                    map->emplace(vault->id(), accessor);
+                }
+            });
+    sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta", 
[](auto&& args) {
+        auto* resp = try_any_cast<GetRowsetResponse*>(args[0]);
+        auto* rs = resp->add_rowset_meta();
+        rs->set_resource_id("no_id");
+    });
+    sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
+        auto* ret = try_any_cast_ret<int>(args);
+        ret->first = -1;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    // succeed to init MockAccessor
+    {
+        HdfsBuildConf hdfs_build_conf;
+        StorageVaultPB vault;
+        hdfs_build_conf.set_fs_name("fs_name");
+        hdfs_build_conf.set_user("root");
+        HdfsVaultInfo hdfs_info;
+        hdfs_info.set_prefix("root_path");
+        hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+        vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+        vault.set_name("test_success_hdfs_vault");
+        vault.set_id("success_vault");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "4"}), 
vault.SerializeAsString());
+    }
+
+    val = instance.SerializeAsString();
+    txn->put(key, val);
+    EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+    EXPECT_EQ(accessor->exists("data/0/test.csv"), 0);
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    EXPECT_EQ(recycler.init(), 0);
+    EXPECT_EQ(recycler.accessor_map_.size(), 1);
+
+    // useful mock accessor
+    
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 0);
+
+    // recycle tablet will fail because unuseful obj accessor can not 
connectted
+    EXPECT_EQ(recycler.recycle_tablet(0), -1);
+    // no resource id, cannot recycle
+    
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 0);
+}
+
+TEST(RecyclerTest, init_all_vault_accessors_failed_test) {
+    auto* sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&sp](int*) {
+        sp->clear_all_call_backs();
+        sp->clear_trace();
+        sp->disable_processing();
+    });
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    EXPECT_EQ(txn_kv->init(), 0);
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string key;
+    std::string val;
+
+    InstanceKeyInfo key_info {"test_instance"};
+    instance_key(key_info, &key);
+    InstanceInfoPB instance;
+    instance.set_instance_id("GetObjStoreInfoTestInstance");
+    // failed to init because S3Conf::from_obj_store_info() fails
+    {
+        ObjectStoreInfoPB obj_info;
+        StorageVaultPB vault;
+        obj_info.set_id("id");
+        obj_info.set_ak("ak");
+        obj_info.set_sk("sk");
+        vault.mutable_obj_info()->MergeFrom(obj_info);
+        vault.set_name("test_failed_s3_vault");
+        vault.set_id("failed_s3");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        txn->put(storage_vault_key({instance.instance_id(), "1"}), 
vault.SerializeAsString());
+    }
+
+    sp->set_call_back("S3Accessor::init.s3_init_failed", [](auto&& args) {
+        auto* ret = try_any_cast_ret<int>(args);
+        ret->first = -1;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    EXPECT_EQ(recycler.init(), -2);
+}
+
 TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
     auto* sp = SyncPoint::get_instance();
     std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&sp](int*) {
@@ -3563,4 +3773,104 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
     }
 }
 
+TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) {
+    auto* sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&sp](int*) {
+        sp->clear_all_call_backs();
+        sp->clear_trace();
+        sp->disable_processing();
+    });
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id("delete_tmp_rowset_data_with_idx_v2");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("delete_tmp_rowset_data_with_idx_v2");
+
+    doris::TabletSchemaCloudPB schema;
+    schema.set_schema_version(1);
+    auto index = schema.add_index();
+    index->set_index_id(1);
+    index->set_index_type(IndexType::INVERTED);
+
+    sp->set_call_back("InstanceRecycler::delete_rowset_data.tmp_rowset", 
[](auto&& args) {
+        auto* ret = try_any_cast<int*>(args[0]);
+        *ret = 1;
+    });
+    sp->set_call_back("InstanceRecycler::delete_rowset_data.no_resource_id", 
[](auto&& args) {
+        auto* map = try_any_cast<
+                std::unordered_map<std::string, 
std::shared_ptr<StorageVaultAccessor>>*>(args[0]);
+        map->erase("no_resource_id");
+        ;
+    });
+    sp->enable_processing();
+
+    {
+        InstanceRecycler recycler(txn_kv, instance, thread_group,
+                                  std::make_shared<TxnLazyCommitter>(txn_kv));
+        ASSERT_EQ(recycler.init(), 0);
+        auto accessor = recycler.accessor_map_.begin()->second;
+        std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+        doris::RowsetMetaCloudPB rowset;
+        rowset.set_rowset_id(0); // useless but required
+        rowset.set_rowset_id_v2("1");
+        rowset.set_num_segments(1);
+        rowset.set_tablet_id(10000);
+        rowset.set_index_id(10001);
+        rowset.set_resource_id("delete_tmp_rowset_data_with_idx_v2");
+        rowset.set_schema_version(schema.schema_version());
+        rowset.mutable_tablet_schema()->CopyFrom(schema);
+        create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
+        rowset.clear_tablet_schema();
+        rowset_pbs.emplace_back(rowset);
+
+        rowset.set_rowset_id(0); // useless but required
+        rowset.set_rowset_id_v2("2");
+        rowset.set_num_segments(1);
+        rowset.set_tablet_id(20000);
+        rowset.set_index_id(20001);
+        rowset.set_resource_id("no_resource_id");
+        rowset.set_schema_version(schema.schema_version());
+        rowset.mutable_tablet_schema()->CopyFrom(schema);
+        create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
+        rowset.clear_tablet_schema();
+        rowset_pbs.emplace_back(rowset);
+
+        std::unordered_set<std::string> list_files;
+        std::unique_ptr<ListIterator> iter;
+        EXPECT_EQ(accessor->list_all(&iter), 0);
+        EXPECT_TRUE(iter->has_next());
+        list_files.clear();
+        for (auto file = iter->next(); file.has_value(); file = iter->next()) {
+            list_files.insert(file->path);
+        }
+        EXPECT_EQ(list_files.size(), 4);
+        // before delete tmp rowset, segment file and idx v2 exist
+        EXPECT_TRUE(list_files.contains("data/10000/1_0.dat"));
+        EXPECT_TRUE(list_files.contains("data/10000/1_0.idx"));
+        EXPECT_TRUE(list_files.contains("data/20000/2_0.dat"));
+        EXPECT_TRUE(list_files.contains("data/20000/2_0.idx"));
+
+        EXPECT_EQ(-1, recycler.delete_rowset_data(rowset_pbs, 
RowsetRecyclingState::TMP_ROWSET));
+        list_files.clear();
+        iter.reset();
+        EXPECT_EQ(accessor->list_all(&iter), 0);
+        EXPECT_TRUE(iter->has_next());
+        for (auto file = iter->next(); file.has_value(); file = iter->next()) {
+            list_files.insert(file->path);
+        }
+        EXPECT_TRUE(list_files.contains("data/20000/2_0.dat"));
+        EXPECT_TRUE(list_files.contains("data/20000/2_0.idx"));
+        // after delete tmp rowset, for valit resource id rowset, both file 
and idx v2 are removed
+        EXPECT_EQ(list_files.size(), 2);
+    }
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to