This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 57ad0b59920 [feat](cloud) support compacting snapshots manuall (#61078)
57ad0b59920 is described below

commit 57ad0b59920b529ac1c09e012a022297dba9fcdb
Author: walter <[email protected]>
AuthorDate: Wed Mar 11 14:36:09 2026 +0800

    [feat](cloud) support compacting snapshots manuall (#61078)
    
    Compacting snapshots is necessary to break dependency chains. This PR
    supports manual snapshot compaction.
    
    Note:​ Suppose there is a dependency chain: A → B → C. Compacting A to B
    is allowed, but compacting B to C is not allowed because B depends on A.
    
    This PR adds a new HTTP API to trigger snapshot compaction manually:
    
    ```bash
    # Compact snapshot for a specific instance
    curl -X POST 
"http://<ms_host>:<ms_port>/v1/compact_snapshot?instance_id=<instance_id>"
    ```
    
    Parameters:
    - `instance_id` (required): The instance ID to compact snapshot for (A
    -> B, instance_id = B)
    
    Example:
    ```bash
    curl -X POST 
"http://127.0.0.1:5000/compact_snapshot?instance_id=my-instance";
    ```
---
 cloud/src/common/bvars.cpp                         |  7 ++++++
 cloud/src/common/bvars.h                           |  5 +++++
 cloud/src/meta-service/meta_service.h              | 10 +++++++++
 cloud/src/meta-service/meta_service_http.cpp       | 15 +++++++++++++
 cloud/src/meta-service/meta_service_partition.cpp  | 19 ++++++++++++----
 cloud/src/meta-service/meta_service_resource.cpp   |  2 +-
 cloud/src/meta-service/meta_service_snapshot.cpp   | 26 ++++++++++++++++++++++
 cloud/src/recycler/recycler_operation_log.cpp      |  1 +
 cloud/src/recycler/snapshot_chain_compactor.cpp    | 18 ++++++++++++---
 cloud/src/resource-manager/resource_manager.cpp    | 21 ++++++++++-------
 cloud/src/snapshot/snapshot_manager.cpp            |  5 +++++
 cloud/src/snapshot/snapshot_manager.h              |  3 +++
 .../cloud/datasource/CloudInternalCatalog.java     |  2 ++
 gensrc/proto/cloud.proto                           | 18 +++++++++++++++
 14 files changed, 136 insertions(+), 16 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index f2dd86d8d1c..e2f587e151a 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -109,6 +109,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_abort_snapshot("ms", 
"abort_snapshot");
 BvarLatencyRecorderWithTag g_bvar_ms_drop_snapshot("ms", "drop_snapshot");
 BvarLatencyRecorderWithTag g_bvar_ms_list_snapshot("ms", "list_snapshot");
 BvarLatencyRecorderWithTag g_bvar_ms_clone_instance("ms", "clone_instance");
+BvarLatencyRecorderWithTag g_bvar_ms_compact_snapshot("ms", 
"compact_snapshot");
 BvarLatencyRecorderWithTag g_bvar_ms_update_packed_file_info("ms", 
"update_packed_file_info");
 bvar::Adder<int64_t> g_bvar_update_delete_bitmap_fail_counter;
 bvar::Window<bvar::Adder<int64_t> > 
g_bvar_update_delete_bitmap_fail_counter_minute("ms", 
"update_delete_bitmap_fail", &g_bvar_update_delete_bitmap_fail_counter, 60);
@@ -517,6 +518,9 @@ mBvarInt64Adder 
g_bvar_rpc_kv_drop_snapshot_del_counter("rpc_kv_drop_snapshot_de
 mBvarInt64Adder 
g_bvar_rpc_kv_clone_instance_get_counter("rpc_kv_clone_instance_get_counter",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_clone_instance_put_counter("rpc_kv_clone_instance_put_counter",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_clone_instance_del_counter("rpc_kv_clone_instance_del_counter",{"instance_id"});
+// compact_snapshot
+mBvarInt64Adder 
g_bvar_rpc_kv_compact_snapshot_get_counter("rpc_kv_compact_snapshot_get_counter",{"instance_id"});
+mBvarInt64Adder 
g_bvar_rpc_kv_compact_snapshot_put_counter("rpc_kv_compact_snapshot_put_counter",{"instance_id"});
 
 // bytes
 // get_rowset
@@ -724,5 +728,8 @@ mBvarInt64Adder 
g_bvar_rpc_kv_list_snapshot_del_bytes("rpc_kv_list_snapshot_del_
 mBvarInt64Adder 
g_bvar_rpc_kv_clone_instance_get_bytes("rpc_kv_clone_instance_get_bytes",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_clone_instance_put_bytes("rpc_kv_clone_instance_put_bytes",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_clone_instance_del_bytes("rpc_kv_clone_instance_del_bytes",{"instance_id"});
+// compact_snapshot
+mBvarInt64Adder 
g_bvar_rpc_kv_compact_snapshot_get_bytes("rpc_kv_compact_snapshot_get_bytes",{"instance_id"});
+mBvarInt64Adder 
g_bvar_rpc_kv_compact_snapshot_put_bytes("rpc_kv_compact_snapshot_put_bytes",{"instance_id"});
 
 // clang-format on
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 695a9c0206b..5497cf6a754 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -621,6 +621,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_abort_snapshot;
 extern BvarLatencyRecorderWithTag g_bvar_ms_drop_snapshot;
 extern BvarLatencyRecorderWithTag g_bvar_ms_list_snapshot;
 extern BvarLatencyRecorderWithTag g_bvar_ms_clone_instance;
+extern BvarLatencyRecorderWithTag g_bvar_ms_compact_snapshot;
 extern BvarLatencyRecorderWithTag g_bvar_ms_update_packed_file_info;
 extern bvar::Adder<int64_t> g_bvar_update_delete_bitmap_fail_counter;
 extern bvar::Adder<int64_t> g_bvar_get_delete_bitmap_fail_counter;
@@ -922,6 +923,8 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_drop_snapshot_del_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_del_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_put_counter;
 
 extern mBvarInt64Adder g_bvar_rpc_kv_get_rowset_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_version_get_bytes;
@@ -1062,6 +1065,8 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_list_snapshot_del_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_del_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_put_bytes;
 
 // meta ranges
 extern mBvarStatus<int64_t> g_bvar_fdb_kv_ranges_count;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index 487b26f3899..b789aab8b21 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -424,6 +424,10 @@ public:
                         const CloneInstanceRequest* request, 
CloneInstanceResponse* response,
                         ::google::protobuf::Closure* done) override;
 
+    void compact_snapshot(::google::protobuf::RpcController* controller,
+                          const CompactSnapshotRequest* request, 
CompactSnapshotResponse* response,
+                          ::google::protobuf::Closure* done) override;
+
 private:
     std::pair<MetaServiceCode, std::string> alter_instance(
             const AlterInstanceRequest* request,
@@ -1009,6 +1013,12 @@ public:
         call_impl(&cloud::MetaService::clone_instance, controller, request, 
response, done);
     }
 
+    void compact_snapshot(::google::protobuf::RpcController* controller,
+                          const CompactSnapshotRequest* request, 
CompactSnapshotResponse* response,
+                          ::google::protobuf::Closure* done) override {
+        call_impl(&cloud::MetaService::compact_snapshot, controller, request, 
response, done);
+    }
+
 private:
     template <typename Request, typename Response>
     using MetaServiceMethod = void 
(cloud::MetaService::*)(::google::protobuf::RpcController*,
diff --git a/cloud/src/meta-service/meta_service_http.cpp 
b/cloud/src/meta-service/meta_service_http.cpp
index ff2c4ce080d..b6e213dead6 100644
--- a/cloud/src/meta-service/meta_service_http.cpp
+++ b/cloud/src/meta-service/meta_service_http.cpp
@@ -702,6 +702,19 @@ static HttpResponse process_list_snapshot(MetaServiceImpl* 
service, brpc::Contro
     return http_json_reply_message(resp.status(), resp);
 }
 
+static HttpResponse process_compact_snapshot(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
+    auto& uri = ctrl->http_request().uri();
+    std::string instance_id(http_query(uri, "instance_id"));
+    if (instance_id.empty()) {
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id 
is empty");
+    }
+    CompactSnapshotRequest req;
+    req.set_instance_id(instance_id);
+    CompactSnapshotResponse resp;
+    service->compact_snapshot(ctrl, &req, &resp, nullptr);
+    return http_json_reply(resp.status());
+}
+
 static HttpResponse process_set_snapshot_property(MetaServiceImpl* service,
                                                   brpc::Controller* ctrl) {
     AlterInstanceRequest req;
@@ -940,6 +953,8 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
             {"v1/set_snapshot_property", process_set_snapshot_property},
             {"v1/get_snapshot_property", process_get_snapshot_property},
             {"v1/set_multi_version_status", process_set_multi_version_status},
+            {"compact_snapshot", process_compact_snapshot},
+            {"v1/compact_snapshot", process_compact_snapshot},
             // misc
             {"abort_txn", process_abort_txn},
             {"abort_tablet_job", process_abort_tablet_job},
diff --git a/cloud/src/meta-service/meta_service_partition.cpp 
b/cloud/src/meta-service/meta_service_partition.cpp
index c28abeadb0f..5f66ca4216e 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -273,7 +273,6 @@ void 
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
             IndexIndexPB index_index_pb;
             index_index_pb.set_db_id(db_id);
             index_index_pb.set_table_id(table_id);
-            LOG(INFO) << index_index_pb.DebugString();
             std::string index_index_value;
             if (!index_index_pb.SerializeToString(&index_index_value)) {
                 code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
@@ -284,6 +283,11 @@ void 
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
             versioned_put(txn.get(), index_meta_key, "");
             txn->put(index_inverted_key, "");
             txn->put(index_index_key, index_index_value);
+            LOG_INFO("put versioned index keys")
+                    .tag("index_id", index_id)
+                    .tag("index_meta_key", hex(index_meta_key))
+                    .tag("index_index_key", hex(index_index_key))
+                    .tag("index_inverted_key", hex(index_inverted_key));
 
             commit_index_log.add_index_ids(index_id);
         }
@@ -313,8 +317,11 @@ void 
MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
             txn->put(part_inverted_index_key, "");
             txn->put(part_index_key, part_index_value);
 
-            LOG(INFO) << "xxx put versioned partition index key=" << 
hex(part_index_key)
-                      << " partition_id=" << partition_id;
+            LOG_INFO("put versioned partition index key")
+                    .tag("partition_id", partition_id)
+                    .tag("part_meta_key", hex(part_meta_key))
+                    .tag("part_index_key", hex(part_index_key))
+                    .tag("part_inverted_index_key", 
hex(part_inverted_index_key));
 
             commit_index_log.add_partition_ids(partition_id);
         }
@@ -809,7 +816,6 @@ void MetaServiceImpl::commit_partition_internal(const 
PartitionRequest* request,
             PartitionIndexPB part_index_pb;
             part_index_pb.set_db_id(db_id);
             part_index_pb.set_table_id(table_id);
-            LOG(INFO) << part_index_pb.DebugString();
             std::string part_index_value;
             if (!part_index_pb.SerializeToString(&part_index_value)) {
                 code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
@@ -820,6 +826,11 @@ void MetaServiceImpl::commit_partition_internal(const 
PartitionRequest* request,
             versioned_put(txn.get(), part_meta_key, "");
             txn->put(part_inverted_index_key, "");
             txn->put(part_index_key, part_index_value);
+            LOG_INFO("put versioned partition index key")
+                    .tag("partition_id", part_id)
+                    .tag("part_meta_key", hex(part_meta_key))
+                    .tag("part_index_key", hex(part_index_key))
+                    .tag("part_inverted_index_key", 
hex(part_inverted_index_key));
 
             commit_partition_log.add_partition_ids(part_id);
         }
diff --git a/cloud/src/meta-service/meta_service_resource.cpp 
b/cloud/src/meta-service/meta_service_resource.cpp
index 5afb92888ce..3c72808f81a 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -2547,7 +2547,7 @@ std::pair<MetaServiceCode, std::string> 
MetaServiceImpl::alter_instance(
         LOG(WARNING) << msg << " err=" << err;
         return std::make_pair(code, msg);
     }
-    LOG(INFO) << "alter instance key=" << hex(key);
+    LOG(INFO) << "alter instance key=" << hex(key) << " instance_id=" << 
instance_id;
     InstanceInfoPB instance;
     if (!instance.ParseFromString(val)) {
         msg = "failed to parse InstanceInfoPB";
diff --git a/cloud/src/meta-service/meta_service_snapshot.cpp 
b/cloud/src/meta-service/meta_service_snapshot.cpp
index 9dd86aac349..c8a21452981 100644
--- a/cloud/src/meta-service/meta_service_snapshot.cpp
+++ b/cloud/src/meta-service/meta_service_snapshot.cpp
@@ -184,4 +184,30 @@ void 
MetaServiceImpl::drop_snapshot(::google::protobuf::RpcController* controlle
     msg = response->status().msg();
 }
 
+void MetaServiceImpl::compact_snapshot(::google::protobuf::RpcController* 
controller,
+                                       const CompactSnapshotRequest* request,
+                                       CompactSnapshotResponse* response,
+                                       ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(compact_snapshot, get, put);
+
+    if (!request->instance_id().empty()) {
+        instance_id = request->instance_id();
+    } else if (request->has_cloud_unique_id() && 
!request->cloud_unique_id().empty()) {
+        instance_id = get_instance_id(resource_mgr_, 
request->cloud_unique_id());
+        if (instance_id.empty()) {
+            code = MetaServiceCode::INVALID_ARGUMENT;
+            msg = "empty instance_id";
+            return;
+        }
+    } else {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "instance_id or cloud_unique_id is required";
+        return;
+    }
+
+    RPC_RATE_LIMIT(compact_snapshot);
+
+    std::tie(code, msg) = snapshot_manager_->compact_snapshot(instance_id);
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler_operation_log.cpp 
b/cloud/src/recycler/recycler_operation_log.cpp
index 2caa32ef705..f6743a5dabb 100644
--- a/cloud/src/recycler/recycler_operation_log.cpp
+++ b/cloud/src/recycler/recycler_operation_log.cpp
@@ -67,6 +67,7 @@ using namespace std::chrono;
 int OperationLogRecycleChecker::init() {
     source_snapshot_versionstamp_ = Versionstamp::min();
     if (instance_info_.has_source_snapshot_id() &&
+        instance_info_.snapshot_compact_status() != SNAPSHOT_COMPACT_DONE &&
         
!SnapshotManager::parse_snapshot_versionstamp(instance_info_.source_snapshot_id(),
                                                       
&source_snapshot_versionstamp_)) {
         LOG_WARNING("failed to parse versionstamp from source snapshot id")
diff --git a/cloud/src/recycler/snapshot_chain_compactor.cpp 
b/cloud/src/recycler/snapshot_chain_compactor.cpp
index 4332d067df8..f25a12e6569 100644
--- a/cloud/src/recycler/snapshot_chain_compactor.cpp
+++ b/cloud/src/recycler/snapshot_chain_compactor.cpp
@@ -230,6 +230,16 @@ int is_instance_cloned(TxnKv* txn_kv, const std::string& 
instance_id, bool* is_c
 }
 
 bool SnapshotChainCompactor::is_snapshot_chain_need_compact(const 
InstanceInfoPB& instance_info) {
+    // Skip instances that have already completed compact
+    if (instance_info.snapshot_compact_status() == 
SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE) {
+        return false;
+    }
+
+    // Instances with DOING status should be compacted (manually triggered)
+    if (instance_info.snapshot_compact_status() == 
SnapshotCompactStatus::SNAPSHOT_COMPACT_DOING) {
+        return true;
+    }
+
     // compact the instance which meets the following conditions:
     // 1. the instance is cloned from snapshot
     // 2. its source instance is not cloned from other snapshots
@@ -247,7 +257,9 @@ bool 
SnapshotChainCompactor::is_snapshot_chain_need_compact(const InstanceInfoPB
                      << instance_info.source_instance_id();
         return false;
     }
-    if (is_instance_cloned_from_snapshot(source_instance_info)) {
+    if (is_instance_cloned_from_snapshot(source_instance_info) &&
+        source_instance_info.snapshot_compact_status() !=
+                SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE) {
         return false;
     }
 
@@ -435,8 +447,8 @@ int InstanceChainCompactor::handle_compaction_completion() {
     std::string reference_key = 
versioned::snapshot_reference_key(ref_key_info);
     txn->remove(reference_key);
 
-    // instance_info.clear_source_instance_id();
-    instance_info.clear_source_snapshot_id();
+    // Preserve source_instance_id and source_snapshot_id, mark compact as done
+    
instance_info.set_snapshot_compact_status(SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE);
     instance_info.clear_compacted_key_sets();
     txn->atomic_add(system_meta_service_instance_update_key(), 1);
     txn->put(key, instance_info.SerializeAsString());
diff --git a/cloud/src/resource-manager/resource_manager.cpp 
b/cloud/src/resource-manager/resource_manager.cpp
index 6af1673bd45..77160ccada6 100644
--- a/cloud/src/resource-manager/resource_manager.cpp
+++ b/cloud/src/resource-manager/resource_manager.cpp
@@ -604,7 +604,7 @@ std::pair<MetaServiceCode, std::string> 
ResourceManager::add_cluster(const std::
 
     txn->atomic_add(system_meta_service_instance_update_key(), 1);
     txn->put(key, val);
-    LOG(INFO) << "put instance_key=" << hex(key);
+    LOG(INFO) << "put instance_key=" << hex(key) << " instance_id=" << 
instance_id;
     err = txn->commit();
     if (err != TxnErrorCode::TXN_OK) {
         msg = "failed to commit kv txn";
@@ -618,8 +618,8 @@ std::pair<MetaServiceCode, std::string> 
ResourceManager::add_cluster(const std::
 }
 
 /**
- * The current implementation is to add fe clusters through HTTP API, 
- * such as follower nodes `ABC` in the cluster, and then immediately drop 
follower node `A`, while fe is not yet pulled up, 
+ * The current implementation is to add fe clusters through HTTP API,
+ * such as follower nodes `ABC` in the cluster, and then immediately drop 
follower node `A`, while fe is not yet pulled up,
  * which may result in the formation of a multi master fe cluster
  * This function provides a simple protection mechanism that does not allow 
dropping the fe node within 5 minutes after adding it through the 
API(add_cluster/add_node).
  * If you bypass this protection and do the behavior described above, god 
bless you.
@@ -754,7 +754,7 @@ std::pair<MetaServiceCode, std::string> 
ResourceManager::drop_cluster(
 
     txn->atomic_add(system_meta_service_instance_update_key(), 1);
     txn->put(key, val);
-    LOG(INFO) << "put instance_key=" << hex(key);
+    LOG(INFO) << "put instance_key=" << hex(key) << " instance_id=" << 
instance_id;
     err = txn->commit();
     if (err != TxnErrorCode::TXN_OK) {
         msg = "failed to commit kv txn";
@@ -894,7 +894,7 @@ std::string ResourceManager::update_cluster(
 
     txn->atomic_add(system_meta_service_instance_update_key(), 1);
     txn->put(key, val);
-    LOG(INFO) << "put instanace_key=" << hex(key);
+    LOG(INFO) << "put instanace_key=" << hex(key) << " instance_id=" << 
instance_id;
     TxnErrorCode err_code = txn->commit();
     if (err_code != TxnErrorCode::TXN_OK) {
         msg = "failed to commit kv txn";
@@ -958,7 +958,7 @@ std::pair<TxnErrorCode, std::string> 
ResourceManager::get_instance(std::shared_p
     }
 
     TxnErrorCode err = txn->get(key, &val);
-    LOG(INFO) << "get instance_key=" << hex(key);
+    LOG(INFO) << "get instance_key=" << hex(key) << " instance_id=" << 
instance_id;
 
     if (err != TxnErrorCode::TXN_OK) {
         code = err;
@@ -969,7 +969,7 @@ std::pair<TxnErrorCode, std::string> 
ResourceManager::get_instance(std::shared_p
 
     if (inst_pb != nullptr && !inst_pb->ParseFromString(val)) {
         code = TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
-        msg = "failed to parse InstanceInfoPB";
+        msg = "failed to parse InstanceInfoPB, instance_id=" + instance_id;
         return ec;
     }
 
@@ -1364,7 +1364,7 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
 
     txn->atomic_add(system_meta_service_instance_update_key(), 1);
     txn->put(key, val);
-    LOG(INFO) << "put instance_key=" << hex(key);
+    LOG(INFO) << "put instance_key=" << hex(key) << " instance_id=" << 
instance_id;
     TxnErrorCode err_code = txn->commit();
     if (err_code != TxnErrorCode::TXN_OK) {
         msg = "failed to commit kv txn";
@@ -1443,6 +1443,11 @@ void ResourceManager::refresh_instance(const 
std::string& instance_id,
     }
 
     if (instance.has_source_instance_id() && 
!instance.source_instance_id().empty()) {
+        // Instances that have completed snapshot compact should not be in the 
map
+        if (instance.snapshot_compact_status() == 
SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE) {
+            instance_source_snapshot_info_.erase(instance_id);
+            return;
+        }
         Versionstamp versionstamp;
         if 
(!SnapshotManager::parse_snapshot_versionstamp(instance.source_snapshot_id(),
                                                           &versionstamp)) {
diff --git a/cloud/src/snapshot/snapshot_manager.cpp 
b/cloud/src/snapshot/snapshot_manager.cpp
index 5d39e9c5683..94d1f4f9c6d 100644
--- a/cloud/src/snapshot/snapshot_manager.cpp
+++ b/cloud/src/snapshot/snapshot_manager.cpp
@@ -117,6 +117,11 @@ void SnapshotManager::clone_instance(const 
CloneInstanceRequest& request,
     response->mutable_status()->set_msg("Not implemented");
 }
 
+std::pair<MetaServiceCode, std::string> SnapshotManager::compact_snapshot(
+        std::string_view instance_id) {
+    return {MetaServiceCode::UNDEFINED_ERR, "Not implemented"};
+}
+
 std::pair<MetaServiceCode, std::string> 
SnapshotManager::set_multi_version_status(
         std::string_view instance_id, MultiVersionStatus multi_version_status) 
{
     return {MetaServiceCode::UNDEFINED_ERR, "Not implemented"};
diff --git a/cloud/src/snapshot/snapshot_manager.h 
b/cloud/src/snapshot/snapshot_manager.h
index bc10d82a8f1..0e7ad2c12a3 100644
--- a/cloud/src/snapshot/snapshot_manager.h
+++ b/cloud/src/snapshot/snapshot_manager.h
@@ -52,6 +52,9 @@ public:
     virtual void clone_instance(const CloneInstanceRequest& request,
                                 CloneInstanceResponse* response);
 
+    // Manually trigger snapshot compact for an instance.
+    virtual std::pair<MetaServiceCode, std::string> 
compact_snapshot(std::string_view instance_id);
+
     virtual std::pair<MetaServiceCode, std::string> set_multi_version_status(
             std::string_view instance_id, MultiVersionStatus 
multi_version_status);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index f8e6573bdc3..9d47c68b844 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -672,6 +672,8 @@ public class CloudInternalCatalog extends InternalCatalog {
         if (partitionIds != null) {
             indexRequestBuilder.addAllPartitionIds(partitionIds);
         }
+        LOG.debug("committing materialized index for tableId: {}, 
partitionIds: {}, indexIds: {}",
+                tableId, partitionIds, indexIds);
         final Cloud.IndexRequest indexRequest = indexRequestBuilder.build();
 
         Cloud.IndexResponse response = null;
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 945382b16f7..c779506a012 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -68,6 +68,12 @@ enum SnapshotSwitchStatus {
     SNAPSHOT_SWITCH_ON = 2;
 }
 
+enum SnapshotCompactStatus {
+    SNAPSHOT_COMPACT_UNKNOWN = 0;
+    SNAPSHOT_COMPACT_DOING = 1;
+    SNAPSHOT_COMPACT_DONE = 2;
+}
+
 enum KeySetType {
     UNKNOWN_KEY_SET = 0;
 
@@ -140,6 +146,7 @@ message InstanceInfoPB {
 
     optional int64 snapshot_retained_data_size = 121;
     optional int64 snapshot_billable_data_size = 122;
+    optional SnapshotCompactStatus snapshot_compact_status = 123;
 }
 
 message StagePB {
@@ -2198,6 +2205,16 @@ message CloneInstanceResponse {
     optional string image_url = 3;
 }
 
+message CompactSnapshotRequest {
+    optional string instance_id = 1;
+    optional string cloud_unique_id = 2;
+    optional string request_ip = 3;
+}
+
+message CompactSnapshotResponse {
+    optional MetaServiceResponseStatus status = 1;
+}
+
 message PackedSlicePB {
     optional string path = 1;
     optional int64 offset = 2;
@@ -2338,6 +2355,7 @@ service MetaService {
     rpc drop_snapshot(DropSnapshotRequest) returns (DropSnapshotResponse);
     rpc list_snapshot(ListSnapshotRequest) returns (ListSnapshotResponse);
     rpc clone_instance(CloneInstanceRequest) returns (CloneInstanceResponse);
+    rpc compact_snapshot(CompactSnapshotRequest) returns 
(CompactSnapshotResponse);
 
     rpc update_packed_file_info(UpdatePackedFileInfoRequest) returns 
(UpdatePackedFileInfoResponse);
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to