This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d20365cdcf [fix](transaction) fix publish txn fake succ (#24273)
d20365cdcf is described below
commit d20365cdcf31786531dc87c059ffe280113778df
Author: yujun <[email protected]>
AuthorDate: Thu Sep 14 21:04:59 2023 +0800
[fix](transaction) fix publish txn fake succ (#24273)
---
be/src/agent/task_worker_pool.cpp | 24 ++--
be/src/common/status.h | 1 +
be/src/olap/data_dir.cpp | 2 +-
be/src/olap/task/engine_publish_version_task.cpp | 52 +++++----
be/src/olap/task/engine_publish_version_task.h | 11 +-
.../java/org/apache/doris/master/MasterImpl.java | 6 +
.../org/apache/doris/task/PublishVersionTask.java | 13 +++
.../doris/transaction/DatabaseTransactionMgr.java | 79 ++++++++-----
.../doris/transaction/GlobalTransactionMgr.java | 6 +-
.../doris/transaction/PublishVersionDaemon.java | 126 ++-------------------
.../transaction/DatabaseTransactionMgrTest.java | 21 +++-
.../transaction/GlobalTransactionMgrTest.java | 36 +++---
gensrc/thrift/MasterService.thrift | 1 +
13 files changed, 174 insertions(+), 204 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 637c92af5a..a62d603fe0 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1525,17 +1525,18 @@ void
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
DorisMetrics::instance()->publish_task_request_total->increment(1);
VLOG_NOTICE << "get publish version task. signature=" <<
agent_task_req.signature;
- std::vector<TTabletId> error_tablet_ids;
- std::vector<TTabletId> succ_tablet_ids;
+ std::set<TTabletId> error_tablet_ids;
+ std::map<TTabletId, TVersion> succ_tablets;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>>
discontinuous_version_tablets;
uint32_t retry_time = 0;
Status status;
bool is_task_timeout = false;
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
+ succ_tablets.clear();
error_tablet_ids.clear();
EnginePublishVersionTask engine_task(publish_version_req,
&error_tablet_ids,
- &succ_tablet_ids,
&discontinuous_version_tablets);
+ &succ_tablets,
&discontinuous_version_tablets);
status = StorageEngine::instance()->execute_task(&engine_task);
if (status.ok()) {
break;
@@ -1584,25 +1585,22 @@ void
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
.tag("transaction_id", publish_version_req.transaction_id)
.tag("error_tablets_num", error_tablet_ids.size())
.error(status);
- finish_task_request.__set_error_tablet_ids(error_tablet_ids);
} else {
if (!config::disable_auto_compaction &&
!MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
- for (int i = 0; i < succ_tablet_ids.size(); i++) {
+ for (auto [tablet_id, _] : succ_tablets) {
TabletSharedPtr tablet =
-
StorageEngine::instance()->tablet_manager()->get_tablet(
- succ_tablet_ids[i]);
+
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
if (tablet != nullptr) {
tablet->publised_count++;
if (tablet->publised_count % 10 == 0) {
StorageEngine::instance()->submit_compaction_task(
tablet,
CompactionType::CUMULATIVE_COMPACTION, true);
- LOG(INFO) << "trigger compaction succ, tabletid:"
<< succ_tablet_ids[i]
+ LOG(INFO) << "trigger compaction succ, tablet_id:"
<< tablet_id
<< ", publised:" <<
tablet->publised_count;
}
} else {
- LOG(WARNING)
- << "trigger compaction failed, tabletid:" <<
succ_tablet_ids[i];
+ LOG(WARNING) << "trigger compaction failed,
tablet_id:" << tablet_id;
}
}
}
@@ -1611,7 +1609,7 @@ void
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
LOG_INFO("successfully publish version")
.tag("signature", agent_task_req.signature)
.tag("transaction_id", publish_version_req.transaction_id)
- .tag("tablets_num", succ_tablet_ids.size())
+ .tag("tablets_num", succ_tablets.size())
.tag("cost(s)", cost_second);
}
@@ -1620,7 +1618,9 @@ void
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_report_version(_s_report_version);
- finish_task_request.__set_error_tablet_ids(error_tablet_ids);
+ finish_task_request.__set_succ_tablets(succ_tablets);
+ finish_task_request.__set_error_tablet_ids(
+ std::vector<TTabletId>(error_tablet_ids.begin(),
error_tablet_ids.end()));
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
diff --git a/be/src/common/status.h b/be/src/common/status.h
index e49bffa70f..14c81c6e2c 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -306,6 +306,7 @@ constexpr bool capture_stacktrace(int code) {
&& code != ErrorCode::INVERTED_INDEX_BUILD_WAITTING
&& code != ErrorCode::META_KEY_NOT_FOUND
&& code != ErrorCode::PUSH_VERSION_ALREADY_EXIST
+ && code != ErrorCode::VERSION_NOT_EXIST
&& code != ErrorCode::TABLE_ALREADY_DELETED_ERROR
&& code != ErrorCode::TRANSACTION_NOT_EXIST
&& code != ErrorCode::TRANSACTION_ALREADY_VISIBLE
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 104e549f6a..301c463cbb 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -490,7 +490,7 @@ Status DataDir::load() {
PendingPublishInfoPB pending_publish_info_pb;
bool parsed = pending_publish_info_pb.ParseFromString(info);
if (!parsed) {
- LOG(WARNING) << "parse pending publish info failed, tablt_id: " <<
tablet_id
+ LOG(WARNING) << "parse pending publish info failed, tablet_id: "
<< tablet_id
<< " publish_version: " << publish_version;
}
StorageEngine::instance()->add_async_publish_task(
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 1353e66915..a56430fbb1 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -69,22 +69,17 @@ void TabletPublishStatistics::record_in_bvar() {
}
EnginePublishVersionTask::EnginePublishVersionTask(
- const TPublishVersionRequest& publish_version_req,
std::vector<TTabletId>* error_tablet_ids,
- std::vector<TTabletId>* succ_tablet_ids,
+ const TPublishVersionRequest& publish_version_req,
std::set<TTabletId>* error_tablet_ids,
+ std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>*
discontinuous_version_tablets)
: _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
- _succ_tablet_ids(succ_tablet_ids),
+ _succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets) {}
void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
- _error_tablet_ids->push_back(tablet_id);
-}
-
-void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) {
- std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
- _succ_tablet_ids->push_back(tablet_id);
+ _error_tablet_ids->insert(tablet_id);
}
Status EnginePublishVersionTask::finish() {
@@ -126,7 +121,7 @@ Status EnginePublishVersionTask::finish() {
// and receive fe's publish version task
// this be must return as an error tablet
if (rowset == nullptr) {
- _error_tablet_ids->push_back(tablet_info.tablet_id);
+ add_error_tablet_id(tablet_info.tablet_id);
res = Status::Error<PUSH_ROWSET_NOT_FOUND>(
"could not find related rowset for tablet {}, txn id
{}",
tablet_info.tablet_id, transaction_id);
@@ -135,7 +130,7 @@ Status EnginePublishVersionTask::finish() {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
tablet_info.tablet_id, tablet_info.tablet_uid);
if (tablet == nullptr) {
- _error_tablet_ids->push_back(tablet_info.tablet_id);
+ add_error_tablet_id(tablet_info.tablet_id);
res = Status::Error<PUSH_TABLE_NOT_EXIST>(
"can't get tablet when publish version. tablet_id={}",
tablet_info.tablet_id);
@@ -199,6 +194,7 @@ Status EnginePublishVersionTask::finish() {
}
token->wait();
+ _succ_tablets->clear();
// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
int64_t partition_id = par_ver_info.partition_id;
@@ -209,18 +205,36 @@ Status EnginePublishVersionTask::finish() {
Version version(par_ver_info.version, par_ver_info.version);
for (auto& tablet_info : partition_related_tablet_infos) {
- // has to use strict mode to check if check all tablets
- if (!_publish_version_req.strict_mode) {
- break;
- }
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_info.tablet_id);
+ auto tablet_id = tablet_info.tablet_id;
if (tablet == nullptr) {
- add_error_tablet_id(tablet_info.tablet_id);
+ add_error_tablet_id(tablet_id);
+ _succ_tablets->erase(tablet_id);
+ LOG(WARNING) << "publish version failed on transaction, not
found tablet. "
+ << "transaction_id=" << transaction_id << ",
tablet_id=" << tablet_id
+ << ", version=" << par_ver_info.version;
} else {
// check if the version exist, if not exist, then set publish
failed
- if (!tablet->check_version_exist(version)) {
- add_error_tablet_id(tablet_info.tablet_id);
+ if (_error_tablet_ids->find(tablet_id) ==
_error_tablet_ids->end()) {
+ if (tablet->check_version_exist(version)) {
+ // it's better to report the max continous succ
version,
+ // but it maybe time cost now.
+ // current just report 0
+ (*_succ_tablets)[tablet_id] = 0;
+ } else {
+ add_error_tablet_id(tablet_id);
+ if (res.ok()) {
+ res = Status::Error<VERSION_NOT_EXIST>(
+ "tablet {} not exists version {}",
tablet_id,
+ par_ver_info.version);
+ }
+ LOG(WARNING) << "publish version failed on
transaction, tablet version not "
+ "exists. "
+ << "transaction_id=" << transaction_id
+ << ", tablet_id=" << tablet_id
+ << ", version=" << par_ver_info.version;
+ }
}
}
}
@@ -280,9 +294,7 @@ void TabletPublishTxnTask::handle() {
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
- _engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id);
int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
- // print stats if publish cost > 500ms
g_tablet_publish_latency << cost_us;
_stats.record_in_bvar();
LOG(INFO) << "publish version successfully on tablet"
diff --git a/be/src/olap/task/engine_publish_version_task.h
b/be/src/olap/task/engine_publish_version_task.h
index 8acf8099ca..0a270c93d2 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -23,7 +23,9 @@
#include <atomic>
#include <condition_variable>
+#include <map>
#include <mutex>
+#include <set>
#include <vector>
#include "common/status.h"
@@ -83,23 +85,22 @@ private:
class EnginePublishVersionTask : public EngineTask {
public:
EnginePublishVersionTask(
- const TPublishVersionRequest& publish_version_req,
vector<TTabletId>* error_tablet_ids,
- std::vector<TTabletId>* succ_tablet_ids,
+ const TPublishVersionRequest& publish_version_req,
+ std::set<TTabletId>* error_tablet_ids, std::map<TTabletId,
TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>*
discontinous_version_tablets);
~EnginePublishVersionTask() {}
virtual Status finish() override;
void add_error_tablet_id(int64_t tablet_id);
- void add_succ_tablet_id(int64_t tablet_id);
int64_t finish_task();
private:
const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
- vector<TTabletId>* _error_tablet_ids;
- vector<TTabletId>* _succ_tablet_ids;
+ std::set<TTabletId>* _error_tablet_ids;
+ std::map<TTabletId, TVersion>* _succ_tablets;
std::vector<std::tuple<int64_t, int64_t, int64_t>>*
_discontinuous_version_tablets;
};
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 4e031c0dac..23118647cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -67,6 +67,7 @@ import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public class MasterImpl {
@@ -465,6 +466,10 @@ public class MasterImpl {
}
private void finishPublishVersion(AgentTask task, TFinishTaskRequest
request) {
+ Map<Long, Long> succTablets = null;
+ if (request.isSetSuccTablets()) {
+ succTablets = request.getSuccTablets();
+ }
List<Long> errorTabletIds = null;
if (request.isSetErrorTabletIds()) {
errorTabletIds = request.getErrorTabletIds();
@@ -478,6 +483,7 @@ public class MasterImpl {
}
PublishVersionTask publishVersionTask = (PublishVersionTask) task;
+ publishVersionTask.setSuccTablets(succTablets);
publishVersionTask.addErrorTablets(errorTabletIds);
publishVersionTask.setFinished(true);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
index 1ce9d866b8..8461b1db4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
@@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
public class PublishVersionTask extends AgentTask {
private static final Logger LOG =
LogManager.getLogger(PublishVersionTask.class);
@@ -34,11 +35,15 @@ public class PublishVersionTask extends AgentTask {
private List<TPartitionVersionInfo> partitionVersionInfos;
private List<Long> errorTablets;
+ // tabletId => version, current version = 0
+ private Map<Long, Long> succTablets;
+
public PublishVersionTask(long backendId, long transactionId, long dbId,
List<TPartitionVersionInfo> partitionVersionInfos, long
createTime) {
super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L,
-1L, transactionId, createTime);
this.transactionId = transactionId;
this.partitionVersionInfos = partitionVersionInfos;
+ this.succTablets = null;
this.errorTablets = new ArrayList<Long>();
this.isFinished = false;
}
@@ -57,6 +62,14 @@ public class PublishVersionTask extends AgentTask {
return partitionVersionInfos;
}
+ public Map<Long, Long> getSuccTablets() {
+ return succTablets;
+ }
+
+ public void setSuccTablets(Map<Long, Long> succTablets) {
+ this.succTablets = succTablets;
+ }
+
public synchronized List<Long> getErrorTablets() {
return errorTablets;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 8397499ded..7974cb6a89 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -54,6 +54,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.ClearTransactionTask;
+import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.annotations.VisibleForTesting;
@@ -868,7 +869,7 @@ public class DatabaseTransactionMgr {
}
}
- public void finishTransaction(long transactionId, Set<Long>
errorReplicaIds) throws UserException {
+ public void finishTransaction(long transactionId) throws UserException {
TransactionState transactionState = null;
readLock();
try {
@@ -876,14 +877,10 @@ public class DatabaseTransactionMgr {
} finally {
readUnlock();
}
+
// add all commit errors and publish errors to a single set
- if (errorReplicaIds == null) {
- errorReplicaIds = Sets.newHashSet();
- }
- Set<Long> originalErrorReplicas = transactionState.getErrorReplicas();
- if (originalErrorReplicas != null) {
- errorReplicaIds.addAll(originalErrorReplicas);
- }
+ Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
+ Map<Long, PublishVersionTask> publishTasks =
transactionState.getPublishVersionTasks();
long now = System.currentTimeMillis();
long firstPublishOneSuccTime =
transactionState.getFirstPublishOneSuccTime();
@@ -980,21 +977,10 @@ public class DatabaseTransactionMgr {
tabletWriteFailedReplicas.clear();
tabletVersionFailedReplicas.clear();
for (Replica replica : tablet.getReplicas()) {
- if
(!errorReplicaIds.contains(replica.getId())) {
- if
(replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
- tabletSuccReplicas.add(replica);
- } else {
-
tabletVersionFailedReplicas.add(replica);
- }
- } else if (replica.getVersion() >=
partitionCommitInfo.getVersion()) {
- // the replica's version is larger than or
equal to current transaction
- // partition's version the replica is
normal, then remove it from error replica ids
- // TODO(cmy): actually I have no idea why
we need this check
- tabletSuccReplicas.add(replica);
- errorReplicaIds.remove(replica.getId());
- } else {
- tabletWriteFailedReplicas.add(replica);
- }
+
checkReplicaContinuousVersionSucc(tablet.getId(), replica,
+ partitionCommitInfo.getVersion(),
publishTasks.get(replica.getBackendId()),
+ errorReplicaIds, tabletSuccReplicas,
tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas);
}
int healthReplicaNum = tabletSuccReplicas.size();
@@ -1005,7 +991,7 @@ public class DatabaseTransactionMgr {
LOG.info("publish version quorum succ for
transaction {} on tablet {} with version"
+ " {}, and has failed replicas,
quorum num {}. table {}, partition {},"
+ " tablet detail: {}",
- transactionState, tablet,
partitionCommitInfo.getVersion(),
+ transactionState, tablet.getId(),
partitionCommitInfo.getVersion(),
quorumReplicaNum, tableId,
partitionId, writeDetail);
}
continue;
@@ -1033,8 +1019,8 @@ public class DatabaseTransactionMgr {
LOG.info("publish version timeout succ for
transaction {} on tablet {} with version"
+ " {}, and has failed replicas,
quorum num {}. table {}, partition {},"
+ " tablet detail: {}",
- transactionState, tablet,
partitionCommitInfo.getVersion(), quorumReplicaNum,
- tableId, partitionId, writeDetail);
+ transactionState, tablet.getId(),
partitionCommitInfo.getVersion(),
+ quorumReplicaNum, tableId,
partitionId, writeDetail);
} else {
publishResult = PublishResult.FAILED;
String errMsg = String.format("publish on
tablet %d failed."
@@ -1046,8 +1032,8 @@ public class DatabaseTransactionMgr {
LOG.info("publish version failed for
transaction {} on tablet {} with version"
+ " {}, and has failed replicas,
quorum num {}. table {}, partition {},"
+ " tablet detail: {}",
- transactionState, tablet,
partitionCommitInfo.getVersion(), quorumReplicaNum,
- tableId, partitionId, writeDetail);
+ transactionState, tablet.getId(),
partitionCommitInfo.getVersion(),
+ quorumReplicaNum, tableId,
partitionId, writeDetail);
}
}
}
@@ -1093,6 +1079,43 @@ public class DatabaseTransactionMgr {
LOG.info("finish transaction {} successfully, publish result: {}",
transactionState, publishResult.name());
}
+ private void checkReplicaContinuousVersionSucc(long tabletId, Replica
replica, long version,
+ PublishVersionTask backendPublishTask, Set<Long> errorReplicaIds,
List<Replica> tabletSuccReplicas,
+ List<Replica> tabletWriteFailedReplicas, List<Replica>
tabletVersionFailedReplicas) {
+ if (backendPublishTask == null || !backendPublishTask.isFinished()) {
+ errorReplicaIds.add(replica.getId());
+ } else {
+ Map<Long, Long> backendSuccTablets =
backendPublishTask.getSuccTablets();
+ // new doris BE will report succ tablets
+ if (backendSuccTablets != null) {
+ if (backendSuccTablets.containsKey(tabletId)) {
+ errorReplicaIds.remove(replica.getId());
+ } else {
+ errorReplicaIds.add(replica.getId());
+ }
+ } else {
+ // for compatibility, old doris BE report only error tablets
+ List<Long> backendErrorTablets =
backendPublishTask.getErrorTablets();
+ if (backendErrorTablets != null &&
backendErrorTablets.contains(tabletId)) {
+ errorReplicaIds.add(replica.getId());
+ }
+ }
+ }
+
+ if (!errorReplicaIds.contains(replica.getId())) {
+ if (replica.checkVersionCatchUp(version - 1, true)) {
+ tabletSuccReplicas.add(replica);
+ } else {
+ tabletVersionFailedReplicas.add(replica);
+ }
+ } else if (replica.getVersion() >= version) {
+ tabletSuccReplicas.add(replica);
+ errorReplicaIds.remove(replica.getId());
+ } else {
+ tabletWriteFailedReplicas.add(replica);
+ }
+ }
+
protected void unprotectedPreCommitTransaction2PC(TransactionState
transactionState, Set<Long> errorReplicaIds,
Map<Long, Set<Long>>
tableToPartition, Set<Long> totalInvolvedBackends,
Database db) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 1ad8d2deb8..22a019c4c0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -402,13 +402,13 @@ public class GlobalTransactionMgr implements Writable {
/**
* if the table is deleted between commit and publish version, then should
ignore the partition
*
+ * @param dbId
* @param transactionId
- * @param errorReplicaIds
* @return
*/
- public void finishTransaction(long dbId, long transactionId, Set<Long>
errorReplicaIds) throws UserException {
+ public void finishTransaction(long dbId, long transactionId) throws
UserException {
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
- dbTransactionMgr.finishTransaction(transactionId, errorReplicaIds);
+ dbTransactionMgr.finishTransaction(transactionId);
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 4cb3ee5e9e..33ea8de07e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -17,18 +17,11 @@
package org.apache.doris.transaction;
-import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.Tablet;
-import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
@@ -36,14 +29,12 @@ import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TTaskType;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
public class PublishVersionDaemon extends MasterDaemon {
@@ -63,15 +54,6 @@ public class PublishVersionDaemon extends MasterDaemon {
}
}
- private boolean
isAllBackendsOfUnfinishedTasksDead(List<PublishVersionTask> unfinishedTasks) {
- for (PublishVersionTask unfinishedTask : unfinishedTasks) {
- if
(Env.getCurrentSystemInfo().checkBackendAlive(unfinishedTask.getBackendId())) {
- return false;
- }
- }
- return true;
- }
-
private void publishVersion() {
GlobalTransactionMgr globalTransactionMgr =
Env.getCurrentGlobalTransactionMgr();
List<TransactionState> readyTransactionStates =
globalTransactionMgr.getReadyToPublishTransactions();
@@ -81,7 +63,8 @@ public class PublishVersionDaemon extends MasterDaemon {
// ATTN, we publish transaction state to all backends including dead
backend, if not publish to dead backend
// then transaction manager will treat it as success
- List<Long> allBackends =
Env.getCurrentSystemInfo().getAllBackendIds(false);
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
+ List<Long> allBackends = infoService.getAllBackendIds(false);
if (allBackends.isEmpty()) {
LOG.warn("some transaction state need to publish, but no backend
exists");
return;
@@ -138,109 +121,17 @@ public class PublishVersionDaemon extends MasterDaemon {
AgentTaskExecutor.submit(batchTask);
}
- TabletInvertedIndex tabletInvertedIndex =
Env.getCurrentInvertedIndex();
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
- Map<Long, PublishVersionTask> transTasks =
transactionState.getPublishVersionTasks();
- Set<Long> publishErrorReplicaIds = Sets.newHashSet();
- List<PublishVersionTask> unfinishedTasks = Lists.newArrayList();
- for (PublishVersionTask publishVersionTask : transTasks.values()) {
- if (publishVersionTask.isFinished()) {
- // sometimes backend finish publish version task,
- // but it maybe failed to change transactionid to version
for some tablets
- // and it will upload the failed tabletinfo to fe and fe
will deal with them
- List<Long> errorTablets =
publishVersionTask.getErrorTablets();
- if (errorTablets == null || errorTablets.isEmpty()) {
- continue;
- } else {
- for (long tabletId : errorTablets) {
- // tablet inverted index also contains rollingup
index
- // if tablet meta not contains the tablet, skip
this tablet because this tablet is dropped
- // from fe
- if (tabletInvertedIndex.getTabletMeta(tabletId) ==
null) {
- continue;
- }
- Replica replica = tabletInvertedIndex.getReplica(
- tabletId,
publishVersionTask.getBackendId());
- if (replica != null) {
- publishErrorReplicaIds.add(replica.getId());
- } else {
- LOG.info("could not find related replica with
tabletid={}, backendid={}",
- tabletId,
publishVersionTask.getBackendId());
- }
- }
- }
- } else {
- unfinishedTasks.add(publishVersionTask);
- }
- }
-
- boolean shouldFinishTxn = false;
- if (!unfinishedTasks.isEmpty()) {
- shouldFinishTxn =
isAllBackendsOfUnfinishedTasksDead(unfinishedTasks);
- if (transactionState.isPublishTimeout() || shouldFinishTxn) {
- // transaction's publish is timeout, but there still has
unfinished tasks.
- // we need to collect all error replicas, and try to
finish this txn.
- for (PublishVersionTask unfinishedTask : unfinishedTasks) {
- // set all replicas in the backend to error state
- List<TPartitionVersionInfo> versionInfos =
unfinishedTask.getPartitionVersionInfos();
- Set<Long> errorPartitionIds = Sets.newHashSet();
- for (TPartitionVersionInfo versionInfo : versionInfos)
{
-
errorPartitionIds.add(versionInfo.getPartitionId());
- }
- if (errorPartitionIds.isEmpty()) {
- continue;
- }
-
- Database db = Env.getCurrentInternalCatalog()
- .getDbNullable(transactionState.getDbId());
- if (db == null) {
- LOG.warn("Database [{}] has been dropped.",
transactionState.getDbId());
- continue;
- }
-
- for (long tableId : transactionState.getTableIdList())
{
- Table table = db.getTableNullable(tableId);
- if (table == null || table.getType() !=
Table.TableType.OLAP) {
- LOG.warn("Table [{}] in database [{}] has been
dropped.", tableId, db.getFullName());
- continue;
- }
- OlapTable olapTable = (OlapTable) table;
- olapTable.readLock();
- try {
- for (Long errorPartitionId :
errorPartitionIds) {
- Partition partition =
olapTable.getPartition(errorPartitionId);
- if (partition != null) {
- List<MaterializedIndex>
materializedIndexList
- =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
- for (MaterializedIndex
materializedIndex : materializedIndexList) {
- for (Tablet tablet :
materializedIndex.getTablets()) {
- Replica replica =
tablet.getReplicaByBackendId(
-
unfinishedTask.getBackendId());
- if (replica != null) {
-
publishErrorReplicaIds.add(replica.getId());
- }
- }
- }
- }
- }
- } finally {
- olapTable.readUnlock();
- }
- }
- }
- shouldFinishTxn = true;
- }
- } else {
- // all publish tasks are finished, try to finish this txn.
- shouldFinishTxn = true;
- }
+ boolean hasBackendAliveAndUnfinishTask =
transactionState.getPublishVersionTasks().values().stream()
+ .anyMatch(task -> !task.isFinished() &&
infoService.checkBackendAlive(task.getBackendId()));
+ boolean shouldFinishTxn = !hasBackendAliveAndUnfinishTask ||
transactionState.isPublishTimeout();
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other
transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
- transactionState.getTransactionId(),
publishErrorReplicaIds);
+ transactionState.getTransactionId());
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}",
transactionState.getTransactionId(), e);
}
@@ -248,8 +139,7 @@ public class PublishVersionDaemon extends MasterDaemon {
// if finish transaction state failed, then update publish
version time, should check
// to finish after some interval
transactionState.updateSendTaskTime();
- LOG.debug("publish version for transaction {} failed, has
{} error replicas during publish",
- transactionState, publishErrorReplicaIds.size());
+ LOG.debug("publish version for transaction {} failed",
transactionState);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 0e89e4c86e..b988650fa2 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.meta.MetaContext;
+import org.apache.doris.task.PublishVersionTask;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -56,7 +57,17 @@ public class DatabaseTransactionMgrTest {
private static Env slaveEnv;
private static Map<String, Long> LabelToTxnId;
- private TransactionState.TxnCoordinator transactionSource = new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+ private TransactionState.TxnCoordinator transactionSource =
+ new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+
+ public static void setTransactionFinishPublish(TransactionState
transactionState, List<Long> backendIds) {
+ for (long backendId : backendIds) {
+ PublishVersionTask task = new PublishVersionTask(backendId,
transactionState.getTransactionId(),
+ transactionState.getDbId(), null,
System.currentTimeMillis());
+ task.setFinished(true);
+ transactionState.addPublishVersionTask(backendId, task);
+ }
+ }
@Before
public void setUp() throws InstantiationException, IllegalAccessException,
IllegalArgumentException,
@@ -100,7 +111,11 @@ public class DatabaseTransactionMgrTest {
Table testTable1 =
masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(testTable1), transactionId1, transTablets);
- masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId1, null);
+ TransactionState transactionState1 =
fakeEditLog.getTransaction(transactionId1);
+ setTransactionFinishPublish(transactionState1,
+ Lists.newArrayList(CatalogTestUtil.testBackendId1,
+ CatalogTestUtil.testBackendId2,
CatalogTestUtil.testBackendId3));
+ masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId1);
labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1);
TransactionState.TxnCoordinator beTransactionSource = new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1");
@@ -120,8 +135,6 @@ public class DatabaseTransactionMgrTest {
labelToTxnId.put(CatalogTestUtil.testTxnLabel3, transactionId3);
labelToTxnId.put(CatalogTestUtil.testTxnLabel4, transactionId4);
- TransactionState transactionState1 =
fakeEditLog.getTransaction(transactionId1);
-
FakeEnv.setEnv(slaveEnv);
slaveTransMgr.replayUpsertTransactionState(transactionState1);
return labelToTxnId;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 953ea3d879..a819c4f030 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -55,7 +55,6 @@ import
org.apache.doris.transaction.TransactionState.TxnSourceType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import mockit.Injectable;
import mockit.Mocked;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -66,10 +65,8 @@ import org.junit.Test;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
-
public class GlobalTransactionMgrTest {
private static FakeEditLog fakeEditLog;
@@ -467,9 +464,12 @@ public class GlobalTransactionMgrTest {
TransactionState transactionState =
fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
slaveTransMgr.replayUpsertTransactionState(transactionState);
- Set<Long> errorReplicaIds = Sets.newHashSet();
- errorReplicaIds.add(CatalogTestUtil.testReplicaId1);
- masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId, errorReplicaIds);
+
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
+ Lists.newArrayList(CatalogTestUtil.testBackendId1,
+ CatalogTestUtil.testBackendId2,
CatalogTestUtil.testBackendId3));
+ transactionState.getPublishVersionTasks()
+
.get(CatalogTestUtil.testBackendId1).getErrorTablets().add(CatalogTestUtil.testTabletId1);
+ masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId);
transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.VISIBLE,
transactionState.getTransactionStatus());
// check replica version
@@ -524,9 +524,13 @@ public class GlobalTransactionMgrTest {
// master finish the transaction failed
FakeEnv.setEnv(masterEnv);
- Set<Long> errorReplicaIds = Sets.newHashSet();
- errorReplicaIds.add(CatalogTestUtil.testReplicaId2);
- masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId, errorReplicaIds);
+
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
+ Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId2));
+
+ // backend2 publish failed
+ transactionState.getPublishVersionTasks()
+
.get(CatalogTestUtil.testBackendId2).getErrorTablets().add(CatalogTestUtil.testTabletId1);
+ masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
Replica replica1 =
tablet.getReplicaById(CatalogTestUtil.testReplicaId1);
Replica replica2 =
tablet.getReplicaById(CatalogTestUtil.testReplicaId2);
@@ -540,8 +544,12 @@ public class GlobalTransactionMgrTest {
Assert.assertEquals(-1, replica2.getLastFailedVersion());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 1,
replica3.getLastFailedVersion());
- errorReplicaIds = Sets.newHashSet();
- masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId, errorReplicaIds);
+ // backend2 publish success
+ Map<Long, Long> backend2SuccTablets = Maps.newHashMap();
+ backend2SuccTablets.put(CatalogTestUtil.testTabletId1, 0L);
+ transactionState.getPublishVersionTasks()
+
.get(CatalogTestUtil.testBackendId2).setSuccTablets(backend2SuccTablets);
+ masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId);
Assert.assertEquals(TransactionStatus.VISIBLE,
transactionState.getTransactionStatus());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 1,
replica1.getVersion());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 1,
replica2.getVersion());
@@ -603,8 +611,10 @@ public class GlobalTransactionMgrTest {
Assert.assertTrue(CatalogTestUtil.compareCatalog(masterEnv, slaveEnv));
// master finish the transaction2
- errorReplicaIds = Sets.newHashSet();
- masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId2, errorReplicaIds);
+
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
+ Lists.newArrayList(CatalogTestUtil.testBackendId1,
+ CatalogTestUtil.testBackendId2,
CatalogTestUtil.testBackendId3));
+ masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId2);
Assert.assertEquals(TransactionStatus.VISIBLE,
transactionState.getTransactionStatus());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 2,
replica1.getVersion());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 2,
replica2.getVersion());
diff --git a/gensrc/thrift/MasterService.thrift
b/gensrc/thrift/MasterService.thrift
index 3eda812e1b..dedc454d33 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -65,6 +65,7 @@ struct TFinishTaskRequest {
14: optional list<Types.TTabletId> downloaded_tablet_ids
15: optional i64 copy_size
16: optional i64 copy_time_ms
+ 17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
}
struct TTablet {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]