This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e957773463c [enhancement](cloud) Add checker and more bvar for cloud
restore (#54912)
e957773463c is described below
commit e957773463c1a86a43c76ba0c0c516530659ca2b
Author: xy720 <[email protected]>
AuthorDate: Fri Aug 22 02:32:27 2025 +0800
[enhancement](cloud) Add checker and more bvar for cloud restore (#54912)
set config::enable_checker = true;
set config::enable_restore_job_check = true;
The 511112 is the instance id:
```
curl http://127.0.0.1:5000/vars | grep restore_job | grep 511112
checker_restore_job_committed_state_511112 : 0
checker_restore_job_completed_state_511112 : 269
checker_restore_job_cost_many_time_511112 : 0
checker_restore_job_dropped_state_511112 : 0
checker_restore_job_prepared_state_511112 : 20
checker_restore_job_recycling_state_511112 : 322
ms_commit_restore_job_511112_count : 660
ms_commit_restore_job_511112_latency : 0
ms_commit_restore_job_511112_latency_80 : 0
ms_commit_restore_job_511112_latency_90 : 0
ms_commit_restore_job_511112_latency_99 : 0
ms_commit_restore_job_511112_latency_999 : 0
ms_commit_restore_job_511112_latency_9999 : 0
ms_commit_restore_job_511112_max_latency : 0
ms_commit_restore_job_511112_qps : 0
ms_finish_restore_job_511112_count : 660
ms_finish_restore_job_511112_latency : 0
ms_finish_restore_job_511112_latency_80 : 0
ms_finish_restore_job_511112_latency_90 : 0
ms_finish_restore_job_511112_latency_99 : 0
ms_finish_restore_job_511112_latency_999 : 0
ms_finish_restore_job_511112_latency_9999 : 0
ms_finish_restore_job_511112_max_latency : 0
ms_finish_restore_job_511112_qps : 0
ms_prepare_restore_job_511112_count : 660
ms_prepare_restore_job_511112_latency : 0
ms_prepare_restore_job_511112_latency_80 : 0
ms_prepare_restore_job_511112_latency_90 : 0
ms_prepare_restore_job_511112_latency_99 : 0
ms_prepare_restore_job_511112_latency_999 : 0
ms_prepare_restore_job_511112_latency_9999 : 0
ms_prepare_restore_job_511112_max_latency : 0
ms_prepare_restore_job_511112_qps : 0
```
---
be/src/cloud/cloud_meta_mgr.cpp | 4 +-
cloud/src/common/bvars.cpp | 6 ++
cloud/src/common/bvars.h | 19 ++++++
cloud/src/common/config.h | 1 +
cloud/src/meta-service/meta_service.cpp | 37 ++++++++++-
cloud/src/recycler/checker.cpp | 113 +++++++++++++++++++++++++++++++-
cloud/src/recycler/checker.h | 2 +
cloud/test/meta_service_test.cpp | 56 +++++++++++++++-
cloud/test/recycler_test.cpp | 75 +++++++++++++++++++++
gensrc/proto/cloud.proto | 9 ++-
10 files changed, 313 insertions(+), 9 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 86de6080585..1eb647fe39e 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1155,6 +1155,7 @@ Status CloudMetaMgr::prepare_restore_job(const
TabletMetaPB& tablet_meta) {
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet_meta.tablet_id());
req.set_expiration(config::snapshot_expire_time_sec);
+ req.set_action(RestoreJobRequest::PREPARE);
doris_tablet_meta_to_cloud(req.mutable_tablet_meta(),
std::move(tablet_meta));
return retry_rpc("prepare restore job", req, &resp,
&MetaService_Stub::prepare_restore_job);
@@ -1166,6 +1167,7 @@ Status CloudMetaMgr::commit_restore_job(const int64_t
tablet_id) {
RestoreJobResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMMIT);
return retry_rpc("commit restore job", req, &resp,
&MetaService_Stub::commit_restore_job);
}
@@ -1177,7 +1179,7 @@ Status CloudMetaMgr::finish_restore_job(const int64_t
tablet_id, bool is_complet
RestoreJobResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet_id);
- req.set_is_completed(is_completed);
+ req.set_action(is_completed ? RestoreJobRequest::COMPLETE :
RestoreJobRequest::ABORT);
return retry_rpc("finish restore job", req, &resp,
&MetaService_Stub::finish_restore_job);
}
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 1678f29bdff..eb8618d4048 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -231,6 +231,12 @@ BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_leaked_delete_bitmaps("checke
BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_abnormal_delete_bitmaps("checker",
"abnormal_delete_bitmaps");
BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_delete_bitmaps_scanned("checker",
"delete_bitmap_keys_scanned");
BvarStatusWithTag<int64_t>
g_bvar_max_rowsets_with_useless_delete_bitmap_version("checker",
"max_rowsets_with_useless_delete_bitmap_version");
+BvarStatusWithTag<int64_t>
g_bvar_checker_restore_job_prepared_state("checker",
"restore_job_prepared_state");
+BvarStatusWithTag<int64_t>
g_bvar_checker_restore_job_committed_state("checker",
"restore_job_committed_state");
+BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_dropped_state("checker",
"restore_job_dropped_state");
+BvarStatusWithTag<int64_t>
g_bvar_checker_restore_job_completed_state("checker",
"restore_job_completed_state");
+BvarStatusWithTag<int64_t>
g_bvar_checker_restore_job_recycling_state("checker",
"restore_job_recycling_state");
+BvarStatusWithTag<int64_t>
g_bvar_checker_restore_job_cost_many_time("checker",
"restore_job_cost_many_time");
// rpc kv rw count
// get_rowset
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index c5f4d93d2a0..92c5bd326e1 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -375,6 +375,13 @@ extern BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_abnormal_delete_bitmap
extern BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_delete_bitmaps_scanned;
extern BvarStatusWithTag<int64_t>
g_bvar_max_rowsets_with_useless_delete_bitmap_version;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_prepared_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_committed_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_dropped_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_completed_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_recycling_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_cost_many_time;
+
// rpc kv
extern mBvarInt64Adder g_bvar_rpc_kv_get_rowset_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_version_get_counter;
@@ -422,6 +429,12 @@ extern mBvarInt64Adder
g_bvar_rpc_kv_commit_partition_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_commit_partition_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_check_kv_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_obj_store_info_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_alter_storage_vault_get_counter;
@@ -528,6 +541,12 @@ extern mBvarInt64Adder
g_bvar_rpc_kv_commit_partition_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_commit_partition_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_check_kv_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_obj_store_info_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_alter_storage_vault_get_bytes;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 26c5c2d8615..10aa7b99dd0 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -118,6 +118,7 @@ CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h
CONF_Bool(force_immediate_recycle, "false");
CONF_mBool(enable_mow_job_key_check, "false");
+CONF_mBool(enable_restore_job_check, "false");
CONF_mBool(enable_checker_for_meta_key_check, "false");
CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index b51738bc471..53aa719e84c 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1328,6 +1328,13 @@ void
MetaServiceImpl::prepare_restore_job(::google::protobuf::RpcController* con
RestoreJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(prepare_restore_job);
+ if (request->action() != RestoreJobRequest::PREPARE) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "invalid action, expected PREPARE but got " +
+ RestoreJobRequest::Action_Name(request->action());
+ return;
+ }
+
if (!request->has_tablet_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty tablet_id";
@@ -1509,6 +1516,13 @@ void
MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
RestoreJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(commit_restore_job);
+ if (request->action() != RestoreJobRequest::COMMIT) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "invalid action, expected COMMIT but got " +
+ RestoreJobRequest::Action_Name(request->action());
+ return;
+ }
+
if (!request->has_tablet_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty tablet_id";
@@ -1846,6 +1860,14 @@ void
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
RestoreJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(finish_restore_job);
+ if (request->action() != RestoreJobRequest::COMPLETE &&
+ request->action() != RestoreJobRequest::ABORT) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "invalid action, expected COMPLETE or ABORT but got " +
+ RestoreJobRequest::Action_Name(request->action());
+ return;
+ }
+
if (!request->has_tablet_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty tablet_id";
@@ -1900,7 +1922,6 @@ void
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
return;
}
- bool is_completed = request->has_is_completed() && request->is_completed();
if (restore_job_pb.state() == RestoreJobCloudPB::DROPPED ||
restore_job_pb.state() == RestoreJobCloudPB::COMPLETED) {
LOG_INFO("restore job already finished")
@@ -1917,7 +1938,8 @@ void
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
return;
} else {
// PREPARED, COMMITTED state
- if (is_completed && restore_job_pb.state() !=
RestoreJobCloudPB::COMMITTED) {
+ if (request->action() == RestoreJobRequest::COMPLETE &&
+ restore_job_pb.state() != RestoreJobCloudPB::COMMITTED) {
// Only allow COMMITTED -> COMPLETED
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("restore tablet {} in invalid state to complete,
state: {}",
@@ -1925,10 +1947,21 @@ void
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
RestoreJobCloudPB::State_Name(restore_job_pb.state()));
return;
}
+ if (request->action() == RestoreJobRequest::ABORT &&
+ (restore_job_pb.state() != RestoreJobCloudPB::PREPARED &&
+ restore_job_pb.state() != RestoreJobCloudPB::COMMITTED)) {
+ // Only allow PREPARED/COMMITTED -> DROPPED
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("restore tablet {} in invalid state to abort,
state: {}",
+ tablet_idx.tablet_id(),
+
RestoreJobCloudPB::State_Name(restore_job_pb.state()));
+ return;
+ }
}
// 2. update restore job
std::string to_save_val;
+ bool is_completed = request->action() == RestoreJobRequest::COMPLETE;
restore_job_pb.set_state(is_completed ? RestoreJobCloudPB::COMPLETED
: RestoreJobCloudPB::DROPPED);
restore_job_pb.set_need_recycle_data(!is_completed);
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index 7d3cf1f1347..07faef16077 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -198,6 +198,12 @@ int Checker::start() {
}
}
+ if (config::enable_restore_job_check) {
+ if (int ret = checker->do_restore_job_check(); ret != 0) {
+ success = false;
+ }
+ }
+
if (config::enable_delete_bitmap_storage_optimize_v2_check) {
if (int ret =
checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
ret != 0) {
@@ -1375,7 +1381,7 @@ int
InstanceChecker::check_inverted_index_file_storage_format_v1(
// Garbage data leak
// clang-format off
LOG(WARNING) << "rowset_index_cache_v1.segment_ids don't contains
segment_id, rowset should be recycled,"
- << " key = " << file_path
+ << " key = " << file_path
<< " segment_id = " << segment_id;
// clang-format on
return 1;
@@ -1385,7 +1391,7 @@ int
InstanceChecker::check_inverted_index_file_storage_format_v1(
// Garbage data leak
// clang-format off
LOG(WARNING) << "rowset_index_cache_v1.index_ids don't contains
index_id_with_suffix_name,"
- << " rowset with inde meta should be recycled, key=" <<
file_path
+ << " rowset with inde meta should be recycled, key=" <<
file_path
<< " index_id_with_suffix_name=" <<
index_id_with_suffix_name;
// clang-format on
return 1;
@@ -1758,4 +1764,107 @@ int InstanceChecker::do_mow_job_key_check() {
return 0;
}
+int InstanceChecker::do_restore_job_check() {
+ int64_t num_prepared = 0;
+ int64_t num_committed = 0;
+ int64_t num_dropped = 0;
+ int64_t num_completed = 0;
+ int64_t num_recycling = 0;
+ int64_t num_cost_many_time = 0;
+ const int64_t COST_MANY_THRESHOLD = 3600;
+
+ using namespace std::chrono;
+ auto start_time = steady_clock::now();
+ DORIS_CLOUD_DEFER {
+ g_bvar_checker_restore_job_prepared_state.put(instance_id_,
num_prepared);
+ g_bvar_checker_restore_job_committed_state.put(instance_id_,
num_committed);
+ g_bvar_checker_restore_job_dropped_state.put(instance_id_,
num_dropped);
+ g_bvar_checker_restore_job_completed_state.put(instance_id_,
num_completed);
+ g_bvar_checker_restore_job_recycling_state.put(instance_id_,
num_recycling);
+ g_bvar_checker_restore_job_cost_many_time.put(instance_id_,
num_cost_many_time);
+ auto cost_ms =
+ duration_cast<std::chrono::milliseconds>(steady_clock::now() -
start_time).count();
+ LOG(INFO) << "check instance restore jobs finished, cost=" << cost_ms
+ << "ms. instance_id=" << instance_id_ << " num_prepared=" <<
num_prepared
+ << " num_committed=" << num_committed << " num_dropped=" <<
num_dropped
+ << " num_completed=" << num_completed << " num_recycling="
<< num_recycling
+ << " num_cost_many_time=" << num_cost_many_time;
+ };
+
+ LOG_INFO("begin to check restore jobs").tag("instance_id", instance_id_);
+
+ JobRestoreTabletKeyInfo restore_job_key_info0 {instance_id_, 0};
+ JobRestoreTabletKeyInfo restore_job_key_info1 {instance_id_, INT64_MAX};
+ std::string begin;
+ std::string end;
+ job_restore_tablet_key(restore_job_key_info0, &begin);
+ job_restore_tablet_key(restore_job_key_info1, &end);
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn";
+ return -1;
+ }
+ err = txn->get(begin, end, &it);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
+ return -1;
+ }
+ if (!it->has_next()) {
+ break;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ RestoreJobCloudPB restore_job_pb;
+ if (!restore_job_pb.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed restore job value").tag("key", hex(k));
+ return -1;
+ }
+
+ switch (restore_job_pb.state()) {
+ case RestoreJobCloudPB::PREPARED:
+ ++num_prepared;
+ break;
+ case RestoreJobCloudPB::COMMITTED:
+ ++num_committed;
+ break;
+ case RestoreJobCloudPB::DROPPED:
+ ++num_dropped;
+ break;
+ case RestoreJobCloudPB::COMPLETED:
+ ++num_completed;
+ break;
+ case RestoreJobCloudPB::RECYCLING:
+ ++num_recycling;
+ break;
+ default:
+ break;
+ }
+
+ int64_t current_time = ::time(nullptr);
+ if ((restore_job_pb.state() == RestoreJobCloudPB::PREPARED ||
+ restore_job_pb.state() == RestoreJobCloudPB::COMMITTED) &&
+ current_time > restore_job_pb.ctime_s() + COST_MANY_THRESHOLD)
{
+ // restore job run more than 1 hour
+ ++num_cost_many_time;
+ LOG_WARNING("restore job cost too many time")
+ .tag("key", hex(k))
+ .tag("tablet_id", restore_job_pb.tablet_id())
+ .tag("state", restore_job_pb.state())
+ .tag("ctime_s", restore_job_pb.ctime_s())
+ .tag("mtime_s", restore_job_pb.mtime_s());
+ }
+
+ if (!it->has_next()) {
+ begin = k;
+ begin.push_back('\x00'); // Update to next smallest key for
iteration
+ break;
+ }
+ }
+ } while (it->more() && !stopped());
+ return 0;
+}
+
} // namespace doris::cloud
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index fb8f084297d..436e1302b07 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -107,6 +107,8 @@ public:
int do_mow_job_key_check();
+ int do_restore_job_check();
+
// If there are multiple buckets, return the minimum lifecycle; if there
are no buckets (i.e.
// all accessors are HdfsAccessor), return INT64_MAX.
// Return 0 if success, otherwise error
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 4471b5a3658..5912d51b8ee 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -10604,6 +10604,11 @@ TEST(MetaServiceTest, RestoreJobTest) {
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
txn->put(meta_tablet_idx_key({instance_id, tablet_id}),
tablet_idx_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ // empty action
+ meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+ ASSERT_TRUE(res.status().msg().find("invalid action") !=
std::string::npos);
+ req.set_action(RestoreJobRequest::PREPARE);
// empty tablet id
meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
@@ -10616,6 +10621,13 @@ TEST(MetaServiceTest, RestoreJobTest) {
meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_EQ(res.status().msg(), "no tablet meta");
+
+ // check key existence
+ std::string restore_job_key = job_restore_tablet_key({instance_id,
tablet_id});
+ std::string val;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(restore_job_key, &val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
req.Clear();
res.Clear();
}
@@ -10627,6 +10639,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
req.set_tablet_id(tablet_id);
req.set_expiration(time(nullptr) + 3600);
+ req.set_action(RestoreJobRequest::PREPARE);
// set tablet meta
auto* tablet_meta = req.mutable_tablet_meta();
@@ -10667,6 +10680,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
txn->put(meta_tablet_idx_key({instance_id, tablet_id}),
tablet_idx_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::PREPARE);
// set tablet meta
auto* tablet_meta = req.mutable_tablet_meta();
@@ -10709,10 +10723,23 @@ TEST(MetaServiceTest, RestoreJobTest) {
// invalid args commit restore job
{
reset_meta_service();
+ // empty action
+ meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+ ASSERT_TRUE(res.status().msg().find("invalid action") !=
std::string::npos);
+ req.set_action(RestoreJobRequest::COMMIT);
+
// empty tablet_id
meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_EQ(res.status().msg(), "empty tablet_id");
+
+ // check key existence
+ std::string restore_job_key = job_restore_tablet_key({instance_id,
tablet_id});
+ std::string val;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(restore_job_key, &val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
req.Clear();
res.Clear();
}
@@ -10724,6 +10751,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMMIT);
meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_EQ(res.status().msg(), "restore job not exists or has been
recycled");
@@ -10742,6 +10770,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
RestoreJobResponse make_res;
make_req.set_tablet_id(tablet_id);
make_req.set_expiration(time(nullptr) + 3600);
+ make_req.set_action(RestoreJobRequest::PREPARE);
auto* tablet_meta = make_req.mutable_tablet_meta();
tablet_meta->set_table_id(table_id);
tablet_meta->set_index_id(index_id);
@@ -10767,6 +10796,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
// commit_restore_job
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMMIT);
meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
std::string tablet_key =
@@ -10812,6 +10842,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
RestoreJobResponse make_res;
make_req.set_tablet_id(tablet_id);
make_req.set_expiration(time(nullptr) + 3600);
+ make_req.set_action(RestoreJobRequest::PREPARE);
auto* tablet_meta = make_req.mutable_tablet_meta();
tablet_meta->set_table_id(table_id);
@@ -10843,6 +10874,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
// commit_restore_job
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMMIT);
meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
@@ -10876,10 +10908,23 @@ TEST(MetaServiceTest, RestoreJobTest) {
// invalid args finish restore job
{
reset_meta_service();
+ // empty action
+ meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+ ASSERT_TRUE(res.status().msg().find("invalid action") !=
std::string::npos);
+ req.set_action(RestoreJobRequest::COMPLETE);
+
// empty tablet_id
meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_EQ(res.status().msg(), "empty tablet_id");
+
+ // check key existence
+ std::string restore_job_key = job_restore_tablet_key({instance_id,
tablet_id});
+ std::string val;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(restore_job_key, &val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
req.Clear();
res.Clear();
}
@@ -10891,6 +10936,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMPLETE);
meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_EQ(res.status().msg(), "restore job not exists or has been
recycled");
@@ -10909,6 +10955,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
RestoreJobResponse make_res;
make_req.set_tablet_id(tablet_id);
make_req.set_expiration(time(nullptr) + 3600);
+ make_req.set_action(RestoreJobRequest::PREPARE);
auto* tablet_meta = make_req.mutable_tablet_meta();
tablet_meta->set_table_id(table_id);
@@ -10934,6 +10981,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
ASSERT_EQ(txn->get(restore_job_rs_key, &val), TxnErrorCode::TXN_OK);
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMMIT);
meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
@@ -10942,7 +10990,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
// finish_restore_job to COMPLETED
req.set_tablet_id(tablet_id);
- req.set_is_completed(true);
+ req.set_action(RestoreJobRequest::COMPLETE);
meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
@@ -10975,6 +11023,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
RestoreJobResponse make_res;
make_req.set_tablet_id(tablet_id);
make_req.set_expiration(time(nullptr) + 3600);
+ make_req.set_action(RestoreJobRequest::PREPARE);
auto* tablet_meta = make_req.mutable_tablet_meta();
tablet_meta->set_table_id(table_id);
@@ -10996,7 +11045,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
// finish_restore_job to DROPPED
req.set_tablet_id(tablet_id);
- req.set_is_completed(false);
+ req.set_action(RestoreJobRequest::ABORT);
meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
@@ -11029,6 +11078,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
RestoreJobResponse make_res;
make_req.set_tablet_id(tablet_id);
make_req.set_expiration(time(nullptr) + 3600);
+ make_req.set_action(RestoreJobRequest::PREPARE);
// set tablet meta
auto* tablet_meta = make_req.mutable_tablet_meta();
@@ -11045,7 +11095,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
// finish_restore_job to COMPLETED
req.set_tablet_id(tablet_id);
- req.set_is_completed(true);
+ req.set_action(RestoreJobRequest::COMPLETE);
meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_TRUE(res.status().msg().find("invalid state to complete") !=
std::string::npos);
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 7030d6d418e..fd7a5699ec6 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -2275,6 +2275,20 @@ TEST(RecyclerTest, recycle_restore_jobs) {
auto begin_key = job_restore_tablet_key({instance_id, 0});
auto end_key = job_restore_tablet_key({instance_id, INT64_MAX});
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(it->size(), 20);
+
+ begin_key = job_restore_rowset_key({instance_id, 0, 0});
+ end_key = job_restore_rowset_key({instance_id, INT64_MAX, 0});
+ ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(it->size(), 100);
+
+ ASSERT_EQ(recycler.recycle_restore_jobs(), 0);
+
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ begin_key = job_restore_tablet_key({instance_id, 0});
+ end_key = job_restore_tablet_key({instance_id, INT64_MAX});
+ ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
ASSERT_EQ(it->size(), 0);
begin_key = job_restore_rowset_key({instance_id, 0, 0});
@@ -4540,6 +4554,67 @@ TEST(CheckerTest, check_job_key) {
ASSERT_EQ(checker.do_mow_job_key_check(), -1);
}
+TEST(CheckerTest, do_restore_job_check) {
+ config::enable_restore_job_check = true;
+ std::string instance_id = "test_do_restore_job_check";
+ [[maybe_unused]] auto sp = SyncPoint::get_instance();
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+ sp->set_call_back("get_instance_id", [&](auto&& args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("1");
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+
+ // Prepare test data: simulate restore jobs in different states
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+ // Add a PREPARED restore job
+ RestoreJobCloudPB prepared_job;
+ prepared_job.set_tablet_id(10001);
+ prepared_job.set_state(RestoreJobCloudPB::PREPARED);
+ prepared_job.set_ctime_s(::time(nullptr) - 1800); // 30 minutes ago
+ std::string prepared_key;
+ job_restore_tablet_key({instance_id, prepared_job.tablet_id()},
&prepared_key);
+ txn->put(prepared_key, prepared_job.SerializeAsString());
+
+ // Add a COMMITTED restore job
+ RestoreJobCloudPB committed_job;
+ committed_job.set_tablet_id(10002);
+ committed_job.set_state(RestoreJobCloudPB::COMMITTED);
+ committed_job.set_ctime_s(::time(nullptr) - 7200); // 2 hours ago
+ std::string committed_key;
+ job_restore_tablet_key({instance_id, committed_job.tablet_id()},
&committed_key);
+ txn->put(committed_key, committed_job.SerializeAsString());
+
+ // Add a COMPLETED restore job
+ RestoreJobCloudPB completed_job;
+ completed_job.set_tablet_id(10003);
+ completed_job.set_state(RestoreJobCloudPB::COMPLETED);
+ completed_job.set_ctime_s(::time(nullptr) - 3600); // 1 hour ago
+ std::string completed_key;
+ job_restore_tablet_key({instance_id, completed_job.tablet_id()},
&completed_key);
+ txn->put(completed_key, completed_job.SerializeAsString());
+
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
+
+ // Run the check
+ ASSERT_EQ(checker.do_restore_job_check(), 0);
+}
+
TEST(CheckerTest, delete_bitmap_storage_optimize_v2_check_normal) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 52ddcebcc83..6636b31af57 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1277,12 +1277,19 @@ message PartitionResponse {
}
message RestoreJobRequest {
+ enum Action {
+ UNKONWN = 0;
+ PREPARE = 1;
+ COMMIT = 2;
+ ABORT = 3;
+ COMPLETE = 4;
+ }
optional string cloud_unique_id = 1;
optional int64 tablet_id = 2;
optional doris.TabletMetaCloudPB tablet_meta = 3;
optional int64 expiration = 4;
optional string request_ip = 5;
- optional bool is_completed = 6;
+ optional Action action = 6;
}
message RestoreJobResponse {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]