This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 991b9f64590 [fix](recycler) Fix premature exit recycling when there is
an invalid storage vault (#46798) (#47159)
991b9f64590 is described below
commit 991b9f645908f9699a8e6a999895d8dcd73eb6c5
Author: abmdocrt <[email protected]>
AuthorDate: Mon Jan 20 10:37:01 2025 +0800
[fix](recycler) Fix premature exit recycling when there is an invalid
storage vault (#46798) (#47159)
Related PR: Pick #46798 #47164
---
cloud/src/recycler/hdfs_accessor.cpp | 1 +
cloud/src/recycler/recycler.cpp | 221 ++++++++++++++++++++++++++++-------
cloud/test/recycler_test.cpp | 136 +++++++++++++++++++++
3 files changed, 314 insertions(+), 44 deletions(-)
diff --git a/cloud/src/recycler/hdfs_accessor.cpp
b/cloud/src/recycler/hdfs_accessor.cpp
index 1999bcfa165..024acd0efe7 100644
--- a/cloud/src/recycler/hdfs_accessor.cpp
+++ b/cloud/src/recycler/hdfs_accessor.cpp
@@ -356,6 +356,7 @@ std::string extract_parent_path(const std::string& path) {
}
int HdfsAccessor::init() {
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init.hdfs_init_failed",
(int)-1);
// TODO(plat1ko): Cache hdfsFS
fs_ = HDFSBuilder::create_fs(info_.build_conf());
if (!fs_) {
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 307528011ea..62a161e3edb 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -24,12 +24,14 @@
#include <atomic>
#include <chrono>
+#include <cstddef>
#include <cstdint>
#include <deque>
#include <string>
#include <string_view>
#include "common/stopwatch.h"
+#include "meta-service/meta_service.h"
#include "meta-service/meta_service_schema.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
@@ -249,8 +251,9 @@ void Recycler::recycle_callback() {
auto instance_recycler = std::make_shared<InstanceRecycler>(
txn_kv_, instance, _thread_pool_group, txn_lazy_committer_);
- if (instance_recycler->init() != 0) {
- LOG(WARNING) << "failed to init instance recycler, instance_id="
<< instance_id;
+ if (int r = instance_recycler->init(); r != 0) {
+ LOG(WARNING) << "failed to init instance recycler, instance_id="
<< instance_id
+ << " ret=" << r;
continue;
}
std::string recycle_job_key;
@@ -258,6 +261,8 @@ void Recycler::recycle_callback() {
int ret = prepare_instance_recycle_job(txn_kv_.get(), recycle_job_key,
instance_id,
ip_port_,
config::recycle_interval_seconds * 1000);
if (ret != 0) { // Prepare failed
+ LOG(WARNING) << "failed to prepare recycle_job, instance_id=" <<
instance_id
+ << " ret=" << ret;
continue;
} else {
std::lock_guard lock(mtx_);
@@ -276,7 +281,12 @@ void Recycler::recycle_callback() {
std::lock_guard lock(mtx_);
recycling_instance_map_.erase(instance_id);
}
- LOG_INFO("finish recycle instance").tag("instance_id", instance_id);
+ auto elpased_ms =
+ ctime_ms -
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ LOG_INFO("finish recycle instance")
+ .tag("instance_id", instance_id)
+ .tag("cost_ms", elpased_ms);
}
}
@@ -523,35 +533,37 @@ int InstanceRecycler::init_storage_vault_accessors() {
LOG(WARNING) << "malformed storage vault, unable to deserialize
key=" << hex(k);
return -1;
}
+
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
+ &accessor_map_, &vault);
if (vault.has_hdfs_info()) {
auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
int ret = accessor->init();
if (ret != 0) {
LOG(WARNING) << "failed to init hdfs accessor. instance_id="
<< instance_id_
- << " resource_id=" << vault.id() << " name=" <<
vault.name();
- return ret;
+ << " resource_id=" << vault.id() << " name=" <<
vault.name()
+ << " hdfs_vault=" <<
vault.hdfs_info().ShortDebugString();
+ continue;
}
accessor_map_.emplace(vault.id(), std::move(accessor));
} else if (vault.has_obj_info()) {
-#ifdef UNIT_TEST
- auto accessor = std::make_shared<MockAccessor>();
-#else
auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
if (!s3_conf) {
- LOG(WARNING) << "failed to init object accessor, instance_id="
<< instance_id_;
- return -1;
+ LOG(WARNING) << "failed to init object accessor, invalid conf,
instance_id="
+ << instance_id_ << " s3_vault=" <<
vault.obj_info().ShortDebugString();
+ continue;
}
std::shared_ptr<S3Accessor> accessor;
int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
if (ret != 0) {
LOG(WARNING) << "failed to init s3 accessor. instance_id=" <<
instance_id_
- << " resource_id=" << vault.id() << " name=" <<
vault.name();
- return ret;
+ << " resource_id=" << vault.id() << " name=" <<
vault.name()
+ << " ret=" << ret
+ << " s3_vault=" <<
vault.obj_info().ShortDebugString();
+ continue;
}
-#endif
accessor_map_.emplace(vault.id(), std::move(accessor));
}
@@ -562,6 +574,13 @@ int InstanceRecycler::init_storage_vault_accessors() {
return -1;
}
+ if (accessor_map_.empty()) {
+ LOG(WARNING) << "no accessors for instance=" << instance_id_;
+ return -2;
+ }
+ LOG_INFO("finish init instance recycler number_accessors={} instance=",
accessor_map_.size(),
+ instance_id_);
+
return 0;
}
@@ -1461,7 +1480,8 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
}
auto it = accessor_map_.find(rs.resource_id());
- if (it == accessor_map_.end()) [[unlikely]] { // impossible
+ // possible if the accessor is not initilized correctly
+ if (it == accessor_map_.end()) [[unlikely]] {
LOG_WARNING("instance has no such resource id")
.tag("instance_id", instance_id_)
.tag("resource_id", rs.resource_id());
@@ -1545,8 +1565,16 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
[](const int& ret) { return
ret != 0; });
for (auto& [resource_id, file_paths] : resource_file_paths) {
concurrent_delete_executor.add([&, rid = &resource_id, paths =
&file_paths]() -> int {
+ DCHECK(accessor_map_.count(*rid))
+ << "uninitilized accessor, instance_id=" << instance_id_
+ << " resource_id=" << resource_id << " path[0]=" <<
(*paths)[0];
+ if (!accessor_map_.contains(*rid)) {
+ LOG_WARNING("delete rowset data accessor_map_ does not
contains resouce id")
+ .tag("resource_id", resource_id)
+ .tag("instance_id", instance_id_);
+ return -1;
+ }
auto& accessor = accessor_map_[*rid];
- DCHECK(accessor);
return accessor->delete_files(*paths);
});
}
@@ -1576,7 +1604,9 @@ int InstanceRecycler::delete_rowset_data(const
std::string& resource_id, int64_t
if (it == accessor_map_.end()) {
LOG_WARNING("instance has no such resource id")
.tag("instance_id", instance_id_)
- .tag("resource_id", resource_id);
+ .tag("resource_id", resource_id)
+ .tag("tablet_id", tablet_id)
+ .tag("rowset_id", rowset_id);
return -1;
}
auto& accessor = it->second;
@@ -1588,42 +1618,107 @@ int InstanceRecycler::recycle_tablet(int64_t
tablet_id) {
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);
+ int ret = 0;
auto start_time = steady_clock::now();
+ // collect resource ids
+ std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0});
+ std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0});
+ std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id,
""});
+ std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id +
1, ""});
+
+ std::set<std::string> resource_ids;
+ int64_t recycle_rowsets_number = 0;
+ int64_t recycle_segments_number = 0;
+ int64_t recycle_rowsets_data_size = 0;
+ int64_t recycle_rowsets_index_size = 0;
+ int64_t max_rowset_version = 0;
+ int64_t min_rowset_creation_time = INT64_MAX;
+ int64_t max_rowset_creation_time = 0;
+ int64_t min_rowset_expiration_time = INT64_MAX;
+ int64_t max_rowset_expiration_time = 0;
+
std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s",
cost)
.tag("instance_id", instance_id_)
- .tag("tablet_id", tablet_id);
+ .tag("tablet_id", tablet_id)
+ .tag("recycle rowsets number", recycle_rowsets_number)
+ .tag("recycle segments number", recycle_segments_number)
+ .tag("all rowsets recycle data size",
recycle_rowsets_data_size)
+ .tag("all rowsets recycle index size",
recycle_rowsets_index_size)
+ .tag("max rowset version", max_rowset_version)
+ .tag("min rowset creation time", min_rowset_creation_time)
+ .tag("max rowset creation time", max_rowset_creation_time)
+ .tag("min rowset expiration time", min_rowset_expiration_time)
+ .tag("max rowset expiration time", max_rowset_expiration_time)
+ .tag("ret", ret);
});
- // delete all rowset kv in this tablet
- std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0});
- std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0});
- std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id,
""});
- std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id +
1, ""});
-
- int ret = 0;
std::unique_ptr<Transaction> txn;
if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
- LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id;
+ LOG_WARNING("failed to recycle tablet ")
+ .tag("tablet id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("reason", "failed to create txn");
ret = -1;
}
- txn->remove(rs_key0, rs_key1);
- txn->remove(recyc_rs_key0, recyc_rs_key1);
-
- // remove delete bitmap for MoW table
- std::string pending_key = meta_pending_delete_bitmap_key({instance_id_,
tablet_id});
- txn->remove(pending_key);
- std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_,
tablet_id, "", 0, 0});
- std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_,
tablet_id + 1, "", 0, 0});
- txn->remove(delete_bitmap_start, delete_bitmap_end);
-
- TxnErrorCode err = txn->commit();
- if (err != TxnErrorCode::TXN_OK) {
- LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id
<< ", err=" << err;
+ GetRowsetResponse resp;
+ std::string msg;
+ MetaServiceCode code = MetaServiceCode::OK;
+ // get rowsets in tablet
+ internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1,
instance_id_,
+ tablet_id, code, msg, &resp);
+ if (code != MetaServiceCode::OK) {
+ LOG_WARNING("failed to get rowsets of tablet when recycle tablet")
+ .tag("tablet id", tablet_id)
+ .tag("msg", msg)
+ .tag("code", code)
+ .tag("instance id", instance_id_);
ret = -1;
}
+
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_tablet.create_rowset_meta",
&resp);
+
+ for (const auto& rs_meta : resp.rowset_meta()) {
+ if (!rs_meta.has_resource_id()) {
+ LOG_WARNING("rowset meta does not have a resource id, impossible!")
+ .tag("rs_meta", rs_meta.ShortDebugString())
+ .tag("instance_id", instance_id_)
+ .tag("tablet_id", tablet_id);
+ return -1;
+ }
+ auto it = accessor_map_.find(rs_meta.resource_id());
+ // possible if the accessor is not initilized correctly
+ if (it == accessor_map_.end()) [[unlikely]] {
+ LOG_WARNING(
+ "failed to find resource id when recycle tablet, skip this
vault accessor "
+ "recycle process")
+ .tag("tablet id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("resource_id", rs_meta.resource_id())
+ .tag("rowset meta pb", rs_meta.ShortDebugString());
+ return -1;
+ }
+ recycle_rowsets_number += 1;
+ recycle_segments_number += rs_meta.num_segments();
+ recycle_rowsets_data_size += rs_meta.data_disk_size();
+ recycle_rowsets_index_size += rs_meta.index_disk_size();
+ max_rowset_version = std::max(max_rowset_version,
rs_meta.end_version());
+ min_rowset_creation_time = std::min(min_rowset_creation_time,
rs_meta.creation_time());
+ max_rowset_creation_time = std::max(max_rowset_creation_time,
rs_meta.creation_time());
+ min_rowset_expiration_time = std::min(min_rowset_expiration_time,
rs_meta.txn_expiration());
+ max_rowset_expiration_time = std::max(max_rowset_expiration_time,
rs_meta.txn_expiration());
+ resource_ids.emplace(rs_meta.resource_id());
+ }
+
+ LOG_INFO("recycle tablet start to delete object")
+ .tag("instance id", instance_id_)
+ .tag("tablet id", tablet_id)
+ .tag("recycle tablet resource ids are",
+ std::accumulate(resource_ids.begin(), resource_ids.end(),
std::string(),
+ [](const std::string& a, const std::string&
b) {
+ return a.empty() ? b : a + "," + b;
+ }));
SyncExecutor<int> concurrent_delete_executor(
_thread_pool_group.s3_producer_pool,
@@ -1631,16 +1726,20 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id)
{
[](const int& ret) { return ret != 0; });
// delete all rowset data in this tablet
- for (auto& [_, accessor] : accessor_map_) {
- concurrent_delete_executor.add([&, accessor_ptr = &accessor]() {
- if
((*accessor_ptr)->delete_directory(tablet_path_prefix(tablet_id)) != 0) {
+ // ATTN: there may be data leak if not all accessor initilized successfully
+ // partial data deleted if the tablet is stored cross-storage vault
+ // vault id is not attached to TabletMeta...
+ for (const auto& resource_id : resource_ids) {
+ concurrent_delete_executor.add([&, accessor_ptr =
accessor_map_[resource_id]]() {
+ if (accessor_ptr->delete_directory(tablet_path_prefix(tablet_id))
!= 0) {
LOG(WARNING) << "failed to delete rowset data of tablet " <<
tablet_id
- << " s3_path=" << accessor->uri();
+ << " path=" << accessor_ptr->uri();
return -1;
}
return 0;
});
}
+
bool finished = true;
std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
for (int r : rets) {
@@ -1651,6 +1750,40 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
ret = finished ? ret : -1;
+ if (ret != 0) { // failed recycle tablet data
+ LOG_WARNING("ret!=0")
+ .tag("finished", finished)
+ .tag("ret", ret)
+ .tag("instance_id", instance_id_)
+ .tag("tablet_id", tablet_id);
+ return ret;
+ }
+
+ txn.reset();
+ if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to recycle tablet ")
+ .tag("tablet id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("reason", "failed to create txn");
+ ret = -1;
+ }
+ // delete all rowset kv in this tablet
+ txn->remove(rs_key0, rs_key1);
+ txn->remove(recyc_rs_key0, recyc_rs_key1);
+
+ // remove delete bitmap for MoW table
+ std::string pending_key = meta_pending_delete_bitmap_key({instance_id_,
tablet_id});
+ txn->remove(pending_key);
+ std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_,
tablet_id, "", 0, 0});
+ std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_,
tablet_id + 1, "", 0, 0});
+ txn->remove(delete_bitmap_start, delete_bitmap_end);
+
+ TxnErrorCode err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id
<< ", err=" << err;
+ ret = -1;
+ }
+
if (ret == 0) {
// All object files under tablet have been deleted
std::lock_guard lock(recycled_tablets_mtx_);
@@ -2233,7 +2366,7 @@ int InstanceRecycler::abort_timeout_txn() {
txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED);
txn_info.set_finish_time(current_time);
txn_info.set_reason("timeout");
- VLOG_DEBUG << "txn_info=" << txn_info.DebugString();
+ VLOG_DEBUG << "txn_info=" << txn_info.ShortDebugString();
txn_inf_val.clear();
if (!txn_info.SerializeToString(&txn_inf_val)) {
LOG_WARNING("failed to serialize txn info").tag("key", hex(k));
@@ -2911,7 +3044,7 @@ int InstanceRecycler::recycle_expired_stage_objects() {
const auto& old_obj = instance_info_.obj_info()[idx - 1];
auto s3_conf = S3Conf::from_obj_store_info(old_obj);
if (!s3_conf) {
- LOG(WARNING) << "failed to init s3_conf with obj_info=" <<
old_obj.DebugString();
+ LOG(WARNING) << "failed to init s3_conf with obj_info=" <<
old_obj.ShortDebugString();
continue;
}
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index e38d25aaa84..42ab8f629cb 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -36,6 +36,7 @@
#include "meta-service/mem_txn_kv.h"
#include "meta-service/meta_service.h"
#include "meta-service/txn_kv_error.h"
+#include "mock_accessor.h"
#include "mock_resource_manager.h"
#include "rate-limiter/rate_limiter.h"
#include "recycler/checker.h"
@@ -3263,4 +3264,139 @@ TEST(RecyclerTest,
delete_rowset_data_without_inverted_index_storage_format) {
}
}
+TEST(RecyclerTest, init_vault_accessor_failed_test) {
+ auto* sp = SyncPoint::get_instance();
+ std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ });
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ EXPECT_EQ(txn_kv->init(), 0);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string key;
+ std::string val;
+
+ InstanceKeyInfo key_info {"test_instance"};
+ instance_key(key_info, &key);
+ InstanceInfoPB instance;
+ instance.set_instance_id("GetObjStoreInfoTestInstance");
+ // failed to init because S3Conf::from_obj_store_info() fails
+ {
+ ObjectStoreInfoPB obj_info;
+ StorageVaultPB vault;
+ obj_info.set_id("id");
+ obj_info.set_ak("ak");
+ obj_info.set_sk("sk");
+ vault.mutable_obj_info()->MergeFrom(obj_info);
+ vault.set_name("test_failed_s3_vault_1");
+ vault.set_id("failed_s3_1");
+ instance.add_storage_vault_names(vault.name());
+ instance.add_resource_ids(vault.id());
+ txn->put(storage_vault_key({instance.instance_id(), "1"}),
vault.SerializeAsString());
+ }
+
+ // succeed to init but unuseful
+ {
+ ObjectStoreInfoPB obj_info;
+ StorageVaultPB vault;
+ obj_info.set_id("id");
+ obj_info.set_ak("ak");
+ obj_info.set_sk("sk");
+ obj_info.set_provider(ObjectStoreInfoPB_Provider_COS);
+ vault.mutable_obj_info()->MergeFrom(obj_info);
+ vault.set_name("test_failed_s3_vault_2");
+ vault.set_id("failed_s3_2");
+ instance.add_storage_vault_names(vault.name());
+ instance.add_resource_ids(vault.id());
+ instance.set_instance_id("GetObjStoreInfoTestInstance");
+ txn->put(storage_vault_key({instance.instance_id(), "2"}),
vault.SerializeAsString());
+ }
+
+ // failed to init because accessor->init() fails
+ {
+ HdfsBuildConf hdfs_build_conf;
+ StorageVaultPB vault;
+ hdfs_build_conf.set_fs_name("fs_name");
+ hdfs_build_conf.set_user("root");
+ HdfsVaultInfo hdfs_info;
+ hdfs_info.set_prefix("root_path");
+ hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+ vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+ vault.set_name("test_failed_hdfs_vault_1");
+ vault.set_id("failed_hdfs_1");
+ instance.add_storage_vault_names(vault.name());
+ instance.add_resource_ids(vault.id());
+ instance.set_instance_id("GetObjStoreInfoTestInstance");
+ txn->put(storage_vault_key({instance.instance_id(), "3"}),
vault.SerializeAsString());
+ }
+
+ auto accessor = std::make_shared<MockAccessor>();
+ EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0);
+ sp->set_call_back(
+ "InstanceRecycler::init_storage_vault_accessors.mock_vault",
[&accessor](auto&& args) {
+ auto* map = try_any_cast<
+ std::unordered_map<std::string,
std::shared_ptr<StorageVaultAccessor>>*>(
+ args[0]);
+ auto* vault = try_any_cast<StorageVaultPB*>(args[1]);
+ if (vault->name() == "test_success_hdfs_vault") {
+ map->emplace(vault->id(), accessor);
+ }
+ });
+ sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta",
[](auto&& args) {
+ auto* resp = try_any_cast<GetRowsetResponse*>(args[0]);
+ auto* rs = resp->add_rowset_meta();
+ rs->set_resource_id("failed_s3_2");
+ rs = resp->add_rowset_meta();
+ rs->set_resource_id("success_vault");
+ });
+ sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = -1;
+ ret->second = true;
+ });
+
+ sp->enable_processing();
+
+ // succeed to init MockAccessor
+ {
+ HdfsBuildConf hdfs_build_conf;
+ StorageVaultPB vault;
+ hdfs_build_conf.set_fs_name("fs_name");
+ hdfs_build_conf.set_user("root");
+ HdfsVaultInfo hdfs_info;
+ hdfs_info.set_prefix("root_path");
+ hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+ vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+ vault.set_name("test_success_hdfs_vault");
+ vault.set_id("success_vault");
+ instance.add_storage_vault_names(vault.name());
+ instance.add_resource_ids(vault.id());
+ instance.set_instance_id("GetObjStoreInfoTestInstance");
+ txn->put(storage_vault_key({instance.instance_id(), "4"}),
vault.SerializeAsString());
+ }
+ val = instance.SerializeAsString();
+ txn->put(key, val);
+ EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ EXPECT_EQ(accessor->exists("data/0/test.csv"), 0);
+
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ EXPECT_EQ(recycler.init(), 0);
+ EXPECT_EQ(recycler.accessor_map_.size(), 2);
+
+ // unuseful obj accessor
+
EXPECT_EQ(recycler.accessor_map_.at("failed_s3_2")->exists("data/0/test.csv"),
-1);
+ // useful mock accessor
+
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
0);
+
+ // recycle tablet will fail because unuseful obj accessor can not
connectted
+ EXPECT_EQ(recycler.recycle_tablet(0), -1);
+ // however, useful mock accessor can recycle tablet
+
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
1);
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]