This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 81e8182a44e [fix](cloud-mow) FE should release delete bitmap lock when
calculating delete bitmap failed (#45673)
81e8182a44e is described below
commit 81e8182a44ec656f7b3d891ae97e5fe68fbf7d33
Author: huanghaibin <[email protected]>
AuthorDate: Thu Jan 2 22:04:35 2025 +0800
[fix](cloud-mow) FE should release delete bitmap lock when calculating
delete bitmap failed (#45673)
When calculating delete bitmap failed, delete bitmap lock is holded by
last failed txn, fe should release lock for other txn can do commit.
---
.../transaction/CloudGlobalTransactionMgr.java | 221 +++++++++++----------
.../transaction/DeleteBitmapUpdateLockContext.java | 82 ++++++++
...test_cloud_mow_stream_load_with_commit_fail.out | 7 +
...t_cloud_mow_stream_load_with_commit_fail.groovy | 142 +++++++++++++
4 files changed, 346 insertions(+), 106 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index b9425245f42..ffe32348bd0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -346,7 +346,29 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
public void commitTransaction(long dbId, List<Table> tableList, long
transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment
txnCommitAttachment)
throws UserException {
- commitTransaction(dbId, tableList, transactionId, tabletCommitInfos,
txnCommitAttachment, false);
+ List<OlapTable> mowTableList = getMowTableList(tableList,
tabletCommitInfos);
+ try {
+ LOG.info("try to commit transaction, transactionId: {}",
transactionId);
+ Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos = null;
+ if (!mowTableList.isEmpty()) {
+ DeleteBitmapUpdateLockContext lockContext = new
DeleteBitmapUpdateLockContext();
+ getDeleteBitmapUpdateLock(transactionId, mowTableList,
tabletCommitInfos, lockContext);
+ if (lockContext.getBackendToPartitionTablets().isEmpty()) {
+ throw new UserException(
+ "The partition info is empty, table may be
dropped, txnid=" + transactionId);
+ }
+ backendToPartitionInfos = getCalcDeleteBitmapInfo(lockContext,
null);
+ }
+ commitTransactionWithoutLock(dbId, tableList, transactionId,
tabletCommitInfos, txnCommitAttachment, false,
+ mowTableList, backendToPartitionInfos);
+ } catch (Exception e) {
+ if (!mowTableList.isEmpty()) {
+ LOG.warn("commit txn {} failed, release delete bitmap lock,
catch exception {}", transactionId,
+ e.getMessage());
+ removeDeleteBitmapUpdateLock(mowTableList, transactionId);
+ }
+ throw e;
+ }
}
/**
@@ -464,17 +486,15 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
return baseTabletIds;
}
- private void commitTransaction(long dbId, List<Table> tableList, long
transactionId,
- List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment
txnCommitAttachment, boolean is2PC)
+ private void commitTransactionWithoutLock(long dbId, List<Table>
tableList, long transactionId,
+ List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment
txnCommitAttachment, boolean is2PC,
+ List<OlapTable> mowTableList, Map<Long,
List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos)
throws UserException {
-
- LOG.info("try to commit transaction, transactionId: {}",
transactionId);
if (Config.disable_load_job) {
throw new TransactionCommitFailedException(
"disable_load_job is set to true, all load jobs are not
allowed");
}
- List<OlapTable> mowTableList = getMowTableList(tableList,
tabletCommitInfos);
if (!mowTableList.isEmpty()) {
// may be this txn has been calculated by previously task but
commit rpc is timeout,
// and be will send another commit request to fe, so need to check
txn status first
@@ -493,7 +513,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
transactionState.getTransactionStatus().toString());
}
}
- calcDeleteBitmapForMow(dbId, mowTableList, transactionId,
tabletCommitInfos, null);
+ sendCalcDeleteBitmaptask(dbId, transactionId,
backendToPartitionInfos,
+ Config.calculate_delete_bitmap_task_timeout_seconds);
}
CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
@@ -535,6 +556,10 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest,
long transactionId, boolean is2PC,
TxnCommitAttachment txnCommitAttachment) throws UserException {
+ if (DebugPointUtil.isEnable("FE.mow.commit.exception")) {
+ LOG.info("debug point FE.mow.commit.exception, throw e");
+ throw new UserException("debug point FE.mow.commit.exception");
+ }
boolean txnOperated = false;
TransactionState txnState = null;
TxnStateChangeCallback cb = null;
@@ -653,43 +678,6 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
return mowTableList;
}
- private void calcDeleteBitmapForMow(long dbId, List<OlapTable> tableList,
long transactionId,
- List<TabletCommitInfo> tabletCommitInfos,
List<SubTransactionState> subTransactionStates)
- throws UserException {
- Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets =
Maps.newHashMap();
- Map<Long, Partition> partitions = Maps.newHashMap();
- Map<Long, Set<Long>> tableToPartitions = Maps.newHashMap();
- Map<Long, List<Long>> tableToTabletList = Maps.newHashMap();
- Map<Long, TabletMeta> tabletToTabletMeta = Maps.newHashMap();
- getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions,
partitions, backendToPartitionTablets,
- tableToTabletList, tabletToTabletMeta);
- if (backendToPartitionTablets.isEmpty()) {
- throw new UserException("The partition info is empty, table may be
dropped, txnid=" + transactionId);
- }
-
- Map<Long, List<Long>> partitionToSubTxnIds =
getPartitionSubTxnIds(subTransactionStates, tableToTabletList,
- tabletToTabletMeta);
- Map<Long, Long> baseCompactionCnts = Maps.newHashMap();
- Map<Long, Long> cumulativeCompactionCnts = Maps.newHashMap();
- Map<Long, Long> cumulativePoints = Maps.newHashMap();
- getDeleteBitmapUpdateLock(tableToPartitions, transactionId,
tableToTabletList, tabletToTabletMeta,
- baseCompactionCnts, cumulativeCompactionCnts,
cumulativePoints);
- Map<Long, Long> partitionVersions = getPartitionVersions(partitions);
-
- Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos = getCalcDeleteBitmapInfo(
- backendToPartitionTablets, partitionVersions,
baseCompactionCnts, cumulativeCompactionCnts,
- cumulativePoints, partitionToSubTxnIds);
- try {
- sendCalcDeleteBitmaptask(dbId, transactionId,
backendToPartitionInfos,
- subTransactionStates == null ?
Config.calculate_delete_bitmap_task_timeout_seconds
- :
Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load);
- } catch (UserException e) {
- LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" +
transactionId + ",exception=" + e.getMessage());
- removeDeleteBitmapUpdateLock(tableToPartitions, transactionId);
- throw e;
- }
- }
-
private Map<Long, List<Long>>
getPartitionSubTxnIds(List<SubTransactionState> subTransactionStates,
Map<Long, List<Long>> tableToTabletList, Map<Long, TabletMeta>
tabletToTabletMeta) {
if (subTransactionStates == null) {
@@ -715,11 +703,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
private void getPartitionInfo(List<OlapTable> tableList,
List<TabletCommitInfo> tabletCommitInfos,
- Map<Long, Set<Long>> tableToParttions,
- Map<Long, Partition> partitions,
- Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets,
- Map<Long, List<Long>> tableToTabletList,
- Map<Long, TabletMeta> tabletToTabletMeta) {
+ DeleteBitmapUpdateLockContext lockContext) {
Map<Long, OlapTable> tableMap = Maps.newHashMap();
for (OlapTable olapTable : tableList) {
tableMap.put(olapTable.getId(), olapTable);
@@ -731,7 +715,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
List<TabletMeta> tabletMetaList =
tabletInvertedIndex.getTabletMetaList(tabletIds);
for (int i = 0; i < tabletMetaList.size(); i++) {
long tabletId = tabletIds.get(i);
- if (tabletToTabletMeta.containsKey(tabletId)) {
+ if (lockContext.getTabletToTabletMeta().containsKey(tabletId)) {
continue;
}
TabletMeta tabletMeta = tabletMetaList.get(i);
@@ -740,9 +724,10 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
continue;
}
- tabletToTabletMeta.put(tabletId, tabletMeta);
+ lockContext.getTabletToTabletMeta().put(tabletId, tabletMeta);
- List<Long> tableTabletIds =
tableToTabletList.computeIfAbsent(tableId, k -> Lists.newArrayList());
+ List<Long> tableTabletIds = lockContext.getTableToTabletList()
+ .computeIfAbsent(tableId, k -> Lists.newArrayList());
if (!tableTabletIds.contains(tabletId)) {
tableTabletIds.add(tabletId);
}
@@ -750,20 +735,20 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
long partitionId = tabletMeta.getPartitionId();
long backendId = tabletCommitInfos.get(i).getBackendId();
- if (!tableToParttions.containsKey(tableId)) {
- tableToParttions.put(tableId, Sets.newHashSet());
+ if (!lockContext.getTableToPartitions().containsKey(tableId)) {
+ lockContext.getTableToPartitions().put(tableId,
Sets.newHashSet());
}
- tableToParttions.get(tableId).add(partitionId);
+ lockContext.getTableToPartitions().get(tableId).add(partitionId);
- if (!backendToPartitionTablets.containsKey(backendId)) {
- backendToPartitionTablets.put(backendId, Maps.newHashMap());
+ if
(!lockContext.getBackendToPartitionTablets().containsKey(backendId)) {
+ lockContext.getBackendToPartitionTablets().put(backendId,
Maps.newHashMap());
}
- Map<Long, Set<Long>> partitionToTablets =
backendToPartitionTablets.get(backendId);
+ Map<Long, Set<Long>> partitionToTablets =
lockContext.getBackendToPartitionTablets().get(backendId);
if (!partitionToTablets.containsKey(partitionId)) {
partitionToTablets.put(partitionId, Sets.newHashSet());
}
partitionToTablets.get(partitionId).add(tabletId);
- partitions.putIfAbsent(partitionId,
tableMap.get(tableId).getPartition(partitionId));
+ lockContext.getPartitions().putIfAbsent(partitionId,
tableMap.get(tableId).getPartition(partitionId));
}
}
@@ -778,11 +763,10 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
private Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
getCalcDeleteBitmapInfo(
- Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets,
Map<Long, Long> partitionVersions,
- Map<Long, Long> baseCompactionCnts, Map<Long, Long>
cumulativeCompactionCnts,
- Map<Long, Long> cumulativePoints, Map<Long,
List<Long>> partitionToSubTxnIds) {
+ DeleteBitmapUpdateLockContext lockContext, Map<Long, List<Long>>
partitionToSubTxnIds) {
Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos = Maps.newHashMap();
- for (Map.Entry<Long, Map<Long, Set<Long>>> entry :
backendToPartitionTablets.entrySet()) {
+ Map<Long, Long> partitionVersions =
getPartitionVersions(lockContext.getPartitions());
+ for (Map.Entry<Long, Map<Long, Set<Long>>> entry :
lockContext.getBackendToPartitionTablets().entrySet()) {
List<TCalcDeleteBitmapPartitionInfo> partitionInfos =
Lists.newArrayList();
for (Map.Entry<Long, Set<Long>> partitionToTablets :
entry.getValue().entrySet()) {
Long partitionId = partitionToTablets.getKey();
@@ -790,15 +774,16 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
TCalcDeleteBitmapPartitionInfo partitionInfo = new
TCalcDeleteBitmapPartitionInfo(partitionId,
partitionVersions.get(partitionId),
Lists.newArrayList(tabletList));
- if (!baseCompactionCnts.isEmpty() &&
!cumulativeCompactionCnts.isEmpty()
- && !cumulativePoints.isEmpty()) {
+ if (!lockContext.getBaseCompactionCnts().isEmpty()
+ && !lockContext.getCumulativeCompactionCnts().isEmpty()
+ && !lockContext.getCumulativePoints().isEmpty()) {
List<Long> reqBaseCompactionCnts = Lists.newArrayList();
List<Long> reqCumulativeCompactionCnts =
Lists.newArrayList();
List<Long> reqCumulativePoints = Lists.newArrayList();
for (long tabletId : tabletList) {
-
reqBaseCompactionCnts.add(baseCompactionCnts.get(tabletId));
-
reqCumulativeCompactionCnts.add(cumulativeCompactionCnts.get(tabletId));
-
reqCumulativePoints.add(cumulativePoints.get(tabletId));
+
reqBaseCompactionCnts.add(lockContext.getBaseCompactionCnts().get(tabletId));
+
reqCumulativeCompactionCnts.add(lockContext.getCumulativeCompactionCnts().get(tabletId));
+
reqCumulativePoints.add(lockContext.getCumulativePoints().get(tabletId));
}
partitionInfo.setBaseCompactionCnts(reqBaseCompactionCnts);
partitionInfo.setCumulativeCompactionCnts(reqCumulativeCompactionCnts);
@@ -818,10 +803,9 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
return backendToPartitionInfos;
}
- private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>>
tableToParttions, long transactionId,
- Map<Long, List<Long>> tableToTabletList, Map<Long, TabletMeta>
tabletToTabletMeta,
- Map<Long, Long> baseCompactionCnts, Map<Long, Long>
cumulativeCompactionCnts,
- Map<Long, Long> cumulativePoints) throws
UserException {
+ private void getDeleteBitmapUpdateLock(long transactionId, List<OlapTable>
mowTableList,
+ List<TabletCommitInfo> tabletCommitInfos,
DeleteBitmapUpdateLockContext lockContext)
+ throws UserException {
if
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.sleep"))
{
DebugPoint debugPoint = DebugPointUtil.getDebugPoint(
"CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.sleep");
@@ -854,17 +838,15 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
StopWatch stopWatch = new StopWatch();
stopWatch.start();
+ getPartitionInfo(mowTableList, tabletCommitInfos, lockContext);
int totalRetryTime = 0;
- for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
lockContext.getTableToPartitions().entrySet()) {
GetDeleteBitmapUpdateLockRequest.Builder builder =
GetDeleteBitmapUpdateLockRequest.newBuilder();
- builder.setTableId(entry.getKey())
- .setLockId(transactionId)
- .setInitiator(-1)
-
.setExpiration(Config.delete_bitmap_lock_expiration_seconds)
- .setRequireCompactionStats(true);
- List<Long> tabletList = tableToTabletList.get(entry.getKey());
+
builder.setTableId(entry.getKey()).setLockId(transactionId).setInitiator(-1)
+
.setExpiration(Config.delete_bitmap_lock_expiration_seconds).setRequireCompactionStats(true);
+ List<Long> tabletList =
lockContext.getTableToTabletList().get(entry.getKey());
for (Long tabletId : tabletList) {
- TabletMeta tabletMeta = tabletToTabletMeta.get(tabletId);
+ TabletMeta tabletMeta =
lockContext.getTabletToTabletMeta().get(tabletId);
TabletIndexPB.Builder tabletIndexBuilder =
TabletIndexPB.newBuilder();
tabletIndexBuilder.setDbId(tabletMeta.getDbId());
tabletIndexBuilder.setTableId(tabletMeta.getTableId());
@@ -881,16 +863,16 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
try {
response =
MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request);
if (LOG.isDebugEnabled()) {
- LOG.debug("get delete bitmap lock, transactionId={},
Request: {}, Response: {}",
- transactionId, request, response);
+ LOG.debug("get delete bitmap lock, transactionId={},
Request: {}, Response: {}", transactionId,
+ request, response);
}
if (response.getStatus().getCode() !=
MetaServiceCode.LOCK_CONFLICT
&& response.getStatus().getCode() !=
MetaServiceCode.KV_TXN_CONFLICT) {
break;
}
} catch (Exception e) {
- LOG.warn("ignore get delete bitmap lock exception,
transactionId={}, retryTime={}",
- transactionId, retryTime, e);
+ LOG.warn("ignore get delete bitmap lock exception,
transactionId={}, retryTime={}", transactionId,
+ retryTime, e);
}
// sleep random millis [20, 300] ms, avoid txn conflict
int randomMillis = 20 + (int) (Math.random() * (300 - 20));
@@ -906,8 +888,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
Preconditions.checkNotNull(response);
Preconditions.checkNotNull(response.getStatus());
if (response.getStatus().getCode() != MetaServiceCode.OK) {
- LOG.warn("get delete bitmap lock failed, transactionId={}, for
{} times, response:{}",
- transactionId, retryTime, response);
+ LOG.warn("get delete bitmap lock failed, transactionId={}, for
{} times, response:{}", transactionId,
+ retryTime, response);
if (response.getStatus().getCode() ==
MetaServiceCode.LOCK_CONFLICT
|| response.getStatus().getCode() ==
MetaServiceCode.KV_TXN_CONFLICT) {
// DELETE_BITMAP_LOCK_ERR will be retried on be
@@ -928,30 +910,28 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
if (size1 != tabletList.size() || size2 != tabletList.size() ||
size3 != tabletList.size()) {
throw new UserException("The size of returned compaction cnts
can't match the size of tabletList, "
+ "tabletList.size()=" + tabletList.size() + ",
respBaseCompactionCnts.size()=" + size1
- + ", respCumulativeCompactionCnts.size()=" +
size2 + ", respCumulativePoints.size()="
- + size3);
+ + ", respCumulativeCompactionCnts.size()=" + size2 +
", respCumulativePoints.size()=" + size3);
}
for (int i = 0; i < tabletList.size(); i++) {
long tabletId = tabletList.get(i);
- baseCompactionCnts.put(tabletId,
respBaseCompactionCnts.get(i));
- cumulativeCompactionCnts.put(tabletId,
respCumulativeCompactionCnts.get(i));
- cumulativePoints.put(tabletId, respCumulativePoints.get(i));
+ lockContext.getBaseCompactionCnts().put(tabletId,
respBaseCompactionCnts.get(i));
+ lockContext.getCumulativeCompactionCnts().put(tabletId,
respCumulativeCompactionCnts.get(i));
+ lockContext.getCumulativePoints().put(tabletId,
respCumulativePoints.get(i));
}
totalRetryTime += retryTime;
}
stopWatch.stop();
if (totalRetryTime > 0 || stopWatch.getTime() > 20) {
- LOG.info(
- "get delete bitmap lock successfully. txns: {}.
totalRetryTime: {}. "
- + "partitionSize: {}. time cost: {} ms.",
- transactionId, totalRetryTime, tableToParttions.size(),
stopWatch.getTime());
+ LOG.info("get delete bitmap lock successfully. txns: {}.
totalRetryTime: {}. "
+ + "partitionSize: {}. time cost: {} ms.",
transactionId, totalRetryTime,
+ lockContext.getTableToPartitions().size(),
stopWatch.getTime());
}
}
- private void removeDeleteBitmapUpdateLock(Map<Long, Set<Long>>
tableToParttions, long transactionId) {
- for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) {
+ private void removeDeleteBitmapUpdateLock(List<OlapTable> tableList, long
transactionId) {
+ for (OlapTable table : tableList) {
RemoveDeleteBitmapUpdateLockRequest.Builder builder =
RemoveDeleteBitmapUpdateLockRequest.newBuilder();
- builder.setTableId(entry.getKey())
+ builder.setTableId(table.getId())
.setLockId(transactionId)
.setInitiator(-1);
final RemoveDeleteBitmapUpdateLockRequest request =
builder.build();
@@ -978,6 +958,10 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
private void sendCalcDeleteBitmaptask(long dbId, long transactionId,
Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos,
long calculateDeleteBitmapTaskTimeoutSeconds) throws UserException
{
+ if (backendToPartitionInfos == null) {
+ throw new UserException("failed to send calculate delete bitmap
task to be,transactionId=" + transactionId
+ + ",but backendToPartitionInfos is null");
+ }
if (backendToPartitionInfos.isEmpty()) {
return;
}
@@ -1100,8 +1084,34 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
.collect(Collectors.toList());
List<Table> tableList = ((Database)
db).getTablesOnIdOrderOrThrowException(tableIdList);
beforeCommitTransaction(tableList, transactionId, timeoutMillis);
+ List<TabletCommitInfo> tabletCommitInfos =
subTransactionStates.stream().map(
+
SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream)
+ .map(c -> new TabletCommitInfo(c.getTabletId(),
c.getBackendId())).collect(Collectors.toList());
+ List<OlapTable> mowTableList = getMowTableList(tableList,
tabletCommitInfos);
try {
- commitTransactionWithSubTxns(db.getId(), tableList, transactionId,
subTransactionStates);
+ Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos = null;
+ if (!mowTableList.isEmpty()) {
+ DeleteBitmapUpdateLockContext lockContext = new
DeleteBitmapUpdateLockContext();
+ getDeleteBitmapUpdateLock(transactionId, mowTableList,
tabletCommitInfos, lockContext);
+ if (lockContext.getBackendToPartitionTablets().isEmpty()) {
+ throw new UserException(
+ "The partition info is empty, table may be
dropped, txnid=" + transactionId);
+ }
+ Map<Long, List<Long>> partitionToSubTxnIds =
getPartitionSubTxnIds(subTransactionStates,
+ lockContext.getTableToTabletList(),
+ lockContext.getTabletToTabletMeta());
+ backendToPartitionInfos = getCalcDeleteBitmapInfo(
+ lockContext, partitionToSubTxnIds);
+ }
+ commitTransactionWithSubTxns(db.getId(), tableList, transactionId,
subTransactionStates, mowTableList,
+ backendToPartitionInfos);
+ } catch (Exception e) {
+ if (!mowTableList.isEmpty()) {
+ LOG.warn("commit txn {} failed, release delete bitmap lock,
catch exception {}", transactionId,
+ e.getMessage());
+ removeDeleteBitmapUpdateLock(mowTableList, transactionId);
+ }
+ throw e;
} finally {
afterCommitTransaction(tableList);
}
@@ -1109,13 +1119,11 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
private void commitTransactionWithSubTxns(long dbId, List<Table>
tableList, long transactionId,
- List<SubTransactionState> subTransactionStates) throws
UserException {
- List<TabletCommitInfo> tabletCommitInfos =
subTransactionStates.stream().map(
-
SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream)
- .map(c -> new TabletCommitInfo(c.getTabletId(),
c.getBackendId())).collect(Collectors.toList());
- List<OlapTable> mowTableList = getMowTableList(tableList,
tabletCommitInfos);
+ List<SubTransactionState> subTransactionStates, List<OlapTable>
mowTableList,
+ Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos) throws UserException {
if (!mowTableList.isEmpty()) {
- calcDeleteBitmapForMow(dbId, mowTableList, transactionId,
tabletCommitInfos, subTransactionStates);
+ sendCalcDeleteBitmaptask(dbId, transactionId,
backendToPartitionInfos,
+
Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load);
}
cleanSubTransactions(transactionId);
@@ -1196,7 +1204,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
@Override
public void commitTransaction2PC(Database db, List<Table> tableList, long
transactionId, long timeoutMillis)
throws UserException {
- commitTransaction(db.getId(), tableList, transactionId, null, null,
true);
+ List<OlapTable> mowTableList = getMowTableList(tableList, null);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId,
null, null, true, mowTableList, null);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java
new file mode 100644
index 00000000000..fcc84b9ca18
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.transaction;
+
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.TabletMeta;
+
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DeleteBitmapUpdateLockContext {
+ private Map<Long, Long> baseCompactionCnts;
+ private Map<Long, Long> cumulativeCompactionCnts;
+ private Map<Long, Long> cumulativePoints;
+ private Map<Long, Set<Long>> tableToPartitions;
+ private Map<Long, Partition> partitions;
+ private Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets;
+ private Map<Long, List<Long>> tableToTabletList;
+ private Map<Long, TabletMeta> tabletToTabletMeta;
+
+ public DeleteBitmapUpdateLockContext() {
+ baseCompactionCnts = Maps.newHashMap();
+ cumulativeCompactionCnts = Maps.newHashMap();
+ cumulativePoints = Maps.newHashMap();
+ tableToPartitions = Maps.newHashMap();
+ partitions = Maps.newHashMap();
+ backendToPartitionTablets = Maps.newHashMap();
+ tableToTabletList = Maps.newHashMap();
+ tabletToTabletMeta = Maps.newHashMap();
+ }
+
+ public Map<Long, List<Long>> getTableToTabletList() {
+ return tableToTabletList;
+ }
+
+ public Map<Long, Long> getBaseCompactionCnts() {
+ return baseCompactionCnts;
+ }
+
+ public Map<Long, Long> getCumulativeCompactionCnts() {
+ return cumulativeCompactionCnts;
+ }
+
+ public Map<Long, Long> getCumulativePoints() {
+ return cumulativePoints;
+ }
+
+ public Map<Long, Map<Long, Set<Long>>> getBackendToPartitionTablets() {
+ return backendToPartitionTablets;
+ }
+
+ public Map<Long, Partition> getPartitions() {
+ return partitions;
+ }
+
+ public Map<Long, Set<Long>> getTableToPartitions() {
+ return tableToPartitions;
+ }
+
+ public Map<Long, TabletMeta> getTabletToTabletMeta() {
+ return tabletToTabletMeta;
+ }
+
+}
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
new file mode 100644
index 00000000000..b8b3ea3ecca
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+
+-- !sql --
+5 e 90
+6 f 100
+
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
new file mode 100644
index 00000000000..fa71c3644f2
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_params = [string: [:]]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_param = { paramName, paramValue ->
+ // for eache be node, set paramName=paramValue
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def reset_be_param = { paramName ->
+ // for eache be node, reset paramName to default
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def original_value = backendId_to_params.get(id).get(paramName)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
original_value))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def get_be_param = { paramName ->
+ // for eache be node, get param value by default
+ def paramValue = ""
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ // get the config value from be
+ def (code, out, err) = curl("GET",
String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort,
paramName))
+ assertTrue(code == 0)
+ assertTrue(out.contains(paramName))
+ // parsing
+ def resultList = parseJson(out)[0]
+ assertTrue(resultList.size() == 4)
+ // get original value
+ paramValue = resultList[2]
+ backendId_to_params.get(id, [:]).put(paramName, paramValue)
+ }
+ }
+
+ def customFeConfig = [
+ calculate_delete_bitmap_task_timeout_seconds: 2,
+ meta_service_rpc_retry_times : 5
+ ]
+
+ // store the original value
+ get_be_param("mow_stream_load_commit_retry_times")
+ // disable retry to make this problem more clear
+ set_be_param("mow_stream_load_commit_retry_times", "1")
+
+
+ def tableName = "tbl_basic"
+ setFeConfigTemporary(customFeConfig) {
+ try {
+ // create table
+ sql """ drop table if exists ${tableName}; """
+
+ sql """
+ CREATE TABLE `${tableName}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(1100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_num" = "1"
+ );
+ """
+ // this streamLoad will fail on fe commit phase
+
GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception', null)
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+
assertTrue(json.Message.contains("FE.mow.commit.exception"))
+ }
+ }
+ qt_sql """ select * from ${tableName} order by id"""
+
+ // this streamLoad will success because of removing exception
injection
+
GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ qt_sql """ select * from ${tableName} order by id"""
+ } finally {
+ reset_be_param("mow_stream_load_commit_retry_times")
+
GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
+ sql "DROP TABLE IF EXISTS ${tableName};"
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]