This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit e5c46b78d29717e5f125d3ac7e841b06b66f04e5 Author: jiafeng.zhang <[email protected]> AuthorDate: Thu May 12 13:31:22 2022 +0800 [fix](binlog-load) binlog load fails because txn exceeds the default value (#9471) binlog load Because txn exceeds the default value, resume is a failure, and a friendly prompt message is given to the user, instead of prompting success now, it still fails after a while, and the user will feel inexplicable Issue Number: close #9468 --- .../doris/load/sync/canal/CanalSyncChannel.java | 94 +++++++++++++--------- .../doris/transaction/DatabaseTransactionMgr.java | 2 +- 2 files changed, 57 insertions(+), 39 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 2b71619dcf..5d0774b54a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.UserException; @@ -41,6 +42,7 @@ import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TTxnParams; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionEntry; import org.apache.doris.transaction.TransactionState; @@ -121,53 +123,69 @@ public class CanalSyncChannel extends SyncChannel { + "_batch" + batchId + "_" + currentTime; String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN; GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr(); - TransactionEntry txnEntry = txnExecutor.getTxnEntry(); - TTxnParams txnConf = txnEntry.getTxnConf(); - TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING; - TStreamLoadPutRequest request = null; - try { - long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label, - new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond); - String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState( + DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId()); + if (databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db) { + TransactionEntry txnEntry = txnExecutor.getTxnEntry(); + TTxnParams txnConf = txnEntry.getTxnConf(); + TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING; + TStreamLoadPutRequest request = null; + try { + long txnId = globalTransactionMgr.beginTransaction(db.getId(), + Lists.newArrayList(tbl.getId()), label, + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, + FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond); + String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState( db.getId(), txnId).getAuthCode(); - request = new TStreamLoadPutRequest() + request = new TStreamLoadPutRequest() .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl()) .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) .setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId()) .setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION) .setColumns(targetColumn); - txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid); - txnEntry.setLabel(label); - txnExecutor.setTxnId(txnId); - } catch (DuplicatedRequestException e) { - LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}", + txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid); + txnEntry.setLabel(label); + txnExecutor.setTxnId (txnId); + } catch (DuplicatedRequestException e) { + LOG.warn ("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}", id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable); - txnExecutor.setTxnId(e.getTxnId()); - } catch (LabelAlreadyUsedException e) { - // this happens when channel re-consume same batch, we should just pass through it without begin a new txn - LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId); - return; - } catch (AnalysisException | BeginTransactionException e) { - LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable); - throw e; - } catch (UserException e) { - LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable); - throw e; - } - try { - // async exec begin transaction - long txnId = txnExecutor.getTxnId(); - if (txnId != -1L) { - this.txnExecutor.beginTransaction(request); - LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId()); + txnExecutor.setTxnId(e.getTxnId()); + } catch (LabelAlreadyUsedException e) { + // this happens when channel re-consume same batch, + // we should just pass through it without begin a new txn + LOG.warn ("Label already used in channel {}, label: {}, table: {}, batch: {}", + id, label, targetTable, batchId); + return; + } catch (AnalysisException | BeginTransactionException e) { + LOG.warn ("encounter an error when beginning txn in channel {}, table: {}", + id, targetTable); + throw e; + } catch (UserException e) { + LOG.warn ("encounter an error when creating plan in channel {}, table: {}", + id, targetTable); + throw e; } - } catch (TException e) { - LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage()); - throw e; - } catch (TimeoutException | InterruptedException | ExecutionException e) { - LOG.warn("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}", + try { + // async exec begin transaction + long txnId = txnExecutor.getTxnId(); + if ( txnId != - 1L ) { + this.txnExecutor.beginTransaction (request); + LOG.info ("begin txn in channel {}, table: {}, label:{}, txn id: {}", + id, targetTable, label, txnExecutor.getTxnId()); + } + } catch ( TException e) { + LOG.warn ("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", + id, targetTable, txnExecutor.getTxnId(), e.getMessage()); + throw e; + } catch ( TimeoutException | InterruptedException | ExecutionException e) { + LOG.warn ("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage()); - throw e; + throw e; + } + } else { + String failMsg = "current running txns on db " + db.getId() + " is " + + databaseTransactionMgr.getRunningTxnNums() + ", larger than limit " + Config.max_running_txn_num_per_db; + LOG.warn(failMsg); + throw new BeginTransactionException(failMsg); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 9ce906fb79..bf530c5ef4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -195,7 +195,7 @@ public class DatabaseTransactionMgr { return labelToTxnIds.get(label); } - protected int getRunningTxnNums() { + public int getRunningTxnNums() { return runningTxnNums; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
