This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 555d7822413 branch-3.0: [improve](binlog) Allow commit txn without
waiting txn publish #48961 (#49265)
555d7822413 is described below
commit 555d7822413114f3e6e783ad1b1225450568fb9a
Author: walter <[email protected]>
AuthorDate: Thu Mar 20 12:07:49 2025 +0800
branch-3.0: [improve](binlog) Allow commit txn without waiting txn publish
#48961 (#49265)
cherry pick from #48961
---
.../transaction/CloudGlobalTransactionMgr.java | 17 ++++--
.../apache/doris/load/loadv2/BrokerLoadJob.java | 2 +-
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 2 +-
.../apache/doris/service/FrontendServiceImpl.java | 15 ++++--
.../doris/transaction/GlobalTransactionMgr.java | 31 ++++++++---
.../transaction/GlobalTransactionMgrIface.java | 9 +++-
.../transaction/CloudGlobalTransactionMgrTest.java | 10 ++--
.../org/apache/doris/load/DeleteHandlerTest.java | 3 +-
.../apache/doris/load/loadv2/SparkLoadJobTest.java | 3 +-
.../transaction/DatabaseTransactionMgrTest.java | 9 ++--
.../transaction/GlobalTransactionMgrTest.java | 61 ++++++++++++----------
gensrc/thrift/FrontendService.thrift | 1 +
12 files changed, 108 insertions(+), 55 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index e00a2211419..b526c5886e0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -338,14 +338,14 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
@Deprecated
@Override
- public void commitTransaction(long dbId, List<Table> tableList,
+ public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
- commitTransaction(dbId, tableList, transactionId, tabletCommitInfos,
null);
+ commitTransactionWithoutLock(dbId, tableList, transactionId,
tabletCommitInfos, null);
}
@Override
- public void commitTransaction(long dbId, List<Table> tableList, long
transactionId,
+ public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment
txnCommitAttachment)
throws UserException {
List<OlapTable> mowTableList = getMowTableList(tableList,
tabletCommitInfos);
@@ -1186,6 +1186,15 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
}
+ @Override
+ public void commitTransaction(DatabaseIf db, List<Table> tableList, long
transactionId,
+ List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ // There is no publish in cloud mode
+ commitAndPublishTransaction(
+ db, tableList, transactionId, tabletCommitInfos,
timeoutMillis, txnCommitAttachment);
+ }
+
@Override
public boolean commitAndPublishTransaction(DatabaseIf db, List<Table>
tableList, long transactionId,
List<TabletCommitInfo>
tabletCommitInfos, long timeoutMillis,
@@ -1234,7 +1243,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
try {
- commitTransaction(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
} finally {
decreaseWaitingLockCount(tablesToLock);
MetaLockUtils.commitUnlockTables(tablesToLock);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 536a7267fdd..05d9d0c2acc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -361,7 +361,7 @@ public class BrokerLoadJob extends BulkLoadJob {
.add("txn_id", transactionId)
.add("msg", "Load job try to commit txn")
.build());
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
+
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
getLoadJobFinalOperation());
afterLoadingTaskCommitTransaction(tableList);
afterCommit();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index f01f205e96d..1abc8b76b54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -671,7 +671,7 @@ public class SparkLoadJob extends BulkLoadJob {
try {
LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id",
transactionId)
.add("msg", "Load job try to commit txn").build());
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
+
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress,
loadStartTimestamp,
finishTimestamp, state, failMsg));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 38d81d64bfa..381ea3bbbf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1854,14 +1854,23 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
subTxnInfo.getTabletCommitInfos(), null));
}
transactionState.setSubTxnIds(subTxnIds);
- return
Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(db,
request.getTxnId(),
- subTransactionStates, timeoutMs);
- } else {
+ return Env.getCurrentGlobalTransactionMgr()
+ .commitAndPublishTransaction(db, request.getTxnId(),
+ subTransactionStates, timeoutMs);
+ } else if (!request.isOnlyCommit()) {
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ } else {
+ // single table commit, so don't need to wait for publish.
+ Env.getCurrentGlobalTransactionMgr()
+ .commitTransaction(db, tableList,
+ request.getTxnId(),
+
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ return true;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index ff1115bf0a6..6552c65d412 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -215,10 +215,11 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
@Deprecated
- public void commitTransaction(long dbId, List<Table> tableList,
+ @Override
+ public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
- commitTransaction(dbId, tableList, transactionId, tabletCommitInfos,
null);
+ commitTransactionWithoutLock(dbId, tableList, transactionId,
tabletCommitInfos, null);
}
@Override
@@ -234,7 +235,8 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
* @note it is necessary to optimize the `lock` mechanism and `lock` scope
resulting from wait lock long time
* @note callers should get all tables' write locks before call this api
*/
- public void commitTransaction(long dbId, List<Table> tableList, long
transactionId,
+ @Override
+ public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment
txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
@@ -251,7 +253,7 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
/**
* @note callers should get all tables' write locks before call this api
*/
- public void commitTransaction(long dbId, List<Table> tableList, long
transactionId,
+ public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId,
List<SubTransactionState> subTransactionStates, long timeout)
throws UserException {
if (Config.disable_load_job) {
throw new TransactionCommitFailedException("disable_load_job is
set to true, all load jobs are prevented");
@@ -264,6 +266,21 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
dbTransactionMgr.commitTransaction(transactionId, tableList,
subTransactionStates);
}
+ @Override
+ public void commitTransaction(DatabaseIf db, List<Table> tableList, long
transactionId,
+ List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList,
timeoutMillis, TimeUnit.MILLISECONDS)) {
+ throw new UserException("get tableList write lock timeout,
tableList=("
+ + StringUtils.join(tableList, ",") + ")");
+ }
+ try {
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
+ }
+
private void commitTransaction2PC(long dbId, long transactionId)
throws UserException {
if (Config.disable_load_job) {
@@ -291,7 +308,7 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
+ StringUtils.join(tableList, ",") + ")");
}
try {
- commitTransaction(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
@@ -320,7 +337,7 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
+ StringUtils.join(tableList, ",") + ")");
}
try {
- commitTransaction(db.getId(), tableList, transactionId,
subTransactionStates, timeoutMillis);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId,
subTransactionStates, timeoutMillis);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
@@ -351,7 +368,7 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
stopWatch.stop();
LOG.info("stream load tasks are committed successfully. txns: {}. time
cost: {} ms."
- + " data will be visable later.", transactionId,
stopWatch.getTime());
+ + " data will be visible later.", transactionId,
stopWatch.getTime());
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index c8d436e68d8..38c8cdd5a6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -78,14 +78,19 @@ public interface GlobalTransactionMgrIface extends Writable
{
throws UserException;
@Deprecated
- public void commitTransaction(long dbId, List<Table> tableList,
+ public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException;
- public void commitTransaction(long dbId, List<Table> tableList, long
transactionId,
+ public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment
txnCommitAttachment)
throws UserException;
+ public void commitTransaction(DatabaseIf db, List<Table> tableList, long
transactionId,
+ List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
+ TxnCommitAttachment txnCommitAttachment)
+ throws UserException;
+
public boolean commitAndPublishTransaction(DatabaseIf db, List<Table>
tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis)
throws UserException;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
index 2522e2487ac..75648634e3b 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
@@ -194,7 +194,7 @@ public class CloudGlobalTransactionMgrTest {
long transactionId = 123533;
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1),
+ masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1),
transactionId, null, null);
}
@@ -219,7 +219,7 @@ public class CloudGlobalTransactionMgrTest {
long transactionId = 123533;
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1),
+ masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1),
transactionId, null, null);
}
@@ -246,8 +246,8 @@ public class CloudGlobalTransactionMgrTest {
long transactionId = 123533;
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
-
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1),
- transactionId, null, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId, null, null);
});
}
@@ -278,7 +278,7 @@ public class CloudGlobalTransactionMgrTest {
long transactionId = 123533;
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1),
+ masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1),
transactionId, null, null);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 18f29ac30d2..bf1b82019eb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -358,7 +358,8 @@ public class DeleteHandlerTest {
new Expectations(globalTransactionMgr) {
{
try {
- globalTransactionMgr.commitTransaction(anyLong,
(List<Table>) any, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment)
any);
+ globalTransactionMgr.commitTransactionWithoutLock(
+ anyLong, (List<Table>) any, anyLong,
(List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
} catch (UserException e) {
// CHECKSTYLE IGNORE THIS LINE
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index e3916cfb18d..f1e9942c7e3 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -377,7 +377,8 @@ public class SparkLoadJobTest {
AgentTaskExecutor.submit((AgentBatchTask) any);
Env.getCurrentGlobalTransactionMgr();
result = transactionMgr;
- transactionMgr.commitTransaction(dbId, (List<Table>) any,
transactionId, (List<TabletCommitInfo>) any,
+ transactionMgr.commitTransactionWithoutLock(
+ dbId, (List<Table>) any, transactionId,
(List<TabletCommitInfo>) any,
(LoadJobFinalOperation) any);
}
};
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 82c79e9f6f0..365cf8051d8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -144,7 +144,8 @@ public class DatabaseTransactionMgrTest {
CatalogTestUtil.testTabletId1, allBackends);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId1,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId1,
transTablets, null);
TransactionState transactionState1 =
fakeEditLog.getTransaction(transactionId1);
Map<String, Map<Long, Long>> keyToSuccessTablets = new HashMap<>();
@@ -541,7 +542,8 @@ public class DatabaseTransactionMgrTest {
setSuccessTablet(keyToSuccessTablets, allBackends, transactionId,
CatalogTestUtil.testTabletId1, 14);
setSuccessTablet(keyToSuccessTablets, allBackends, subTxnId2,
CatalogTestUtil.testTabletId2, 13);
setSuccessTablet(keyToSuccessTablets, allBackends, subTxnId4,
CatalogTestUtil.testTabletId1, 15);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2, table1),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1,
table2, table1),
transactionState6.getTransactionId(),
GlobalTransactionMgrTest.generateSubTransactionStates(masterTransMgr,
transactionState6,
subTransactionInfos), 300000);
@@ -577,7 +579,8 @@ public class DatabaseTransactionMgrTest {
new SubTransactionInfo(table1,
CatalogTestUtil.testTabletId1, allBackends, subTxnIds8.get(0)),
new SubTransactionInfo(table2,
CatalogTestUtil.testTabletId2, allBackends, subTxnIds8.get(1)),
new SubTransactionInfo(table1,
CatalogTestUtil.testTabletId1, allBackends, subTxnIds8.get(2)));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1,
table2),
transactionState8.getTransactionId(),
GlobalTransactionMgrTest.generateSubTransactionStates(masterTransMgr,
transactionState8,
subTransactionInfos), 300000);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index c4ec468c651..522021a9771 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -177,8 +177,8 @@ public class GlobalTransactionMgrTest {
List<TabletCommitInfo> transTablets =
generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, transTablets, null);
TransactionState transactionState =
fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
@@ -209,8 +209,8 @@ public class GlobalTransactionMgrTest {
List<TabletCommitInfo> transTablets =
generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId2));
// commit txn
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, transTablets, null);
checkVersion(testTable1, CatalogTestUtil.testPartition1,
CatalogTestUtil.testIndexId1,
CatalogTestUtil.testTabletId1,
CatalogTestUtil.testStartVersion,
CatalogTestUtil.testStartVersion + 2,
@@ -240,8 +240,8 @@ public class GlobalTransactionMgrTest {
List<TabletCommitInfo> transTablets =
generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId3));
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1),
- transactionId2, transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2, transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
TransactionState transactionState =
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@@ -260,8 +260,8 @@ public class GlobalTransactionMgrTest {
// txn3: commit the second transaction with 1,2,3 success
if (true) {
List<TabletCommitInfo> transTablets =
generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId2, transTablets, null);
TransactionState transactionState =
fakeEditLog.getTransaction(transactionId2);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
@@ -342,7 +342,8 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1),
"idToRunningTransactionState", idToTransactionState);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1),
1L, transTablets, txnCommitAttachment);
+ masterTransMgr.commitTransactionWithoutLock(
+ 1L, Lists.newArrayList(testTable1), 1L, transTablets,
txnCommitAttachment);
RoutineLoadStatistic jobStatistic =
Deencapsulation.getField(routineLoadJob, "jobStatistic");
Assert.assertEquals(Long.valueOf(101),
Deencapsulation.getField(jobStatistic, "currentTotalRows"));
@@ -407,7 +408,8 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1),
"idToRunningTransactionState", idToTransactionState);
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1),
1L, transTablets, txnCommitAttachment);
+ masterTransMgr.commitTransactionWithoutLock(
+ 1L, Lists.newArrayList(testTable1), 1L, transTablets,
txnCommitAttachment);
// current total rows and error rows will be reset after job pause, so
here they should be 0.
RoutineLoadStatistic jobStatistic =
Deencapsulation.getField(routineLoadJob, "jobStatistic");
@@ -429,8 +431,8 @@ public class GlobalTransactionMgrTest {
List<TabletCommitInfo> transTablets =
generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
OlapTable testTable1 = (OlapTable) (masterEnv.getInternalCatalog()
.getDbOrMetaException(CatalogTestUtil.testDbId1).getTableOrMetaException(CatalogTestUtil.testTableId1));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, transTablets, null);
TransactionState transactionState =
fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
checkTableVersion(testTable1, 1, 2);
@@ -497,8 +499,8 @@ public class GlobalTransactionMgrTest {
LoadJobSourceType.FRONTEND,
Config.stream_load_default_timeout_second);
List<TabletCommitInfo> transTablets =
generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId2));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, transTablets, null);
// follower catalog replay the transaction
TransactionState transactionState =
fakeEditLog.getTransaction(transactionId);
@@ -562,8 +564,8 @@ public class GlobalTransactionMgrTest {
List<TabletCommitInfo> transTablets =
generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId3));
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1),
- transactionId2, transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2, transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
TransactionState transactionState =
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1,
@@ -576,8 +578,8 @@ public class GlobalTransactionMgrTest {
// commit the second transaction with 1,2,3 success
if (true) {
List<TabletCommitInfo> transTablets =
generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId2,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId2, transTablets, null);
TransactionState transactionState =
fakeEditLog.getTransaction(transactionId2);
// check status is commit
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
@@ -654,8 +656,8 @@ public class GlobalTransactionMgrTest {
subTransactionInfos);
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2), transactionId,
- subTransactionStates, 300000);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
transactionId, subTransactionStates, 300000);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
// check partition version
@@ -724,8 +726,8 @@ public class GlobalTransactionMgrTest {
// commit txn
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2),
- transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1,
table2), transactionId,
subTransactionStates, 300000);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
@@ -767,7 +769,8 @@ public class GlobalTransactionMgrTest {
try {
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1,
table2),
transactionId, subTransactionStates, 300000);
Assert.fail();
} catch (TabletQuorumFailedException e) {
@@ -803,7 +806,8 @@ public class GlobalTransactionMgrTest {
// commit txn
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1,
table2),
transactionId, subTransactionStates, 300000);
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
// check partition version
@@ -881,7 +885,8 @@ public class GlobalTransactionMgrTest {
subTransactionInfos);
// commit txn
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1),
transactionId,
subTransactionStates, 300000);
} catch (TabletQuorumFailedException e) {
@@ -919,7 +924,8 @@ public class GlobalTransactionMgrTest {
// commit txn
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
transactionId,
subTransactionStates, 300000);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
@@ -1022,7 +1028,8 @@ public class GlobalTransactionMgrTest {
// commit txn
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1,
table2),
transactionId, subTransactionStates, 300000);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 556668c8ef8..6389e23bfa4 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -867,6 +867,7 @@ struct TCommitTxnRequest {
// used for ccr
13: optional bool txn_insert
14: optional list<TSubTxnInfo> sub_txn_infos
+ 15: optional bool only_commit // only commit txn, without waiting txn
publish
}
struct TCommitTxnResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]