This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 700b2c4017f [fix](cloud) ensure afterCommit/afterAbort will always be called when commit/abort transaction fails (#41267) 700b2c4017f is described below commit 700b2c4017f7502e981ba1f9ba64363ff1a75da0 Author: hui lai <1353307...@qq.com> AuthorDate: Sat Sep 28 11:14:37 2024 +0800 [fix](cloud) ensure afterCommit/afterAbort will always be called when commit/abort transaction fails (#41267) Ensure afterCommit/afterAbort will always be called when commit/abort transaction fails. Otherwise, it may cause some problems, such as the routing load getting stuck. --- .../transaction/CloudGlobalTransactionMgr.java | 117 ++++++++++++++------- 1 file changed, 79 insertions(+), 38 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 10baf4a135c..c1d7e95e744 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -512,22 +512,35 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } final CommitTxnRequest commitTxnRequest = builder.build(); + boolean txnOperated = false; + TransactionState txnState = null; + TxnStateChangeCallback cb = null; + long callbackId = 0L; try { - commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList); - } catch (UserException e) { - // For routine load, it is necessary to release the write lock when commit transaction fails, - // otherwise it will cause the lock added in beforeCommitted to not be released. + txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList); + txnOperated = true; + } finally { if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; - Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); + callbackId = rlTaskTxnCommitAttachment.getJobId(); + } else if (txnState != null) { + callbackId = txnState.getCallbackId(); + } + + cb = callbackFactory.getCallback(callbackId); + if (cb != null) { + LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}", + transactionId, callbackId, txnState); + cb.afterCommitted(txnState, txnOperated); + cb.afterVisible(txnState, txnOperated); } - throw e; } } - private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId, + private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId, List<Table> tableList) throws UserException { CommitTxnResponse commitTxnResponse = null; + TransactionState txnState = null; int retryTime = 0; try { @@ -578,19 +591,13 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { throw new UserException("internal error, " + internalMsgBuilder.toString()); } - TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo()); - TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); - if (cb != null) { - LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}", - txnState.getTransactionId(), txnState.getCallbackId(), txnState); - cb.afterCommitted(txnState, true); - cb.afterVisible(txnState, true); - } + txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo()); if (MetricRepo.isInit) { MetricRepo.COUNTER_TXN_SUCCESS.increase(1L); MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() - txnState.getPrepareTime()); } afterCommitTxnResp(commitTxnResponse); + return txnState; } private List<OlapTable> getMowTableList(List<Table> tableList) { @@ -990,9 +997,24 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } final CommitTxnRequest commitTxnRequest = builder.build(); - commitTxn(commitTxnRequest, transactionId, false, db.getId(), - subTransactionStates.stream().map(SubTransactionState::getTable) + TransactionState txnState = null; + boolean txnOperated = false; + try { + txnState = commitTxn(commitTxnRequest, transactionId, false, db.getId(), + subTransactionStates.stream().map(SubTransactionState::getTable) .collect(Collectors.toList())); + txnOperated = true; + } finally { + if (txnState != null) { + TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); + if (cb != null) { + LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}", + txnState.getTransactionId(), txnState.getCallbackId(), txnState); + cb.afterCommitted(txnState, txnOperated); + cb.afterVisible(txnState, txnOperated); + } + } + } return true; } @@ -1042,8 +1064,6 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { @Override public void abortTransaction(Long dbId, Long transactionId, String reason, TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException { - LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId); - if (txnCommitAttachment != null) { if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; @@ -1058,6 +1078,18 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } } + AbortTxnResponse abortTxnResponse = null; + try { + abortTxnResponse = abortTransactionImpl(dbId, transactionId, reason, null, null); + } finally { + handleAfterAbort(abortTxnResponse, txnCommitAttachment, transactionId); + } + } + + private AbortTxnResponse abortTransactionImpl(Long dbId, Long transactionId, String reason, + TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException { + LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId); + AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder(); builder.setDbId(dbId); builder.setTxnId(transactionId); @@ -1089,27 +1121,43 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { Preconditions.checkNotNull(abortTxnResponse.getStatus()); } catch (RpcException e) { LOG.warn("abortTxn failed, transactionId:{}, Exception", transactionId, e); - // For routine load, it is necessary to release the write lock when abort transaction fails, - // otherwise it will cause the lock added in beforeAborted to not be released. - if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { - RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; - Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); - } throw new UserException("abortTxn failed, errMsg:" + e.getMessage()); } afterAbortTxnResp(abortTxnResponse, String.valueOf(transactionId), txnCommitAttachment); + return abortTxnResponse; + } + + private void handleAfterAbort(AbortTxnResponse abortTxnResponse, TxnCommitAttachment txnCommitAttachment, + long transactionId) throws UserException { + TransactionState txnState = new TransactionState(); + boolean txnOperated = false; + long callbackId = 0L; + TxnStateChangeCallback cb = null; + String abortReason = ""; + + if (abortTxnResponse != null) { + txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); + txnOperated = abortTxnResponse.getStatus().getCode() == MetaServiceCode.OK; + callbackId = txnState.getCallbackId(); + abortReason = txnState.getReason(); + } + if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + callbackId = rlTaskTxnCommitAttachment.getJobId(); + } + + cb = callbackFactory.getCallback(callbackId); + if (cb != null) { + LOG.info("run txn callback, txnId:{} callbackId:{}, txnState:{}", + transactionId, callbackId, txnState); + cb.afterAborted(txnState, txnOperated, abortReason); + } } private void afterAbortTxnResp(AbortTxnResponse abortTxnResponse, String txnIdOrLabel, TxnCommitAttachment txnCommitAttachment) throws UserException { if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.OK) { LOG.warn("abortTxn failed, transaction:{}, response:{}", txnIdOrLabel, abortTxnResponse); - // For routine load, it is necessary to release the write lock when abort transaction fails, - // otherwise it will cause the lock added in beforeAborted to not be released. - if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { - RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; - Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock(); - } switch (abortTxnResponse.getStatus().getCode()) { case TXN_ID_NOT_FOUND: case TXN_LABEL_NOT_FOUND: @@ -1125,13 +1173,6 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } } - TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo()); - TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); - if (cb != null) { - LOG.info("run txn callback, txnId:{} callbackId:{}", txnState.getTransactionId(), - txnState.getCallbackId()); - cb.afterAborted(txnState, true, txnState.getReason()); - } if (MetricRepo.isInit) { MetricRepo.COUNTER_TXN_FAILED.increase(1L); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org