This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 700b2c4017f [fix](cloud) ensure afterCommit/afterAbort will always be 
called when commit/abort transaction fails (#41267)
700b2c4017f is described below

commit 700b2c4017f7502e981ba1f9ba64363ff1a75da0
Author: hui lai <1353307...@qq.com>
AuthorDate: Sat Sep 28 11:14:37 2024 +0800

    [fix](cloud) ensure afterCommit/afterAbort will always be called when 
commit/abort transaction fails (#41267)
    
    Ensure afterCommit/afterAbort will always be called when commit/abort
    transaction fails. Otherwise, it may cause some problems, such as the
    routing load getting stuck.
---
 .../transaction/CloudGlobalTransactionMgr.java     | 117 ++++++++++++++-------
 1 file changed, 79 insertions(+), 38 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 10baf4a135c..c1d7e95e744 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -512,22 +512,35 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
+        boolean txnOperated = false;
+        TransactionState txnState = null;
+        TxnStateChangeCallback cb = null;
+        long callbackId = 0L;
         try {
-            commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
-        } catch (UserException e) {
-            // For routine load, it is necessary to release the write lock 
when commit transaction fails,
-            // otherwise it will cause the lock added in beforeCommitted to 
not be released.
+            txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId, 
tableList);
+            txnOperated = true;
+        } finally {
             if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
                 RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
-                
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
+                callbackId = rlTaskTxnCommitAttachment.getJobId();
+            } else if (txnState != null) {
+                callbackId = txnState.getCallbackId();
+            }
+
+            cb = callbackFactory.getCallback(callbackId);
+            if (cb != null) {
+                LOG.info("commitTxn, run txn callback, transactionId:{} 
callbackId:{}, txnState:{}",
+                        transactionId, callbackId, txnState);
+                cb.afterCommitted(txnState, txnOperated);
+                cb.afterVisible(txnState, txnOperated);
             }
-            throw e;
         }
     }
 
-    private void commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC, long dbId,
+    private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC, long dbId,
             List<Table> tableList) throws UserException {
         CommitTxnResponse commitTxnResponse = null;
+        TransactionState txnState = null;
         int retryTime = 0;
 
         try {
@@ -578,19 +591,13 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             throw new UserException("internal error, " + 
internalMsgBuilder.toString());
         }
 
-        TransactionState txnState = 
TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
-        TxnStateChangeCallback cb = 
callbackFactory.getCallback(txnState.getCallbackId());
-        if (cb != null) {
-            LOG.info("commitTxn, run txn callback, transactionId:{} 
callbackId:{}, txnState:{}",
-                    txnState.getTransactionId(), txnState.getCallbackId(), 
txnState);
-            cb.afterCommitted(txnState, true);
-            cb.afterVisible(txnState, true);
-        }
+        txnState = 
TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
         if (MetricRepo.isInit) {
             MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
             MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() 
- txnState.getPrepareTime());
         }
         afterCommitTxnResp(commitTxnResponse);
+        return txnState;
     }
 
     private List<OlapTable> getMowTableList(List<Table> tableList) {
@@ -990,9 +997,24 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
-        commitTxn(commitTxnRequest, transactionId, false, db.getId(),
-                
subTransactionStates.stream().map(SubTransactionState::getTable)
+        TransactionState txnState = null;
+        boolean txnOperated = false;
+        try {
+            txnState = commitTxn(commitTxnRequest, transactionId, false, 
db.getId(),
+                    
subTransactionStates.stream().map(SubTransactionState::getTable)
                         .collect(Collectors.toList()));
+            txnOperated = true;
+        } finally {
+            if (txnState != null) {
+                TxnStateChangeCallback cb = 
callbackFactory.getCallback(txnState.getCallbackId());
+                if (cb != null) {
+                    LOG.info("commitTxn, run txn callback, transactionId:{} 
callbackId:{}, txnState:{}",
+                            txnState.getTransactionId(), 
txnState.getCallbackId(), txnState);
+                    cb.afterCommitted(txnState, txnOperated);
+                    cb.afterVisible(txnState, txnOperated);
+                }
+            }
+        }
         return true;
     }
 
@@ -1042,8 +1064,6 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     @Override
     public void abortTransaction(Long dbId, Long transactionId, String reason,
             TxnCommitAttachment txnCommitAttachment, List<Table> tableList) 
throws UserException {
-        LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, 
transactionId);
-
         if (txnCommitAttachment != null) {
             if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
                 RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
@@ -1058,6 +1078,18 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             }
         }
 
+        AbortTxnResponse abortTxnResponse = null;
+        try {
+            abortTxnResponse = abortTransactionImpl(dbId, transactionId, 
reason, null, null);
+        } finally {
+            handleAfterAbort(abortTxnResponse, txnCommitAttachment, 
transactionId);
+        }
+    }
+
+    private AbortTxnResponse abortTransactionImpl(Long dbId, Long 
transactionId, String reason,
+            TxnCommitAttachment txnCommitAttachment, List<Table> tableList) 
throws UserException {
+        LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, 
transactionId);
+
         AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder();
         builder.setDbId(dbId);
         builder.setTxnId(transactionId);
@@ -1089,27 +1121,43 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             Preconditions.checkNotNull(abortTxnResponse.getStatus());
         } catch (RpcException e) {
             LOG.warn("abortTxn failed, transactionId:{}, Exception", 
transactionId, e);
-            // For routine load, it is necessary to release the write lock 
when abort transaction fails,
-            // otherwise it will cause the lock added in beforeAborted to not 
be released.
-            if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
-                RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
-                
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
-            }
             throw new UserException("abortTxn failed, errMsg:" + 
e.getMessage());
         }
         afterAbortTxnResp(abortTxnResponse, String.valueOf(transactionId), 
txnCommitAttachment);
+        return abortTxnResponse;
+    }
+
+    private void handleAfterAbort(AbortTxnResponse abortTxnResponse, 
TxnCommitAttachment txnCommitAttachment,
+                                long transactionId) throws UserException {
+        TransactionState txnState = new TransactionState();
+        boolean txnOperated = false;
+        long callbackId = 0L;
+        TxnStateChangeCallback cb = null;
+        String abortReason = "";
+
+        if (abortTxnResponse != null) {
+            txnState = 
TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
+            txnOperated = abortTxnResponse.getStatus().getCode() == 
MetaServiceCode.OK;
+            callbackId = txnState.getCallbackId();
+            abortReason = txnState.getReason();
+        }
+        if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
+            RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
+            callbackId = rlTaskTxnCommitAttachment.getJobId();
+        }
+
+        cb = callbackFactory.getCallback(callbackId);
+        if (cb != null) {
+            LOG.info("run txn callback, txnId:{} callbackId:{}, txnState:{}",
+                    transactionId, callbackId, txnState);
+            cb.afterAborted(txnState, txnOperated, abortReason);
+        }
     }
 
     private void afterAbortTxnResp(AbortTxnResponse abortTxnResponse, String 
txnIdOrLabel,
             TxnCommitAttachment txnCommitAttachment) throws UserException {
         if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.OK) {
             LOG.warn("abortTxn failed, transaction:{}, response:{}", 
txnIdOrLabel, abortTxnResponse);
-            // For routine load, it is necessary to release the write lock 
when abort transaction fails,
-            // otherwise it will cause the lock added in beforeAborted to not 
be released.
-            if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
-                RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
-                
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
-            }
             switch (abortTxnResponse.getStatus().getCode()) {
                 case TXN_ID_NOT_FOUND:
                 case TXN_LABEL_NOT_FOUND:
@@ -1125,13 +1173,6 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             }
         }
 
-        TransactionState txnState = 
TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
-        TxnStateChangeCallback cb = 
callbackFactory.getCallback(txnState.getCallbackId());
-        if (cb != null) {
-            LOG.info("run txn callback, txnId:{} callbackId:{}", 
txnState.getTransactionId(),
-                    txnState.getCallbackId());
-            cb.afterAborted(txnState, true, txnState.getReason());
-        }
         if (MetricRepo.isInit) {
             MetricRepo.COUNTER_TXN_FAILED.increase(1L);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to