This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bf54a8ebda [Enhancement](Recycler) Add bvar metrics for operation 
logs (#60520)
6bf54a8ebda is described below

commit 6bf54a8ebda2cfc57a6c5e64c8c656a5967b1745
Author: Jimmy <[email protected]>
AuthorDate: Mon Feb 9 10:57:15 2026 +0800

    [Enhancement](Recycler) Add bvar metrics for operation logs (#60520)
    
    - Add dedicated bvar metrics for the recycle_operation_logs path, which
    previously had no observability through the monitoring system
    - Introduce OplogRecycleStats to track per-round recycling statistics
    including skipped-by-snapshot count, earliest recyclable version,
    failure count, and generated recycle key counts
    (partition/index/rowset/txn)
    - Add scan_and_statistics_operation_logs() to collect pre-recycling
    statistics via RecyclerMetricsContext, consistent with existing
    scan_and_statistics_* functions
    - Use a PendingCounts pattern in OperationLogRecycler to avoid inflating
    committed metrics on transaction failure — local counts are only merged
    into OplogRecycleStats atomics after successful
    commit()
    - All new statistics are gated behind the enable_recycler_stats_metrics
    config flag
---
 cloud/src/common/bvars.cpp                    |  40 +++
 cloud/src/common/bvars.h                      |  23 ++
 cloud/src/recycler/recycler.cpp               |  40 +++
 cloud/src/recycler/recycler.h                 |  25 +-
 cloud/src/recycler/recycler_operation_log.cpp |  90 ++++-
 cloud/test/recycler_operation_log_test.cpp    | 478 ++++++++++++++++++++++++++
 6 files changed, 692 insertions(+), 4 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 6c3ba1eee50..91b1e0bb649 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -187,6 +187,46 @@ BvarStatusWithTag<int64_t> 
g_bvar_recycler_batch_delete_rowset_plan_count(
 BvarStatusWithTag<int64_t> g_bvar_recycler_batch_delete_failures(
         "recycler", "batch_delete_failures");
 
+// Operation Log Recycler BVars
+mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_total_num(
+        "recycler_oplog_last_round_total_num", {"instance_id"});
+mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_not_recycled_num(
+        "recycler_oplog_last_round_not_recycled_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycle_failed_num(
+        "recycler_oplog_recycle_failed_num", {"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_commit_partition_num(
+        "recycler_oplog_last_round_recycled_commit_partition_num", 
{"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_drop_partition_num(
+        "recycler_oplog_last_round_recycled_drop_partition_num", 
{"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_commit_index_num(
+        "recycler_oplog_last_round_recycled_commit_index_num", 
{"instance_id"});
+mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_drop_index_num(
+        "recycler_oplog_last_round_recycled_drop_index_num", {"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_update_tablet_num(
+        "recycler_oplog_last_round_recycled_update_tablet_num", 
{"instance_id"});
+mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_compaction_num(
+        "recycler_oplog_last_round_recycled_compaction_num", {"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_schema_change_num(
+        "recycler_oplog_last_round_recycled_schema_change_num", 
{"instance_id"});
+mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_commit_txn_num(
+        "recycler_oplog_last_round_recycled_commit_txn_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_partition_num(
+        "recycler_oplog_recycled_commit_partition_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_partition_num(
+        "recycler_oplog_recycled_drop_partition_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_index_num(
+        "recycler_oplog_recycled_commit_index_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_index_num(
+        "recycler_oplog_recycled_drop_index_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_update_tablet_num(
+        "recycler_oplog_recycled_update_tablet_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_compaction_num(
+        "recycler_oplog_recycled_compaction_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_schema_change_num(
+        "recycler_oplog_recycled_schema_change_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_txn_num(
+        "recycler_oplog_recycled_commit_txn_num", {"instance_id"});
+
 // txn_kv's bvars
 bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
 bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 0e7a6314ad2..e4b9789c1bf 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -674,6 +674,29 @@ extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_batch_delete_failures;
 extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_packed_file_bytes_object_deleted;
 extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_packed_file_rowset_scanned_num;
 
+// Operation Log Recycler BVars
+// Note: generic metrics (last_round_to_recycle_num/bytes, 
last_round_recycled_num/bytes, etc.)
+// are reported by RecyclerMetricsContext with operation_type = 
"recycle_operation_logs".
+extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_total_num;
+extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_not_recycled_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycle_failed_num;
+extern mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_commit_partition_num;
+extern mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_drop_partition_num;
+extern mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_commit_index_num;
+extern mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_drop_index_num;
+extern mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_update_tablet_num;
+extern mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_compaction_num;
+extern mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_schema_change_num;
+extern mBvarStatus<int64_t> 
g_bvar_recycler_oplog_last_round_recycled_commit_txn_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_partition_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_partition_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_index_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_index_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_update_tablet_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_compaction_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_schema_change_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_txn_num;
+
 // txn_kv's bvars
 extern bvar::LatencyRecorder g_bvar_txn_kv_get;
 extern bvar::LatencyRecorder g_bvar_txn_kv_range_get;
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 67484c81605..a92a02f1953 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -7057,6 +7057,46 @@ int InstanceRecycler::scan_and_statistics_restore_jobs() 
{
     return ret;
 }
 
+void InstanceRecycler::scan_and_statistics_operation_logs() {
+    if (!should_recycle_versioned_keys()) {
+        return;
+    }
+
+    RecyclerMetricsContext metrics_context(instance_id_, 
"recycle_operation_logs");
+
+    OperationLogRecycleChecker recycle_checker(instance_id_, txn_kv_.get(), 
instance_info_);
+    if (recycle_checker.init() != 0) {
+        return;
+    }
+
+    std::string log_key_prefix = versioned::log_key(instance_id_);
+    std::string begin_key = encode_versioned_key(log_key_prefix, 
Versionstamp::min());
+    std::string end_key = encode_versioned_key(log_key_prefix, 
Versionstamp::max());
+
+    std::unique_ptr<BlobIterator> iter = blob_get_range(txn_kv_, begin_key, 
end_key);
+    for (; iter->valid(); iter->next()) {
+        OperationLogPB operation_log;
+        if (!iter->parse_value(&operation_log)) {
+            continue;
+        }
+
+        std::string_view key = iter->key();
+        Versionstamp log_versionstamp;
+        if (!decode_versioned_key(&key, &log_versionstamp)) {
+            continue;
+        }
+
+        OperationLogReferenceInfo ref_info;
+        if (recycle_checker.can_recycle(log_versionstamp, 
operation_log.min_timestamp(),
+                                         &ref_info)) {
+            metrics_context.total_need_recycle_num++;
+            metrics_context.total_need_recycle_data_size += 
operation_log.ByteSizeLong();
+        }
+    }
+
+    metrics_context.report(true);
+}
+
 int InstanceRecycler::classify_rowset_task_by_ref_count(
         RowsetDeleteTask& task, std::vector<RowsetDeleteTask>& 
batch_delete_tasks) {
     constexpr int MAX_RETRY = 10;
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index a24a68516d1..f722bef29ef 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -244,6 +244,8 @@ public:
             : RecyclerMetricsContext("global_recycler", "recycle_segment") {}
 };
 
+struct OplogRecycleStats;
+
 class InstanceRecycler {
 public:
     struct PackedFileRecycleStats {
@@ -398,6 +400,8 @@ public:
 
     int scan_and_statistics_restore_jobs();
 
+    void scan_and_statistics_operation_logs();
+
     /**
      * Decode the key of a packed-file metadata record into the persisted 
object path.
      *
@@ -476,7 +480,8 @@ private:
     //
     // Both `operation_log` and `raw_keys` will be removed in the same 
transaction, to ensure atomicity.
     int recycle_operation_log(Versionstamp log_version, const 
std::vector<std::string>& raw_keys,
-                              OperationLogPB operation_log);
+                              OperationLogPB operation_log,
+                              OplogRecycleStats* oplog_stats = nullptr);
 
     // Recycle rowset meta and data, return 0 for success otherwise error
     //
@@ -605,6 +610,24 @@ struct OperationLogReferenceInfo {
     Versionstamp referenced_snapshot_timestamp;
 };
 
+struct OplogRecycleStats {
+    // Total oplog count scanned per round
+    std::atomic<int64_t> total_num {0};
+    // Oplogs not recycled this round (per round, written to mBvarStatus)
+    std::atomic<int64_t> not_recycled_num {0};
+    // Recycle failures (per round, accumulated to mBvarIntAdder at end)
+    std::atomic<int64_t> failed_num {0};
+    // Per-oplog-type recycled counts (incremented after successful commit)
+    std::atomic<int64_t> recycled_commit_partition {0};
+    std::atomic<int64_t> recycled_drop_partition {0};
+    std::atomic<int64_t> recycled_commit_index {0};
+    std::atomic<int64_t> recycled_drop_index {0};
+    std::atomic<int64_t> recycled_update_tablet {0};
+    std::atomic<int64_t> recycled_compaction {0};
+    std::atomic<int64_t> recycled_schema_change {0};
+    std::atomic<int64_t> recycled_commit_txn {0};
+};
+
 // Helper class to check if operation logs can be recycled based on snapshots 
and versionstamps
 class OperationLogRecycleChecker {
 public:
diff --git a/cloud/src/recycler/recycler_operation_log.cpp 
b/cloud/src/recycler/recycler_operation_log.cpp
index 71eda034d5f..2caa32ef705 100644
--- a/cloud/src/recycler/recycler_operation_log.cpp
+++ b/cloud/src/recycler/recycler_operation_log.cpp
@@ -31,6 +31,8 @@
 #include <utility>
 #include <vector>
 
+#include "common/bvars.h"
+#include "common/config.h"
 #include "common/defer.h"
 #include "common/encryption_util.h"
 #include "common/logging.h"
@@ -650,6 +652,50 @@ static TxnErrorCode get_txn_info(TxnKv* txn_kv, 
std::string_view instance_id, in
     return TxnErrorCode::TXN_OK;
 }
 
+static void report_oplog_recycle_stats(const std::string& instance_id,
+                                       const OplogRecycleStats& stats) {
+    g_bvar_recycler_oplog_last_round_total_num.put({instance_id}, 
stats.total_num.load());
+    g_bvar_recycler_oplog_last_round_not_recycled_num.put({instance_id},
+                                                          
stats.not_recycled_num.load());
+    if (stats.failed_num.load() > 0) {
+        g_bvar_recycler_oplog_recycle_failed_num.put({instance_id}, 
stats.failed_num.load());
+    }
+    // Per-type last round counts (mBvarStatus, overwritten each round)
+    g_bvar_recycler_oplog_last_round_recycled_commit_partition_num.put(
+            {instance_id}, stats.recycled_commit_partition.load());
+    g_bvar_recycler_oplog_last_round_recycled_drop_partition_num.put(
+            {instance_id}, stats.recycled_drop_partition.load());
+    g_bvar_recycler_oplog_last_round_recycled_commit_index_num.put(
+            {instance_id}, stats.recycled_commit_index.load());
+    g_bvar_recycler_oplog_last_round_recycled_drop_index_num.put({instance_id},
+                                                                 
stats.recycled_drop_index.load());
+    g_bvar_recycler_oplog_last_round_recycled_update_tablet_num.put(
+            {instance_id}, stats.recycled_update_tablet.load());
+    g_bvar_recycler_oplog_last_round_recycled_compaction_num.put({instance_id},
+                                                                 
stats.recycled_compaction.load());
+    g_bvar_recycler_oplog_last_round_recycled_schema_change_num.put(
+            {instance_id}, stats.recycled_schema_change.load());
+    g_bvar_recycler_oplog_last_round_recycled_commit_txn_num.put({instance_id},
+                                                                 
stats.recycled_commit_txn.load());
+    // Per-type cumulative counts (mBvarIntAdder, accumulated across rounds)
+    g_bvar_recycler_oplog_recycled_commit_partition_num.put({instance_id},
+                                                            
stats.recycled_commit_partition.load());
+    g_bvar_recycler_oplog_recycled_drop_partition_num.put({instance_id},
+                                                          
stats.recycled_drop_partition.load());
+    g_bvar_recycler_oplog_recycled_commit_index_num.put({instance_id},
+                                                        
stats.recycled_commit_index.load());
+    g_bvar_recycler_oplog_recycled_drop_index_num.put({instance_id},
+                                                      
stats.recycled_drop_index.load());
+    g_bvar_recycler_oplog_recycled_update_tablet_num.put({instance_id},
+                                                         
stats.recycled_update_tablet.load());
+    g_bvar_recycler_oplog_recycled_compaction_num.put({instance_id},
+                                                      
stats.recycled_compaction.load());
+    g_bvar_recycler_oplog_recycled_schema_change_num.put({instance_id},
+                                                         
stats.recycled_schema_change.load());
+    g_bvar_recycler_oplog_recycled_commit_txn_num.put({instance_id},
+                                                      
stats.recycled_commit_txn.load());
+}
+
 int InstanceRecycler::recycle_operation_logs() {
     if (!should_recycle_versioned_keys()) {
         VLOG_DEBUG << "instance " << instance_id_
@@ -664,6 +710,18 @@ int InstanceRecycler::recycle_operation_logs() {
     AnnotateTag tag("instance_id", instance_id_);
     LOG_WARNING("begin to recycle operation logs");
 
+    const std::string task_name = "recycle_operation_logs";
+    RecyclerMetricsContext metrics_context(instance_id_, task_name);
+    OplogRecycleStats oplog_stats;
+
+    // scan_and_statistics_operation_logs() is expensive (scans lots of KVs),
+    // so it's controlled by enable_recycler_stats_metrics.
+    // The other stats (counting what was actually recycled) are lightweight
+    // and always collected.
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_operation_logs();
+    }
+
     StopWatch stop_watch;
     size_t total_operation_logs = 0;
     size_t recycled_operation_logs = 0;
@@ -672,6 +730,9 @@ int InstanceRecycler::recycle_operation_logs() {
     size_t recycled_operation_log_data_size = 0;
 
     DORIS_CLOUD_DEFER {
+        metrics_context.finish_report();
+        report_oplog_recycle_stats(instance_id_, oplog_stats);
+
         int64_t cost = stop_watch.elapsed_us() / 1000'000;
         LOG_WARNING("recycle operation logs, cost={}s", cost)
                 .tag("total_operation_logs", total_operation_logs)
@@ -705,16 +766,24 @@ int InstanceRecycler::recycle_operation_logs() {
         OperationLogReferenceInfo reference_info;
         if (recycle_checker.can_recycle(log_versionstamp, 
operation_log.min_timestamp(),
                                         &reference_info)) {
+            metrics_context.total_need_recycle_num++;
+            metrics_context.total_need_recycle_data_size += value_size;
+
             AnnotateTag tag("log_key", hex(key));
-            int res = recycle_operation_log(log_versionstamp, raw_keys, 
std::move(operation_log));
+            int res = recycle_operation_log(log_versionstamp, raw_keys, 
std::move(operation_log),
+                                            &oplog_stats);
             if (res != 0) {
                 LOG_WARNING("failed to recycle operation 
log").tag("error_code", res);
+                oplog_stats.failed_num.fetch_add(1, std::memory_order_relaxed);
                 return res;
             }
 
             recycled_operation_logs++;
             recycled_operation_log_data_size += value_size;
+            metrics_context.total_recycled_num++;
+            metrics_context.total_recycled_data_size += value_size;
         } else {
+            oplog_stats.not_recycled_num.fetch_add(1, 
std::memory_order_relaxed);
             int res = calculator.calculate_operation_log_data_size(key, 
operation_log,
                                                                    
reference_info);
             if (res != 0) {
@@ -726,6 +795,8 @@ int InstanceRecycler::recycle_operation_logs() {
         total_operation_logs++;
         operation_log_data_size += value_size;
         max_operation_log_data_size = std::max(max_operation_log_data_size, 
value_size);
+        oplog_stats.total_num.fetch_add(1, std::memory_order_relaxed);
+        metrics_context.report();
         return 0;
     };
 
@@ -800,8 +871,11 @@ int InstanceRecycler::recycle_operation_logs() {
 
 int InstanceRecycler::recycle_operation_log(Versionstamp log_version,
                                             const std::vector<std::string>& 
raw_keys,
-                                            OperationLogPB operation_log) {
+                                            OperationLogPB operation_log,
+                                            OplogRecycleStats* oplog_stats) {
     int recycle_log_count = 0;
+    // Track which oplog type was recycled (only one per log entry)
+    std::atomic<int64_t>* recycled_counter = nullptr;
     OperationLogRecycler log_recycler(instance_id_, txn_kv_.get(), log_version,
                                       operation_log.min_timestamp(), raw_keys);
     RETURN_ON_FAILURE(log_recycler.begin());
@@ -817,6 +891,9 @@ int InstanceRecycler::recycle_operation_log(Versionstamp 
log_version,
                 return res;                                               \
             }                                                             \
             recycle_log_count++;                                          \
+            if (oplog_stats) {                                            \
+                recycled_counter = &oplog_stats->recycled_##log_type;     \
+            }                                                             \
         }                                                                 \
     } while (0)
 
@@ -858,6 +935,9 @@ int InstanceRecycler::recycle_operation_log(Versionstamp 
log_version,
         }
 
         recycle_log_count++;
+        if (oplog_stats) {
+            recycled_counter = &oplog_stats->recycled_commit_txn;
+        }
     }
 
     if (recycle_log_count > 1) {
@@ -868,7 +948,11 @@ int InstanceRecycler::recycle_operation_log(Versionstamp 
log_version,
         return -1; // This is an unexpected condition, should not happen
     }
 
-    return log_recycler.commit();
+    int ret = log_recycler.commit();
+    if (ret == 0 && recycled_counter) {
+        recycled_counter->fetch_add(1, std::memory_order_relaxed);
+    }
+    return ret;
 }
 
 } // namespace doris::cloud
diff --git a/cloud/test/recycler_operation_log_test.cpp 
b/cloud/test/recycler_operation_log_test.cpp
index 8eb1cb6f4b2..1d3c3e65065 100644
--- a/cloud/test/recycler_operation_log_test.cpp
+++ b/cloud/test/recycler_operation_log_test.cpp
@@ -16,15 +16,18 @@
 // under the License.
 
 #include <butil/strings/string_split.h>
+#include <bvar/variable.h>
 #include <fmt/core.h>
 #include <gen_cpp/cloud.pb.h>
 #include <gen_cpp/olap_file.pb.h>
 #include <gtest/gtest.h>
 
+#include <climits>
 #include <cstdint>
 #include <memory>
 #include <string>
 
+#include "common/bvars.h"
 #include "common/config.h"
 #include "common/util.h"
 #include "cpp/sync_point.h"
@@ -2807,4 +2810,479 @@ TEST(RecycleOperationLogTest, 
RecycleCompactionLogKeepKeyBoundsWhenDisabled) {
     }
 }
 
+// ============================================================================
+// Tests for OplogRecycleStats and recycle_operation_logs metrics integration
+// ============================================================================
+
+TEST(RecycleOperationLogTest, OplogRecycleStatsAtomicOperations) {
+    OplogRecycleStats stats;
+
+    // Test initial values
+    EXPECT_EQ(stats.total_num.load(), 0);
+    EXPECT_EQ(stats.not_recycled_num.load(), 0);
+    EXPECT_EQ(stats.failed_num.load(), 0);
+    EXPECT_EQ(stats.recycled_commit_partition.load(), 0);
+    EXPECT_EQ(stats.recycled_drop_partition.load(), 0);
+    EXPECT_EQ(stats.recycled_commit_index.load(), 0);
+    EXPECT_EQ(stats.recycled_drop_index.load(), 0);
+    EXPECT_EQ(stats.recycled_update_tablet.load(), 0);
+    EXPECT_EQ(stats.recycled_compaction.load(), 0);
+    EXPECT_EQ(stats.recycled_schema_change.load(), 0);
+    EXPECT_EQ(stats.recycled_commit_txn.load(), 0);
+
+    // Test fetch_add
+    stats.not_recycled_num.fetch_add(3, std::memory_order_relaxed);
+    EXPECT_EQ(stats.not_recycled_num.load(), 3);
+
+    stats.failed_num.fetch_add(1, std::memory_order_relaxed);
+    EXPECT_EQ(stats.failed_num.load(), 1);
+
+    stats.recycled_drop_partition.fetch_add(5, std::memory_order_relaxed);
+    stats.recycled_drop_index.fetch_add(2, std::memory_order_relaxed);
+    stats.recycled_compaction.fetch_add(10, std::memory_order_relaxed);
+    stats.recycled_commit_txn.fetch_add(7, std::memory_order_relaxed);
+    EXPECT_EQ(stats.recycled_drop_partition.load(), 5);
+    EXPECT_EQ(stats.recycled_drop_index.load(), 2);
+    EXPECT_EQ(stats.recycled_compaction.load(), 10);
+    EXPECT_EQ(stats.recycled_commit_txn.load(), 7);
+
+    // Test total_num
+    stats.total_num.fetch_add(10, std::memory_order_relaxed);
+    EXPECT_EQ(stats.total_num.load(), 10);
+}
+
+TEST(RecycleOperationLogTest, OplogRecycleStatsPerTypeCounters) {
+    OplogRecycleStats stats;
+
+    // Simulate multiple oplog recycling events
+    stats.recycled_drop_partition.fetch_add(1, std::memory_order_relaxed);
+    stats.recycled_drop_partition.fetch_add(1, std::memory_order_relaxed);
+    stats.recycled_drop_index.fetch_add(1, std::memory_order_relaxed);
+    stats.recycled_compaction.fetch_add(1, std::memory_order_relaxed);
+    stats.recycled_compaction.fetch_add(1, std::memory_order_relaxed);
+    stats.recycled_compaction.fetch_add(1, std::memory_order_relaxed);
+    stats.recycled_commit_txn.fetch_add(1, std::memory_order_relaxed);
+
+    EXPECT_EQ(stats.recycled_commit_partition.load(), 0);
+    EXPECT_EQ(stats.recycled_drop_partition.load(), 2);
+    EXPECT_EQ(stats.recycled_commit_index.load(), 0);
+    EXPECT_EQ(stats.recycled_drop_index.load(), 1);
+    EXPECT_EQ(stats.recycled_update_tablet.load(), 0);
+    EXPECT_EQ(stats.recycled_compaction.load(), 3);
+    EXPECT_EQ(stats.recycled_schema_change.load(), 0);
+    EXPECT_EQ(stats.recycled_commit_txn.load(), 1);
+}
+
+// Test recycle_operation_logs with stats enabled: drop partition + drop index 
logs
+// Verifies per-type recycled counts and bvar reporting
+TEST(RecycleOperationLogTest, RecycleOperationLogsWithStatsEnabled) {
+    auto old_flag = config::enable_recycler_stats_metrics;
+    config::enable_recycler_stats_metrics = true;
+    DORIS_CLOUD_DEFER {
+        config::enable_recycler_stats_metrics = old_flag;
+    };
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    txn_kv->update_commit_version(1000);
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
+    auto* obj_info = instance.add_obj_info();
+    obj_info->set_id("recycle_oplog_stats");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("recycle_oplog_stats");
+    update_instance_info(txn_kv.get(), instance);
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    ASSERT_EQ(recycler.init(), 0);
+
+    uint64_t db_id = 1;
+    uint64_t table_id = 100;
+    uint64_t index_id1 = 201;
+    uint64_t index_id2 = 202;
+    uint64_t partition_id1 = 301;
+    uint64_t partition_id2 = 302;
+    int64_t expiration = ::time(nullptr) + 3600;
+
+    // Create table version so the recycler can remove it
+    {
+        std::string ver_key = versioned::table_version_key({instance_id, 
table_id});
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        versioned_put(txn.get(), ver_key, "");
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    // Put a drop partition log with 2 partitions
+    {
+        std::string log_key = versioned::log_key(instance_id);
+        OperationLogPB operation_log;
+        operation_log.set_min_timestamp(100);
+        auto* drop_partition = operation_log.mutable_drop_partition();
+        drop_partition->set_db_id(db_id);
+        drop_partition->set_table_id(table_id);
+        drop_partition->add_index_ids(index_id1);
+        drop_partition->add_partition_ids(partition_id1);
+        drop_partition->add_partition_ids(partition_id2);
+        drop_partition->set_expired_at_s(expiration);
+        drop_partition->set_update_table_version(true);
+
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        versioned::blob_put(txn.get(), log_key, operation_log);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    // Put a drop index log with 2 indexes
+    {
+        std::string log_key = versioned::log_key(instance_id);
+        OperationLogPB operation_log;
+        operation_log.set_min_timestamp(101);
+        auto* drop_index = operation_log.mutable_drop_index();
+        drop_index->set_db_id(db_id);
+        drop_index->set_table_id(table_id);
+        drop_index->add_index_ids(index_id1);
+        drop_index->add_index_ids(index_id2);
+
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        versioned::blob_put(txn.get(), log_key, operation_log);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    // Save cumulative baselines before recycling (mBvarIntAdder accumulates 
across tests)
+    int64_t baseline_drop_partition =
+            
g_bvar_recycler_oplog_recycled_drop_partition_num.get({instance_id});
+    int64_t baseline_drop_index = 
g_bvar_recycler_oplog_recycled_drop_index_num.get({instance_id});
+
+    // Recycle the operation logs - stats should be collected
+    ASSERT_EQ(recycler.recycle_operation_logs(), 0);
+
+    // Verify recycle partition records were created
+    {
+        std::string recycle_key1 = recycle_partition_key({instance_id, 
partition_id1});
+        std::string recycle_key2 = recycle_partition_key({instance_id, 
partition_id2});
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string value;
+        ASSERT_EQ(txn->get(recycle_key1, &value), TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(recycle_key2, &value), TxnErrorCode::TXN_OK);
+    }
+
+    // Verify recycle index records were created
+    {
+        std::string recycle_key1 = recycle_index_key({instance_id, index_id1});
+        std::string recycle_key2 = recycle_index_key({instance_id, index_id2});
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string value;
+        ASSERT_EQ(txn->get(recycle_key1, &value), TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(recycle_key2, &value), TxnErrorCode::TXN_OK);
+    }
+
+    // Verify total oplog count: 2 logs (1 drop_partition + 1 drop_index)
+    {
+        int64_t total = 
g_bvar_recycler_oplog_last_round_total_num.get({instance_id});
+        EXPECT_EQ(total, 2) << "Should have scanned 2 operation logs in total";
+    }
+
+    // Verify per-type last round recycled counts: 1 drop_partition log and 1 
drop_index log
+    {
+        int last_round_drop_partition =
+                
g_bvar_recycler_oplog_last_round_recycled_drop_partition_num.get({instance_id});
+        EXPECT_EQ(last_round_drop_partition, 1) << "Should have recycled 1 
drop_partition log";
+    }
+    {
+        int last_round_drop_index =
+                
g_bvar_recycler_oplog_last_round_recycled_drop_index_num.get({instance_id});
+        EXPECT_EQ(last_round_drop_index, 1) << "Should have recycled 1 
drop_index log";
+    }
+
+    // Verify per-type cumulative recycled counts (check delta from baseline)
+    {
+        int64_t recycled_drop_partition =
+                
g_bvar_recycler_oplog_recycled_drop_partition_num.get({instance_id});
+        EXPECT_EQ(recycled_drop_partition - baseline_drop_partition, 1)
+                << "Cumulative drop_partition delta should be 1";
+    }
+    {
+        int64_t recycled_drop_index =
+                
g_bvar_recycler_oplog_recycled_drop_index_num.get({instance_id});
+        EXPECT_EQ(recycled_drop_index - baseline_drop_index, 1)
+                << "Cumulative drop_index delta should be 1";
+    }
+}
+
+// Test recycle_operation_logs with snapshot protection: some logs skipped
+TEST(RecycleOperationLogTest, RecycleOperationLogsSkippedBySnapshot) {
+    auto old_flag = config::enable_recycler_stats_metrics;
+    config::enable_recycler_stats_metrics = true;
+    DORIS_CLOUD_DEFER {
+        config::enable_recycler_stats_metrics = old_flag;
+    };
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    txn_kv->update_commit_version(1000);
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    std::string test_instance_id = "test_oplog_not_recycled";
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(test_instance_id);
+    
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
+    auto* obj_info = instance.add_obj_info();
+    obj_info->set_id("recycle_snapshot_skip");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("recycle_snapshot_skip");
+
+    // Store instance info
+    {
+        std::string key = instance_key({test_instance_id});
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, instance.SerializeAsString());
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    auto insert_empty_value = [&]() {
+        std::unique_ptr<Transaction> txn;
+        EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put("dummy_snapshot_test", "");
+        EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    };
+
+    // Create table version
+    uint64_t table_id = 500;
+    {
+        std::string ver_key = versioned::table_version_key({test_instance_id, 
table_id});
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        versioned_put(txn.get(), ver_key, "");
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    // Put an old operation log (will be recyclable)
+    {
+        std::string log_key = versioned::log_key(test_instance_id);
+        OperationLogPB operation_log;
+        operation_log.set_min_timestamp(100);
+
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        versioned::blob_put(txn.get(), log_key, operation_log);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    insert_empty_value();
+
+    // Write a snapshot (this creates a protection boundary)
+    {
+        SnapshotPB snapshot;
+        std::string snapshot_key = 
versioned::snapshot_full_key(test_instance_id);
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        versioned_put(txn.get(), snapshot_key, snapshot.SerializeAsString());
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    insert_empty_value();
+
+    // Put a new operation log after the snapshot (min_timestamp before 
snapshot)
+    // This log should be skipped by snapshot protection
+    {
+        std::string log_key = versioned::log_key(test_instance_id);
+        OperationLogPB operation_log;
+        // min_timestamp is set to a value before the snapshot, so the snapshot
+        // falls within [min_timestamp, log_version) and the log can't be 
recycled
+        operation_log.set_min_timestamp(100);
+
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        versioned::blob_put(txn.get(), log_key, operation_log);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    insert_empty_value();
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    ASSERT_EQ(recycler.init(), 0);
+
+    // Recycle - some logs should be recycled, some not recycled due to 
snapshot
+    ASSERT_EQ(recycler.recycle_operation_logs(), 0);
+
+    // Verify not_recycled bvar is set
+    {
+        int64_t not_recycled =
+                
g_bvar_recycler_oplog_last_round_not_recycled_num.get({test_instance_id});
+        EXPECT_GE(not_recycled, 1) << "At least one log should not be 
recycled";
+    }
+}
+
+// Test recycle_operation_logs with stats disabled (default)
+TEST(RecycleOperationLogTest, RecycleOperationLogsStatsDisabled) {
+    auto old_flag = config::enable_recycler_stats_metrics;
+    config::enable_recycler_stats_metrics = false;
+    DORIS_CLOUD_DEFER {
+        config::enable_recycler_stats_metrics = old_flag;
+    };
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    txn_kv->update_commit_version(1000);
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
+    auto* obj_info = instance.add_obj_info();
+    obj_info->set_id("recycle_stats_disabled");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("recycle_stats_disabled");
+    update_instance_info(txn_kv.get(), instance);
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    ASSERT_EQ(recycler.init(), 0);
+
+    // Put an empty operation log
+    {
+        std::string log_key = versioned::log_key(instance_id);
+        Versionstamp versionstamp(123, 0);
+        OperationLogPB operation_log;
+        operation_log.set_min_timestamp(versionstamp.version());
+
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        versioned::blob_put(txn.get(), log_key, versionstamp, operation_log);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    // Recycle should succeed without stats collection
+    ASSERT_EQ(recycler.recycle_operation_logs(), 0);
+
+    // Verify all logs were recycled
+    remove_instance_info(txn_kv.get());
+    ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get());
+}
+
+// Test recycle_operation_logs compaction log with stats: recycled_compaction 
tracking
+TEST(RecycleOperationLogTest, RecycleCompactionLogWithStats) {
+    auto old_flag = config::enable_recycler_stats_metrics;
+    config::enable_recycler_stats_metrics = true;
+    DORIS_CLOUD_DEFER {
+        config::enable_recycler_stats_metrics = old_flag;
+    };
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    txn_kv->update_commit_version(1000);
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    std::string test_instance_id = "test_compaction_log_stats";
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(test_instance_id);
+    
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
+    auto* obj_info = instance.add_obj_info();
+    obj_info->set_id("compaction_stats");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("compaction_stats");
+
+    {
+        std::string key = instance_key({test_instance_id});
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        txn->put(key, instance.SerializeAsString());
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    ASSERT_EQ(recycler.init(), 0);
+
+    int64_t table_id = 600;
+    int64_t tablet_id = 603;
+
+    // Create table version
+    {
+        std::string ver_key = versioned::table_version_key({test_instance_id, 
table_id});
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        versioned_put(txn.get(), ver_key, "");
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    // Put a compaction log with 3 recycle rowsets
+    {
+        std::string log_key = versioned::log_key(test_instance_id);
+        OperationLogPB operation_log;
+        operation_log.set_min_timestamp(100);
+        auto* compaction = operation_log.mutable_compaction();
+        compaction->set_tablet_id(tablet_id);
+
+        for (int i = 0; i < 3; i++) {
+            auto* recycle_rowset = compaction->add_recycle_rowsets();
+            auto* rowset_meta = recycle_rowset->mutable_rowset_meta();
+            rowset_meta->set_rowset_id(0);
+            rowset_meta->set_tablet_id(tablet_id);
+            rowset_meta->set_rowset_id_v2(fmt::format("compaction_rowset_{}", 
i));
+            rowset_meta->set_start_version(i * 10);
+            rowset_meta->set_end_version(i * 10 + 9);
+        }
+
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        versioned::blob_put(txn.get(), log_key, operation_log);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    // Recycle the operation logs
+    ASSERT_EQ(recycler.recycle_operation_logs(), 0);
+
+    // Verify recycle rowset records were created (3 rowsets from compaction 
log)
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        for (int i = 0; i < 3; i++) {
+            std::string rowset_id = fmt::format("compaction_rowset_{}", i);
+            std::string recycle_key = recycle_rowset_key({test_instance_id, 
tablet_id, rowset_id});
+            std::string value;
+            TxnErrorCode err = txn->get(recycle_key, &value);
+            EXPECT_EQ(err, TxnErrorCode::TXN_OK)
+                    << "Recycle rowset key not found for rowset " << rowset_id;
+        }
+    }
+
+    // Verify last_round recycled_compaction bvar count (1 compaction log 
recycled)
+    {
+        int last_round_compaction =
+                
g_bvar_recycler_oplog_last_round_recycled_compaction_num.get({test_instance_id});
+        EXPECT_EQ(last_round_compaction, 1) << "Last round should have 
recycled 1 compaction log";
+    }
+    // Verify cumulative recycled_compaction bvar count
+    {
+        int recycled_compaction =
+                
g_bvar_recycler_oplog_recycled_compaction_num.get({test_instance_id});
+        EXPECT_EQ(recycled_compaction, 1) << "Cumulative compaction should be 
1";
+    }
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to