This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit dce84b290b45919bc99f5250471692f2c173518d Author: meiyi <myime...@gmail.com> AuthorDate: Thu Sep 26 14:06:00 2024 +0800 [fix](cloud) abortTransaction does not handle response code (#41275) --- .../transaction/CloudGlobalTransactionMgr.java | 42 ++++++++++++++-------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index eee2faff6f4..55ca2c5945c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -1077,6 +1077,33 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); } + afterAbortTxnResp(abortTxnResponse, String.valueOf(transactionId), txnCommitAttachment); + } + + private void afterAbortTxnResp(AbortTxnResponse abortTxnResponse, String txnIdOrLabel, + TxnCommitAttachment txnCommitAttachment) throws UserException { + if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.OK) { + LOG.warn("abortTxn failed, transaction:{}, response:{}", txnIdOrLabel, abortTxnResponse); + // For routine load, it is necessary to release the write lock when abort transaction fails, + // otherwise it will cause the lock added in beforeAborted to not be released. + if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); + } + switch (abortTxnResponse.getStatus().getCode()) { + case TXN_ID_NOT_FOUND: + case TXN_LABEL_NOT_FOUND: + case TXN_INVALID_STATUS: + throw new TransactionNotFoundException("transaction [" + txnIdOrLabel + "] not found"); + case TXN_ALREADY_ABORTED: + throw new TransactionNotFoundException("transaction [" + txnIdOrLabel + "] is already aborted"); + case TXN_ALREADY_VISIBLE: + throw new UserException( + "transaction [" + txnIdOrLabel + "] is already visible, " + ", could not abort"); + default: + throw new UserException(abortTxnResponse.getStatus().getMsg()); + } + } TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); @@ -1129,20 +1156,7 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { LOG.warn("abortTxn failed, label:{}, exception:", label, e); throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); } - - TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); - TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); - if (cb == null) { - LOG.info("no callback to run for this txn, txnId:{} callbackId:{}", txnState.getTransactionId(), - txnState.getCallbackId()); - return; - } - - LOG.info("run txn callback, txnId:{} callbackId:{}", txnState.getTransactionId(), txnState.getCallbackId()); - cb.afterAborted(txnState, true, txnState.getReason()); - if (MetricRepo.isInit) { - MetricRepo.COUNTER_TXN_FAILED.increase(1L); - } + afterAbortTxnResp(abortTxnResponse, label, null); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org