This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3426b30f5bc branch-3.0: [enhancement](txn lazy commit) Add more ut and 
log for recycler about txn lazy commit #51310 (#51338)
3426b30f5bc is described below

commit 3426b30f5bc2f354305b9a8da39a2d04e6a87c73
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 29 09:39:36 2025 +0800

    branch-3.0: [enhancement](txn lazy commit) Add more ut and log for recycler 
about txn lazy commit #51310 (#51338)
    
    Cherry-picked from #51310
    
    Co-authored-by: Lei Zhang <[email protected]>
---
 cloud/src/recycler/recycler.cpp     |  62 +++-
 cloud/test/txn_lazy_commit_test.cpp | 697 ++++++++++++++++++++++++++++++++++--
 2 files changed, 733 insertions(+), 26 deletions(-)

diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 307beab63d4..f9197a91017 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -941,6 +941,11 @@ bool check_lazy_txn_finished(std::shared_ptr<TxnKv> 
txn_kv, const std::string in
     }
 
     if (!tablet_idx_pb.has_db_id()) {
+        // In the previous version, the db_id was not set in the index_pb.
+        // If updating to the version which enable txn lazy commit, the db_id 
will be set.
+        LOG(INFO) << "txn index has no db_id, tablet_id=" << tablet_id
+                  << " instance_id=" << instance_id
+                  << " tablet_idx_pb=" << tablet_idx_pb.ShortDebugString();
         return true;
     }
 
@@ -951,6 +956,12 @@ bool check_lazy_txn_finished(std::shared_ptr<TxnKv> 
txn_kv, const std::string in
     err = txn->get(ver_key, &ver_val);
 
     if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
+        LOG(INFO) << ""
+                     "partition version not found, instance_id="
+                  << instance_id << " db_id=" << tablet_idx_pb.db_id()
+                  << " table_id=" << tablet_idx_pb.table_id()
+                  << " partition_id=" << tablet_idx_pb.partition_id() << " 
tablet_id=" << tablet_id
+                  << " key=" << hex(ver_key);
         return true;
     }
 
@@ -974,6 +985,7 @@ bool check_lazy_txn_finished(std::shared_ptr<TxnKv> txn_kv, 
const std::string in
     }
 
     if (version_pb.pending_txn_ids_size() > 0) {
+        TEST_SYNC_POINT_CALLBACK("check_lazy_txn_finished::txn_not_finished");
         DCHECK(version_pb.pending_txn_ids_size() == 1);
         LOG(WARNING) << "lazy txn not finished, instance_id=" << instance_id
                      << " db_id=" << tablet_idx_pb.db_id()
@@ -1298,7 +1310,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
         int64_t tablet_id = tablet_meta_pb.tablet_id();
 
         if (!check_lazy_txn_finished(txn_kv_, instance_id_, 
tablet_meta_pb.tablet_id())) {
-            LOG(WARNING) << "lazy txn not finished tablet_id=" << 
tablet_meta_pb.tablet_id();
+            LOG(WARNING) << "lazy txn not finished tablet_meta_pb=" << 
tablet_meta_pb.tablet_id();
             return -1;
         }
 
@@ -1672,6 +1684,8 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
     int ret = 0;
     auto start_time = steady_clock::now();
 
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("recycle_tablet::begin", (int)0);
+
     // collect resource ids
     std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0});
     std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0});
@@ -2099,7 +2113,8 @@ int InstanceRecycler::recycle_rowsets() {
     return ret;
 }
 
-bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv, const std::string& 
instance_id, int64_t txn_id) {
+bool is_txn_finished(std::shared_ptr<TxnKv> txn_kv, const std::string& 
instance_id,
+                     int64_t txn_id) {
     std::unique_ptr<Transaction> txn;
     TxnErrorCode err = txn_kv->create_txn(&txn);
     if (err != TxnErrorCode::TXN_OK) {
@@ -2112,7 +2127,10 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv, const 
std::string& instance_i
     err = txn->get(index_key, &index_val);
     if (err != TxnErrorCode::TXN_OK) {
         if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
+            TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_recycled");
             // txn has been recycled;
+            LOG(INFO) << "txn index key has been recycled, txn_id=" << txn_id
+                      << " instance_id=" << instance_id;
             return true;
         }
         LOG(WARNING) << "failed to get txn index key, txn_id=" << txn_id
@@ -2129,13 +2147,29 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv, 
const std::string& instance_i
     }
 
     DCHECK(index_pb.has_tablet_index() == true);
-    DCHECK(index_pb.tablet_index().has_db_id() == true);
+    if (!index_pb.tablet_index().has_db_id()) {
+        // In the previous version, the db_id was not set in the index_pb.
+        // If updating to the version which enable txn lazy commit, the db_id 
will be set.
+        LOG(INFO) << "txn index has no db_id, txn_id=" << txn_id << " 
instance_id=" << instance_id
+                  << " index=" << index_pb.ShortDebugString();
+        return true;
+    }
+
     int64_t db_id = index_pb.tablet_index().db_id();
+    DCHECK_GT(db_id, 0) << "db_id=" << db_id << " txn_id=" << txn_id
+                        << " instance_id=" << instance_id;
 
     std::string info_val;
     const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
     err = txn->get(info_key, &info_val);
     if (err != TxnErrorCode::TXN_OK) {
+        if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
+            // txn info has been recycled;
+            LOG(INFO) << "txn info key has been recycled, db_id=" << db_id << 
" txn_id=" << txn_id
+                      << " instance_id=" << instance_id;
+            return true;
+        }
+
         DCHECK(err != TxnErrorCode::TXN_KEY_NOT_FOUND);
         LOG(WARNING) << "failed to get txn info key, txn_id=" << txn_id
                      << " instance_id=" << instance_id << " key=" << 
hex(info_key)
@@ -2149,10 +2183,17 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv, 
const std::string& instance_i
                      << " instance_id=" << instance_id;
         return false;
     }
-    DCHECK(txn_info.txn_id() == txn_id);
-    if (TxnStatusPB::TXN_STATUS_ABORTED == txn_info.status()) {
+
+    DCHECK(txn_info.txn_id() == txn_id) << "txn_id=" << txn_id << " 
instance_id=" << instance_id
+                                        << " txn_info=" << 
txn_info.ShortDebugString();
+
+    if (TxnStatusPB::TXN_STATUS_ABORTED == txn_info.status() ||
+        TxnStatusPB::TXN_STATUS_VISIBLE == txn_info.status()) {
+        TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_aborted", 
&txn_info);
         return true;
     }
+
+    TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_not_finished", &txn_info);
     return false;
 }
 
@@ -2233,7 +2274,16 @@ int InstanceRecycler::recycle_tmp_rowsets() {
             return 0;
         }
 
-        if (!is_txn_aborted(txn_kv_, instance_id_, rowset.txn_id())) {
+        DCHECK_GT(rowset.txn_id(), 0)
+                << "txn_id=" << rowset.txn_id() << " rowset=" << 
rowset.ShortDebugString();
+        if (!is_txn_finished(txn_kv_, instance_id_, rowset.txn_id())) {
+            LOG(INFO) << "txn is not finished, skip recycle tmp rowset, 
instance_id="
+                      << instance_id_ << " tablet_id=" << rowset.tablet_id()
+                      << " rowset_id=" << rowset.rowset_id_v2() << " version=["
+                      << rowset.start_version() << '-' << rowset.end_version()
+                      << "] txn_id=" << rowset.txn_id()
+                      << " creation_time=" << rowset.creation_time() << " 
expiration=" << expiration
+                      << " txn_expiration=" << rowset.txn_expiration();
             return 0;
         }
 
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index 5737e6e9eef..81a5c8a2d99 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -75,6 +75,7 @@ int main(int argc, char** argv) {
     config::enable_txn_store_retry = false;
 
     config::label_keep_max_second = 0;
+    config::force_immediate_recycle = true;
 
     if (!doris::cloud::init_glog("txn_lazy_commit_test")) {
         std::cerr << "failed to init glog" << std::endl;
@@ -155,21 +156,23 @@ static void create_tablet_with_db_id(MetaServiceProxy* 
meta_service, int64_t db_
     ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
 }
 
-static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t 
tablet_id, int partition_id,
-                                              int64_t version = -1, int 
num_rows = 100) {
+static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t 
tablet_id, int index_id,
+                                              int partition_id, int64_t 
version = -1,
+                                              int num_rows = 100) {
     doris::RowsetMetaCloudPB rowset;
     rowset.set_rowset_id(0); // required
     rowset.set_rowset_id_v2(next_rowset_id());
     rowset.set_tablet_id(tablet_id);
     rowset.set_partition_id(partition_id);
+    rowset.set_index_id(index_id);
     rowset.set_txn_id(txn_id);
     if (version > 0) {
         rowset.set_start_version(version);
         rowset.set_end_version(version);
     }
-    rowset.set_num_segments(1);
-    rowset.set_num_rows(num_rows);
-    rowset.set_data_disk_size(num_rows * 100);
+    rowset.set_num_segments(0);
+    rowset.set_num_rows(0);
+    rowset.set_data_disk_size(0);
     rowset.mutable_tablet_schema()->set_schema_version(0);
     rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK
     return rowset;
@@ -422,7 +425,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) {
     for (int i = 0; i < 5; ++i) {
         create_tablet_without_db_id(meta_service.get(), table_id, index_id, 
partition_id,
                                     tablet_id_base + i);
-        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
partition_id);
+        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
         tmp_rowsets_meta.push_back(std::make_pair("mock_tmp_rowset_key", 
tmp_rowset));
         CreateRowsetResponse res;
         commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -510,7 +513,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) 
{
     for (int i = 0; i < 5; ++i) {
         create_tablet_without_db_id(meta_service.get(), table_id, index_id, 
partition_id,
                                     tablet_id_base + i);
-        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
partition_id);
+        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
         CreateRowsetResponse res;
         commit_rowset(meta_service.get(), tmp_rowset, res);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -591,7 +594,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) 
{
     for (int i = 0; i < 5; ++i) {
         create_tablet_without_db_id(meta_service.get(), table_id, index_id, 
partition_id,
                                     tablet_id_base + i);
-        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
partition_id);
+        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
         CreateRowsetResponse res;
         commit_rowset(meta_service.get(), tmp_rowset, res);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -700,7 +703,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) {
     for (int i = 0; i < 5; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
-        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
partition_id);
+        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
         CreateRowsetResponse res;
         commit_rowset(meta_service.get(), tmp_rowset, res);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -778,7 +781,7 @@ TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) {
     for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) {
         create_tablet_without_db_id(meta_service.get(), table_id, index_id, 
partition_id,
                                     tablet_id_base + i);
-        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
partition_id);
+        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
         CreateRowsetResponse res;
         commit_rowset(meta_service.get(), tmp_rowset, res);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -866,7 +869,7 @@ TEST(TxnLazyCommitTest, 
NotFallThroughCommitTxnEventuallyTest) {
     for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
-        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
partition_id);
+        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
         CreateRowsetResponse res;
         commit_rowset(meta_service.get(), tmp_rowset, res);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -954,7 +957,7 @@ TEST(TxnLazyCommitTest, FallThroughCommitTxnEventuallyTest) 
{
     for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) {
         create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
                                  tablet_id_base + i);
-        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
partition_id);
+        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
         CreateRowsetResponse res;
         commit_rowset(meta_service.get(), tmp_rowset, res);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1111,7 +1114,8 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase1Test) {
         }
         {
             for (int i = 0; i < 10; ++i) {
-                auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, 
partition_id);
+                auto tmp_rowset =
+                        create_rowset(txn_id1, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
                 ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1158,7 +1162,8 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase1Test) {
         }
         {
             for (int i = 0; i < 10; ++i) {
-                auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i, 
partition_id);
+                auto tmp_rowset =
+                        create_rowset(txn_id2, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
                 ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1349,7 +1354,8 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase2Test) {
         }
         {
             for (int i = 0; i < 10; ++i) {
-                auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, 
partition_id);
+                auto tmp_rowset =
+                        create_rowset(txn_id1, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
                 ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1396,7 +1402,8 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase2Test) {
         }
         {
             for (int i = 0; i < 10; ++i) {
-                auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i, 
partition_id);
+                auto tmp_rowset =
+                        create_rowset(txn_id2, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
                 ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1506,7 +1513,8 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase3Test) {
 
         {
             for (int i = 0; i < 10; ++i) {
-                auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id_base + 
i, partition_id);
+                auto tmp_rowset =
+                        create_rowset(tmp_txn_id, tablet_id_base + i, 
index_id, partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
                 ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1615,7 +1623,8 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase3Test) {
 
         {
             for (int i = 0; i < 10; ++i) {
-                auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, 
partition_id);
+                auto tmp_rowset =
+                        create_rowset(txn_id1, tablet_id_base + i, index_id, 
partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
                 ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1766,7 +1775,7 @@ TEST(TxnLazyCommitTest, 
ConcurrentCommitTxnEventuallyCase4Test) {
 
         {
             for (int i = 0; i < 10; ++i) {
-                auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
partition_id);
+                auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
                 CreateRowsetResponse res;
                 commit_rowset(meta_service.get(), tmp_rowset, res);
                 ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1851,7 +1860,7 @@ TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) {
             ASSERT_GT(res.txn_id(), 0);
         }
         {
-            auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, 
partition_id);
+            auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, index_id, 
partition_id);
             CreateRowsetResponse res;
             commit_rowset(meta_service.get(), tmp_rowset, res);
             ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1935,4 +1944,652 @@ TEST(TxnLazyCommitTest, ForceTxnLazyCommit) {
     ASSERT_LT(counter, 70000);
     config::enable_cloud_txn_lazy_commit_fuzzy_test = false;
 }
+
+TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase1) {
+    // 
===========================================================================
+    // threads concurrent execution flow:
+    //
+    //   meta-service             recycler
+    //         |                      |
+    //       begin                    |
+    //         |                      |
+    // prepare/commit rowset          |
+    //         |                      |
+    //   lazy commit submit           |
+    //         |                      |
+    //       dead                     |
+    //         |                      |
+    //         |              recyle_tmp_rowsets
+    //         |                      |
+    //         |              abort_timeout_txn
+    //         |                      |
+    //         |                      |
+    //         v                      v
+
+    auto txn_kv = get_mem_txn_kv();
+
+    int64_t db_id = 988032131;
+    int64_t table_id = 5145043;
+    int64_t index_id = 273456;
+    int64_t partition_id = 44576544;
+    std::string mock_instance = "test_instance";
+    const std::string label = "test_label_67ae2q1231";
+
+    bool commit_txn_eventullay_hit = false;
+    bool is_txn_finished_hit = false;
+    bool abort_timeout_txn_hit = false;
+
+    auto sp = SyncPoint::get_instance();
+
+    sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", 
[&](auto&& args) {
+        commit_txn_eventullay_hit = true;
+        bool* pred = try_any_cast<bool*>(args.back());
+        *pred = true;
+    });
+
+    TxnInfoPB txn_info_pb;
+    sp->set_call_back("is_txn_finished::txn_not_finished", [&](auto&& args) {
+        is_txn_finished_hit = true;
+        txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]);
+        {
+            std::unique_ptr<Transaction> txn;
+            ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+            check_txn_committed(txn, db_id, txn_info_pb.txn_id(), label);
+        }
+    });
+
+    txn_info_pb.Clear();
+    sp->set_call_back("abort_timeout_txn::advance_last_pending_txn_id", 
[&](auto&& args) {
+        abort_timeout_txn_hit = true;
+        txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]);
+        {
+            std::unique_ptr<Transaction> txn;
+            ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+            check_txn_committed(txn, db_id, txn_info_pb.txn_id(), label);
+        }
+    });
+
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    // mock rowset and tablet
+    int64_t tablet_id_base = 2313324;
+    for (int i = 0; i < 10; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id_base + i);
+    }
+    int txn_id = 0;
+    {
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_db_id(db_id);
+            txn_info_pb.set_label(label);
+            txn_info_pb.add_table_ids(table_id);
+            txn_info_pb.set_timeout_ms(36000);
+            req.mutable_txn_info()->CopyFrom(txn_info_pb);
+            BeginTxnResponse res;
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            txn_id = res.txn_id();
+            ASSERT_GT(txn_id, 0);
+        }
+
+        {
+            for (int i = 0; i < 10; ++i) {
+                auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
+                CreateRowsetResponse res;
+                commit_rowset(meta_service.get(), tmp_rowset, res);
+                ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            }
+        }
+
+        {
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id);
+            req.set_is_2pc(false);
+            req.set_enable_txn_lazy_commit(true);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+    }
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(mock_instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+
+    ASSERT_EQ(recycler.init(), 0);
+    ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0);
+    ASSERT_TRUE(is_txn_finished_hit);
+
+    ASSERT_EQ(recycler.abort_timeout_txn(), 0);
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        check_txn_visible(txn, db_id, txn_id, label);
+    }
+    sleep(1);
+    ASSERT_EQ(recycler.recycle_expired_txn_label(), 0);
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        check_txn_not_exist(txn, db_id, txn_id, label);
+    }
+    ASSERT_TRUE(abort_timeout_txn_hit);
+
+    ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0);
+
+    sp->clear_all_call_backs();
+    sp->clear_trace();
+    sp->disable_processing();
+    ASSERT_TRUE(commit_txn_eventullay_hit);
+}
+
+TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase2) {
+    // 
===========================================================================
+    // threads concurrent execution flow:
+    //
+    //   meta-service             recycler
+    //         |                      |
+    //    begin txn                   |
+    //         |                      |
+    //    abort txn                   |
+    //         |                      |
+    //         |               recyle_tmp_rowsets
+    //         |                      |
+    //         v                      v
+
+    auto txn_kv = get_mem_txn_kv();
+
+    int64_t db_id = 41414;
+    int64_t table_id = 5454146;
+    int64_t index_id = 27656;
+    int64_t partition_id = 4423544;
+    std::string mock_instance = "test_instance";
+    const std::string label = "test_label_67ae2q1231";
+
+    bool txn_has_been_aborted = false;
+
+    auto sp = SyncPoint::get_instance();
+
+    TxnInfoPB txn_info_pb;
+    sp->set_call_back("is_txn_finished::txn_has_been_aborted", [&](auto&& 
args) {
+        txn_has_been_aborted = true;
+        txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]);
+        ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_ABORTED);
+    });
+
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    // mock rowset and tablet
+    int64_t tablet_id_base = 2313324;
+    for (int i = 0; i < 10; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id_base + i);
+    }
+    int txn_id = 0;
+    {
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_db_id(db_id);
+            txn_info_pb.set_label(label);
+            txn_info_pb.add_table_ids(table_id);
+            txn_info_pb.set_timeout_ms(36000);
+            req.mutable_txn_info()->CopyFrom(txn_info_pb);
+            BeginTxnResponse res;
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            txn_id = res.txn_id();
+            ASSERT_GT(txn_id, 0);
+        }
+
+        {
+            for (int i = 0; i < 10; ++i) {
+                auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
+                CreateRowsetResponse res;
+                commit_rowset(meta_service.get(), tmp_rowset, res);
+                ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            }
+        }
+        {
+            brpc::Controller cntl;
+            AbortTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            ASSERT_GT(txn_id, 0);
+            req.set_txn_id(txn_id);
+            req.set_reason("test");
+            AbortTxnResponse res;
+            
meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            ASSERT_EQ(res.txn_info().status(), 
TxnStatusPB::TXN_STATUS_ABORTED);
+        }
+    }
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(mock_instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+
+    ASSERT_EQ(recycler.init(), 0);
+    ASSERT_FALSE(txn_has_been_aborted);
+    ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0);
+    ASSERT_TRUE(txn_has_been_aborted);
+
+    sp->clear_all_call_backs();
+    sp->clear_trace();
+    sp->disable_processing();
+}
+
+TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase3) {
+    // 
===========================================================================
+    // threads concurrent execution flow:
+    //
+    //   meta-service             recycler
+    //         |                      |
+    //    begin txn                   |
+    //         |                      |
+    //    abort txn                   |
+    //         |                      |
+    // recycle_expired_txn_label      |
+    //         |                      |
+    //         |               recyle_tmp_rowsets
+    //         |                      |
+    //         v                      v
+
+    auto txn_kv = get_mem_txn_kv();
+
+    int64_t db_id = 42345236;
+    int64_t table_id = 3165524;
+    int64_t index_id = 89089;
+    int64_t partition_id = 434154;
+    std::string mock_instance = "test_instance";
+    const std::string label = "test_label_67ae2q1231";
+
+    bool txn_has_been_recycled = false;
+
+    auto sp = SyncPoint::get_instance();
+
+    TxnInfoPB txn_info_pb;
+    sp->set_call_back("is_txn_finished::txn_has_been_recycled",
+                      [&](auto&& args) { txn_has_been_recycled = true; });
+
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    // mock rowset and tablet
+    int64_t tablet_id_base = 2313324;
+    for (int i = 0; i < 10; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id_base + i);
+    }
+    int txn_id = 0;
+    {
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_db_id(db_id);
+            txn_info_pb.set_label(label);
+            txn_info_pb.add_table_ids(table_id);
+            txn_info_pb.set_timeout_ms(36000);
+            req.mutable_txn_info()->CopyFrom(txn_info_pb);
+            BeginTxnResponse res;
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            txn_id = res.txn_id();
+            ASSERT_GT(txn_id, 0);
+        }
+
+        {
+            for (int i = 0; i < 10; ++i) {
+                auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
+                CreateRowsetResponse res;
+                commit_rowset(meta_service.get(), tmp_rowset, res);
+                ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            }
+        }
+        {
+            brpc::Controller cntl;
+            AbortTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            ASSERT_GT(txn_id, 0);
+            req.set_txn_id(txn_id);
+            req.set_reason("test");
+            AbortTxnResponse res;
+            
meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            ASSERT_EQ(res.txn_info().status(), 
TxnStatusPB::TXN_STATUS_ABORTED);
+        }
+    }
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(mock_instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+
+    ASSERT_EQ(recycler.init(), 0);
+
+    ASSERT_EQ(recycler.recycle_expired_txn_label(), 0);
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        check_txn_not_exist(txn, db_id, txn_id, label);
+    }
+
+    ASSERT_FALSE(txn_has_been_recycled);
+    ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0);
+    ASSERT_TRUE(txn_has_been_recycled);
+
+    sp->clear_all_call_backs();
+    sp->clear_trace();
+    sp->disable_processing();
+}
+TEST(TxnLazyCommitTest, RecyclePartitions) {
+    // 
===========================================================================
+    // threads concurrent execution flow:
+    //
+    //   meta-service             recycler
+    //         |                      |
+    //       begin                    |
+    //         |                      |
+    // prepare/commit rowset          |
+    //         |                      |
+    //   lazy commit submit           |
+    //         |                      |
+    //       dead                     |
+    //         |                      |
+    //   drop partition               |
+    //         |                      |
+    //         |                recyle_partitions
+    //         |                      |
+    //         |                      |
+    //         |                      |
+    //         |                      |
+    //         v                      v
+
+    auto txn_kv = get_mem_txn_kv();
+
+    int64_t db_id = 988032131;
+    int64_t table_id = 5145043;
+    int64_t index_id = 273456;
+    int64_t partition_id = 44576544;
+    std::string mock_instance = "test_instance";
+    const std::string label = "test_label_67ae2q1231";
+
+    bool commit_txn_eventullay_hit = false;
+    bool txn_not_finished_hit = false;
+
+    auto sp = SyncPoint::get_instance();
+
+    sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", 
[&](auto&& args) {
+        commit_txn_eventullay_hit = true;
+        bool* pred = try_any_cast<bool*>(args.back());
+        *pred = true;
+    });
+
+    sp->set_call_back("check_lazy_txn_finished::txn_not_finished",
+                      [&](auto&& args) { txn_not_finished_hit = true; });
+
+    sp->set_call_back("recycle_tablet::begin", [](auto&& args) {
+        auto* ret = try_any_cast_ret<int>(args);
+        ret->first = 0;
+        ret->second = true;
+    });
+
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    // mock rowset and tablet
+    int64_t tablet_id_base = 2313324;
+    for (int i = 0; i < 10; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id_base + i);
+    }
+    int txn_id = 0;
+    {
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_db_id(db_id);
+            txn_info_pb.set_label(label);
+            txn_info_pb.add_table_ids(table_id);
+            txn_info_pb.set_timeout_ms(36000);
+            req.mutable_txn_info()->CopyFrom(txn_info_pb);
+            BeginTxnResponse res;
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            txn_id = res.txn_id();
+            ASSERT_GT(txn_id, 0);
+        }
+
+        {
+            for (int i = 0; i < 10; ++i) {
+                auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
+                CreateRowsetResponse res;
+                commit_rowset(meta_service.get(), tmp_rowset, res);
+                ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            }
+        }
+
+        {
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id);
+            req.set_is_2pc(false);
+            req.set_enable_txn_lazy_commit(true);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+
+        {
+            // drop partition
+            brpc::Controller cntl;
+            PartitionRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+
+            req.set_db_id(1);
+            req.set_table_id(table_id);
+            req.add_index_ids(index_id);
+            req.add_partition_ids(partition_id);
+            PartitionResponse res;
+            meta_service->drop_partition(
+                    
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res,
+                    nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+    }
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(mock_instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+
+    ASSERT_EQ(recycler.init(), 0);
+    ASSERT_FALSE(txn_not_finished_hit);
+    ASSERT_EQ(recycler.recycle_partitions(), -1);
+    ASSERT_TRUE(txn_not_finished_hit);
+
+    ASSERT_EQ(recycler.abort_timeout_txn(), 0);
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        check_txn_visible(txn, db_id, txn_id, label);
+    }
+
+    ASSERT_EQ(recycler.recycle_partitions(), 0);
+
+    sp->clear_all_call_backs();
+    sp->clear_trace();
+    sp->disable_processing();
+}
+
+TEST(TxnLazyCommitTest, RecycleIndexes) {
+    // 
===========================================================================
+    // threads concurrent execution flow:
+    //
+    //   meta-service             recycler
+    //         |                      |
+    //       begin                    |
+    //         |                      |
+    // prepare/commit rowset          |
+    //         |                      |
+    //   lazy commit submit           |
+    //         |                      |
+    //       dead                     |
+    //         |                      |
+    //   drop indexes                 |
+    //         |                      |
+    //         |                recycle_indexes
+    //         |                      |
+    //         |                      |
+    //         |                      |
+    //         |                      |
+    //         v                      v
+
+    auto txn_kv = get_mem_txn_kv();
+
+    int64_t db_id = 4524364;
+    int64_t table_id = 65354;
+    int64_t index_id = 658432;
+    int64_t partition_id = 76553;
+    std::string mock_instance = "test_instance";
+    const std::string label = "test_label_67a34e2q1231";
+
+    bool commit_txn_eventullay_hit = false;
+    bool txn_not_finished_hit = false;
+
+    auto sp = SyncPoint::get_instance();
+
+    sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", 
[&](auto&& args) {
+        commit_txn_eventullay_hit = true;
+        bool* pred = try_any_cast<bool*>(args.back());
+        *pred = true;
+    });
+
+    sp->set_call_back("check_lazy_txn_finished::txn_not_finished",
+                      [&](auto&& args) { txn_not_finished_hit = true; });
+
+    sp->set_call_back("recycle_tablet::begin", [](auto&& args) {
+        auto* ret = try_any_cast_ret<int>(args);
+        ret->first = 0;
+        ret->second = true;
+    });
+
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    // mock rowset and tablet
+    int64_t tablet_id_base = 2313324;
+    for (int i = 0; i < 10; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id_base + i);
+    }
+    int txn_id = 0;
+    {
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_db_id(db_id);
+            txn_info_pb.set_label(label);
+            txn_info_pb.add_table_ids(table_id);
+            txn_info_pb.set_timeout_ms(36000);
+            req.mutable_txn_info()->CopyFrom(txn_info_pb);
+            BeginTxnResponse res;
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            txn_id = res.txn_id();
+            ASSERT_GT(txn_id, 0);
+        }
+
+        {
+            for (int i = 0; i < 10; ++i) {
+                auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
+                CreateRowsetResponse res;
+                commit_rowset(meta_service.get(), tmp_rowset, res);
+                ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            }
+        }
+
+        {
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id);
+            req.set_is_2pc(false);
+            req.set_enable_txn_lazy_commit(true);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+
+        {
+            // drop partition
+            brpc::Controller cntl;
+            IndexRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+
+            req.set_db_id(1);
+            req.set_table_id(table_id);
+            req.add_index_ids(index_id);
+            IndexResponse res;
+            
meta_service->drop_index(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+    }
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(mock_instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+
+    ASSERT_EQ(recycler.init(), 0);
+    ASSERT_FALSE(txn_not_finished_hit);
+    ASSERT_EQ(recycler.recycle_indexes(), -1);
+    ASSERT_TRUE(txn_not_finished_hit);
+
+    ASSERT_EQ(recycler.abort_timeout_txn(), 0);
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        check_txn_visible(txn, db_id, txn_id, label);
+    }
+
+    ASSERT_EQ(recycler.recycle_indexes(), 0);
+
+    sp->clear_all_call_backs();
+    sp->clear_trace();
+    sp->disable_processing();
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to