This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit dce84b290b45919bc99f5250471692f2c173518d
Author: meiyi <myime...@gmail.com>
AuthorDate: Thu Sep 26 14:06:00 2024 +0800

    [fix](cloud) abortTransaction does not handle response code (#41275)
---
 .../transaction/CloudGlobalTransactionMgr.java     | 42 ++++++++++++++--------
 1 file changed, 28 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index eee2faff6f4..55ca2c5945c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -1077,6 +1077,33 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             }
             throw new UserException("abortTxn failed, errMsg:" + 
e.getMessage());
         }
+        afterAbortTxnResp(abortTxnResponse, String.valueOf(transactionId), 
txnCommitAttachment);
+    }
+
+    private void afterAbortTxnResp(AbortTxnResponse abortTxnResponse, String 
txnIdOrLabel,
+            TxnCommitAttachment txnCommitAttachment) throws UserException {
+        if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.OK) {
+            LOG.warn("abortTxn failed, transaction:{}, response:{}", 
txnIdOrLabel, abortTxnResponse);
+            // For routine load, it is necessary to release the write lock 
when abort transaction fails,
+            // otherwise it will cause the lock added in beforeAborted to not 
be released.
+            if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
+                RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
+                
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
+            }
+            switch (abortTxnResponse.getStatus().getCode()) {
+                case TXN_ID_NOT_FOUND:
+                case TXN_LABEL_NOT_FOUND:
+                case TXN_INVALID_STATUS:
+                    throw new TransactionNotFoundException("transaction [" + 
txnIdOrLabel + "] not found");
+                case TXN_ALREADY_ABORTED:
+                    throw new TransactionNotFoundException("transaction [" + 
txnIdOrLabel + "] is already aborted");
+                case TXN_ALREADY_VISIBLE:
+                    throw new UserException(
+                            "transaction [" + txnIdOrLabel + "] is already 
visible, " + ", could not abort");
+                default:
+                    throw new 
UserException(abortTxnResponse.getStatus().getMsg());
+            }
+        }
 
         TransactionState txnState = 
TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
         TxnStateChangeCallback cb = 
callbackFactory.getCallback(txnState.getCallbackId());
@@ -1129,20 +1156,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             LOG.warn("abortTxn failed, label:{}, exception:", label, e);
             throw new UserException("abortTxn failed, errMsg:" + 
e.getMessage());
         }
-
-        TransactionState txnState = 
TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
-        TxnStateChangeCallback cb = 
callbackFactory.getCallback(txnState.getCallbackId());
-        if (cb == null) {
-            LOG.info("no callback to run for this txn, txnId:{} 
callbackId:{}", txnState.getTransactionId(),
-                        txnState.getCallbackId());
-            return;
-        }
-
-        LOG.info("run txn callback, txnId:{} callbackId:{}", 
txnState.getTransactionId(), txnState.getCallbackId());
-        cb.afterAborted(txnState, true, txnState.getReason());
-        if (MetricRepo.isInit) {
-            MetricRepo.COUNTER_TXN_FAILED.increase(1L);
-        }
+        afterAbortTxnResp(abortTxnResponse, label, null);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to