This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9a2a98701c6 [improvement](log) add txn log (#28875)
9a2a98701c6 is described below
commit 9a2a98701c6b16847963094fc8b241d5679ce996
Author: yujun <[email protected]>
AuthorDate: Sun Jan 14 10:27:35 2024 +0800
[improvement](log) add txn log (#28875)
---
be/src/olap/task/engine_publish_version_task.cpp | 11 ++++--
be/src/olap/txn_manager.cpp | 41 ++++++++++++++++-----
.../doris/transaction/DatabaseTransactionMgr.java | 42 +++++++++++++++++-----
3 files changed, 73 insertions(+), 21 deletions(-)
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 78f5cb9c328..ce5ba534fd1 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -94,13 +94,16 @@ Status EnginePublishVersionTask::execute() {
VLOG_NOTICE << "begin to process publish version. transaction_id=" <<
transaction_id;
DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
- LOG_WARNING("EnginePublishVersionTask.finish.random random
failed");
+ LOG_WARNING("EnginePublishVersionTask.finish.random random failed")
+ .tag("txn_id", transaction_id);
return Status::InternalError("debug engine publish version task
random failed");
}
});
DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
- LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait
ms", wait);
+ LOG_WARNING("EnginePublishVersionTask.finish.wait wait")
+ .tag("txn_id", transaction_id)
+ .tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
@@ -204,7 +207,9 @@ Status EnginePublishVersionTask::execute() {
partition_id, tablet_info.tablet_id,
version.first);
}
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
- "check_version_exist failed");
+ "version not continuous for mow, tablet_id={},
"
+ "tablet_max_version={}, txn_version={}",
+ tablet_info.tablet_id, max_version,
version.first);
int64_t missed_version = max_version + 1;
int64_t missed_txn_id =
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 161c39fdd20..7a82ef4c6ae 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -142,13 +142,18 @@ Status TxnManager::prepare_txn(TPartitionId partition_id,
TTransactionId transac
DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
- LOG_WARNING("TxnManager.prepare_txn.random_failed random failed");
+ LOG_WARNING("TxnManager.prepare_txn.random_failed random failed")
+ .tag("txn_id", transaction_id)
+ .tag("tablet_id", tablet_id);
return Status::InternalError("debug prepare txn random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
- LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait);
+ LOG_WARNING("TxnManager.prepare_txn.wait")
+ .tag("txn_id", transaction_id)
+ .tag("tablet_id", tablet_id)
+ .tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
@@ -313,13 +318,18 @@ Status TxnManager::commit_txn(OlapMeta* meta,
TPartitionId partition_id,
DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
- LOG_WARNING("TxnManager.commit_txn.random_failed");
+ LOG_WARNING("TxnManager.commit_txn.random_failed")
+ .tag("txn_id", transaction_id)
+ .tag("tablet_id", tablet_id);
return Status::InternalError("debug commit txn random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
- LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait);
+ LOG_WARNING("TxnManager.commit_txn.wait")
+ .tag("txn_id", transaction_id)
+ .tag("tablet_id", tablet_id)
+ .tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
@@ -388,7 +398,10 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId
partition_id,
rowset_ptr->rowset_meta()->get_rowset_pb(), false);
DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
-
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait);
+ LOG_WARNING("TxnManager.RowsetMetaManager.save_wait")
+ .tag("txn_id", transaction_id)
+ .tag("tablet_id", tablet_id)
+ .tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
@@ -466,13 +479,18 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
}
DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
-
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta");
+
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta")
+ .tag("txn_id", transaction_id)
+ .tag("tablet_id", tablet_id);
return Status::InternalError("debug publish txn before save rs
meta random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
-
LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms",
wait);
+ LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta")
+ .tag("txn_id", transaction_id)
+ .tag("tablet_id", tablet_id)
+ .tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
@@ -486,13 +504,18 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta",
{
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
-
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta");
+
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta")
+ .tag("txn_id", transaction_id)
+ .tag("tablet_id", tablet_id);
return Status::InternalError("debug publish txn after save rs meta
random failed");
}
});
DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
-
LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms",
wait);
+ LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta")
+ .tag("txn_id", transaction_id)
+ .tag("tablet_id", tablet_id)
+ .tag("wait ms", wait);
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9b611425cc9..57088cbd7a8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -589,9 +589,11 @@ public class DatabaseTransactionMgr {
tabletVersionFailedReplicas);
String errMsg = String.format("Failed to commit
txn %s, cause tablet %s succ replica num %s"
- + " < load required replica num %s. table
%s, partition %s, this tablet detail: %s",
+ + " < load required replica num %s. table
%s, partition: [ id=%s, commit version %s"
+ + ", visible version %s ], this tablet
detail: %s",
transactionId, tablet.getId(),
successReplicaNum, loadRequiredReplicaNum, tableId,
- partition.getId(), writeDetail);
+ partition.getId(),
partition.getCommittedVersion(), partition.getVisibleVersion(),
+ writeDetail);
LOG.info(errMsg);
throw new
TabletQuorumFailedException(transactionId, errMsg);
@@ -746,7 +748,7 @@ public class DatabaseTransactionMgr {
}
// update nextVersion because of the failure of persistent transaction
resulting in error version
- updateCatalogAfterCommitted(transactionState, db);
+ updateCatalogAfterCommitted(transactionState, db, false);
LOG.info("transaction:[{}] successfully committed", transactionState);
}
@@ -1128,9 +1130,9 @@ public class DatabaseTransactionMgr {
tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
logs.add(String.format("publish version quorum
succ for transaction %s on tablet %s"
+ " with version %s, and has failed
replicas, load require replica num %s. "
- + "table %s, partition %s, tablet detail:
%s",
- transactionState, tablet.getId(),
newVersion,
- loadRequiredReplicaNum, tableId,
partitionId, writeDetail));
+ + "table %s, partition: [ id=%s, commit
version=%s ], tablet detail: %s",
+ transactionState, tablet.getId(),
newVersion, loadRequiredReplicaNum, tableId,
+ partitionId,
partition.getCommittedVersion(), writeDetail));
}
continue;
}
@@ -1178,7 +1180,8 @@ public class DatabaseTransactionMgr {
if (needLog) {
transactionState.setLastPublishLogTime(now);
for (String log : logs) {
- LOG.info("{}. publish times {}", log,
transactionState.getPublishCount());
+ LOG.info("{}. publish times {}, whole txn publish result {}",
+ log, transactionState.getPublishCount(),
publishResult.name());
}
}
@@ -1793,8 +1796,10 @@ public class DatabaseTransactionMgr {
}
}
- private void updateCatalogAfterCommitted(TransactionState
transactionState, Database db) {
+ private void updateCatalogAfterCommitted(TransactionState
transactionState, Database db, boolean isReplay) {
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
+ List<Replica> tabletSuccReplicas = Lists.newArrayList();
+ List<Replica> tabletFailedReplicas = Lists.newArrayList();
for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
long tableId = tableCommitInfo.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
@@ -1817,13 +1822,32 @@ public class DatabaseTransactionMgr {
for (MaterializedIndex index : allIndices) {
List<Tablet> tablets = index.getTablets();
for (Tablet tablet : tablets) {
+ tabletFailedReplicas.clear();
+ tabletSuccReplicas.clear();
for (Replica replica : tablet.getReplicas()) {
if (errorReplicaIds.contains(replica.getId())) {
// TODO(cmy): do we need to update last failed
version here?
// because in updateCatalogAfterVisible, it
will be updated again.
replica.updateLastFailedVersion(partitionCommitInfo.getVersion());
+ tabletFailedReplicas.add(replica);
+ } else {
+ tabletSuccReplicas.add(replica);
}
}
+ if (!isReplay && !tabletFailedReplicas.isEmpty()) {
+ LOG.info("some replicas load data failed for
committed txn {} on version {}, table {}, "
+ + "partition {}, tablet {}, {} replicas
load data succ: {}, {} replicas load "
+ + "data fail: {}",
+ transactionState.getTransactionId(),
partitionCommitInfo.getVersion(),
+ tableId, partitionId, tablet.getId(),
tabletSuccReplicas.size(),
+ Joiner.on(",
").join(tabletSuccReplicas.stream()
+ .map(replica ->
replica.toStringSimple(true))
+ .collect(Collectors.toList())),
+ tabletFailedReplicas.size(),
+ Joiner.on(",
").join(tabletFailedReplicas.stream()
+ .map(replica ->
replica.toStringSimple(true))
+ .collect(Collectors.toList())));
+ }
}
}
partition.setNextVersion(partition.getNextVersion() + 1);
@@ -2036,7 +2060,7 @@ public class DatabaseTransactionMgr {
// set transaction status will call txn state change listener
transactionState.replaySetTransactionStatus();
if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
- updateCatalogAfterCommitted(transactionState, db);
+ updateCatalogAfterCommitted(transactionState, db, true);
} else if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
updateCatalogAfterVisible(transactionState, db);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]