This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9e23a2fd1cd [feat](snapshot) Support storage vault for clone instance
(#63217)
9e23a2fd1cd is described below
commit 9e23a2fd1cdd4565492ac408dc862b325a230403
Author: Yixuan Wang <[email protected]>
AuthorDate: Fri May 15 11:11:24 2026 +0800
[feat](snapshot) Support storage vault for clone instance (#63217)
---
cloud/src/meta-service/meta_service_resource.cpp | 520 ++++++++++++++++++-----
cloud/test/resource_test.cpp | 166 ++++++++
2 files changed, 589 insertions(+), 97 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_resource.cpp
b/cloud/src/meta-service/meta_service_resource.cpp
index 1c511488673..ed36b54322c 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -325,6 +325,96 @@ static int find_cascade_instances(TxnKv* txn_kv, const
std::string& root_instanc
return 0;
}
+static int find_storage_vault_position_by_id(const InstanceInfoPB& instance,
+ std::string_view vault_id) {
+ auto id_itr =
+ std::find(instance.resource_ids().begin(),
instance.resource_ids().end(), vault_id);
+ if (id_itr == instance.resource_ids().end()) {
+ return -1;
+ }
+ return static_cast<int>(id_itr - instance.resource_ids().begin());
+}
+
+static int find_storage_vault_id_by_name(const InstanceInfoPB& instance,
+ std::string_view vault_name,
std::string* vault_id) {
+ auto name_itr = std::find_if(
+ instance.storage_vault_names().begin(),
instance.storage_vault_names().end(),
+ [&](const auto& current_name) { return current_name == vault_name;
});
+ if (name_itr == instance.storage_vault_names().end()) {
+ return -1;
+ }
+ int pos = static_cast<int>(name_itr -
instance.storage_vault_names().begin());
+ *vault_id = instance.resource_ids().Get(pos);
+ return 0;
+}
+
+static int alter_instance_obj_store_info_by_id(InstanceInfoPB& instance,
+ std::string_view target_obj_id,
std::string_view ak,
+ std::string_view sk,
std::string_view role_arn,
+ std::string_view external_id,
+ const EncryptionInfoPB&
encryption_info,
+ MetaServiceCode& code,
std::string& msg) {
+ auto& obj_info =
const_cast<std::decay_t<decltype(instance.obj_info())>&>(instance.obj_info());
+ for (auto& it : obj_info) {
+ if (it.id() != target_obj_id) {
+ continue;
+ }
+
+ if (role_arn.empty()) {
+ if (it.ak() == ak && it.sk() == sk) {
+ code = MetaServiceCode::OK;
+ msg = "ak/sk not changed";
+ return 1;
+ }
+ it.clear_role_arn();
+ it.clear_external_id();
+ it.clear_cred_provider_type();
+
+ it.set_ak(std::string(ak));
+ it.set_sk(std::string(sk));
+ it.mutable_encryption_info()->CopyFrom(encryption_info);
+ } else {
+ if (!ak.empty() || !sk.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "invaild argument, both set ak/sk and role_arn is not
allowed";
+ LOG(INFO) << msg;
+ return -1;
+ }
+
+ if (it.provider() != ObjectStoreInfoPB::S3) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "role_arn is only supported for s3 provider";
+ LOG(INFO) << msg << " provider=" << it.provider();
+ return -1;
+ }
+
+ if (it.role_arn() == role_arn && it.external_id() == external_id) {
+ code = MetaServiceCode::OK;
+ msg = "ak/sk not changed";
+ return 1;
+ }
+ it.clear_ak();
+ it.clear_sk();
+ it.clear_encryption_info();
+
+ it.set_role_arn(std::string(role_arn));
+ it.set_external_id(std::string(external_id));
+ it.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE);
+ }
+
+ auto now_time = std::chrono::system_clock::now();
+ uint64_t time =
+
std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch())
+ .count();
+ it.set_mtime(time);
+ return 0;
+ }
+
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("obj info id={} not found", target_obj_id);
+ return -1;
+}
+
// Helper function to update AK/SK for a single instance
// Returns 0 on success, -1 on error
static int update_instance_ak_sk(InstanceInfoPB& instance, const
UpdateAkSkRequest* request,
@@ -805,9 +895,11 @@ static bool vault_exist(const InstanceInfoPB& instance,
const std::string& new_v
return false;
}
-static int alter_hdfs_storage_vault(InstanceInfoPB& instance,
std::unique_ptr<Transaction>& txn,
- const StorageVaultPB& vault,
MetaServiceCode& code,
- std::string& msg,
AlterObjStoreInfoResponse* response) {
+static int alter_hdfs_storage_vault_by_id(InstanceInfoPB& instance,
+ std::unique_ptr<Transaction>& txn,
+ std::string_view target_vault_id,
+ const StorageVaultPB& vault,
MetaServiceCode& code,
+ std::string& msg,
AlterObjStoreInfoResponse* response) {
if (!vault.has_hdfs_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
@@ -825,19 +917,25 @@ static int alter_hdfs_storage_vault(InstanceInfoPB&
instance, std::unique_ptr<Tr
return -1;
}
const auto& name = vault.name();
- // Here we try to get mutable iter since we might need to alter the vault
name
- auto name_itr =
std::find_if(instance.mutable_storage_vault_names()->begin(),
- instance.mutable_storage_vault_names()->end(),
- [&](const auto& vault_name) { return name ==
vault_name; });
- if (name_itr == instance.storage_vault_names().end()) {
+ int pos = find_storage_vault_position_by_id(instance, target_vault_id);
+ if (pos < 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
- ss << "invalid storage vault name, not found, name =" << name;
+ ss << "invalid storage vault id, not found, id =" << target_vault_id;
msg = ss.str();
return -1;
}
- auto pos = name_itr - instance.storage_vault_names().begin();
- std::string vault_id = instance.resource_ids().begin()[pos];
+ auto* storage_vault_names = instance.mutable_storage_vault_names();
+ auto* name_ptr = storage_vault_names->Mutable(pos);
+ DCHECK(name_ptr != nullptr);
+ const std::string old_name = *name_ptr;
+ if (old_name != name) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("storage vault id={} name mismatch, expected={},
actual={}",
+ target_vault_id, name, old_name);
+ return -1;
+ }
+ std::string vault_id = instance.resource_ids().Get(pos);
auto vault_key = storage_vault_key({instance.instance_id(), vault_id});
std::string val;
@@ -881,7 +979,10 @@ static int alter_hdfs_storage_vault(InstanceInfoPB&
instance, std::unique_ptr<Tr
}
new_vault.set_name(vault.alter_name());
- *name_itr = vault.alter_name();
+ *name_ptr = vault.alter_name();
+ if (instance.default_storage_vault_id() == vault_id) {
+ instance.set_default_storage_vault_name(vault.alter_name());
+ }
}
auto* alter_hdfs_info = new_vault.mutable_hdfs_info();
if (hdfs_info.build_conf().has_hdfs_kerberos_keytab()) {
@@ -918,9 +1019,24 @@ static int alter_hdfs_storage_vault(InstanceInfoPB&
instance, std::unique_ptr<Tr
return 0;
}
-static int alter_s3_storage_vault(InstanceInfoPB& instance,
std::unique_ptr<Transaction>& txn,
- const StorageVaultPB& vault,
MetaServiceCode& code,
- std::string& msg, AlterObjStoreInfoResponse*
response) {
+static int alter_hdfs_storage_vault(InstanceInfoPB& instance,
std::unique_ptr<Transaction>& txn,
+ const StorageVaultPB& vault,
MetaServiceCode& code,
+ std::string& msg,
AlterObjStoreInfoResponse* response) {
+ std::string vault_id;
+ if (find_storage_vault_id_by_name(instance, vault.name(), &vault_id) != 0)
{
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ std::stringstream ss;
+ ss << "invalid storage vault name, not found, name =" << vault.name();
+ msg = ss.str();
+ return -1;
+ }
+ return alter_hdfs_storage_vault_by_id(instance, txn, vault_id, vault,
code, msg, response);
+}
+
+static int alter_s3_storage_vault_by_id(InstanceInfoPB& instance,
std::unique_ptr<Transaction>& txn,
+ std::string_view target_vault_id,
+ const StorageVaultPB& vault,
MetaServiceCode& code,
+ std::string& msg,
AlterObjStoreInfoResponse* response) {
if (!vault.has_obj_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
@@ -939,19 +1055,25 @@ static int alter_s3_storage_vault(InstanceInfoPB&
instance, std::unique_ptr<Tran
}
const auto& name = vault.name();
- // Here we try to get mutable iter since we might need to alter the vault
name
- auto name_itr =
std::find_if(instance.mutable_storage_vault_names()->begin(),
- instance.mutable_storage_vault_names()->end(),
- [&](const auto& vault_name) { return name ==
vault_name; });
- if (name_itr == instance.storage_vault_names().end()) {
+ int pos = find_storage_vault_position_by_id(instance, target_vault_id);
+ if (pos < 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
- ss << "invalid storage vault name, not found, name =" << name;
+ ss << "invalid storage vault id, not found, id =" << target_vault_id;
msg = ss.str();
return -1;
}
- auto pos = name_itr - instance.storage_vault_names().begin();
- std::string vault_id = instance.resource_ids().begin()[pos];
+ auto* storage_vault_names = instance.mutable_storage_vault_names();
+ auto* name_ptr = storage_vault_names->Mutable(pos);
+ DCHECK(name_ptr != nullptr);
+ const std::string old_name = *name_ptr;
+ if (old_name != name) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("storage vault id={} name mismatch, expected={},
actual={}",
+ target_vault_id, name, old_name);
+ return -1;
+ }
+ std::string vault_id = instance.resource_ids().Get(pos);
auto vault_key = storage_vault_key({instance.instance_id(), vault_id});
std::string val;
@@ -995,7 +1117,10 @@ static int alter_s3_storage_vault(InstanceInfoPB&
instance, std::unique_ptr<Tran
}
new_vault.set_name(vault.alter_name());
- *name_itr = vault.alter_name();
+ *name_ptr = vault.alter_name();
+ if (instance.default_storage_vault_id() == vault_id) {
+ instance.set_default_storage_vault_name(vault.alter_name());
+ }
}
if (obj_info.has_role_arn() && (obj_info.has_ak() || obj_info.has_sk())) {
@@ -1074,6 +1199,20 @@ static int alter_s3_storage_vault(InstanceInfoPB&
instance, std::unique_ptr<Tran
return 0;
}
+static int alter_s3_storage_vault(InstanceInfoPB& instance,
std::unique_ptr<Transaction>& txn,
+ const StorageVaultPB& vault,
MetaServiceCode& code,
+ std::string& msg, AlterObjStoreInfoResponse*
response) {
+ std::string vault_id;
+ if (find_storage_vault_id_by_name(instance, vault.name(), &vault_id) != 0)
{
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ std::stringstream ss;
+ ss << "invalid storage vault name, not found, name =" << vault.name();
+ msg = ss.str();
+ return -1;
+ }
+ return alter_s3_storage_vault_by_id(instance, txn, vault_id, vault, code,
msg, response);
+}
+
struct ObjectStorageDesc {
std::string& ak;
std::string& sk;
@@ -1307,6 +1446,17 @@ void
MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
return;
}
+ bool supports_cascade = request->op() ==
AlterObjStoreInfoRequest::ALTER_S3_VAULT ||
+ request->op() ==
AlterObjStoreInfoRequest::ALTER_HDFS_VAULT;
+ std::string root_vault_id;
+ if (supports_cascade &&
+ find_storage_vault_id_by_name(instance, request->vault().name(),
&root_vault_id) != 0) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("invalid storage vault name, not found, name ={}",
+ request->vault().name());
+ return;
+ }
+
switch (request->op()) {
case AlterObjStoreInfoRequest::ADD_S3_VAULT: {
if (!instance.enable_storage_vault()) {
@@ -1494,6 +1644,115 @@ void
MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
+ return;
+ }
+
+ async_notify_refresh_instance(txn_kv_, instance_id, true);
+
+ if (!supports_cascade) {
+ return;
+ }
+
+ if (!instance.has_snapshot_switch_status() ||
+ instance.snapshot_switch_status() == SNAPSHOT_SWITCH_DISABLED) {
+ LOG(INFO) << "snapshot disabled for instance_id=" << instance_id
+ << ", skip cascade updating derived instances after
alter_storage_vault";
+ return;
+ }
+
+ std::vector<std::string> cascade_instance_ids;
+ if (find_cascade_instances(txn_kv_.get(), instance_id,
&cascade_instance_ids) != 0) {
+ LOG(WARNING) << "failed to find derived instances for storage vault
cascade, instance_id="
+ << instance_id;
+ return;
+ }
+
+ for (const auto& cascade_id : cascade_instance_ids) {
+ std::unique_ptr<Transaction> cascade_txn;
+ TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn);
+ if (cascade_err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(cascade_err);
+ msg = fmt::format(
+ "failed to create txn for derived storage vault update,
instance_id={}, "
+ "err={}",
+ cascade_id, cascade_err);
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ std::string cascade_key;
+ std::string cascade_val;
+ instance_key({cascade_id}, &cascade_key);
+ cascade_err = cascade_txn->get(cascade_key, &cascade_val);
+ if (cascade_err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(cascade_err);
+ msg = fmt::format(
+ "failed to get derived instance for storage vault update,
instance_id={}, "
+ "err={}",
+ cascade_id, cascade_err);
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ InstanceInfoPB cascade_instance;
+ if (!cascade_instance.ParseFromString(cascade_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format(
+ "failed to parse derived InstanceInfoPB for storage vault
update, "
+ "instance_id={}",
+ cascade_id);
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ MetaServiceCode cascade_code = MetaServiceCode::OK;
+ std::string cascade_msg;
+ AlterObjStoreInfoResponse cascade_response;
+ int ret = -1;
+ if (request->op() == AlterObjStoreInfoRequest::ALTER_S3_VAULT) {
+ ret = alter_s3_storage_vault_by_id(cascade_instance, cascade_txn,
root_vault_id,
+ request->vault(), cascade_code,
cascade_msg,
+ &cascade_response);
+ } else if (request->op() ==
AlterObjStoreInfoRequest::ALTER_HDFS_VAULT) {
+ ret = alter_hdfs_storage_vault_by_id(cascade_instance,
cascade_txn, root_vault_id,
+ request->vault(),
cascade_code, cascade_msg,
+ &cascade_response);
+ }
+ if (ret != 0) {
+ code = cascade_code;
+ msg = fmt::format(
+ "failed to cascade storage vault update, instance_id={},
vault_id={}, msg={}",
+ cascade_id, root_vault_id, cascade_msg);
+ LOG(WARNING) << msg << " code=" << static_cast<int>(code);
+ return;
+ }
+
+ cascade_val = cascade_instance.SerializeAsString();
+ if (cascade_val.empty()) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ msg = fmt::format(
+ "failed to serialize derived instance after storage vault
update, "
+ "instance_id={}",
+ cascade_id);
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ cascade_txn->atomic_add(system_meta_service_instance_update_key(), 1);
+ cascade_txn->put(cascade_key, cascade_val);
+ cascade_err = cascade_txn->commit();
+ if (cascade_err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(cascade_err);
+ msg = fmt::format(
+ "failed to commit derived storage vault update,
instance_id={}, err={}",
+ cascade_id, cascade_err);
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ async_notify_refresh_instance(txn_kv_, cascade_id, true);
+ LOG(INFO) << "cascade storage vault update finished,
root_instance_id=" << instance_id
+ << " derived_instance_id=" << cascade_id << " vault_id=" <<
root_vault_id;
}
}
@@ -1585,73 +1844,28 @@ void
MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont
return;
}
+ bool supports_cascade = request->op() ==
AlterObjStoreInfoRequest::ALTER_OBJ_INFO ||
+ request->op() ==
AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK;
+ std::string root_obj_id =
+ request->has_obj() && request->obj().has_id() ?
request->obj().id() : "0";
+
switch (request->op()) {
case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK:
case AlterObjStoreInfoRequest::ALTER_OBJ_INFO: {
- // get id
- std::string id = request->obj().has_id() ? request->obj().id() : "0";
- int idx = std::stoi(id);
+ int idx = std::stoi(root_obj_id);
if (idx < 1 || idx > instance.obj_info().size()) {
// err
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "id invalid, please check it";
return;
}
- auto& obj_info =
-
const_cast<std::decay_t<decltype(instance.obj_info())>&>(instance.obj_info());
- for (auto& it : obj_info) {
- if (std::stoi(it.id()) == idx) {
- if (role_arn.empty()) {
- if (it.ak() == ak && it.sk() == sk) {
- // not change, just return ok
- code = MetaServiceCode::OK;
- msg = "ak/sk not changed";
- return;
- }
- it.clear_role_arn();
- it.clear_external_id();
- it.clear_cred_provider_type();
-
- it.set_ak(ak);
- it.set_sk(sk);
- it.mutable_encryption_info()->CopyFrom(encryption_info);
- } else {
- if (!ak.empty() || !sk.empty()) {
- code = MetaServiceCode::INVALID_ARGUMENT;
- msg = "invaild argument, both set ak/sk and role_arn
is not allowed";
- LOG(INFO) << msg;
- return;
- }
-
- if (it.provider() != ObjectStoreInfoPB::S3) {
- code = MetaServiceCode::INVALID_ARGUMENT;
- msg = "role_arn is only supported for s3 provider";
- LOG(INFO) << msg << " provider=" << it.provider();
- return;
- }
-
- if (it.role_arn() == role_arn && it.external_id() ==
external_id &&
- get_cred_provider_type(it) ==
get_cred_provider_type(request->obj())) {
- // not change, just return ok
- code = MetaServiceCode::OK;
- msg = "ak/sk not changed";
- return;
- }
- it.clear_ak();
- it.clear_sk();
- it.clear_encryption_info();
-
- it.set_role_arn(role_arn);
- it.set_external_id(external_id);
-
it.set_cred_provider_type(get_cred_provider_type(request->obj()));
- }
-
- auto now_time = std::chrono::system_clock::now();
- uint64_t time =
std::chrono::duration_cast<std::chrono::seconds>(
- now_time.time_since_epoch())
- .count();
- it.set_mtime(time);
+ int ret = alter_instance_obj_store_info_by_id(instance, root_obj_id,
ak, sk, role_arn,
+ external_id,
encryption_info, code, msg);
+ if (ret != 0) {
+ if (ret > 0) {
+ return;
}
+ return;
}
} break;
case AlterObjStoreInfoRequest::ADD_OBJ_INFO: {
@@ -1727,6 +1941,105 @@ void
MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
+ return;
+ }
+
+ async_notify_refresh_instance(txn_kv_, instance_id, true);
+
+ if (!supports_cascade) {
+ return;
+ }
+
+ if (!instance.has_snapshot_switch_status() ||
+ instance.snapshot_switch_status() == SNAPSHOT_SWITCH_DISABLED) {
+ LOG(INFO) << "snapshot disabled for instance_id=" << instance_id
+ << ", skip cascade updating derived instances after
alter_obj_store_info";
+ return;
+ }
+
+ std::vector<std::string> cascade_instance_ids;
+ if (find_cascade_instances(txn_kv_.get(), instance_id,
&cascade_instance_ids) != 0) {
+ LOG(WARNING) << "failed to find derived instances for obj store
cascade, instance_id="
+ << instance_id;
+ return;
+ }
+
+ for (const auto& cascade_id : cascade_instance_ids) {
+ std::unique_ptr<Transaction> cascade_txn;
+ TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn);
+ if (cascade_err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(cascade_err);
+ msg = fmt::format(
+ "failed to create txn for derived obj store update,
instance_id={}, err={}",
+ cascade_id, cascade_err);
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ std::string cascade_key;
+ std::string cascade_val;
+ instance_key({cascade_id}, &cascade_key);
+ cascade_err = cascade_txn->get(cascade_key, &cascade_val);
+ if (cascade_err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(cascade_err);
+ msg = fmt::format(
+ "failed to get derived instance for obj store update,
instance_id={}, err={}",
+ cascade_id, cascade_err);
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ InstanceInfoPB cascade_instance;
+ if (!cascade_instance.ParseFromString(cascade_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format(
+ "failed to parse derived InstanceInfoPB for obj store
update, instance_id={}",
+ cascade_id);
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ MetaServiceCode cascade_code = MetaServiceCode::OK;
+ std::string cascade_msg;
+ int ret = alter_instance_obj_store_info_by_id(cascade_instance,
root_obj_id, ak, sk,
+ role_arn, external_id,
encryption_info,
+ cascade_code,
cascade_msg);
+ if (ret != 0) {
+ if (ret < 0) {
+ code = cascade_code;
+ msg = fmt::format(
+ "failed to cascade obj store update, instance_id={},
obj_info_id={}, "
+ "msg={}",
+ cascade_id, root_obj_id, cascade_msg);
+ LOG(WARNING) << msg << " code=" << static_cast<int>(code);
+ return;
+ }
+ }
+
+ cascade_val = cascade_instance.SerializeAsString();
+ if (cascade_val.empty()) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ msg = fmt::format(
+ "failed to serialize derived instance after obj store
update, instance_id={}",
+ cascade_id);
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ cascade_txn->atomic_add(system_meta_service_instance_update_key(), 1);
+ cascade_txn->put(cascade_key, cascade_val);
+ cascade_err = cascade_txn->commit();
+ if (cascade_err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(cascade_err);
+ msg = fmt::format("failed to commit derived obj store update,
instance_id={}, err={}",
+ cascade_id, cascade_err);
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ async_notify_refresh_instance(txn_kv_, cascade_id, true);
+ LOG(INFO) << "cascade obj store update finished, root_instance_id=" <<
instance_id
+ << " derived_instance_id=" << cascade_id << " obj_info_id="
<< root_obj_id;
}
}
@@ -1849,9 +2162,11 @@ void
MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller,
std::unique_ptr<Transaction> cascade_txn;
TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn);
if (cascade_err != TxnErrorCode::TXN_OK) {
- LOG(WARNING) << "failed to create txn for derived instance,
instance_id=" << cascade_id
- << " err=" << cascade_err;
- continue;
+ code = cast_as<ErrCategory::CREATE>(cascade_err);
+ msg = fmt::format("failed to create txn for derived instance,
instance_id={}, err={}",
+ cascade_id, cascade_err);
+ LOG(WARNING) << msg;
+ return;
}
InstanceKeyInfo cascade_key_info {cascade_id};
@@ -1861,15 +2176,20 @@ void
MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller,
cascade_err = cascade_txn->get(cascade_key, &cascade_val);
if (cascade_err != TxnErrorCode::TXN_OK) {
- LOG(WARNING) << "failed to get derived instance, instance_id=" <<
cascade_id
- << " err=" << cascade_err;
- continue;
+ code = cast_as<ErrCategory::READ>(cascade_err);
+ msg = fmt::format("failed to get derived instance, instance_id={},
err={}", cascade_id,
+ cascade_err);
+ LOG(WARNING) << msg;
+ return;
}
InstanceInfoPB cascade_instance;
if (!cascade_instance.ParseFromString(cascade_val)) {
- LOG(WARNING) << "failed to parse InstanceInfoPB for derived
instance_id=" << cascade_id;
- continue;
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("failed to parse InstanceInfoPB for derived
instance_id={}",
+ cascade_id);
+ LOG(WARNING) << msg;
+ return;
}
// Update the cascade instance using helper function
@@ -1878,24 +2198,30 @@ void
MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller,
std::string cascade_msg;
if (update_instance_ak_sk(cascade_instance, request, time,
cascade_code, cascade_msg,
cascade_update_record) != 0) {
- LOG(WARNING) << "failed to update derived instance, instance_id="
<< cascade_id
- << " msg=" << cascade_msg;
- continue;
+ code = cascade_code;
+ msg = fmt::format("failed to update derived instance,
instance_id={}, msg={}",
+ cascade_id, cascade_msg);
+ LOG(WARNING) << msg << " code=" << static_cast<int>(code);
+ return;
}
cascade_val = cascade_instance.SerializeAsString();
if (cascade_val.empty()) {
- LOG(WARNING) << "failed to serialize derived instance,
instance_id=" << cascade_id;
- continue;
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ msg = fmt::format("failed to serialize derived instance,
instance_id={}", cascade_id);
+ LOG(WARNING) << msg;
+ return;
}
cascade_txn->put(cascade_key, cascade_val);
cascade_err = cascade_txn->commit();
if (cascade_err != TxnErrorCode::TXN_OK) {
- LOG(WARNING) << "failed to commit derived instance txn,
instance_id=" << cascade_id
- << " err=" << cascade_err;
- continue;
+ code = cast_as<ErrCategory::COMMIT>(cascade_err);
+ msg = fmt::format("failed to commit derived instance txn,
instance_id={}, err={}",
+ cascade_id, cascade_err);
+ LOG(WARNING) << msg;
+ return;
}
async_notify_refresh_instance(txn_kv_, cascade_id, true);
diff --git a/cloud/test/resource_test.cpp b/cloud/test/resource_test.cpp
index ee3f8598df8..2c02d9bfccf 100644
--- a/cloud/test/resource_test.cpp
+++ b/cloud/test/resource_test.cpp
@@ -567,6 +567,84 @@ static void verify_instance_aksk(MetaServiceProxy*
meta_service, const std::stri
EXPECT_EQ(instance.obj_info(0).sk(), expected_sk);
}
+static void create_instance_with_storage_vault(MetaServiceProxy* meta_service,
+ const std::string& instance_id,
+ const std::string&
source_instance_id,
+ const std::string& vault_id,
+ const std::string& vault_name,
const std::string& ak,
+ const std::string& sk, bool
enable_snapshot = true) {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ instance.set_enable_storage_vault(true);
+ instance.add_resource_ids(vault_id);
+ instance.add_storage_vault_names(vault_name);
+ instance.set_default_storage_vault_id(vault_id);
+ instance.set_default_storage_vault_name(vault_name);
+
+ std::optional<Versionstamp> snapshot_version;
+ if (!source_instance_id.empty()) {
+ instance.set_source_instance_id(source_instance_id);
+ snapshot_version = next_test_snapshot_versionstamp();
+ instance.set_source_snapshot_id(snapshot_version->to_string());
+ }
+ instance.set_snapshot_switch_status(enable_snapshot ? SNAPSHOT_SWITCH_ON
+ :
SNAPSHOT_SWITCH_DISABLED);
+
+ StorageVaultPB vault;
+ vault.set_id(vault_id);
+ vault.set_name(vault_name);
+ auto* obj_info = vault.mutable_obj_info();
+ obj_info->set_id(vault_id);
+ obj_info->set_ak(ak);
+ obj_info->set_sk(sk);
+ obj_info->set_bucket("bucket");
+ obj_info->set_prefix("prefix");
+ obj_info->set_endpoint("endpoint");
+ obj_info->set_external_endpoint("external-endpoint");
+ obj_info->set_region("region");
+ obj_info->set_provider(ObjectStoreInfoPB::S3);
+
+ txn->put(instance_key({instance_id}), instance.SerializeAsString());
+ txn->put(storage_vault_key({instance_id, vault_id}),
vault.SerializeAsString());
+
+ if (snapshot_version.has_value()) {
+ versioned::SnapshotReferenceKeyInfo ref_key_info {source_instance_id,
*snapshot_version,
+ instance_id};
+ txn->put(versioned::snapshot_reference_key(ref_key_info), "");
+ }
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
+
+static void verify_storage_vault(MetaServiceProxy* meta_service, const
std::string& instance_id,
+ const std::string& vault_id, const
std::string& expected_name,
+ const std::string& expected_ak, const
std::string& expected_sk) {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string val;
+ ASSERT_EQ(txn->get(storage_vault_key({instance_id, vault_id}), &val),
TxnErrorCode::TXN_OK);
+
+ StorageVaultPB vault;
+ ASSERT_TRUE(vault.ParseFromString(val));
+ ASSERT_TRUE(vault.has_obj_info());
+ EXPECT_EQ(vault.name(), expected_name);
+ EXPECT_EQ(vault.obj_info().ak(), expected_ak);
+ EXPECT_EQ(vault.obj_info().sk(), expected_sk);
+
+ ASSERT_EQ(txn->get(instance_key({instance_id}), &val),
TxnErrorCode::TXN_OK);
+ InstanceInfoPB instance;
+ ASSERT_TRUE(instance.ParseFromString(val));
+ ASSERT_EQ(instance.resource_ids_size(), 1);
+ ASSERT_EQ(instance.storage_vault_names_size(), 1);
+ EXPECT_EQ(instance.resource_ids(0), vault_id);
+ EXPECT_EQ(instance.storage_vault_names(0), expected_name);
+ EXPECT_EQ(instance.default_storage_vault_id(), vault_id);
+ EXPECT_EQ(instance.default_storage_vault_name(), expected_name);
+}
+
// Test AK/SK cascade update: two-level cascade
TEST(AkSkCascadeTest, TwoLevelCascade) {
auto meta_service = get_meta_service();
@@ -856,6 +934,94 @@ TEST(AkSkCascadeTest, ChildWithoutObjInfo) {
sp->clear_all_call_backs();
}
+TEST(StorageVaultCascadeTest, AlterS3VaultCascadesToDerivedInstances) {
+ auto meta_service = get_meta_service();
+
+ auto sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
+ auto* ret = try_any_cast<int*>(args[0]);
+ *ret = 0;
+ auto* key = try_any_cast<std::string*>(args[1]);
+ *key = "selectdbselectdbselectdbselectdb";
+ auto* key_id = try_any_cast<int64_t*>(args[2]);
+ *key_id = 1;
+ });
+
+ create_instance_with_storage_vault(meta_service.get(), "vault_parent", "",
"2", "vault_old",
+ "old_ak", "old_sk");
+ create_instance_with_storage_vault(meta_service.get(), "vault_child",
"vault_parent", "2",
+ "vault_old", "old_ak", "old_sk");
+
+ AlterObjStoreInfoRequest req;
+ req.set_cloud_unique_id("1:vault_parent:test");
+ req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT);
+ StorageVaultPB vault;
+ vault.set_name("vault_old");
+ vault.set_alter_name("vault_new");
+ vault.mutable_obj_info()->set_ak("new_ak");
+ vault.mutable_obj_info()->set_sk("new_sk");
+ req.mutable_vault()->CopyFrom(vault);
+
+ brpc::Controller cntl;
+ AlterObjStoreInfoResponse res;
+ meta_service->alter_storage_vault(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
+
+ std::string cipher_sk = "HNAGUf23voYuuqV2BCX9Tw==";
+ verify_storage_vault(meta_service.get(), "vault_parent", "2", "vault_new",
"new_ak", cipher_sk);
+ verify_storage_vault(meta_service.get(), "vault_child", "2", "vault_new",
"new_ak", cipher_sk);
+
+ sp->disable_processing();
+ sp->clear_all_call_backs();
+}
+
+TEST(StorageVaultCascadeTest, SnapshotDisabledSkipsCascade) {
+ auto meta_service = get_meta_service();
+
+ auto sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
+ auto* ret = try_any_cast<int*>(args[0]);
+ *ret = 0;
+ auto* key = try_any_cast<std::string*>(args[1]);
+ *key = "selectdbselectdbselectdbselectdb";
+ auto* key_id = try_any_cast<int64_t*>(args[2]);
+ *key_id = 1;
+ });
+
+ create_instance_with_storage_vault(meta_service.get(),
"vault_parent_disabled", "", "2",
+ "vault_old", "old_ak", "old_sk",
+ /*enable_snapshot=*/false);
+ create_instance_with_storage_vault(meta_service.get(),
"vault_child_disabled",
+ "vault_parent_disabled", "2",
"vault_old", "old_ak",
+ "old_sk");
+
+ AlterObjStoreInfoRequest req;
+ req.set_cloud_unique_id("1:vault_parent_disabled:test");
+ req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT);
+ StorageVaultPB vault;
+ vault.set_name("vault_old");
+ vault.set_alter_name("vault_new");
+ vault.mutable_obj_info()->set_ak("new_ak");
+ vault.mutable_obj_info()->set_sk("new_sk");
+ req.mutable_vault()->CopyFrom(vault);
+
+ brpc::Controller cntl;
+ AlterObjStoreInfoResponse res;
+ meta_service->alter_storage_vault(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
+
+ std::string cipher_sk = "HNAGUf23voYuuqV2BCX9Tw==";
+ verify_storage_vault(meta_service.get(), "vault_parent_disabled", "2",
"vault_new", "new_ak",
+ cipher_sk);
+ verify_storage_vault(meta_service.get(), "vault_child_disabled", "2",
"vault_old", "old_ak",
+ "old_sk");
+
+ sp->disable_processing();
+ sp->clear_all_call_backs();
+}
+
TEST(ResourceTest, RollbackInstance) {
auto sp = SyncPoint::get_instance();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]