This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 555d7822413 branch-3.0: [improve](binlog) Allow commit txn without 
waiting txn publish #48961 (#49265)
555d7822413 is described below

commit 555d7822413114f3e6e783ad1b1225450568fb9a
Author: walter <[email protected]>
AuthorDate: Thu Mar 20 12:07:49 2025 +0800

    branch-3.0: [improve](binlog) Allow commit txn without waiting txn publish 
#48961 (#49265)
    
    cherry pick from #48961
---
 .../transaction/CloudGlobalTransactionMgr.java     | 17 ++++--
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  2 +-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |  2 +-
 .../apache/doris/service/FrontendServiceImpl.java  | 15 ++++--
 .../doris/transaction/GlobalTransactionMgr.java    | 31 ++++++++---
 .../transaction/GlobalTransactionMgrIface.java     |  9 +++-
 .../transaction/CloudGlobalTransactionMgrTest.java | 10 ++--
 .../org/apache/doris/load/DeleteHandlerTest.java   |  3 +-
 .../apache/doris/load/loadv2/SparkLoadJobTest.java |  3 +-
 .../transaction/DatabaseTransactionMgrTest.java    |  9 ++--
 .../transaction/GlobalTransactionMgrTest.java      | 61 ++++++++++++----------
 gensrc/thrift/FrontendService.thrift               |  1 +
 12 files changed, 108 insertions(+), 55 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index e00a2211419..b526c5886e0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -338,14 +338,14 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
 
     @Deprecated
     @Override
-    public void commitTransaction(long dbId, List<Table> tableList,
+    public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
             long transactionId, List<TabletCommitInfo> tabletCommitInfos)
             throws UserException {
-        commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, 
null);
+        commitTransactionWithoutLock(dbId, tableList, transactionId, 
tabletCommitInfos, null);
     }
 
     @Override
-    public void commitTransaction(long dbId, List<Table> tableList, long 
transactionId,
+    public void commitTransactionWithoutLock(long dbId, List<Table> tableList, 
long transactionId,
             List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment 
txnCommitAttachment)
             throws UserException {
         List<OlapTable> mowTableList = getMowTableList(tableList, 
tabletCommitInfos);
@@ -1186,6 +1186,15 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
     }
 
+    @Override
+    public void commitTransaction(DatabaseIf db, List<Table> tableList, long 
transactionId,
+            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis, 
TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        // There is no publish in cloud mode
+        commitAndPublishTransaction(
+                db, tableList, transactionId, tabletCommitInfos, 
timeoutMillis, txnCommitAttachment);
+    }
+
     @Override
     public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> 
tableList, long transactionId,
                                                List<TabletCommitInfo> 
tabletCommitInfos, long timeoutMillis,
@@ -1234,7 +1243,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         try {
-            commitTransaction(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
+            commitTransactionWithoutLock(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
         } finally {
             decreaseWaitingLockCount(tablesToLock);
             MetaLockUtils.commitUnlockTables(tablesToLock);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 536a7267fdd..05d9d0c2acc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -361,7 +361,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                         .add("txn_id", transactionId)
                         .add("msg", "Load job try to commit txn")
                         .build());
-                Env.getCurrentGlobalTransactionMgr().commitTransaction(
+                
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
                         dbId, tableList, transactionId, commitInfos, 
getLoadJobFinalOperation());
                 afterLoadingTaskCommitTransaction(tableList);
                 afterCommit();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index f01f205e96d..1abc8b76b54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -671,7 +671,7 @@ public class SparkLoadJob extends BulkLoadJob {
             try {
                 LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", 
transactionId)
                         .add("msg", "Load job try to commit txn").build());
-                Env.getCurrentGlobalTransactionMgr().commitTransaction(
+                
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
                         dbId, tableList, transactionId, commitInfos,
                         new LoadJobFinalOperation(id, loadingStatus, progress, 
loadStartTimestamp,
                                 finishTimestamp, state, failMsg));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 38d81d64bfa..381ea3bbbf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1854,14 +1854,23 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                                 subTxnInfo.getTabletCommitInfos(), null));
             }
             transactionState.setSubTxnIds(subTxnIds);
-            return 
Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(db, 
request.getTxnId(),
-                    subTransactionStates, timeoutMs);
-        } else {
+            return Env.getCurrentGlobalTransactionMgr()
+                    .commitAndPublishTransaction(db, request.getTxnId(),
+                            subTransactionStates, timeoutMs);
+        } else if (!request.isOnlyCommit()) {
             return Env.getCurrentGlobalTransactionMgr()
                     .commitAndPublishTransaction(db, tableList,
                             request.getTxnId(),
                             
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
                             
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+        } else {
+            // single table commit, so don't need to wait for publish.
+            Env.getCurrentGlobalTransactionMgr()
+                    .commitTransaction(db, tableList,
+                            request.getTxnId(),
+                            
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+                            
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+            return true;
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index ff1115bf0a6..6552c65d412 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -215,10 +215,11 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     @Deprecated
-    public void commitTransaction(long dbId, List<Table> tableList,
+    @Override
+    public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
             long transactionId, List<TabletCommitInfo> tabletCommitInfos)
             throws UserException {
-        commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, 
null);
+        commitTransactionWithoutLock(dbId, tableList, transactionId, 
tabletCommitInfos, null);
     }
 
     @Override
@@ -234,7 +235,8 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
      * @note it is necessary to optimize the `lock` mechanism and `lock` scope 
resulting from wait lock long time
      * @note callers should get all tables' write locks before call this api
      */
-    public void commitTransaction(long dbId, List<Table> tableList, long 
transactionId,
+    @Override
+    public void commitTransactionWithoutLock(long dbId, List<Table> tableList, 
long transactionId,
             List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment 
txnCommitAttachment)
             throws UserException {
         if (Config.disable_load_job) {
@@ -251,7 +253,7 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     /**
      * @note callers should get all tables' write locks before call this api
      */
-    public void commitTransaction(long dbId, List<Table> tableList, long 
transactionId,
+    public void commitTransactionWithoutLock(long dbId, List<Table> tableList, 
long transactionId,
             List<SubTransactionState> subTransactionStates, long timeout) 
throws UserException {
         if (Config.disable_load_job) {
             throw new TransactionCommitFailedException("disable_load_job is 
set to true, all load jobs are prevented");
@@ -264,6 +266,21 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         dbTransactionMgr.commitTransaction(transactionId, tableList, 
subTransactionStates);
     }
 
+    @Override
+    public void commitTransaction(DatabaseIf db, List<Table> tableList, long 
transactionId,
+            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis, 
TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 
timeoutMillis, TimeUnit.MILLISECONDS)) {
+            throw new UserException("get tableList write lock timeout, 
tableList=("
+                    + StringUtils.join(tableList, ",") + ")");
+        }
+        try {
+            commitTransactionWithoutLock(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
+        } finally {
+            MetaLockUtils.writeUnlockTables(tableList);
+        }
+    }
+
     private void commitTransaction2PC(long dbId, long transactionId)
             throws UserException {
         if (Config.disable_load_job) {
@@ -291,7 +308,7 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                     + StringUtils.join(tableList, ",") + ")");
         }
         try {
-            commitTransaction(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
+            commitTransactionWithoutLock(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
         } finally {
             MetaLockUtils.writeUnlockTables(tableList);
         }
@@ -320,7 +337,7 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                     + StringUtils.join(tableList, ",") + ")");
         }
         try {
-            commitTransaction(db.getId(), tableList, transactionId, 
subTransactionStates, timeoutMillis);
+            commitTransactionWithoutLock(db.getId(), tableList, transactionId, 
subTransactionStates, timeoutMillis);
         } finally {
             MetaLockUtils.writeUnlockTables(tableList);
         }
@@ -351,7 +368,7 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
         stopWatch.stop();
         LOG.info("stream load tasks are committed successfully. txns: {}. time 
cost: {} ms."
-                + " data will be visable later.", transactionId, 
stopWatch.getTime());
+                + " data will be visible later.", transactionId, 
stopWatch.getTime());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index c8d436e68d8..38c8cdd5a6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -78,14 +78,19 @@ public interface GlobalTransactionMgrIface extends Writable 
{
             throws UserException;
 
     @Deprecated
-    public void commitTransaction(long dbId, List<Table> tableList,
+    public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
             long transactionId, List<TabletCommitInfo> tabletCommitInfos)
             throws UserException;
 
-    public void commitTransaction(long dbId, List<Table> tableList, long 
transactionId,
+    public void commitTransactionWithoutLock(long dbId, List<Table> tableList, 
long transactionId,
             List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment 
txnCommitAttachment)
             throws UserException;
 
+    public void commitTransaction(DatabaseIf db, List<Table> tableList, long 
transactionId,
+            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
+            TxnCommitAttachment txnCommitAttachment)
+            throws UserException;
+
     public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> 
tableList, long transactionId,
             List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis)
             throws UserException;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
index 2522e2487ac..75648634e3b 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
@@ -194,7 +194,7 @@ public class CloudGlobalTransactionMgrTest {
         long transactionId = 123533;
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1),
+        masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1),
                 transactionId, null, null);
     }
 
@@ -219,7 +219,7 @@ public class CloudGlobalTransactionMgrTest {
         long transactionId = 123533;
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1),
+        masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1),
                 transactionId, null, null);
     }
 
@@ -246,8 +246,8 @@ public class CloudGlobalTransactionMgrTest {
                         long transactionId = 123533;
                         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                                 
.getTableOrMetaException(CatalogTestUtil.testTableId1);
-                        
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1),
-                                transactionId, null, null);
+                        masterTransMgr.commitTransactionWithoutLock(
+                                CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId, null, null);
             });
     }
 
@@ -278,7 +278,7 @@ public class CloudGlobalTransactionMgrTest {
         long transactionId = 123533;
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1),
+        masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1),
                 transactionId, null, null);
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 18f29ac30d2..bf1b82019eb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -358,7 +358,8 @@ public class DeleteHandlerTest {
         new Expectations(globalTransactionMgr) {
             {
                 try {
-                    globalTransactionMgr.commitTransaction(anyLong, 
(List<Table>) any, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment) 
any);
+                    globalTransactionMgr.commitTransactionWithoutLock(
+                            anyLong, (List<Table>) any, anyLong, 
(List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
                 } catch (UserException e) {
                     // CHECKSTYLE IGNORE THIS LINE
                 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index e3916cfb18d..f1e9942c7e3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -377,7 +377,8 @@ public class SparkLoadJobTest {
                 AgentTaskExecutor.submit((AgentBatchTask) any);
                 Env.getCurrentGlobalTransactionMgr();
                 result = transactionMgr;
-                transactionMgr.commitTransaction(dbId, (List<Table>) any, 
transactionId, (List<TabletCommitInfo>) any,
+                transactionMgr.commitTransactionWithoutLock(
+                        dbId, (List<Table>) any, transactionId, 
(List<TabletCommitInfo>) any,
                         (LoadJobFinalOperation) any);
             }
         };
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 82c79e9f6f0..365cf8051d8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -144,7 +144,8 @@ public class DatabaseTransactionMgrTest {
                 CatalogTestUtil.testTabletId1, allBackends);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId1,
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId1,
                 transTablets, null);
         TransactionState transactionState1 = 
fakeEditLog.getTransaction(transactionId1);
         Map<String, Map<Long, Long>> keyToSuccessTablets = new HashMap<>();
@@ -541,7 +542,8 @@ public class DatabaseTransactionMgrTest {
             setSuccessTablet(keyToSuccessTablets, allBackends, transactionId, 
CatalogTestUtil.testTabletId1, 14);
             setSuccessTablet(keyToSuccessTablets, allBackends, subTxnId2, 
CatalogTestUtil.testTabletId2, 13);
             setSuccessTablet(keyToSuccessTablets, allBackends, subTxnId4, 
CatalogTestUtil.testTabletId1, 15);
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2, table1),
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(table1, 
table2, table1),
                     transactionState6.getTransactionId(),
                     
GlobalTransactionMgrTest.generateSubTransactionStates(masterTransMgr, 
transactionState6,
                             subTransactionInfos), 300000);
@@ -577,7 +579,8 @@ public class DatabaseTransactionMgrTest {
                     new SubTransactionInfo(table1, 
CatalogTestUtil.testTabletId1, allBackends, subTxnIds8.get(0)),
                     new SubTransactionInfo(table2, 
CatalogTestUtil.testTabletId2, allBackends, subTxnIds8.get(1)),
                     new SubTransactionInfo(table1, 
CatalogTestUtil.testTabletId1, allBackends, subTxnIds8.get(2)));
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2),
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(table1, 
table2),
                     transactionState8.getTransactionId(),
                     
GlobalTransactionMgrTest.generateSubTransactionStates(masterTransMgr, 
transactionState8,
                             subTransactionInfos), 300000);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index c4ec468c651..522021a9771 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -177,8 +177,8 @@ public class GlobalTransactionMgrTest {
         List<TabletCommitInfo> transTablets = 
generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
-                transTablets, null);
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId, transTablets, null);
         TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId);
         // check status is committed
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -209,8 +209,8 @@ public class GlobalTransactionMgrTest {
             List<TabletCommitInfo> transTablets = 
generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
                     Lists.newArrayList(CatalogTestUtil.testBackendId1, 
CatalogTestUtil.testBackendId2));
             // commit txn
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
-                    transTablets, null);
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId, transTablets, null);
             checkVersion(testTable1, CatalogTestUtil.testPartition1, 
CatalogTestUtil.testIndexId1,
                     CatalogTestUtil.testTabletId1, 
CatalogTestUtil.testStartVersion,
                     CatalogTestUtil.testStartVersion + 2,
@@ -240,8 +240,8 @@ public class GlobalTransactionMgrTest {
             List<TabletCommitInfo> transTablets = 
generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
                     Lists.newArrayList(CatalogTestUtil.testBackendId1, 
CatalogTestUtil.testBackendId3));
             try {
-                masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1),
-                        transactionId2, transTablets, null);
+                masterTransMgr.commitTransactionWithoutLock(
+                        CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2, transTablets, null);
                 Assert.fail();
             } catch (TabletQuorumFailedException e) {
                 TransactionState transactionState = 
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@@ -260,8 +260,8 @@ public class GlobalTransactionMgrTest {
         // txn3: commit the second transaction with 1,2,3 success
         if (true) {
             List<TabletCommitInfo> transTablets = 
generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
-                    transTablets, null);
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId2, transTablets, null);
             TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId2);
             // check status is committed
             Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -342,7 +342,8 @@ public class GlobalTransactionMgrTest {
         
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1),
 "idToRunningTransactionState", idToTransactionState);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 
1L, transTablets, txnCommitAttachment);
+        masterTransMgr.commitTransactionWithoutLock(
+                1L, Lists.newArrayList(testTable1), 1L, transTablets, 
txnCommitAttachment);
         RoutineLoadStatistic jobStatistic =  
Deencapsulation.getField(routineLoadJob, "jobStatistic");
 
         Assert.assertEquals(Long.valueOf(101), 
Deencapsulation.getField(jobStatistic, "currentTotalRows"));
@@ -407,7 +408,8 @@ public class GlobalTransactionMgrTest {
         
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1),
 "idToRunningTransactionState", idToTransactionState);
         Table testTable1 = 
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
                 .getTableOrMetaException(CatalogTestUtil.testTableId1);
-        masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 
1L, transTablets, txnCommitAttachment);
+        masterTransMgr.commitTransactionWithoutLock(
+                1L, Lists.newArrayList(testTable1), 1L, transTablets, 
txnCommitAttachment);
 
         // current total rows and error rows will be reset after job pause, so 
here they should be 0.
         RoutineLoadStatistic jobStatistic =  
Deencapsulation.getField(routineLoadJob, "jobStatistic");
@@ -429,8 +431,8 @@ public class GlobalTransactionMgrTest {
         List<TabletCommitInfo> transTablets = 
generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
         OlapTable testTable1 = (OlapTable) (masterEnv.getInternalCatalog()
                 
.getDbOrMetaException(CatalogTestUtil.testDbId1).getTableOrMetaException(CatalogTestUtil.testTableId1));
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
-                transTablets, null);
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId, transTablets, null);
         TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId);
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
         checkTableVersion(testTable1, 1, 2);
@@ -497,8 +499,8 @@ public class GlobalTransactionMgrTest {
                     LoadJobSourceType.FRONTEND, 
Config.stream_load_default_timeout_second);
             List<TabletCommitInfo> transTablets = 
generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
                     Lists.newArrayList(CatalogTestUtil.testBackendId1, 
CatalogTestUtil.testBackendId2));
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId,
-                    transTablets, null);
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId, transTablets, null);
 
             // follower catalog replay the transaction
             TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId);
@@ -562,8 +564,8 @@ public class GlobalTransactionMgrTest {
             List<TabletCommitInfo> transTablets = 
generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
                     Lists.newArrayList(CatalogTestUtil.testBackendId1, 
CatalogTestUtil.testBackendId3));
             try {
-                masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1),
-                        transactionId2, transTablets, null);
+                masterTransMgr.commitTransactionWithoutLock(
+                        CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2, transTablets, null);
                 Assert.fail();
             } catch (TabletQuorumFailedException e) {
                 TransactionState transactionState = 
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1,
@@ -576,8 +578,8 @@ public class GlobalTransactionMgrTest {
         // commit the second transaction with 1,2,3 success
         if (true) {
             List<TabletCommitInfo> transTablets = 
generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(testTable1), transactionId2,
-                    transTablets, null);
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), 
transactionId2, transTablets, null);
             TransactionState transactionState = 
fakeEditLog.getTransaction(transactionId2);
             // check status is commit
             Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -654,8 +656,8 @@ public class GlobalTransactionMgrTest {
                 subTransactionInfos);
         
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
                 .collect(Collectors.toList()));
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2), transactionId,
-                subTransactionStates, 300000);
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), 
transactionId, subTransactionStates, 300000);
         // check status is committed
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
         // check partition version
@@ -724,8 +726,8 @@ public class GlobalTransactionMgrTest {
             // commit txn
             
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
                     .collect(Collectors.toList()));
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2),
-                    transactionId,
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(table1, 
table2), transactionId,
                     subTransactionStates, 300000);
             // check status is committed
             Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -767,7 +769,8 @@ public class GlobalTransactionMgrTest {
             try {
                 
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
                         .collect(Collectors.toList()));
-                masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2),
+                masterTransMgr.commitTransactionWithoutLock(
+                        CatalogTestUtil.testDbId1, Lists.newArrayList(table1, 
table2),
                         transactionId, subTransactionStates, 300000);
                 Assert.fail();
             } catch (TabletQuorumFailedException e) {
@@ -803,7 +806,8 @@ public class GlobalTransactionMgrTest {
             // commit txn
             
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
                     .collect(Collectors.toList()));
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2),
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(table1, 
table2),
                     transactionId, subTransactionStates, 300000);
             Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
             // check partition version
@@ -881,7 +885,8 @@ public class GlobalTransactionMgrTest {
                 subTransactionInfos);
         // commit txn
         try {
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1),
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(table1),
                     transactionId,
                     subTransactionStates, 300000);
         } catch (TabletQuorumFailedException e) {
@@ -919,7 +924,8 @@ public class GlobalTransactionMgrTest {
         // commit txn
         
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
                 .collect(Collectors.toList()));
-        masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2), transactionId,
+        masterTransMgr.commitTransactionWithoutLock(
+                CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), 
transactionId,
                 subTransactionStates, 300000);
         // check status is committed
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -1022,7 +1028,8 @@ public class GlobalTransactionMgrTest {
             // commit txn
             
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
                     .collect(Collectors.toList()));
-            masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2),
+            masterTransMgr.commitTransactionWithoutLock(
+                    CatalogTestUtil.testDbId1, Lists.newArrayList(table1, 
table2),
                     transactionId, subTransactionStates, 300000);
             // check status is committed
             Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 556668c8ef8..6389e23bfa4 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -867,6 +867,7 @@ struct TCommitTxnRequest {
     // used for ccr
     13: optional bool txn_insert
     14: optional list<TSubTxnInfo> sub_txn_infos
+    15: optional bool only_commit   // only commit txn, without waiting txn 
publish
 }
 
 struct TCommitTxnResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to