This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3426b30f5bc branch-3.0: [enhancement](txn lazy commit) Add more ut and
log for recycler about txn lazy commit #51310 (#51338)
3426b30f5bc is described below
commit 3426b30f5bc2f354305b9a8da39a2d04e6a87c73
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 29 09:39:36 2025 +0800
branch-3.0: [enhancement](txn lazy commit) Add more ut and log for recycler
about txn lazy commit #51310 (#51338)
Cherry-picked from #51310
Co-authored-by: Lei Zhang <[email protected]>
---
cloud/src/recycler/recycler.cpp | 62 +++-
cloud/test/txn_lazy_commit_test.cpp | 697 ++++++++++++++++++++++++++++++++++--
2 files changed, 733 insertions(+), 26 deletions(-)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 307beab63d4..f9197a91017 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -941,6 +941,11 @@ bool check_lazy_txn_finished(std::shared_ptr<TxnKv>
txn_kv, const std::string in
}
if (!tablet_idx_pb.has_db_id()) {
+ // In the previous version, the db_id was not set in the index_pb.
+ // If updating to the version which enable txn lazy commit, the db_id
will be set.
+ LOG(INFO) << "txn index has no db_id, tablet_id=" << tablet_id
+ << " instance_id=" << instance_id
+ << " tablet_idx_pb=" << tablet_idx_pb.ShortDebugString();
return true;
}
@@ -951,6 +956,12 @@ bool check_lazy_txn_finished(std::shared_ptr<TxnKv>
txn_kv, const std::string in
err = txn->get(ver_key, &ver_val);
if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
+ LOG(INFO) << ""
+ "partition version not found, instance_id="
+ << instance_id << " db_id=" << tablet_idx_pb.db_id()
+ << " table_id=" << tablet_idx_pb.table_id()
+ << " partition_id=" << tablet_idx_pb.partition_id() << "
tablet_id=" << tablet_id
+ << " key=" << hex(ver_key);
return true;
}
@@ -974,6 +985,7 @@ bool check_lazy_txn_finished(std::shared_ptr<TxnKv> txn_kv,
const std::string in
}
if (version_pb.pending_txn_ids_size() > 0) {
+ TEST_SYNC_POINT_CALLBACK("check_lazy_txn_finished::txn_not_finished");
DCHECK(version_pb.pending_txn_ids_size() == 1);
LOG(WARNING) << "lazy txn not finished, instance_id=" << instance_id
<< " db_id=" << tablet_idx_pb.db_id()
@@ -1298,7 +1310,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
int64_t tablet_id = tablet_meta_pb.tablet_id();
if (!check_lazy_txn_finished(txn_kv_, instance_id_,
tablet_meta_pb.tablet_id())) {
- LOG(WARNING) << "lazy txn not finished tablet_id=" <<
tablet_meta_pb.tablet_id();
+ LOG(WARNING) << "lazy txn not finished tablet_meta_pb=" <<
tablet_meta_pb.tablet_id();
return -1;
}
@@ -1672,6 +1684,8 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
int ret = 0;
auto start_time = steady_clock::now();
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("recycle_tablet::begin", (int)0);
+
// collect resource ids
std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0});
std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0});
@@ -2099,7 +2113,8 @@ int InstanceRecycler::recycle_rowsets() {
return ret;
}
-bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv, const std::string&
instance_id, int64_t txn_id) {
+bool is_txn_finished(std::shared_ptr<TxnKv> txn_kv, const std::string&
instance_id,
+ int64_t txn_id) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
@@ -2112,7 +2127,10 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv, const
std::string& instance_i
err = txn->get(index_key, &index_val);
if (err != TxnErrorCode::TXN_OK) {
if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
+ TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_recycled");
// txn has been recycled;
+ LOG(INFO) << "txn index key has been recycled, txn_id=" << txn_id
+ << " instance_id=" << instance_id;
return true;
}
LOG(WARNING) << "failed to get txn index key, txn_id=" << txn_id
@@ -2129,13 +2147,29 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
}
DCHECK(index_pb.has_tablet_index() == true);
- DCHECK(index_pb.tablet_index().has_db_id() == true);
+ if (!index_pb.tablet_index().has_db_id()) {
+ // In the previous version, the db_id was not set in the index_pb.
+ // If updating to the version which enable txn lazy commit, the db_id
will be set.
+ LOG(INFO) << "txn index has no db_id, txn_id=" << txn_id << "
instance_id=" << instance_id
+ << " index=" << index_pb.ShortDebugString();
+ return true;
+ }
+
int64_t db_id = index_pb.tablet_index().db_id();
+ DCHECK_GT(db_id, 0) << "db_id=" << db_id << " txn_id=" << txn_id
+ << " instance_id=" << instance_id;
std::string info_val;
const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
err = txn->get(info_key, &info_val);
if (err != TxnErrorCode::TXN_OK) {
+ if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
+ // txn info has been recycled;
+ LOG(INFO) << "txn info key has been recycled, db_id=" << db_id <<
" txn_id=" << txn_id
+ << " instance_id=" << instance_id;
+ return true;
+ }
+
DCHECK(err != TxnErrorCode::TXN_KEY_NOT_FOUND);
LOG(WARNING) << "failed to get txn info key, txn_id=" << txn_id
<< " instance_id=" << instance_id << " key=" <<
hex(info_key)
@@ -2149,10 +2183,17 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
<< " instance_id=" << instance_id;
return false;
}
- DCHECK(txn_info.txn_id() == txn_id);
- if (TxnStatusPB::TXN_STATUS_ABORTED == txn_info.status()) {
+
+ DCHECK(txn_info.txn_id() == txn_id) << "txn_id=" << txn_id << "
instance_id=" << instance_id
+ << " txn_info=" <<
txn_info.ShortDebugString();
+
+ if (TxnStatusPB::TXN_STATUS_ABORTED == txn_info.status() ||
+ TxnStatusPB::TXN_STATUS_VISIBLE == txn_info.status()) {
+ TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_aborted",
&txn_info);
return true;
}
+
+ TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_not_finished", &txn_info);
return false;
}
@@ -2233,7 +2274,16 @@ int InstanceRecycler::recycle_tmp_rowsets() {
return 0;
}
- if (!is_txn_aborted(txn_kv_, instance_id_, rowset.txn_id())) {
+ DCHECK_GT(rowset.txn_id(), 0)
+ << "txn_id=" << rowset.txn_id() << " rowset=" <<
rowset.ShortDebugString();
+ if (!is_txn_finished(txn_kv_, instance_id_, rowset.txn_id())) {
+ LOG(INFO) << "txn is not finished, skip recycle tmp rowset,
instance_id="
+ << instance_id_ << " tablet_id=" << rowset.tablet_id()
+ << " rowset_id=" << rowset.rowset_id_v2() << " version=["
+ << rowset.start_version() << '-' << rowset.end_version()
+ << "] txn_id=" << rowset.txn_id()
+ << " creation_time=" << rowset.creation_time() << "
expiration=" << expiration
+ << " txn_expiration=" << rowset.txn_expiration();
return 0;
}
diff --git a/cloud/test/txn_lazy_commit_test.cpp
b/cloud/test/txn_lazy_commit_test.cpp
index 5737e6e9eef..81a5c8a2d99 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -75,6 +75,7 @@ int main(int argc, char** argv) {
config::enable_txn_store_retry = false;
config::label_keep_max_second = 0;
+ config::force_immediate_recycle = true;
if (!doris::cloud::init_glog("txn_lazy_commit_test")) {
std::cerr << "failed to init glog" << std::endl;
@@ -155,21 +156,23 @@ static void create_tablet_with_db_id(MetaServiceProxy*
meta_service, int64_t db_
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
-static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t
tablet_id, int partition_id,
- int64_t version = -1, int
num_rows = 100) {
+static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t
tablet_id, int index_id,
+ int partition_id, int64_t
version = -1,
+ int num_rows = 100) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // required
rowset.set_rowset_id_v2(next_rowset_id());
rowset.set_tablet_id(tablet_id);
rowset.set_partition_id(partition_id);
+ rowset.set_index_id(index_id);
rowset.set_txn_id(txn_id);
if (version > 0) {
rowset.set_start_version(version);
rowset.set_end_version(version);
}
- rowset.set_num_segments(1);
- rowset.set_num_rows(num_rows);
- rowset.set_data_disk_size(num_rows * 100);
+ rowset.set_num_segments(0);
+ rowset.set_num_rows(0);
+ rowset.set_data_disk_size(0);
rowset.mutable_tablet_schema()->set_schema_version(0);
rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK
return rowset;
@@ -422,7 +425,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) {
for (int i = 0; i < 5; ++i) {
create_tablet_without_db_id(meta_service.get(), table_id, index_id,
partition_id,
tablet_id_base + i);
- auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
partition_id);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
tmp_rowsets_meta.push_back(std::make_pair("mock_tmp_rowset_key",
tmp_rowset));
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
@@ -510,7 +513,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest)
{
for (int i = 0; i < 5; ++i) {
create_tablet_without_db_id(meta_service.get(), table_id, index_id,
partition_id,
tablet_id_base + i);
- auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
partition_id);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -591,7 +594,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest)
{
for (int i = 0; i < 5; ++i) {
create_tablet_without_db_id(meta_service.get(), table_id, index_id,
partition_id,
tablet_id_base + i);
- auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
partition_id);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -700,7 +703,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) {
for (int i = 0; i < 5; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
- auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
partition_id);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -778,7 +781,7 @@ TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) {
for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) {
create_tablet_without_db_id(meta_service.get(), table_id, index_id,
partition_id,
tablet_id_base + i);
- auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
partition_id);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -866,7 +869,7 @@ TEST(TxnLazyCommitTest,
NotFallThroughCommitTxnEventuallyTest) {
for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
- auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
partition_id);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -954,7 +957,7 @@ TEST(TxnLazyCommitTest, FallThroughCommitTxnEventuallyTest)
{
for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
tablet_id_base + i);
- auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
partition_id);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1111,7 +1114,8 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
}
{
for (int i = 0; i < 10; ++i) {
- auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i,
partition_id);
+ auto tmp_rowset =
+ create_rowset(txn_id1, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1158,7 +1162,8 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
}
{
for (int i = 0; i < 10; ++i) {
- auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i,
partition_id);
+ auto tmp_rowset =
+ create_rowset(txn_id2, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1349,7 +1354,8 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase2Test) {
}
{
for (int i = 0; i < 10; ++i) {
- auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i,
partition_id);
+ auto tmp_rowset =
+ create_rowset(txn_id1, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1396,7 +1402,8 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase2Test) {
}
{
for (int i = 0; i < 10; ++i) {
- auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i,
partition_id);
+ auto tmp_rowset =
+ create_rowset(txn_id2, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1506,7 +1513,8 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase3Test) {
{
for (int i = 0; i < 10; ++i) {
- auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id_base +
i, partition_id);
+ auto tmp_rowset =
+ create_rowset(tmp_txn_id, tablet_id_base + i,
index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1615,7 +1623,8 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase3Test) {
{
for (int i = 0; i < 10; ++i) {
- auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i,
partition_id);
+ auto tmp_rowset =
+ create_rowset(txn_id1, tablet_id_base + i, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1766,7 +1775,7 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase4Test) {
{
for (int i = 0; i < 10; ++i) {
- auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
partition_id);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1851,7 +1860,7 @@ TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) {
ASSERT_GT(res.txn_id(), 0);
}
{
- auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id,
partition_id);
+ auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, index_id,
partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
@@ -1935,4 +1944,652 @@ TEST(TxnLazyCommitTest, ForceTxnLazyCommit) {
ASSERT_LT(counter, 70000);
config::enable_cloud_txn_lazy_commit_fuzzy_test = false;
}
+
+TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase1) {
+ //
===========================================================================
+ // threads concurrent execution flow:
+ //
+ // meta-service recycler
+ // | |
+ // begin |
+ // | |
+ // prepare/commit rowset |
+ // | |
+ // lazy commit submit |
+ // | |
+ // dead |
+ // | |
+ // | recyle_tmp_rowsets
+ // | |
+ // | abort_timeout_txn
+ // | |
+ // | |
+ // v v
+
+ auto txn_kv = get_mem_txn_kv();
+
+ int64_t db_id = 988032131;
+ int64_t table_id = 5145043;
+ int64_t index_id = 273456;
+ int64_t partition_id = 44576544;
+ std::string mock_instance = "test_instance";
+ const std::string label = "test_label_67ae2q1231";
+
+ bool commit_txn_eventullay_hit = false;
+ bool is_txn_finished_hit = false;
+ bool abort_timeout_txn_hit = false;
+
+ auto sp = SyncPoint::get_instance();
+
+ sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit",
[&](auto&& args) {
+ commit_txn_eventullay_hit = true;
+ bool* pred = try_any_cast<bool*>(args.back());
+ *pred = true;
+ });
+
+ TxnInfoPB txn_info_pb;
+ sp->set_call_back("is_txn_finished::txn_not_finished", [&](auto&& args) {
+ is_txn_finished_hit = true;
+ txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]);
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ check_txn_committed(txn, db_id, txn_info_pb.txn_id(), label);
+ }
+ });
+
+ txn_info_pb.Clear();
+ sp->set_call_back("abort_timeout_txn::advance_last_pending_txn_id",
[&](auto&& args) {
+ abort_timeout_txn_hit = true;
+ txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]);
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ check_txn_committed(txn, db_id, txn_info_pb.txn_id(), label);
+ }
+ });
+
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, true);
+ // mock rowset and tablet
+ int64_t tablet_id_base = 2313324;
+ for (int i = 0; i < 10; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
+ tablet_id_base + i);
+ }
+ int txn_id = 0;
+ {
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label(label);
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ ASSERT_GT(txn_id, 0);
+ }
+
+ {
+ for (int i = 0; i < 10; ++i) {
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_is_2pc(false);
+ req.set_enable_txn_lazy_commit(true);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(mock_instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+
+ ASSERT_EQ(recycler.init(), 0);
+ ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0);
+ ASSERT_TRUE(is_txn_finished_hit);
+
+ ASSERT_EQ(recycler.abort_timeout_txn(), 0);
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ check_txn_visible(txn, db_id, txn_id, label);
+ }
+ sleep(1);
+ ASSERT_EQ(recycler.recycle_expired_txn_label(), 0);
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ check_txn_not_exist(txn, db_id, txn_id, label);
+ }
+ ASSERT_TRUE(abort_timeout_txn_hit);
+
+ ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0);
+
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ ASSERT_TRUE(commit_txn_eventullay_hit);
+}
+
+TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase2) {
+ //
===========================================================================
+ // threads concurrent execution flow:
+ //
+ // meta-service recycler
+ // | |
+ // begin txn |
+ // | |
+ // abort txn |
+ // | |
+ // | recyle_tmp_rowsets
+ // | |
+ // v v
+
+ auto txn_kv = get_mem_txn_kv();
+
+ int64_t db_id = 41414;
+ int64_t table_id = 5454146;
+ int64_t index_id = 27656;
+ int64_t partition_id = 4423544;
+ std::string mock_instance = "test_instance";
+ const std::string label = "test_label_67ae2q1231";
+
+ bool txn_has_been_aborted = false;
+
+ auto sp = SyncPoint::get_instance();
+
+ TxnInfoPB txn_info_pb;
+ sp->set_call_back("is_txn_finished::txn_has_been_aborted", [&](auto&&
args) {
+ txn_has_been_aborted = true;
+ txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]);
+ ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_ABORTED);
+ });
+
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, true);
+ // mock rowset and tablet
+ int64_t tablet_id_base = 2313324;
+ for (int i = 0; i < 10; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
+ tablet_id_base + i);
+ }
+ int txn_id = 0;
+ {
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label(label);
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ ASSERT_GT(txn_id, 0);
+ }
+
+ {
+ for (int i = 0; i < 10; ++i) {
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+ {
+ brpc::Controller cntl;
+ AbortTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ ASSERT_GT(txn_id, 0);
+ req.set_txn_id(txn_id);
+ req.set_reason("test");
+ AbortTxnResponse res;
+
meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().status(),
TxnStatusPB::TXN_STATUS_ABORTED);
+ }
+ }
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(mock_instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+
+ ASSERT_EQ(recycler.init(), 0);
+ ASSERT_FALSE(txn_has_been_aborted);
+ ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0);
+ ASSERT_TRUE(txn_has_been_aborted);
+
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+}
+
+TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase3) {
+ //
===========================================================================
+ // threads concurrent execution flow:
+ //
+ // meta-service recycler
+ // | |
+ // begin txn |
+ // | |
+ // abort txn |
+ // | |
+ // recycle_expired_txn_label |
+ // | |
+ // | recyle_tmp_rowsets
+ // | |
+ // v v
+
+ auto txn_kv = get_mem_txn_kv();
+
+ int64_t db_id = 42345236;
+ int64_t table_id = 3165524;
+ int64_t index_id = 89089;
+ int64_t partition_id = 434154;
+ std::string mock_instance = "test_instance";
+ const std::string label = "test_label_67ae2q1231";
+
+ bool txn_has_been_recycled = false;
+
+ auto sp = SyncPoint::get_instance();
+
+ TxnInfoPB txn_info_pb;
+ sp->set_call_back("is_txn_finished::txn_has_been_recycled",
+ [&](auto&& args) { txn_has_been_recycled = true; });
+
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, true);
+ // mock rowset and tablet
+ int64_t tablet_id_base = 2313324;
+ for (int i = 0; i < 10; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
+ tablet_id_base + i);
+ }
+ int txn_id = 0;
+ {
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label(label);
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ ASSERT_GT(txn_id, 0);
+ }
+
+ {
+ for (int i = 0; i < 10; ++i) {
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+ {
+ brpc::Controller cntl;
+ AbortTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ ASSERT_GT(txn_id, 0);
+ req.set_txn_id(txn_id);
+ req.set_reason("test");
+ AbortTxnResponse res;
+
meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().status(),
TxnStatusPB::TXN_STATUS_ABORTED);
+ }
+ }
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(mock_instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+
+ ASSERT_EQ(recycler.init(), 0);
+
+ ASSERT_EQ(recycler.recycle_expired_txn_label(), 0);
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ check_txn_not_exist(txn, db_id, txn_id, label);
+ }
+
+ ASSERT_FALSE(txn_has_been_recycled);
+ ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0);
+ ASSERT_TRUE(txn_has_been_recycled);
+
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+}
+TEST(TxnLazyCommitTest, RecyclePartitions) {
+ //
===========================================================================
+ // threads concurrent execution flow:
+ //
+ // meta-service recycler
+ // | |
+ // begin |
+ // | |
+ // prepare/commit rowset |
+ // | |
+ // lazy commit submit |
+ // | |
+ // dead |
+ // | |
+ // drop partition |
+ // | |
+ // | recyle_partitions
+ // | |
+ // | |
+ // | |
+ // | |
+ // v v
+
+ auto txn_kv = get_mem_txn_kv();
+
+ int64_t db_id = 988032131;
+ int64_t table_id = 5145043;
+ int64_t index_id = 273456;
+ int64_t partition_id = 44576544;
+ std::string mock_instance = "test_instance";
+ const std::string label = "test_label_67ae2q1231";
+
+ bool commit_txn_eventullay_hit = false;
+ bool txn_not_finished_hit = false;
+
+ auto sp = SyncPoint::get_instance();
+
+ sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit",
[&](auto&& args) {
+ commit_txn_eventullay_hit = true;
+ bool* pred = try_any_cast<bool*>(args.back());
+ *pred = true;
+ });
+
+ sp->set_call_back("check_lazy_txn_finished::txn_not_finished",
+ [&](auto&& args) { txn_not_finished_hit = true; });
+
+ sp->set_call_back("recycle_tablet::begin", [](auto&& args) {
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = 0;
+ ret->second = true;
+ });
+
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, true);
+ // mock rowset and tablet
+ int64_t tablet_id_base = 2313324;
+ for (int i = 0; i < 10; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
+ tablet_id_base + i);
+ }
+ int txn_id = 0;
+ {
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label(label);
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ ASSERT_GT(txn_id, 0);
+ }
+
+ {
+ for (int i = 0; i < 10; ++i) {
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_is_2pc(false);
+ req.set_enable_txn_lazy_commit(true);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ {
+ // drop partition
+ brpc::Controller cntl;
+ PartitionRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+
+ req.set_db_id(1);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ req.add_partition_ids(partition_id);
+ PartitionResponse res;
+ meta_service->drop_partition(
+
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res,
+ nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(mock_instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+
+ ASSERT_EQ(recycler.init(), 0);
+ ASSERT_FALSE(txn_not_finished_hit);
+ ASSERT_EQ(recycler.recycle_partitions(), -1);
+ ASSERT_TRUE(txn_not_finished_hit);
+
+ ASSERT_EQ(recycler.abort_timeout_txn(), 0);
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ check_txn_visible(txn, db_id, txn_id, label);
+ }
+
+ ASSERT_EQ(recycler.recycle_partitions(), 0);
+
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+}
+
+TEST(TxnLazyCommitTest, RecycleIndexes) {
+ //
===========================================================================
+ // threads concurrent execution flow:
+ //
+ // meta-service recycler
+ // | |
+ // begin |
+ // | |
+ // prepare/commit rowset |
+ // | |
+ // lazy commit submit |
+ // | |
+ // dead |
+ // | |
+ // drop indexes |
+ // | |
+ // | recycle_indexes
+ // | |
+ // | |
+ // | |
+ // | |
+ // v v
+
+ auto txn_kv = get_mem_txn_kv();
+
+ int64_t db_id = 4524364;
+ int64_t table_id = 65354;
+ int64_t index_id = 658432;
+ int64_t partition_id = 76553;
+ std::string mock_instance = "test_instance";
+ const std::string label = "test_label_67a34e2q1231";
+
+ bool commit_txn_eventullay_hit = false;
+ bool txn_not_finished_hit = false;
+
+ auto sp = SyncPoint::get_instance();
+
+ sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit",
[&](auto&& args) {
+ commit_txn_eventullay_hit = true;
+ bool* pred = try_any_cast<bool*>(args.back());
+ *pred = true;
+ });
+
+ sp->set_call_back("check_lazy_txn_finished::txn_not_finished",
+ [&](auto&& args) { txn_not_finished_hit = true; });
+
+ sp->set_call_back("recycle_tablet::begin", [](auto&& args) {
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = 0;
+ ret->second = true;
+ });
+
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, true);
+ // mock rowset and tablet
+ int64_t tablet_id_base = 2313324;
+ for (int i = 0; i < 10; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
+ tablet_id_base + i);
+ }
+ int txn_id = 0;
+ {
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label(label);
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ ASSERT_GT(txn_id, 0);
+ }
+
+ {
+ for (int i = 0; i < 10; ++i) {
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_is_2pc(false);
+ req.set_enable_txn_lazy_commit(true);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ {
+ // drop partition
+ brpc::Controller cntl;
+ IndexRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+
+ req.set_db_id(1);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ IndexResponse res;
+
meta_service->drop_index(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(mock_instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+
+ ASSERT_EQ(recycler.init(), 0);
+ ASSERT_FALSE(txn_not_finished_hit);
+ ASSERT_EQ(recycler.recycle_indexes(), -1);
+ ASSERT_TRUE(txn_not_finished_hit);
+
+ ASSERT_EQ(recycler.abort_timeout_txn(), 0);
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ check_txn_visible(txn, db_id, txn_id, label);
+ }
+
+ ASSERT_EQ(recycler.recycle_indexes(), 0);
+
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]