This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e957773463c [enhancement](cloud) Add checker and more bvar for cloud 
restore (#54912)
e957773463c is described below

commit e957773463c1a86a43c76ba0c0c516530659ca2b
Author: xy720 <[email protected]>
AuthorDate: Fri Aug 22 02:32:27 2025 +0800

    [enhancement](cloud) Add checker and more bvar for cloud restore (#54912)
    
    set config::enable_checker = true;
    set config::enable_restore_job_check = true;
    
    The 511112 is the instance id:
    
    ```
    curl http://127.0.0.1:5000/vars | grep restore_job | grep 511112
    
    checker_restore_job_committed_state_511112 : 0
    checker_restore_job_completed_state_511112 : 269
    checker_restore_job_cost_many_time_511112 : 0
    checker_restore_job_dropped_state_511112 : 0
    checker_restore_job_prepared_state_511112 : 20
    checker_restore_job_recycling_state_511112 : 322
    
    ms_commit_restore_job_511112_count : 660
    ms_commit_restore_job_511112_latency : 0
    ms_commit_restore_job_511112_latency_80 : 0
    ms_commit_restore_job_511112_latency_90 : 0
    ms_commit_restore_job_511112_latency_99 : 0
    ms_commit_restore_job_511112_latency_999 : 0
    ms_commit_restore_job_511112_latency_9999 : 0
    ms_commit_restore_job_511112_max_latency : 0
    ms_commit_restore_job_511112_qps : 0
    
    ms_finish_restore_job_511112_count : 660
    ms_finish_restore_job_511112_latency : 0
    ms_finish_restore_job_511112_latency_80 : 0
    ms_finish_restore_job_511112_latency_90 : 0
    ms_finish_restore_job_511112_latency_99 : 0
    ms_finish_restore_job_511112_latency_999 : 0
    ms_finish_restore_job_511112_latency_9999 : 0
    ms_finish_restore_job_511112_max_latency : 0
    ms_finish_restore_job_511112_qps : 0
    
    ms_prepare_restore_job_511112_count : 660
    ms_prepare_restore_job_511112_latency : 0
    ms_prepare_restore_job_511112_latency_80 : 0
    ms_prepare_restore_job_511112_latency_90 : 0
    ms_prepare_restore_job_511112_latency_99 : 0
    ms_prepare_restore_job_511112_latency_999 : 0
    ms_prepare_restore_job_511112_latency_9999 : 0
    ms_prepare_restore_job_511112_max_latency : 0
    ms_prepare_restore_job_511112_qps : 0
    ```
---
 be/src/cloud/cloud_meta_mgr.cpp         |   4 +-
 cloud/src/common/bvars.cpp              |   6 ++
 cloud/src/common/bvars.h                |  19 ++++++
 cloud/src/common/config.h               |   1 +
 cloud/src/meta-service/meta_service.cpp |  37 ++++++++++-
 cloud/src/recycler/checker.cpp          | 113 +++++++++++++++++++++++++++++++-
 cloud/src/recycler/checker.h            |   2 +
 cloud/test/meta_service_test.cpp        |  56 +++++++++++++++-
 cloud/test/recycler_test.cpp            |  75 +++++++++++++++++++++
 gensrc/proto/cloud.proto                |   9 ++-
 10 files changed, 313 insertions(+), 9 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 86de6080585..1eb647fe39e 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1155,6 +1155,7 @@ Status CloudMetaMgr::prepare_restore_job(const 
TabletMetaPB& tablet_meta) {
     req.set_cloud_unique_id(config::cloud_unique_id);
     req.set_tablet_id(tablet_meta.tablet_id());
     req.set_expiration(config::snapshot_expire_time_sec);
+    req.set_action(RestoreJobRequest::PREPARE);
 
     doris_tablet_meta_to_cloud(req.mutable_tablet_meta(), 
std::move(tablet_meta));
     return retry_rpc("prepare restore job", req, &resp, 
&MetaService_Stub::prepare_restore_job);
@@ -1166,6 +1167,7 @@ Status CloudMetaMgr::commit_restore_job(const int64_t 
tablet_id) {
     RestoreJobResponse resp;
     req.set_cloud_unique_id(config::cloud_unique_id);
     req.set_tablet_id(tablet_id);
+    req.set_action(RestoreJobRequest::COMMIT);
 
     return retry_rpc("commit restore job", req, &resp, 
&MetaService_Stub::commit_restore_job);
 }
@@ -1177,7 +1179,7 @@ Status CloudMetaMgr::finish_restore_job(const int64_t 
tablet_id, bool is_complet
     RestoreJobResponse resp;
     req.set_cloud_unique_id(config::cloud_unique_id);
     req.set_tablet_id(tablet_id);
-    req.set_is_completed(is_completed);
+    req.set_action(is_completed ? RestoreJobRequest::COMPLETE : 
RestoreJobRequest::ABORT);
 
     return retry_rpc("finish restore job", req, &resp, 
&MetaService_Stub::finish_restore_job);
 }
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 1678f29bdff..eb8618d4048 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -231,6 +231,12 @@ BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_leaked_delete_bitmaps("checke
 BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_abnormal_delete_bitmaps("checker", 
"abnormal_delete_bitmaps");
 BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_delete_bitmaps_scanned("checker", 
"delete_bitmap_keys_scanned");
 BvarStatusWithTag<int64_t> 
g_bvar_max_rowsets_with_useless_delete_bitmap_version("checker", 
"max_rowsets_with_useless_delete_bitmap_version");
+BvarStatusWithTag<int64_t> 
g_bvar_checker_restore_job_prepared_state("checker", 
"restore_job_prepared_state");
+BvarStatusWithTag<int64_t> 
g_bvar_checker_restore_job_committed_state("checker", 
"restore_job_committed_state");
+BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_dropped_state("checker", 
"restore_job_dropped_state");
+BvarStatusWithTag<int64_t> 
g_bvar_checker_restore_job_completed_state("checker", 
"restore_job_completed_state");
+BvarStatusWithTag<int64_t> 
g_bvar_checker_restore_job_recycling_state("checker", 
"restore_job_recycling_state");
+BvarStatusWithTag<int64_t> 
g_bvar_checker_restore_job_cost_many_time("checker", 
"restore_job_cost_many_time");
 
 // rpc kv rw count
 // get_rowset
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index c5f4d93d2a0..92c5bd326e1 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -375,6 +375,13 @@ extern BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_abnormal_delete_bitmap
 extern BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_delete_bitmaps_scanned;
 extern BvarStatusWithTag<int64_t> 
g_bvar_max_rowsets_with_useless_delete_bitmap_version;
 
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_prepared_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_committed_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_dropped_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_completed_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_recycling_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_cost_many_time;
+
 // rpc kv
 extern mBvarInt64Adder g_bvar_rpc_kv_get_rowset_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_version_get_counter;
@@ -422,6 +429,12 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_commit_partition_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_commit_partition_del_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_check_kv_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_obj_store_info_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_alter_storage_vault_get_counter;
@@ -528,6 +541,12 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_commit_partition_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_commit_partition_del_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_check_kv_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_obj_store_info_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_alter_storage_vault_get_bytes;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 26c5c2d8615..10aa7b99dd0 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -118,6 +118,7 @@ CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h
 CONF_Bool(force_immediate_recycle, "false");
 
 CONF_mBool(enable_mow_job_key_check, "false");
+CONF_mBool(enable_restore_job_check, "false");
 
 CONF_mBool(enable_checker_for_meta_key_check, "false");
 CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index b51738bc471..53aa719e84c 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1328,6 +1328,13 @@ void 
MetaServiceImpl::prepare_restore_job(::google::protobuf::RpcController* con
                                           RestoreJobResponse* response,
                                           ::google::protobuf::Closure* done) {
     RPC_PREPROCESS(prepare_restore_job);
+    if (request->action() != RestoreJobRequest::PREPARE) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid action, expected PREPARE but got " +
+              RestoreJobRequest::Action_Name(request->action());
+        return;
+    }
+
     if (!request->has_tablet_id()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
         msg = "empty tablet_id";
@@ -1509,6 +1516,13 @@ void 
MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
                                          RestoreJobResponse* response,
                                          ::google::protobuf::Closure* done) {
     RPC_PREPROCESS(commit_restore_job);
+    if (request->action() != RestoreJobRequest::COMMIT) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid action, expected COMMIT but got " +
+              RestoreJobRequest::Action_Name(request->action());
+        return;
+    }
+
     if (!request->has_tablet_id()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
         msg = "empty tablet_id";
@@ -1846,6 +1860,14 @@ void 
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
                                          RestoreJobResponse* response,
                                          ::google::protobuf::Closure* done) {
     RPC_PREPROCESS(finish_restore_job);
+    if (request->action() != RestoreJobRequest::COMPLETE &&
+        request->action() != RestoreJobRequest::ABORT) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid action, expected COMPLETE or ABORT but got " +
+              RestoreJobRequest::Action_Name(request->action());
+        return;
+    }
+
     if (!request->has_tablet_id()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
         msg = "empty tablet_id";
@@ -1900,7 +1922,6 @@ void 
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
         return;
     }
 
-    bool is_completed = request->has_is_completed() && request->is_completed();
     if (restore_job_pb.state() == RestoreJobCloudPB::DROPPED ||
         restore_job_pb.state() == RestoreJobCloudPB::COMPLETED) {
         LOG_INFO("restore job already finished")
@@ -1917,7 +1938,8 @@ void 
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
         return;
     } else {
         // PREPARED, COMMITTED state
-        if (is_completed && restore_job_pb.state() != 
RestoreJobCloudPB::COMMITTED) {
+        if (request->action() == RestoreJobRequest::COMPLETE &&
+            restore_job_pb.state() != RestoreJobCloudPB::COMMITTED) {
             // Only allow COMMITTED -> COMPLETED
             code = MetaServiceCode::INVALID_ARGUMENT;
             msg = fmt::format("restore tablet {} in invalid state to complete, 
state: {}",
@@ -1925,10 +1947,21 @@ void 
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
                               
RestoreJobCloudPB::State_Name(restore_job_pb.state()));
             return;
         }
+        if (request->action() == RestoreJobRequest::ABORT &&
+            (restore_job_pb.state() != RestoreJobCloudPB::PREPARED &&
+             restore_job_pb.state() != RestoreJobCloudPB::COMMITTED)) {
+            // Only allow PREPARED/COMMITTED -> DROPPED
+            code = MetaServiceCode::INVALID_ARGUMENT;
+            msg = fmt::format("restore tablet {} in invalid state to abort, 
state: {}",
+                              tablet_idx.tablet_id(),
+                              
RestoreJobCloudPB::State_Name(restore_job_pb.state()));
+            return;
+        }
     }
 
     // 2. update restore job
     std::string to_save_val;
+    bool is_completed = request->action() == RestoreJobRequest::COMPLETE;
     restore_job_pb.set_state(is_completed ? RestoreJobCloudPB::COMPLETED
                                           : RestoreJobCloudPB::DROPPED);
     restore_job_pb.set_need_recycle_data(!is_completed);
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index 7d3cf1f1347..07faef16077 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -198,6 +198,12 @@ int Checker::start() {
                 }
             }
 
+            if (config::enable_restore_job_check) {
+                if (int ret = checker->do_restore_job_check(); ret != 0) {
+                    success = false;
+                }
+            }
+
             if (config::enable_delete_bitmap_storage_optimize_v2_check) {
                 if (int ret = 
checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
                     ret != 0) {
@@ -1375,7 +1381,7 @@ int 
InstanceChecker::check_inverted_index_file_storage_format_v1(
         // Garbage data leak
         // clang-format off
         LOG(WARNING) << "rowset_index_cache_v1.segment_ids don't contains 
segment_id, rowset should be recycled,"
-                     << " key = " << file_path 
+                     << " key = " << file_path
                      << " segment_id = " << segment_id;
         // clang-format on
         return 1;
@@ -1385,7 +1391,7 @@ int 
InstanceChecker::check_inverted_index_file_storage_format_v1(
         // Garbage data leak
         // clang-format off
         LOG(WARNING) << "rowset_index_cache_v1.index_ids don't contains 
index_id_with_suffix_name,"
-                     << " rowset with inde meta should be recycled, key=" << 
file_path 
+                     << " rowset with inde meta should be recycled, key=" << 
file_path
                      << " index_id_with_suffix_name=" << 
index_id_with_suffix_name;
         // clang-format on
         return 1;
@@ -1758,4 +1764,107 @@ int InstanceChecker::do_mow_job_key_check() {
     return 0;
 }
 
+int InstanceChecker::do_restore_job_check() {
+    int64_t num_prepared = 0;
+    int64_t num_committed = 0;
+    int64_t num_dropped = 0;
+    int64_t num_completed = 0;
+    int64_t num_recycling = 0;
+    int64_t num_cost_many_time = 0;
+    const int64_t COST_MANY_THRESHOLD = 3600;
+
+    using namespace std::chrono;
+    auto start_time = steady_clock::now();
+    DORIS_CLOUD_DEFER {
+        g_bvar_checker_restore_job_prepared_state.put(instance_id_, 
num_prepared);
+        g_bvar_checker_restore_job_committed_state.put(instance_id_, 
num_committed);
+        g_bvar_checker_restore_job_dropped_state.put(instance_id_, 
num_dropped);
+        g_bvar_checker_restore_job_completed_state.put(instance_id_, 
num_completed);
+        g_bvar_checker_restore_job_recycling_state.put(instance_id_, 
num_recycling);
+        g_bvar_checker_restore_job_cost_many_time.put(instance_id_, 
num_cost_many_time);
+        auto cost_ms =
+                duration_cast<std::chrono::milliseconds>(steady_clock::now() - 
start_time).count();
+        LOG(INFO) << "check instance restore jobs finished, cost=" << cost_ms
+                  << "ms. instance_id=" << instance_id_ << " num_prepared=" << 
num_prepared
+                  << " num_committed=" << num_committed << " num_dropped=" << 
num_dropped
+                  << " num_completed=" << num_completed << " num_recycling=" 
<< num_recycling
+                  << " num_cost_many_time=" << num_cost_many_time;
+    };
+
+    LOG_INFO("begin to check restore jobs").tag("instance_id", instance_id_);
+
+    JobRestoreTabletKeyInfo restore_job_key_info0 {instance_id_, 0};
+    JobRestoreTabletKeyInfo restore_job_key_info1 {instance_id_, INT64_MAX};
+    std::string begin;
+    std::string end;
+    job_restore_tablet_key(restore_job_key_info0, &begin);
+    job_restore_tablet_key(restore_job_key_info1, &end);
+    std::unique_ptr<RangeGetIterator> it;
+    do {
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG(WARNING) << "failed to create txn";
+            return -1;
+        }
+        err = txn->get(begin, end, &it);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
+            return -1;
+        }
+        if (!it->has_next()) {
+            break;
+        }
+        while (it->has_next()) {
+            auto [k, v] = it->next();
+            RestoreJobCloudPB restore_job_pb;
+            if (!restore_job_pb.ParseFromArray(v.data(), v.size())) {
+                LOG_WARNING("malformed restore job value").tag("key", hex(k));
+                return -1;
+            }
+
+            switch (restore_job_pb.state()) {
+            case RestoreJobCloudPB::PREPARED:
+                ++num_prepared;
+                break;
+            case RestoreJobCloudPB::COMMITTED:
+                ++num_committed;
+                break;
+            case RestoreJobCloudPB::DROPPED:
+                ++num_dropped;
+                break;
+            case RestoreJobCloudPB::COMPLETED:
+                ++num_completed;
+                break;
+            case RestoreJobCloudPB::RECYCLING:
+                ++num_recycling;
+                break;
+            default:
+                break;
+            }
+
+            int64_t current_time = ::time(nullptr);
+            if ((restore_job_pb.state() == RestoreJobCloudPB::PREPARED ||
+                 restore_job_pb.state() == RestoreJobCloudPB::COMMITTED) &&
+                current_time > restore_job_pb.ctime_s() + COST_MANY_THRESHOLD) 
{
+                // restore job run more than 1 hour
+                ++num_cost_many_time;
+                LOG_WARNING("restore job cost too many time")
+                        .tag("key", hex(k))
+                        .tag("tablet_id", restore_job_pb.tablet_id())
+                        .tag("state", restore_job_pb.state())
+                        .tag("ctime_s", restore_job_pb.ctime_s())
+                        .tag("mtime_s", restore_job_pb.mtime_s());
+            }
+
+            if (!it->has_next()) {
+                begin = k;
+                begin.push_back('\x00'); // Update to next smallest key for 
iteration
+                break;
+            }
+        }
+    } while (it->more() && !stopped());
+    return 0;
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index fb8f084297d..436e1302b07 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -107,6 +107,8 @@ public:
 
     int do_mow_job_key_check();
 
+    int do_restore_job_check();
+
     // If there are multiple buckets, return the minimum lifecycle; if there 
are no buckets (i.e.
     // all accessors are HdfsAccessor), return INT64_MAX.
     // Return 0 if success, otherwise error
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 4471b5a3658..5912d51b8ee 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -10604,6 +10604,11 @@ TEST(MetaServiceTest, RestoreJobTest) {
         ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
         txn->put(meta_tablet_idx_key({instance_id, tablet_id}), 
tablet_idx_val);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+        // empty action
+        meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+        ASSERT_TRUE(res.status().msg().find("invalid action") != 
std::string::npos);
+        req.set_action(RestoreJobRequest::PREPARE);
 
         // empty tablet id
         meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
@@ -10616,6 +10621,13 @@ TEST(MetaServiceTest, RestoreJobTest) {
         meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_EQ(res.status().msg(), "no tablet meta");
+
+        // check key existence
+        std::string restore_job_key = job_restore_tablet_key({instance_id, 
tablet_id});
+        std::string val;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(restore_job_key, &val), 
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
         req.Clear();
         res.Clear();
     }
@@ -10627,6 +10639,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
         req.set_tablet_id(tablet_id);
         req.set_expiration(time(nullptr) + 3600);
+        req.set_action(RestoreJobRequest::PREPARE);
 
         // set tablet meta
         auto* tablet_meta = req.mutable_tablet_meta();
@@ -10667,6 +10680,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         txn->put(meta_tablet_idx_key({instance_id, tablet_id}), 
tablet_idx_val);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::PREPARE);
 
         // set tablet meta
         auto* tablet_meta = req.mutable_tablet_meta();
@@ -10709,10 +10723,23 @@ TEST(MetaServiceTest, RestoreJobTest) {
     // invalid args commit restore job
     {
         reset_meta_service();
+        // empty action
+        meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+        ASSERT_TRUE(res.status().msg().find("invalid action") != 
std::string::npos);
+        req.set_action(RestoreJobRequest::COMMIT);
+
         // empty tablet_id
         meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_EQ(res.status().msg(), "empty tablet_id");
+
+        // check key existence
+        std::string restore_job_key = job_restore_tablet_key({instance_id, 
tablet_id});
+        std::string val;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(restore_job_key, &val), 
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
         req.Clear();
         res.Clear();
     }
@@ -10724,6 +10751,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
 
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::COMMIT);
         meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_EQ(res.status().msg(), "restore job not exists or has been 
recycled");
@@ -10742,6 +10770,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         RestoreJobResponse make_res;
         make_req.set_tablet_id(tablet_id);
         make_req.set_expiration(time(nullptr) + 3600);
+        make_req.set_action(RestoreJobRequest::PREPARE);
         auto* tablet_meta = make_req.mutable_tablet_meta();
         tablet_meta->set_table_id(table_id);
         tablet_meta->set_index_id(index_id);
@@ -10767,6 +10796,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
 
         // commit_restore_job
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::COMMIT);
         meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
         std::string tablet_key =
@@ -10812,6 +10842,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         RestoreJobResponse make_res;
         make_req.set_tablet_id(tablet_id);
         make_req.set_expiration(time(nullptr) + 3600);
+        make_req.set_action(RestoreJobRequest::PREPARE);
 
         auto* tablet_meta = make_req.mutable_tablet_meta();
         tablet_meta->set_table_id(table_id);
@@ -10843,6 +10874,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
 
         // commit_restore_job
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::COMMIT);
         meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
         ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
@@ -10876,10 +10908,23 @@ TEST(MetaServiceTest, RestoreJobTest) {
     // invalid args finish restore job
     {
         reset_meta_service();
+        // empty action
+        meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+        ASSERT_TRUE(res.status().msg().find("invalid action") != 
std::string::npos);
+        req.set_action(RestoreJobRequest::COMPLETE);
+
         // empty tablet_id
         meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_EQ(res.status().msg(), "empty tablet_id");
+
+        // check key existence
+        std::string restore_job_key = job_restore_tablet_key({instance_id, 
tablet_id});
+        std::string val;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(restore_job_key, &val), 
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
         req.Clear();
         res.Clear();
     }
@@ -10891,6 +10936,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
 
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::COMPLETE);
         meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_EQ(res.status().msg(), "restore job not exists or has been 
recycled");
@@ -10909,6 +10955,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         RestoreJobResponse make_res;
         make_req.set_tablet_id(tablet_id);
         make_req.set_expiration(time(nullptr) + 3600);
+        make_req.set_action(RestoreJobRequest::PREPARE);
 
         auto* tablet_meta = make_req.mutable_tablet_meta();
         tablet_meta->set_table_id(table_id);
@@ -10934,6 +10981,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         ASSERT_EQ(txn->get(restore_job_rs_key, &val), TxnErrorCode::TXN_OK);
 
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::COMMIT);
         meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
         ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
@@ -10942,7 +10990,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
 
         // finish_restore_job to COMPLETED
         req.set_tablet_id(tablet_id);
-        req.set_is_completed(true);
+        req.set_action(RestoreJobRequest::COMPLETE);
         meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
 
@@ -10975,6 +11023,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         RestoreJobResponse make_res;
         make_req.set_tablet_id(tablet_id);
         make_req.set_expiration(time(nullptr) + 3600);
+        make_req.set_action(RestoreJobRequest::PREPARE);
 
         auto* tablet_meta = make_req.mutable_tablet_meta();
         tablet_meta->set_table_id(table_id);
@@ -10996,7 +11045,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
 
         // finish_restore_job to DROPPED
         req.set_tablet_id(tablet_id);
-        req.set_is_completed(false);
+        req.set_action(RestoreJobRequest::ABORT);
         meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
 
@@ -11029,6 +11078,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         RestoreJobResponse make_res;
         make_req.set_tablet_id(tablet_id);
         make_req.set_expiration(time(nullptr) + 3600);
+        make_req.set_action(RestoreJobRequest::PREPARE);
 
         // set tablet meta
         auto* tablet_meta = make_req.mutable_tablet_meta();
@@ -11045,7 +11095,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
 
         // finish_restore_job to COMPLETED
         req.set_tablet_id(tablet_id);
-        req.set_is_completed(true);
+        req.set_action(RestoreJobRequest::COMPLETE);
         meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_TRUE(res.status().msg().find("invalid state to complete") != 
std::string::npos);
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 7030d6d418e..fd7a5699ec6 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -2275,6 +2275,20 @@ TEST(RecyclerTest, recycle_restore_jobs) {
     auto begin_key = job_restore_tablet_key({instance_id, 0});
     auto end_key = job_restore_tablet_key({instance_id, INT64_MAX});
     ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+    ASSERT_EQ(it->size(), 20);
+
+    begin_key = job_restore_rowset_key({instance_id, 0, 0});
+    end_key = job_restore_rowset_key({instance_id, INT64_MAX, 0});
+    ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+    ASSERT_EQ(it->size(), 100);
+
+    ASSERT_EQ(recycler.recycle_restore_jobs(), 0);
+
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+    begin_key = job_restore_tablet_key({instance_id, 0});
+    end_key = job_restore_tablet_key({instance_id, INT64_MAX});
+    ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
     ASSERT_EQ(it->size(), 0);
 
     begin_key = job_restore_rowset_key({instance_id, 0, 0});
@@ -4540,6 +4554,67 @@ TEST(CheckerTest, check_job_key) {
     ASSERT_EQ(checker.do_mow_job_key_check(), -1);
 }
 
+TEST(CheckerTest, do_restore_job_check) {
+    config::enable_restore_job_check = true;
+    std::string instance_id = "test_do_restore_job_check";
+    [[maybe_unused]] auto sp = SyncPoint::get_instance();
+    DORIS_CLOUD_DEFER {
+        SyncPoint::get_instance()->clear_all_call_backs();
+    };
+    sp->set_call_back("get_instance_id", [&](auto&& args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id("1");
+    InstanceChecker checker(txn_kv, instance_id);
+    ASSERT_EQ(checker.init(instance), 0);
+
+    // Prepare test data: simulate restore jobs in different states
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+    // Add a PREPARED restore job
+    RestoreJobCloudPB prepared_job;
+    prepared_job.set_tablet_id(10001);
+    prepared_job.set_state(RestoreJobCloudPB::PREPARED);
+    prepared_job.set_ctime_s(::time(nullptr) - 1800); // 30 minutes ago
+    std::string prepared_key;
+    job_restore_tablet_key({instance_id, prepared_job.tablet_id()}, 
&prepared_key);
+    txn->put(prepared_key, prepared_job.SerializeAsString());
+
+    // Add a COMMITTED restore job
+    RestoreJobCloudPB committed_job;
+    committed_job.set_tablet_id(10002);
+    committed_job.set_state(RestoreJobCloudPB::COMMITTED);
+    committed_job.set_ctime_s(::time(nullptr) - 7200); // 2 hours ago
+    std::string committed_key;
+    job_restore_tablet_key({instance_id, committed_job.tablet_id()}, 
&committed_key);
+    txn->put(committed_key, committed_job.SerializeAsString());
+
+    // Add a COMPLETED restore job
+    RestoreJobCloudPB completed_job;
+    completed_job.set_tablet_id(10003);
+    completed_job.set_state(RestoreJobCloudPB::COMPLETED);
+    completed_job.set_ctime_s(::time(nullptr) - 3600); // 1 hour ago
+    std::string completed_key;
+    job_restore_tablet_key({instance_id, completed_job.tablet_id()}, 
&completed_key);
+    txn->put(completed_key, completed_job.SerializeAsString());
+
+    ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
+
+    // Run the check
+    ASSERT_EQ(checker.do_restore_job_check(), 0);
+}
+
 TEST(CheckerTest, delete_bitmap_storage_optimize_v2_check_normal) {
     auto txn_kv = std::make_shared<MemTxnKv>();
     ASSERT_EQ(txn_kv->init(), 0);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 52ddcebcc83..6636b31af57 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1277,12 +1277,19 @@ message PartitionResponse {
 }
 
 message RestoreJobRequest {
+    enum Action {
+        UNKONWN  = 0;
+        PREPARE = 1;
+        COMMIT = 2;
+        ABORT = 3;
+        COMPLETE = 4;
+    }
     optional string cloud_unique_id = 1;
     optional int64 tablet_id = 2;
     optional doris.TabletMetaCloudPB tablet_meta = 3;
     optional int64 expiration = 4;
     optional string request_ip = 5;
-    optional bool is_completed = 6;
+    optional Action action = 6;
 }
 
 message RestoreJobResponse {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to