This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c050055a207 [improve](cloud txn insert) Add more ut; Remove duplicated
table_id to reduce fdb value length (#36135)
c050055a207 is described below
commit c050055a207756aaaa5d3993ed6757b056a15112
Author: meiyi <[email protected]>
AuthorDate: Thu Jun 13 09:57:13 2024 +0800
[improve](cloud txn insert) Add more ut; Remove duplicated table_id to
reduce fdb value length (#36135)
## Proposed changes
1. add more meta_service ut;
2. remove duplicated table_id to reduce fdb value length
---
cloud/test/meta_service_test.cpp | 337 ++++++++++++++++++---
.../transaction/CloudGlobalTransactionMgr.java | 4 +-
.../apache/doris/transaction/TransactionEntry.java | 14 +-
3 files changed, 298 insertions(+), 57 deletions(-)
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 013433239d3..e4b14779814 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -1332,6 +1332,16 @@ TEST(MetaServiceTest, CommitTxnExpiredTest) {
}
}
+static void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t
table_id,
+ int64_t index_id, int64_t partition_id,
int64_t tablet_id,
+ int64_t txn_id) {
+ create_tablet(meta_service, table_id, index_id, partition_id, tablet_id);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id, partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service, tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+}
+
TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
auto meta_service = get_meta_service();
int64_t db_id = 98131;
@@ -1349,6 +1359,8 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
int64_t t2_p3_t1 = 18;
[[maybe_unused]] int64_t t2_p4 = 19;
[[maybe_unused]] int64_t t2_p4_t1 = 20;
+ std::string label = "test_label";
+ std::string label2 = "test_label_0";
// begin txn
{
brpc::Controller cntl;
@@ -1356,7 +1368,7 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
req.set_cloud_unique_id("test_cloud_unique_id");
TxnInfoPB txn_info_pb;
txn_info_pb.set_db_id(db_id);
- txn_info_pb.set_label("test_label");
+ txn_info_pb.set_label(label);
txn_info_pb.add_table_ids(t1);
txn_info_pb.set_timeout_ms(36000);
req.mutable_txn_info()->CopyFrom(txn_info_pb);
@@ -1369,27 +1381,9 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
// mock rowset and tablet: for sub_txn1
int64_t sub_txn_id1 = txn_id;
- {
- create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1);
- auto tmp_rowset = create_rowset(sub_txn_id1, t1_p1_t1, t1_p1);
- CreateRowsetResponse res;
- commit_rowset(meta_service.get(), tmp_rowset, res);
- ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
- }
- {
- create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2);
- auto tmp_rowset = create_rowset(sub_txn_id1, t1_p1_t2, t1_p1);
- CreateRowsetResponse res;
- commit_rowset(meta_service.get(), tmp_rowset, res);
- ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
- }
- {
- create_tablet(meta_service.get(), t1, t1_index, t1_p2, t1_p2_t1);
- auto tmp_rowset = create_rowset(sub_txn_id1, t1_p2_t1, t1_p2);
- CreateRowsetResponse res;
- commit_rowset(meta_service.get(), tmp_rowset, res);
- ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
- }
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t1, sub_txn_id1);
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t2, sub_txn_id1);
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p2,
t1_p2_t1, sub_txn_id1);
// begin_sub_txn2
int64_t sub_txn_id2 = -1;
@@ -1400,7 +1394,7 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
req.set_txn_id(txn_id);
req.set_sub_txn_num(0);
req.set_db_id(db_id);
- req.set_label("test_label_0");
+ req.set_label(label2);
req.mutable_table_ids()->Add(t1);
req.mutable_table_ids()->Add(t2);
BeginSubTxnResponse res;
@@ -1414,13 +1408,7 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
}
// mock rowset and tablet: for sub_txn3
- {
- create_tablet(meta_service.get(), t2, t2_index, t2_p3, t2_p3_t1);
- auto tmp_rowset = create_rowset(sub_txn_id2, t2_p3_t1, t2_p3);
- CreateRowsetResponse res;
- commit_rowset(meta_service.get(), tmp_rowset, res);
- ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
- }
+ create_and_commit_rowset(meta_service.get(), t2, t2_index, t2_p3,
t2_p3_t1, sub_txn_id2);
// begin_sub_txn3
int64_t sub_txn_id3 = -1;
@@ -1446,20 +1434,8 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]);
}
// mock rowset and tablet: for sub_txn3
- {
- create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1);
- auto tmp_rowset = create_rowset(sub_txn_id3, t1_p1_t1, t1_p1);
- CreateRowsetResponse res;
- commit_rowset(meta_service.get(), tmp_rowset, res);
- ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
- }
- {
- create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2);
- auto tmp_rowset = create_rowset(sub_txn_id3, t1_p1_t2, t1_p1);
- CreateRowsetResponse res;
- commit_rowset(meta_service.get(), tmp_rowset, res);
- ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
- }
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t1, sub_txn_id3);
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t2, sub_txn_id3);
// commit txn
CommitTxnRequest req;
@@ -1522,6 +1498,275 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
fmt::format("transaction is already visible: db_id={}
txn_id={}", db_id, txn_id));
ASSERT_TRUE(found != std::string::npos);
}
+
+ // check kv
+ {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = meta_service->txn_kv()->create_txn(&txn);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+ // txn_info
+ std::string info_key = txn_info_key({mock_instance, db_id, txn_id});
+ std::string info_val;
+ ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK);
+ TxnInfoPB txn_info;
+ txn_info.ParseFromString(info_val);
+ ASSERT_EQ(txn_info.status(), TxnStatusPB::TXN_STATUS_VISIBLE);
+
+ info_key = txn_info_key({mock_instance, db_id, sub_txn_id2});
+ ASSERT_EQ(txn->get(info_key, &info_val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
+ // txn_index
+ std::string index_key = txn_index_key({mock_instance, txn_id});
+ std::string index_val;
+ ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_OK);
+ TxnIndexPB txn_index;
+ txn_index.ParseFromString(index_val);
+ ASSERT_TRUE(txn_index.has_tablet_index());
+
+ index_key = txn_index_key({mock_instance, sub_txn_id3});
+ ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_OK);
+ txn_index.ParseFromString(index_val);
+ ASSERT_FALSE(txn_index.has_tablet_index());
+
+ // txn_label
+ std::string label_key = txn_label_key({mock_instance, db_id, label});
+ std::string label_val;
+ ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK);
+
+ label_key = txn_label_key({mock_instance, db_id, label2});
+ ASSERT_EQ(txn->get(label_key, &label_val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
+ // txn_running
+ std::string running_key = txn_running_key({mock_instance, db_id,
txn_id});
+ std::string running_val;
+ ASSERT_EQ(txn->get(running_key, &running_val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
+ running_key = txn_running_key({mock_instance, db_id, sub_txn_id3});
+ ASSERT_EQ(txn->get(running_key, &running_val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
+ // tmp rowset
+ int64_t ids[] = {txn_id, sub_txn_id1, sub_txn_id2, sub_txn_id3};
+ for (auto id : ids) {
+ MetaRowsetTmpKeyInfo rs_tmp_key_info0 {mock_instance, id, 0};
+ MetaRowsetTmpKeyInfo rs_tmp_key_info1 {mock_instance, id + 1, 0};
+ std::string rs_tmp_key0;
+ std::string rs_tmp_key1;
+ meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0);
+ meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);
+ std::unique_ptr<RangeGetIterator> it;
+ ASSERT_EQ(txn->get(rs_tmp_key0, rs_tmp_key1, &it, true),
TxnErrorCode::TXN_OK);
+ ASSERT_FALSE(it->has_next());
+ }
+
+ // partition version
+ std::string ver_key = partition_version_key({mock_instance, db_id, t2,
t2_p3});
+ std::string ver_val;
+ ASSERT_EQ(txn->get(ver_key, &ver_val), TxnErrorCode::TXN_OK);
+ VersionPB version;
+ version.ParseFromString(ver_val);
+ ASSERT_EQ(version.version(), 2);
+
+ ver_key = partition_version_key({mock_instance, db_id, t1, t1_p2});
+ ASSERT_EQ(txn->get(ver_key, &ver_val), TxnErrorCode::TXN_OK);
+ version.ParseFromString(ver_val);
+ ASSERT_EQ(version.version(), 2);
+
+ ver_key = partition_version_key({mock_instance, db_id, t1, t1_p1});
+ ASSERT_EQ(txn->get(ver_key, &ver_val), TxnErrorCode::TXN_OK);
+ version.ParseFromString(ver_val);
+ ASSERT_EQ(version.version(), 3);
+
+ // table version
+ std::string table_ver_key = table_version_key({mock_instance, db_id,
t1});
+ std::string table_ver_val;
+ ASSERT_EQ(txn->get(table_ver_key, &table_ver_val),
TxnErrorCode::TXN_OK);
+ auto val_int = *reinterpret_cast<const int64_t*>(table_ver_val.data());
+ ASSERT_EQ(val_int, 1);
+
+ table_version_key({mock_instance, db_id, t2});
+ ASSERT_EQ(txn->get(table_ver_key, &table_ver_val),
TxnErrorCode::TXN_OK);
+ val_int = *reinterpret_cast<const int64_t*>(table_ver_val.data());
+ ASSERT_EQ(val_int, 1);
+ }
+}
+
+TEST(MetaServiceTest, CommitTxnWithSubTxnTest2) {
+ auto meta_service = get_meta_service();
+ int64_t db_id = 99131;
+ int64_t txn_id = -1;
+ int64_t t1 = 20;
+ int64_t t1_index = 200;
+ int64_t t1_p1 = 21;
+ int64_t t1_p1_t1 = 22;
+ int64_t t1_p1_t2 = 23;
+ int64_t t1_p2 = 24;
+ int64_t t1_p2_t1 = 25;
+ int64_t t2 = 26;
+ int64_t t2_index = 201;
+ int64_t t2_p3 = 27;
+ int64_t t2_p3_t1 = 28;
+ [[maybe_unused]] int64_t t2_p4 = 29;
+ [[maybe_unused]] int64_t t2_p4_t1 = 30;
+ std::string label = "test_label_10";
+ std::string label2 = "test_label_11";
+ // begin txn
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label(label);
+ txn_info_pb.add_table_ids(t1);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ }
+
+ std::vector<SubTxnInfo> sub_txn_infos;
+ for (int i = 0; i < 500; i++) {
+ int64_t sub_txn_id1 = -1;
+ if (i == 0) {
+ sub_txn_id1 = txn_id;
+ } else {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(sub_txn_infos.size() - 1);
+ req.set_db_id(db_id);
+ req.set_label(label2);
+ req.mutable_table_ids()->Add(t1);
+ if (i > 0) {
+ req.mutable_table_ids()->Add(t2);
+ }
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), i == 0 ? 1 : 2);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(),
sub_txn_infos.size());
+ ASSERT_TRUE(res.has_sub_txn_id());
+ sub_txn_id1 = res.sub_txn_id();
+ ASSERT_EQ(sub_txn_id1,
+
res.txn_info().sub_txn_ids()[res.txn_info().sub_txn_ids().size() - 1]);
+ }
+ // mock rowset and tablet: for
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t1, sub_txn_id1);
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t2, sub_txn_id1);
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p2,
t1_p2_t1, sub_txn_id1);
+ // generate sub_txn_info
+ {
+ SubTxnInfo sub_txn_info1;
+ sub_txn_info1.set_sub_txn_id(sub_txn_id1);
+ sub_txn_info1.set_table_id(t1);
+ sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t1);
+ sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t2);
+ sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p2_t1);
+ sub_txn_infos.push_back(sub_txn_info1);
+ }
+
+ // begin_sub_txn2
+ int64_t sub_txn_id2 = -1;
+ {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(sub_txn_infos.size() - 1);
+ req.set_db_id(db_id);
+ req.set_label(label2);
+ req.mutable_table_ids()->Add(t1);
+ req.mutable_table_ids()->Add(t2);
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(),
sub_txn_infos.size());
+ ASSERT_TRUE(res.has_sub_txn_id());
+ sub_txn_id2 = res.sub_txn_id();
+ }
+ // mock rowset and tablet: for sub_txn3
+ create_and_commit_rowset(meta_service.get(), t2, t2_index, t2_p3,
t2_p3_t1, sub_txn_id2);
+ {
+ SubTxnInfo sub_txn_info2;
+ sub_txn_info2.set_sub_txn_id(sub_txn_id2);
+ sub_txn_info2.set_table_id(t2);
+ sub_txn_info2.mutable_base_tablet_ids()->Add(t2_p3_t1);
+ sub_txn_infos.push_back(sub_txn_info2);
+ }
+
+ // begin_sub_txn3
+ int64_t sub_txn_id3 = -1;
+ {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(sub_txn_infos.size() - 1);
+ req.set_db_id(db_id);
+ req.set_label(label2);
+ req.mutable_table_ids()->Add(t1);
+ req.mutable_table_ids()->Add(t2);
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(),
sub_txn_infos.size());
+ ASSERT_TRUE(res.has_sub_txn_id());
+ sub_txn_id3 = res.sub_txn_id();
+ }
+ // mock rowset and tablet: for sub_txn3
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t1, sub_txn_id3);
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t2, sub_txn_id3);
+ {
+ SubTxnInfo sub_txn_info3;
+ sub_txn_info3.set_sub_txn_id(sub_txn_id3);
+ sub_txn_info3.set_table_id(t1);
+ sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t1);
+ sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t2);
+ sub_txn_infos.push_back(sub_txn_info3);
+ }
+ }
+
+ // commit txn
+ CommitTxnRequest req;
+ {
+ brpc::Controller cntl;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(666);
+ req.set_txn_id(txn_id);
+ req.set_is_txn_load(true);
+
+ for (const auto& sub_txn_info : sub_txn_infos) {
+ req.add_sub_txn_infos()->CopyFrom(sub_txn_info);
+ }
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ std::cout << res.DebugString() << std::endl;
+ ASSERT_EQ(res.table_ids().size(), 3);
+
+ ASSERT_EQ(res.table_ids()[0], t2);
+ ASSERT_EQ(res.partition_ids()[0], t2_p3);
+ ASSERT_EQ(res.versions()[0], 501);
+
+ ASSERT_EQ(res.table_ids()[1], t1);
+ ASSERT_EQ(res.partition_ids()[1], t1_p2);
+ ASSERT_EQ(res.versions()[1], 501);
+
+ ASSERT_EQ(res.table_ids()[2], t1);
+ ASSERT_EQ(res.partition_ids()[2], t1_p1);
+ ASSERT_EQ(res.versions()[2], 1001);
+ }
}
TEST(MetaServiceTest, BeginAndAbortSubTxnTest) {
@@ -1596,9 +1841,9 @@ TEST(MetaServiceTest, BeginAndAbortSubTxnTest) {
}
}
}
- // case: abort sub txn
+ // case: abort sub txn2 twice
{
- {
+ for (int i = 0; i < 2; i++) {
brpc::Controller cntl;
AbortSubTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index ca9f6ca7bc0..0718bda3433 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -1448,7 +1448,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
}
- public Pair<Long, TransactionState> beginSubTxn(long txnId, long dbId,
List<Long> tableIds, String label,
+ public Pair<Long, TransactionState> beginSubTxn(long txnId, long dbId,
Set<Long> tableIds, String label,
long subTxnNum) throws UserException {
LOG.info("try to begin sub transaction, txnId: {}, dbId: {}, tableIds:
{}, label: {}, subTxnNum: {}", txnId,
dbId, tableIds, label, subTxnNum);
@@ -1488,7 +1488,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
TxnUtil.transactionStateFromPb(response.getTxnInfo()));
}
- public TransactionState abortSubTxn(long txnId, long subTxnId, long dbId,
List<Long> tableIds, long subTxnNum)
+ public TransactionState abortSubTxn(long txnId, long subTxnId, long dbId,
Set<Long> tableIds, long subTxnNum)
throws UserException {
LOG.info("try to abort sub transaction, txnId: {}, subTxnId: {}, dbId:
{}, tableIds: {}, subTxnNum: {}", txnId,
subTxnId, dbId, tableIds, subTxnNum);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index f80782bbf5d..da28dd8250f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -60,6 +60,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
public class TransactionEntry {
@@ -229,7 +230,7 @@ public class TransactionEntry {
if (Config.isCloudMode()) {
TUniqueId queryId = ConnectContext.get().queryId();
String label = String.format("tl_%x_%x", queryId.hi,
queryId.lo);
- List<Long> tableIds = getTableIds();
+ Set<Long> tableIds = getTableIds();
tableIds.add(table.getId());
Pair<Long, TransactionState> pair
= ((CloudGlobalTransactionMgr)
Env.getCurrentGlobalTransactionMgr()).beginSubTxn(
@@ -369,7 +370,7 @@ public class TransactionEntry {
if (isTransactionBegan) {
if (Config.isCloudMode()) {
try {
- List<Long> tableIds = getTableIds();
+ Set<Long> tableIds = getTableIds();
this.transactionState
= ((CloudGlobalTransactionMgr)
Env.getCurrentGlobalTransactionMgr()).abortSubTxn(
transactionId, subTransactionId,
table.getDatabase().getId(), tableIds, allSubTxnNum);
@@ -393,11 +394,6 @@ public class TransactionEntry {
: transactionState.getSubTransactionStates();
subTransactionStatesPtr
.add(new SubTransactionState(subTxnId, table, commitInfos,
subTransactionType));
- Preconditions.checkState(transactionState.getTableIdList().size() ==
subTransactionStatesPtr.size(),
- "txn_id=" + transactionId
- + ", expect table_list="
- + subTransactionStatesPtr.stream().map(s ->
s.getTable().getId()).collect(Collectors.toList())
- + ", real table_list=" +
transactionState.getTableIdList());
}
public boolean isTransactionBegan() {
@@ -494,9 +490,9 @@ public class TransactionEntry {
label, transactionId, dbId, timeoutTimestamp);
}
- private List<Long> getTableIds() {
+ private Set<Long> getTableIds() {
List<SubTransactionState> subTransactionStatesPtr =
Config.isCloudMode() ? subTransactionStates
: transactionState.getSubTransactionStates();
- return subTransactionStatesPtr.stream().map(s ->
s.getTable().getId()).collect(Collectors.toList());
+ return subTransactionStatesPtr.stream().map(s ->
s.getTable().getId()).collect(Collectors.toSet());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]