This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 1124808 [Enhancement] Add detail msg to show the reason of publish failure. (#3647) 1124808 is described below commit 1124808fbc22d1544cc4b83fd7cfee23af0c01fb Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Fri May 22 22:59:53 2020 +0800 [Enhancement] Add detail msg to show the reason of publish failure. (#3647) Add 2 new columns `PublishTime` and `ErrMsg` to show publish version time and errors happen during the transaction process. Can be seen by executing: `SHOW PROC "/transactions/dbId/";` or `SHOW TRANSACTION WHERE ID=xx;` Currently is only record error happen in publish phase, which can help us to find out which txn is blocked. Fix #3646 --- .../org/apache/doris/common/proc/TransProcDir.java | 2 ++ .../doris/transaction/DatabaseTransactionMgr.java | 32 +++++++++++++++------- .../doris/transaction/PublishVersionDaemon.java | 4 ++- .../apache/doris/transaction/TransactionState.java | 19 ++++++++++++- .../transaction/DatabaseTransactionMgrTest.java | 9 +++--- 5 files changed, 50 insertions(+), 16 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java index 0cef447..118cc85 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java @@ -35,11 +35,13 @@ public class TransProcDir implements ProcDirInterface { .add("LoadJobSourceType") .add("PrepareTime") .add("CommitTime") + .add("PublishTime") .add("FinishTime") .add("Reason") .add("ErrorReplicasCount") .add("ListenerId") .add("TimeoutMs") + .add("ErrMsg") .build(); public static final int MAX_SHOW_ENTRIES = 2000; diff --git a/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index fef801f..ac2764e 100644 --- a/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -17,13 +17,6 @@ package org.apache.doris.transaction; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.collections.CollectionUtils; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MaterializedIndex; @@ -44,9 +37,9 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -59,6 +52,15 @@ import org.apache.doris.task.ClearTransactionTask; import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUniqueId; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -231,11 +233,13 @@ public class DatabaseTransactionMgr { info.add(txnState.getSourceType().name()); info.add(TimeUtils.longToTimeString(txnState.getPrepareTime())); info.add(TimeUtils.longToTimeString(txnState.getCommitTime())); + info.add(TimeUtils.longToTimeString(txnState.getPublishVersionTime())); info.add(TimeUtils.longToTimeString(txnState.getFinishTime())); info.add(txnState.getReason()); info.add(String.valueOf(txnState.getErrorReplicas().size())); info.add(String.valueOf(txnState.getCallbackId())); info.add(String.valueOf(txnState.getTimeoutMs())); + info.add(txnState.getErrMsg()); } public long beginTransaction(List<Long> tableIdList, String label, TUniqueId requestId, @@ -579,8 +583,8 @@ public class DatabaseTransactionMgr { } public List<TransactionState> getCommittedTxnList() { + readLock(); try { - readLock(); // only send task to committed transaction return idToRunningTransactionState.values().stream() .filter(transactionState -> (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED)) @@ -653,6 +657,9 @@ public class DatabaseTransactionMgr { transactionId, partitionCommitInfo.getVersion(), partition.getVisibleVersion()); + String errMsg = String.format("wait for publishing partition %d version %d. self version: %d. table %d", + partitionId, partition.getVisibleVersion() + 1, partitionCommitInfo.getVersion(), tableId); + transactionState.setErrorMsg(errMsg); return; } int quorumReplicaNum = partitionInfo.getReplicationNum(partitionId) / 2 + 1; @@ -721,8 +728,12 @@ public class DatabaseTransactionMgr { } if (healthReplicaNum < quorumReplicaNum) { - LOG.info("publish version failed for transaction {} on tablet {}, with only {} replicas less than quorum {}", + LOG.info("publish version failed for transaction {} on tablet {}, with only {} replicas less than quorum {}", transactionState, tablet, healthReplicaNum, quorumReplicaNum); + String errMsg = String.format("publish on tablet %d failed. succeed replica num %d less than quorum %d." + + " table: %d, partition: %d, publish version: %d", + tablet.getId(), healthReplicaNum, quorumReplicaNum, tableId, partitionId, partition.getVisibleVersion() + 1); + transactionState.setErrorMsg(errMsg); hasError = true; } } @@ -737,6 +748,7 @@ public class DatabaseTransactionMgr { try { transactionState.setErrorReplicas(errorReplicaIds); transactionState.setFinishTime(System.currentTimeMillis()); + transactionState.clearErrorMsg(); transactionState.setTransactionStatus(TransactionStatus.VISIBLE); unprotectUpsertTransactionState(transactionState, false); txnOperated = true; diff --git a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 1b637b0..70b0e52 100644 --- a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -167,7 +167,7 @@ public class PublishVersionDaemon extends MasterDaemon { // transaction's publish is timeout, but there still has unfinished tasks. // we need to collect all error replicas, and try to finish this txn. for (PublishVersionTask unfinishedTask : unfinishedTasks) { - // set all replica in the backend to error state + // set all replicas in the backend to error state List<TPartitionVersionInfo> versionInfos = unfinishedTask.getPartitionVersionInfos(); Set<Long> errorPartitionIds = Sets.newHashSet(); for (TPartitionVersionInfo versionInfo : versionInfos) { @@ -177,6 +177,8 @@ public class PublishVersionDaemon extends MasterDaemon { continue; } + // get all tablets of these error partitions, and mark their replicas as error. + // current we don't have partition to tablet map in FE, so here we use an inefficient way. // TODO(cmy): this is inefficient, but just keep it simple. will change it later. List<Long> tabletIds = tabletInvertedIndex.getTabletIdsByBackendId(unfinishedTask.getBackendId()); for (long tabletId : tabletIds) { diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index 4ddbfaf..dbccc14 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -187,7 +187,7 @@ public class TransactionState implements Writable { // this state need not to be serialized private Map<Long, PublishVersionTask> publishVersionTasks; private boolean hasSendTask; - private long publishVersionTime; + private long publishVersionTime = -1; private TransactionStatus preStatus = null; private long callbackId = -1; @@ -206,6 +206,11 @@ public class TransactionState implements Writable { private String errorLogUrl = null; + // record some error msgs during the transaction operation. + // this msg will be shown in show proc "/transactions/dbId/"; + // no need to persist. + private String errMsg = ""; + public TransactionState() { this.dbId = -1; this.tableIdList = Lists.newArrayList(); @@ -646,4 +651,16 @@ public class TransactionState implements Writable { } } } + + public void setErrorMsg(String errMsg) { + this.errMsg = errMsg; + } + + public void clearErrorMsg() { + this.errMsg = ""; + } + + public String getErrMsg() { + return this.errMsg; + } } diff --git a/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 38399a1..78c0699 100644 --- a/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -214,10 +214,11 @@ public class DatabaseTransactionMgrTest { assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(5))); assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(6))); assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(7))); - assertEquals("", txnInfo.get(8)); - assertEquals("0", txnInfo.get(9)); - assertEquals("-1", txnInfo.get(10)); - assertEquals(String.valueOf(Config.stream_load_default_timeout_second * 1000), txnInfo.get(11)); + assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(8))); + assertEquals("", txnInfo.get(9)); + assertEquals("0", txnInfo.get(10)); + assertEquals("-1", txnInfo.get(11)); + assertEquals(String.valueOf(Config.stream_load_default_timeout_second * 1000), txnInfo.get(12)); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org