morningman commented on a change in pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#discussion_r799920692
##########
File path: docs/zh-CN/administrator-guide/load-data/stream-load-manual.md
##########
@@ -179,6 +179,22 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode
对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict
对其不产生影响。
+ merge\_type
数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE
其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete
条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
+
++ two\_phase\_commit
+
+ Stream load 导入可以开启两阶段批量事务提交模式。开启方式为在 HEADER 中声明
```two_phase_commit=true``` 。默认的两阶段批量事务提交为关闭。
+ 两阶段批量事务提交模式的意思是:Stream
load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
+
+ 1. 用户可以调用如下接口对多个stream load事务批量触发commit操作:
+ ```
+ curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..."
http://fe_host:http_port/api/{db}/{table}/_stream_load_commit
Review comment:
No need `{table}` here
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long
usedQuotaDataBytes) {
this.usedQuotaDataBytes = usedQuotaDataBytes;
}
+ public void preCommitTransaction(List<Table> tableList, long
transactionId, List<TabletCommitInfo> tabletCommitInfos,
+ TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ // 1. check status
+ // the caller method already own db lock, we do not obtain db lock here
+ Database db = catalog.getDbOrMetaException(dbId);
+ TransactionState transactionState;
+ readLock();
+ try {
+ transactionState = unprotectedGetTransactionState(transactionId);
+ } finally {
+ readUnlock();
+ }
+ if (transactionState == null
+ || transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
+ throw new TransactionCommitFailedException(
+ transactionState == null ? "transaction not found" :
transactionState.getReason());
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ LOG.debug("transaction is already visible: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction is already
visible");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
+ LOG.debug("transaction is already committed: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction is already
committed");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.PRECOMMITTED) {
+ LOG.debug("transaction is already pre-committed: {}",
transactionId);
+ return;
+ }
+
+ if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+ throw new
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+ }
+
+ // update transaction state extra if exists
+ if (txnCommitAttachment != null) {
+ transactionState.setTxnCommitAttachment(txnCommitAttachment);
+ }
+
+ TabletInvertedIndex tabletInvertedIndex =
catalog.getTabletInvertedIndex();
+ Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+ Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+ Map<Long, Table> idToTable = new HashMap<>();
+ for (int i = 0; i < tableList.size(); i++) {
+ idToTable.put(tableList.get(i).getId(), tableList.get(i));
+ }
+ // 2. validate potential exists problem: db->table->partition
+ // guarantee exist exception during a transaction
+ // if index is dropped, it does not matter.
+ // if table or partition is dropped during load, just ignore that
tablet,
+ // because we should allow dropping rollup or partition during load
+ List<Long> tabletIds = tabletCommitInfos.stream().map(
+ tabletCommitInfo ->
tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+ List<TabletMeta> tabletMetaList =
tabletInvertedIndex.getTabletMetaList(tabletIds);
+ for (int i = 0; i < tabletMetaList.size(); i++) {
+ TabletMeta tabletMeta = tabletMetaList.get(i);
+ if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+ continue;
+ }
+ long tabletId = tabletIds.get(i);
+ long tableId = tabletMeta.getTableId();
+ OlapTable tbl = (OlapTable) idToTable.get(tableId);
+ if (tbl == null) {
+ // this can happen when tableId == -1 (tablet being dropping)
+ // or table really not exist.
+ continue;
+ }
+
+ if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+ throw new LoadException("Table " + tbl.getName() + " is in
restore process. "
+ + "Can not load into it");
+ }
+
+ long partitionId = tabletMeta.getPartitionId();
+ if (tbl.getPartition(partitionId) == null) {
+ // this can happen when partitionId == -1 (tablet being
dropping)
+ // or partition really not exist.
+ continue;
+ }
+
+ if (!tableToPartition.containsKey(tableId)) {
+ tableToPartition.put(tableId, new HashSet<>());
+ }
+ tableToPartition.get(tableId).add(partitionId);
+ if (!tabletToBackends.containsKey(tabletId)) {
+ tabletToBackends.put(tabletId, new HashSet<>());
+ }
+
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+ }
+
+ if (tableToPartition.isEmpty()) {
+ // table or all partitions are being dropped
+ throw new
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+ }
+
+ Set<Long> errorReplicaIds = Sets.newHashSet();
+ Set<Long> totalInvolvedBackends = Sets.newHashSet();
+ for (long tableId : tableToPartition.keySet()) {
+ OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+ for (Partition partition : table.getAllPartitions()) {
+ if
(!tableToPartition.get(tableId).contains(partition.getId())) {
+ continue;
+ }
+
+ List<MaterializedIndex> allIndices;
+ if (transactionState.getLoadedTblIndexes().isEmpty()) {
+ allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+ } else {
+ allIndices = Lists.newArrayList();
+ for (long indexId :
transactionState.getLoadedTblIndexes().get(tableId)) {
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index != null) {
+ allIndices.add(index);
+ }
+ }
+ }
+
+ int quorumReplicaNum =
table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
/ 2 + 1;
+ for (MaterializedIndex index : allIndices) {
+ for (Tablet tablet : index.getTablets()) {
+ int successReplicaNum = 0;
+ long tabletId = tablet.getId();
+ Set<Long> tabletBackends = tablet.getBackendIds();
+ totalInvolvedBackends.addAll(tabletBackends);
+ Set<Long> commitBackends =
tabletToBackends.get(tabletId);
+ // save the error replica ids for current tablet
+ // this param is used for log
+ Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+ for (long tabletBackend : tabletBackends) {
+ Replica replica =
tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+ if (replica == null) {
+ throw new
TransactionCommitFailedException("could not find replica for tablet ["
+ + tabletId + "], backend [" +
tabletBackend + "]");
+ }
+ // if the tablet have no replica's to commit or
the tablet is a rolling up tablet, the commit backends maybe null
+ // if the commit backends is null, set all
replicas as error replicas
+ if (commitBackends != null &&
commitBackends.contains(tabletBackend)) {
+ // if the backend load success but the backend
has some errors previously, then it is not a normal replica
+ // ignore it but not log it
+ // for example, a replica is in clone state
+ if (replica.getLastFailedVersion() < 0) {
+ ++successReplicaNum;
+ }
+ } else {
+ errorBackendIdsForTablet.add(tabletBackend);
+ errorReplicaIds.add(replica.getId());
+ // not remove rollup task here, because the
commit maybe failed
+ // remove rollup task when commit successfully
+ }
+ }
+
+ if (successReplicaNum < quorumReplicaNum) {
+ LOG.warn("Failed to pre-commit txn [{}]. "
+ + "Tablet [{}] success replica num
is {} < quorum replica num {} "
+ + "while error backends {}",
+ transactionId, tablet.getId(),
successReplicaNum, quorumReplicaNum,
+
Joiner.on(",").join(errorBackendIdsForTablet));
+ throw new
TabletQuorumFailedException(transactionId, tablet.getId(),
+ successReplicaNum, quorumReplicaNum,
+ errorBackendIdsForTablet);
+ }
+ }
+ }
+ }
+ }
+
+ unprotectedPreCommitTransaction(transactionState, errorReplicaIds,
tableToPartition, totalInvolvedBackends, db);
+ LOG.info("transaction:[{}] successfully pre-committed",
transactionState);
+ }
+
+ public TransactionState checkPreCommitStatus(long transactionId) throws
UserException {
Review comment:
```suggestion
private TransactionState checkPreCommitStatus(long transactionId) throws
UserException {
```
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long
usedQuotaDataBytes) {
this.usedQuotaDataBytes = usedQuotaDataBytes;
}
+ public void preCommitTransaction(List<Table> tableList, long
transactionId, List<TabletCommitInfo> tabletCommitInfos,
+ TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ // 1. check status
+ // the caller method already own db lock, we do not obtain db lock here
+ Database db = catalog.getDbOrMetaException(dbId);
+ TransactionState transactionState;
+ readLock();
+ try {
+ transactionState = unprotectedGetTransactionState(transactionId);
+ } finally {
+ readUnlock();
+ }
+ if (transactionState == null
+ || transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
+ throw new TransactionCommitFailedException(
+ transactionState == null ? "transaction not found" :
transactionState.getReason());
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ LOG.debug("transaction is already visible: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction is already
visible");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
+ LOG.debug("transaction is already committed: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction is already
committed");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.PRECOMMITTED) {
+ LOG.debug("transaction is already pre-committed: {}",
transactionId);
+ return;
+ }
+
+ if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+ throw new
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+ }
+
+ // update transaction state extra if exists
+ if (txnCommitAttachment != null) {
+ transactionState.setTxnCommitAttachment(txnCommitAttachment);
+ }
+
+ TabletInvertedIndex tabletInvertedIndex =
catalog.getTabletInvertedIndex();
+ Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+ Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+ Map<Long, Table> idToTable = new HashMap<>();
+ for (int i = 0; i < tableList.size(); i++) {
+ idToTable.put(tableList.get(i).getId(), tableList.get(i));
+ }
+ // 2. validate potential exists problem: db->table->partition
+ // guarantee exist exception during a transaction
+ // if index is dropped, it does not matter.
+ // if table or partition is dropped during load, just ignore that
tablet,
+ // because we should allow dropping rollup or partition during load
+ List<Long> tabletIds = tabletCommitInfos.stream().map(
+ tabletCommitInfo ->
tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+ List<TabletMeta> tabletMetaList =
tabletInvertedIndex.getTabletMetaList(tabletIds);
+ for (int i = 0; i < tabletMetaList.size(); i++) {
+ TabletMeta tabletMeta = tabletMetaList.get(i);
+ if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+ continue;
+ }
+ long tabletId = tabletIds.get(i);
+ long tableId = tabletMeta.getTableId();
+ OlapTable tbl = (OlapTable) idToTable.get(tableId);
+ if (tbl == null) {
+ // this can happen when tableId == -1 (tablet being dropping)
+ // or table really not exist.
+ continue;
+ }
+
+ if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+ throw new LoadException("Table " + tbl.getName() + " is in
restore process. "
+ + "Can not load into it");
+ }
+
+ long partitionId = tabletMeta.getPartitionId();
+ if (tbl.getPartition(partitionId) == null) {
+ // this can happen when partitionId == -1 (tablet being
dropping)
+ // or partition really not exist.
+ continue;
+ }
+
+ if (!tableToPartition.containsKey(tableId)) {
+ tableToPartition.put(tableId, new HashSet<>());
+ }
+ tableToPartition.get(tableId).add(partitionId);
+ if (!tabletToBackends.containsKey(tabletId)) {
+ tabletToBackends.put(tabletId, new HashSet<>());
+ }
+
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+ }
+
+ if (tableToPartition.isEmpty()) {
+ // table or all partitions are being dropped
+ throw new
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+ }
+
+ Set<Long> errorReplicaIds = Sets.newHashSet();
+ Set<Long> totalInvolvedBackends = Sets.newHashSet();
+ for (long tableId : tableToPartition.keySet()) {
+ OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+ for (Partition partition : table.getAllPartitions()) {
+ if
(!tableToPartition.get(tableId).contains(partition.getId())) {
+ continue;
+ }
+
+ List<MaterializedIndex> allIndices;
+ if (transactionState.getLoadedTblIndexes().isEmpty()) {
+ allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+ } else {
+ allIndices = Lists.newArrayList();
+ for (long indexId :
transactionState.getLoadedTblIndexes().get(tableId)) {
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index != null) {
+ allIndices.add(index);
+ }
+ }
+ }
+
+ int quorumReplicaNum =
table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
/ 2 + 1;
+ for (MaterializedIndex index : allIndices) {
+ for (Tablet tablet : index.getTablets()) {
+ int successReplicaNum = 0;
+ long tabletId = tablet.getId();
+ Set<Long> tabletBackends = tablet.getBackendIds();
+ totalInvolvedBackends.addAll(tabletBackends);
+ Set<Long> commitBackends =
tabletToBackends.get(tabletId);
+ // save the error replica ids for current tablet
+ // this param is used for log
+ Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+ for (long tabletBackend : tabletBackends) {
+ Replica replica =
tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+ if (replica == null) {
+ throw new
TransactionCommitFailedException("could not find replica for tablet ["
+ + tabletId + "], backend [" +
tabletBackend + "]");
+ }
+ // if the tablet have no replica's to commit or
the tablet is a rolling up tablet, the commit backends maybe null
+ // if the commit backends is null, set all
replicas as error replicas
+ if (commitBackends != null &&
commitBackends.contains(tabletBackend)) {
+ // if the backend load success but the backend
has some errors previously, then it is not a normal replica
+ // ignore it but not log it
+ // for example, a replica is in clone state
+ if (replica.getLastFailedVersion() < 0) {
+ ++successReplicaNum;
+ }
+ } else {
+ errorBackendIdsForTablet.add(tabletBackend);
+ errorReplicaIds.add(replica.getId());
+ // not remove rollup task here, because the
commit maybe failed
+ // remove rollup task when commit successfully
+ }
+ }
+
+ if (successReplicaNum < quorumReplicaNum) {
+ LOG.warn("Failed to pre-commit txn [{}]. "
+ + "Tablet [{}] success replica num
is {} < quorum replica num {} "
+ + "while error backends {}",
+ transactionId, tablet.getId(),
successReplicaNum, quorumReplicaNum,
+
Joiner.on(",").join(errorBackendIdsForTablet));
+ throw new
TabletQuorumFailedException(transactionId, tablet.getId(),
+ successReplicaNum, quorumReplicaNum,
+ errorBackendIdsForTablet);
+ }
+ }
+ }
+ }
+ }
+
+ unprotectedPreCommitTransaction(transactionState, errorReplicaIds,
tableToPartition, totalInvolvedBackends, db);
+ LOG.info("transaction:[{}] successfully pre-committed",
transactionState);
+ }
+
+ public TransactionState checkPreCommitStatus(long transactionId) throws
UserException {
+ Database db = catalog.getDbOrMetaException(dbId);
+ TransactionState transactionState;
+ readLock();
+ try {
+ transactionState = unprotectedGetTransactionState(transactionId);
+ } finally {
+ readUnlock();
+ }
+
+ if (transactionState == null) {
+ LOG.debug("transaction not found: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction {" +
transactionId + "} not found.");
+ }
+
+ transactionState.setCheckTimeout(false);
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
+ LOG.debug("transaction is already aborted: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is already aborted, not pre-committed" +
transactionState.getReason());
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ LOG.debug("transaction is already visible: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is already visible, not pre-committed");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
+ LOG.debug("transaction is already committed: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is already committed, not pre-committed");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.PREPARE) {
+ LOG.debug("transaction is prepare, not pre-committed: {}",
transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is prepare, not pre-committed");
+ }
+
+ long currentTimeMillis = System.currentTimeMillis();
+ // Maybe new invisible version has not been reported to master FE
after the transaction is pre-committed
+ // or FE restart, we should wait a little while to commit transaction.
+ if (currentTimeMillis - transactionState.getPreCommitTime() <
Review comment:
Does it mean that we need to wait at least 60sec to commit the
precommitted the txn?
That is strange to me.
##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -154,4 +178,125 @@ private Object executeWithoutPassword(HttpServletRequest
request,
return new RestBaseResult(e.getMessage());
}
}
+
+ private Object executeStreamLoadCommit(HttpServletRequest request,
+ HttpServletResponse response, String
db, String table) {
+ try {
+ String dbName = db;
+ String tableName = table;
+
+ final String clusterName = ConnectContext.get().getClusterName();
+ if (Strings.isNullOrEmpty(clusterName)) {
+ return new RestBaseResult("No cluster selected.");
+ }
+
+ if (Strings.isNullOrEmpty(dbName)) {
+ return new RestBaseResult("No database selected.");
+ }
+
+ if (Strings.isNullOrEmpty(tableName)) {
+ return new RestBaseResult("No table selected.");
+ }
+
+ String fullDbName = ClusterNamespace.getFullName(clusterName,
dbName);
+
+ List<Long> transactionIds = Lists.newArrayList();
+ String txnIds = request.getHeader(TXN_KEY);
+ if (Strings.isNullOrEmpty(txnIds)) {
+ return new RestBaseResult("No transaction id selected.");
+ } else {
+ for (String txnId : txnIds.split(",")) {
Review comment:
trim the space?
##########
File path: docs/zh-CN/administrator-guide/load-data/stream-load-manual.md
##########
@@ -179,6 +179,22 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode
对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict
对其不产生影响。
+ merge\_type
数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE
其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete
条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
+
++ two\_phase\_commit
+
+ Stream load 导入可以开启两阶段批量事务提交模式。开启方式为在 HEADER 中声明
```two_phase_commit=true``` 。默认的两阶段批量事务提交为关闭。
+ 两阶段批量事务提交模式的意思是:Stream
load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
+
+ 1. 用户可以调用如下接口对多个stream load事务批量触发commit操作:
+ ```
+ curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..."
http://fe_host:http_port/api/{db}/{table}/_stream_load_commit
+ ```
+ 当所有事务都能成功commit时,则本次批量触发commit的操作才成功,只要有一个事务commit失败,则本次批量触发commit的操作不会成功。
+
+ 2. 用户可以调用如下接口对多个stream load事务批量触发abort操作:
+ ```
+ curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..."
http://fe_host:http_port/api/{db}/{table}/_stream_load_abort
Review comment:
No need `{table}` here
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -198,6 +224,16 @@ public void commitTransaction(long dbId, List<Table>
tableList, long transaction
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
dbTransactionMgr.commitTransaction(tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
}
+
+ public void commitTransaction(long dbId, List<Long> transactionIds)
Review comment:
```suggestion
private void commitTransaction(long dbId, List<Long> transactionIds)
```
##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -73,10 +78,29 @@ public Object streamLoad(HttpServletRequest request,
return executeWithoutPassword(request, response, db, table);
}
+ @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY +
"}/_stream_load_commit", method = RequestMethod.GET)
+ public Object streamLoadCommit(HttpServletRequest request,
+ HttpServletResponse response,
+ @PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
+ this.isStreamLoad = true;
+ executeCheckPassword(request, response);
+ return executeStreamLoadCommit(request, response, db, table);
+ }
+
+ @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY +
"}/_stream_load_abort", method = RequestMethod.GET)
Review comment:
better to use `RequestMethod.PUT`?
##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -154,4 +178,125 @@ private Object executeWithoutPassword(HttpServletRequest
request,
return new RestBaseResult(e.getMessage());
}
}
+
+ private Object executeStreamLoadCommit(HttpServletRequest request,
+ HttpServletResponse response, String
db, String table) {
+ try {
+ String dbName = db;
+ String tableName = table;
+
+ final String clusterName = ConnectContext.get().getClusterName();
+ if (Strings.isNullOrEmpty(clusterName)) {
+ return new RestBaseResult("No cluster selected.");
+ }
+
+ if (Strings.isNullOrEmpty(dbName)) {
+ return new RestBaseResult("No database selected.");
+ }
+
+ if (Strings.isNullOrEmpty(tableName)) {
+ return new RestBaseResult("No table selected.");
+ }
+
+ String fullDbName = ClusterNamespace.getFullName(clusterName,
dbName);
+
+ List<Long> transactionIds = Lists.newArrayList();
+ String txnIds = request.getHeader(TXN_KEY);
+ if (Strings.isNullOrEmpty(txnIds)) {
+ return new RestBaseResult("No transaction id selected.");
+ } else {
+ for (String txnId : txnIds.split(",")) {
+ transactionIds.add(Long.parseLong(txnId));
+ }
+ }
+
+ // check auth
+ checkTblAuth(ConnectContext.get().getCurrentUserIdentity(),
fullDbName, tableName, PrivPredicate.LOAD);
+ LOG.info("redirect stream load commit request to master FE, txns:
{}", txnIds);
+
+ RedirectView redirectView = redirectToMaster(request, response);
+ if (redirectView != null) {
+ return redirectView;
+ }
+ {
+ LOG.info("Master FE received http request to commit txns: {}",
txnIds);
Review comment:
Use `debug` level, or there will be too many logs
##########
File path: gensrc/thrift/MasterService.thrift
##########
@@ -31,13 +31,14 @@ struct TTabletInfo {
5: required Types.TCount row_count
6: required Types.TSize data_size
7: optional Types.TStorageMedium storage_medium
- 8: optional list<Types.TTransactionId> transaction_ids
Review comment:
changing fields name or order may cause incompatibility.
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long
usedQuotaDataBytes) {
this.usedQuotaDataBytes = usedQuotaDataBytes;
}
+ public void preCommitTransaction(List<Table> tableList, long
transactionId, List<TabletCommitInfo> tabletCommitInfos,
Review comment:
Most part of this method is same as `commitTransaction`. Can we merge
these 2 methods?
I think the only difference it to set the txn state to PRECOMMITTED or
COMMITTED.
##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -73,10 +78,29 @@ public Object streamLoad(HttpServletRequest request,
return executeWithoutPassword(request, response, db, table);
}
+ @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY +
"}/_stream_load_commit", method = RequestMethod.GET)
+ public Object streamLoadCommit(HttpServletRequest request,
+ HttpServletResponse response,
+ @PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
+ this.isStreamLoad = true;
+ executeCheckPassword(request, response);
+ return executeStreamLoadCommit(request, response, db, table);
+ }
+
+ @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY +
"}/_stream_load_abort", method = RequestMethod.GET)
+ public Object streamLoadAbort(HttpServletRequest request,
+ HttpServletResponse response,
+ @PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
+ this.isStreamLoad = true;
+ executeCheckPassword(request, response);
+ return executeStreamLoadAbort(request, response, db, table);
+ }
+
// Same as Multi load, to be compatible with http v1's response body,
// we return error by using RestBaseResult.
private Object executeWithoutPassword(HttpServletRequest request,
HttpServletResponse response, String
db, String table) {
+ LOG.info("received stream load request (precommit httpv2).");
Review comment:
This log can be removed.
##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -73,10 +78,29 @@ public Object streamLoad(HttpServletRequest request,
return executeWithoutPassword(request, response, db, table);
}
+ @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY +
"}/_stream_load_commit", method = RequestMethod.GET)
Review comment:
better to use `RequestMethod.PUT`?
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long
usedQuotaDataBytes) {
this.usedQuotaDataBytes = usedQuotaDataBytes;
}
+ public void preCommitTransaction(List<Table> tableList, long
transactionId, List<TabletCommitInfo> tabletCommitInfos,
+ TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ // 1. check status
+ // the caller method already own db lock, we do not obtain db lock here
+ Database db = catalog.getDbOrMetaException(dbId);
+ TransactionState transactionState;
+ readLock();
+ try {
+ transactionState = unprotectedGetTransactionState(transactionId);
+ } finally {
+ readUnlock();
+ }
+ if (transactionState == null
+ || transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
+ throw new TransactionCommitFailedException(
+ transactionState == null ? "transaction not found" :
transactionState.getReason());
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ LOG.debug("transaction is already visible: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction is already
visible");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
+ LOG.debug("transaction is already committed: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction is already
committed");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.PRECOMMITTED) {
+ LOG.debug("transaction is already pre-committed: {}",
transactionId);
+ return;
+ }
+
+ if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+ throw new
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+ }
+
+ // update transaction state extra if exists
+ if (txnCommitAttachment != null) {
+ transactionState.setTxnCommitAttachment(txnCommitAttachment);
+ }
+
+ TabletInvertedIndex tabletInvertedIndex =
catalog.getTabletInvertedIndex();
+ Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+ Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+ Map<Long, Table> idToTable = new HashMap<>();
+ for (int i = 0; i < tableList.size(); i++) {
+ idToTable.put(tableList.get(i).getId(), tableList.get(i));
+ }
+ // 2. validate potential exists problem: db->table->partition
+ // guarantee exist exception during a transaction
+ // if index is dropped, it does not matter.
+ // if table or partition is dropped during load, just ignore that
tablet,
+ // because we should allow dropping rollup or partition during load
+ List<Long> tabletIds = tabletCommitInfos.stream().map(
+ tabletCommitInfo ->
tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+ List<TabletMeta> tabletMetaList =
tabletInvertedIndex.getTabletMetaList(tabletIds);
+ for (int i = 0; i < tabletMetaList.size(); i++) {
+ TabletMeta tabletMeta = tabletMetaList.get(i);
+ if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+ continue;
+ }
+ long tabletId = tabletIds.get(i);
+ long tableId = tabletMeta.getTableId();
+ OlapTable tbl = (OlapTable) idToTable.get(tableId);
+ if (tbl == null) {
+ // this can happen when tableId == -1 (tablet being dropping)
+ // or table really not exist.
+ continue;
+ }
+
+ if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+ throw new LoadException("Table " + tbl.getName() + " is in
restore process. "
+ + "Can not load into it");
+ }
+
+ long partitionId = tabletMeta.getPartitionId();
+ if (tbl.getPartition(partitionId) == null) {
+ // this can happen when partitionId == -1 (tablet being
dropping)
+ // or partition really not exist.
+ continue;
+ }
+
+ if (!tableToPartition.containsKey(tableId)) {
+ tableToPartition.put(tableId, new HashSet<>());
+ }
+ tableToPartition.get(tableId).add(partitionId);
+ if (!tabletToBackends.containsKey(tabletId)) {
+ tabletToBackends.put(tabletId, new HashSet<>());
+ }
+
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+ }
+
+ if (tableToPartition.isEmpty()) {
+ // table or all partitions are being dropped
+ throw new
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+ }
+
+ Set<Long> errorReplicaIds = Sets.newHashSet();
+ Set<Long> totalInvolvedBackends = Sets.newHashSet();
+ for (long tableId : tableToPartition.keySet()) {
+ OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+ for (Partition partition : table.getAllPartitions()) {
+ if
(!tableToPartition.get(tableId).contains(partition.getId())) {
+ continue;
+ }
+
+ List<MaterializedIndex> allIndices;
+ if (transactionState.getLoadedTblIndexes().isEmpty()) {
+ allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+ } else {
+ allIndices = Lists.newArrayList();
+ for (long indexId :
transactionState.getLoadedTblIndexes().get(tableId)) {
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index != null) {
+ allIndices.add(index);
+ }
+ }
+ }
+
+ int quorumReplicaNum =
table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
/ 2 + 1;
+ for (MaterializedIndex index : allIndices) {
+ for (Tablet tablet : index.getTablets()) {
+ int successReplicaNum = 0;
+ long tabletId = tablet.getId();
+ Set<Long> tabletBackends = tablet.getBackendIds();
+ totalInvolvedBackends.addAll(tabletBackends);
+ Set<Long> commitBackends =
tabletToBackends.get(tabletId);
+ // save the error replica ids for current tablet
+ // this param is used for log
+ Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+ for (long tabletBackend : tabletBackends) {
+ Replica replica =
tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+ if (replica == null) {
+ throw new
TransactionCommitFailedException("could not find replica for tablet ["
+ + tabletId + "], backend [" +
tabletBackend + "]");
+ }
+ // if the tablet have no replica's to commit or
the tablet is a rolling up tablet, the commit backends maybe null
+ // if the commit backends is null, set all
replicas as error replicas
+ if (commitBackends != null &&
commitBackends.contains(tabletBackend)) {
+ // if the backend load success but the backend
has some errors previously, then it is not a normal replica
+ // ignore it but not log it
+ // for example, a replica is in clone state
+ if (replica.getLastFailedVersion() < 0) {
+ ++successReplicaNum;
+ }
+ } else {
+ errorBackendIdsForTablet.add(tabletBackend);
+ errorReplicaIds.add(replica.getId());
+ // not remove rollup task here, because the
commit maybe failed
+ // remove rollup task when commit successfully
+ }
+ }
+
+ if (successReplicaNum < quorumReplicaNum) {
+ LOG.warn("Failed to pre-commit txn [{}]. "
+ + "Tablet [{}] success replica num
is {} < quorum replica num {} "
+ + "while error backends {}",
+ transactionId, tablet.getId(),
successReplicaNum, quorumReplicaNum,
+
Joiner.on(",").join(errorBackendIdsForTablet));
+ throw new
TabletQuorumFailedException(transactionId, tablet.getId(),
+ successReplicaNum, quorumReplicaNum,
+ errorBackendIdsForTablet);
+ }
+ }
+ }
+ }
+ }
+
+ unprotectedPreCommitTransaction(transactionState, errorReplicaIds,
tableToPartition, totalInvolvedBackends, db);
+ LOG.info("transaction:[{}] successfully pre-committed",
transactionState);
+ }
+
+ public TransactionState checkPreCommitStatus(long transactionId) throws
UserException {
+ Database db = catalog.getDbOrMetaException(dbId);
+ TransactionState transactionState;
+ readLock();
+ try {
+ transactionState = unprotectedGetTransactionState(transactionId);
+ } finally {
+ readUnlock();
+ }
+
+ if (transactionState == null) {
+ LOG.debug("transaction not found: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction {" +
transactionId + "} not found.");
+ }
+
+ transactionState.setCheckTimeout(false);
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
+ LOG.debug("transaction is already aborted: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is already aborted, not pre-committed" +
transactionState.getReason());
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ LOG.debug("transaction is already visible: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is already visible, not pre-committed");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
+ LOG.debug("transaction is already committed: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is already committed, not pre-committed");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.PREPARE) {
+ LOG.debug("transaction is prepare, not pre-committed: {}",
transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is prepare, not pre-committed");
+ }
+
+ long currentTimeMillis = System.currentTimeMillis();
+ // Maybe new invisible version has not been reported to master FE
after the transaction is pre-committed
+ // or FE restart, we should wait a little while to commit transaction.
+ if (currentTimeMillis - transactionState.getPreCommitTime() <
+ Config.stream_load_default_wait_report_second * 1000) {
+ throw new TransactionCommitFailedException("The interval is too
short after txn [" + transactionId + "] " +
+ "was pre-committed. Please try again later.");
+ }
+
+ Set<Long> errorReplicaIds = Sets.newHashSet();
+ Iterator<TableCommitInfo> tableCommitInfoIterator =
transactionState.getIdToTableCommitInfos().values().iterator();
+ while (tableCommitInfoIterator.hasNext()) {
+ TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+ long tableId = tableCommitInfo.getTableId();
+ OlapTable table = (OlapTable) db.getTableNullable(tableId);
+ // table maybe dropped between pre-commit and commit, ignore this
error
+ if (table == null) {
+ tableCommitInfoIterator.remove();
+ LOG.warn("table {} is dropped, skip version check and remove
it from transaction state {}",
+ tableId,
+ transactionState);
+ continue;
+ }
+ PartitionInfo partitionInfo = table.getPartitionInfo();
+ Iterator<PartitionCommitInfo> partitionCommitInfoIterator =
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+ while (partitionCommitInfoIterator.hasNext()) {
+ PartitionCommitInfo partitionCommitInfo =
partitionCommitInfoIterator.next();
+ long partitionId = partitionCommitInfo.getPartitionId();
+ Partition partition = table.getPartition(partitionId);
+ // partition maybe dropped between pre-commit and commit,
ignore this error
+ if (partition == null) {
+ partitionCommitInfoIterator.remove();
+ LOG.warn("partition {} is dropped, skip version check and
remove it from transaction state {}",
+ partitionId,
+ transactionState);
+ continue;
+ }
+
+ int quorumReplicaNum =
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1;
+
+ List<MaterializedIndex> allIndices;
+ if (transactionState.getLoadedTblIndexes().isEmpty()) {
+ allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+ } else {
+ allIndices = Lists.newArrayList();
+ for (long indexId :
transactionState.getLoadedTblIndexes().get(tableId)) {
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index != null) {
+ allIndices.add(index);
+ }
+ }
+ }
+
+ for (MaterializedIndex index : allIndices) {
+ for (Tablet tablet : index.getTablets()) {
+ int healthReplicaNum = 0;
+ for (Replica replica : tablet.getReplicas()) {
+ List<Long> committedTxnIds =
replica.getCommittedTxnIds();
+ if (committedTxnIds.contains(transactionId)) {
+ ++healthReplicaNum;
+ } else {
+ errorReplicaIds.add(replica.getId());
+ }
+ }
+
+ if (healthReplicaNum < quorumReplicaNum) {
+ LOG.warn("failed to commit transaction {} on
tablet {}, with only {} replicas less than quorum {}, txnId: {}",
+ transactionState, tablet,
healthReplicaNum, quorumReplicaNum, transactionId);
+ throw new TransactionCommitFailedException("tablet
[" + tablet.getId() +"], with only ["
+ +healthReplicaNum+ "] replicas less than
quorum [" +quorumReplicaNum+ "], txnId ["
+ + transactionId + "]");
+ }
+ }
+ }
+ }
+ }
+ transactionState.setErrorReplicas(errorReplicaIds);
+
+ return transactionState;
+ }
+
+ public void commitTransaction(List<Long> transactionIds) throws
UserException {
+ try {
+ List<TransactionState> transactionStates = Lists.newArrayList();
+ for (long transactionId : transactionIds) {
+ transactionStates.add(checkPreCommitStatus(transactionId));
+ }
+
+ for (TransactionState transactionState : transactionStates) {
+ commitTransaction(transactionState);
+ }
+ } finally {
+ for (long transactionId : transactionIds) {
+ TransactionState transactionState;
+ readLock();
+ try {
+ transactionState =
unprotectedGetTransactionState(transactionId);
+ } finally {
+ readUnlock();
+ }
+
+ if (transactionState != null) {
+ transactionState.setCheckTimeout(true);
+ }
+ }
+ }
+ }
+
+ public void commitTransaction(TransactionState transactionState) throws
UserException {
Review comment:
```suggestion
private void commitTransaction(TransactionState transactionState) throws
UserException {
```
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -198,6 +224,16 @@ public void commitTransaction(long dbId, List<Table>
tableList, long transaction
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
dbTransactionMgr.commitTransaction(tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
}
+
+ public void commitTransaction(long dbId, List<Long> transactionIds)
Review comment:
Better to change the method name, for example, adding a prefix of suffix
`2PC` to make it different from the normal `commitTransaction`. Or it is
confused for code reading.
All other methods which are related to `2PC` in both GlobalTransactionMgr
and DatabaseTransactionMgr need to be changed.
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long
usedQuotaDataBytes) {
this.usedQuotaDataBytes = usedQuotaDataBytes;
}
+ public void preCommitTransaction(List<Table> tableList, long
transactionId, List<TabletCommitInfo> tabletCommitInfos,
+ TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ // 1. check status
+ // the caller method already own db lock, we do not obtain db lock here
+ Database db = catalog.getDbOrMetaException(dbId);
+ TransactionState transactionState;
+ readLock();
+ try {
+ transactionState = unprotectedGetTransactionState(transactionId);
+ } finally {
+ readUnlock();
+ }
+ if (transactionState == null
+ || transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
+ throw new TransactionCommitFailedException(
+ transactionState == null ? "transaction not found" :
transactionState.getReason());
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ LOG.debug("transaction is already visible: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction is already
visible");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
+ LOG.debug("transaction is already committed: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction is already
committed");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.PRECOMMITTED) {
+ LOG.debug("transaction is already pre-committed: {}",
transactionId);
+ return;
+ }
+
+ if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+ throw new
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+ }
+
+ // update transaction state extra if exists
+ if (txnCommitAttachment != null) {
+ transactionState.setTxnCommitAttachment(txnCommitAttachment);
+ }
+
+ TabletInvertedIndex tabletInvertedIndex =
catalog.getTabletInvertedIndex();
+ Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+ Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+ Map<Long, Table> idToTable = new HashMap<>();
+ for (int i = 0; i < tableList.size(); i++) {
+ idToTable.put(tableList.get(i).getId(), tableList.get(i));
+ }
+ // 2. validate potential exists problem: db->table->partition
+ // guarantee exist exception during a transaction
+ // if index is dropped, it does not matter.
+ // if table or partition is dropped during load, just ignore that
tablet,
+ // because we should allow dropping rollup or partition during load
+ List<Long> tabletIds = tabletCommitInfos.stream().map(
+ tabletCommitInfo ->
tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+ List<TabletMeta> tabletMetaList =
tabletInvertedIndex.getTabletMetaList(tabletIds);
+ for (int i = 0; i < tabletMetaList.size(); i++) {
+ TabletMeta tabletMeta = tabletMetaList.get(i);
+ if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+ continue;
+ }
+ long tabletId = tabletIds.get(i);
+ long tableId = tabletMeta.getTableId();
+ OlapTable tbl = (OlapTable) idToTable.get(tableId);
+ if (tbl == null) {
+ // this can happen when tableId == -1 (tablet being dropping)
+ // or table really not exist.
+ continue;
+ }
+
+ if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+ throw new LoadException("Table " + tbl.getName() + " is in
restore process. "
+ + "Can not load into it");
+ }
+
+ long partitionId = tabletMeta.getPartitionId();
+ if (tbl.getPartition(partitionId) == null) {
+ // this can happen when partitionId == -1 (tablet being
dropping)
+ // or partition really not exist.
+ continue;
+ }
+
+ if (!tableToPartition.containsKey(tableId)) {
+ tableToPartition.put(tableId, new HashSet<>());
+ }
+ tableToPartition.get(tableId).add(partitionId);
+ if (!tabletToBackends.containsKey(tabletId)) {
+ tabletToBackends.put(tabletId, new HashSet<>());
+ }
+
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+ }
+
+ if (tableToPartition.isEmpty()) {
+ // table or all partitions are being dropped
+ throw new
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+ }
+
+ Set<Long> errorReplicaIds = Sets.newHashSet();
+ Set<Long> totalInvolvedBackends = Sets.newHashSet();
+ for (long tableId : tableToPartition.keySet()) {
+ OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+ for (Partition partition : table.getAllPartitions()) {
+ if
(!tableToPartition.get(tableId).contains(partition.getId())) {
+ continue;
+ }
+
+ List<MaterializedIndex> allIndices;
+ if (transactionState.getLoadedTblIndexes().isEmpty()) {
+ allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+ } else {
+ allIndices = Lists.newArrayList();
+ for (long indexId :
transactionState.getLoadedTblIndexes().get(tableId)) {
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index != null) {
+ allIndices.add(index);
+ }
+ }
+ }
+
+ int quorumReplicaNum =
table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
/ 2 + 1;
+ for (MaterializedIndex index : allIndices) {
+ for (Tablet tablet : index.getTablets()) {
+ int successReplicaNum = 0;
+ long tabletId = tablet.getId();
+ Set<Long> tabletBackends = tablet.getBackendIds();
+ totalInvolvedBackends.addAll(tabletBackends);
+ Set<Long> commitBackends =
tabletToBackends.get(tabletId);
+ // save the error replica ids for current tablet
+ // this param is used for log
+ Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+ for (long tabletBackend : tabletBackends) {
+ Replica replica =
tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+ if (replica == null) {
+ throw new
TransactionCommitFailedException("could not find replica for tablet ["
+ + tabletId + "], backend [" +
tabletBackend + "]");
+ }
+ // if the tablet have no replica's to commit or
the tablet is a rolling up tablet, the commit backends maybe null
+ // if the commit backends is null, set all
replicas as error replicas
+ if (commitBackends != null &&
commitBackends.contains(tabletBackend)) {
+ // if the backend load success but the backend
has some errors previously, then it is not a normal replica
+ // ignore it but not log it
+ // for example, a replica is in clone state
+ if (replica.getLastFailedVersion() < 0) {
+ ++successReplicaNum;
+ }
+ } else {
+ errorBackendIdsForTablet.add(tabletBackend);
+ errorReplicaIds.add(replica.getId());
+ // not remove rollup task here, because the
commit maybe failed
+ // remove rollup task when commit successfully
+ }
+ }
+
+ if (successReplicaNum < quorumReplicaNum) {
+ LOG.warn("Failed to pre-commit txn [{}]. "
+ + "Tablet [{}] success replica num
is {} < quorum replica num {} "
+ + "while error backends {}",
+ transactionId, tablet.getId(),
successReplicaNum, quorumReplicaNum,
+
Joiner.on(",").join(errorBackendIdsForTablet));
+ throw new
TabletQuorumFailedException(transactionId, tablet.getId(),
+ successReplicaNum, quorumReplicaNum,
+ errorBackendIdsForTablet);
+ }
+ }
+ }
+ }
+ }
+
+ unprotectedPreCommitTransaction(transactionState, errorReplicaIds,
tableToPartition, totalInvolvedBackends, db);
+ LOG.info("transaction:[{}] successfully pre-committed",
transactionState);
+ }
+
+ public TransactionState checkPreCommitStatus(long transactionId) throws
UserException {
+ Database db = catalog.getDbOrMetaException(dbId);
+ TransactionState transactionState;
+ readLock();
+ try {
+ transactionState = unprotectedGetTransactionState(transactionId);
+ } finally {
+ readUnlock();
+ }
+
+ if (transactionState == null) {
+ LOG.debug("transaction not found: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction {" +
transactionId + "} not found.");
+ }
+
+ transactionState.setCheckTimeout(false);
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
+ LOG.debug("transaction is already aborted: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is already aborted, not pre-committed" +
transactionState.getReason());
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ LOG.debug("transaction is already visible: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is already visible, not pre-committed");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
+ LOG.debug("transaction is already committed: {}", transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is already committed, not pre-committed");
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.PREPARE) {
+ LOG.debug("transaction is prepare, not pre-committed: {}",
transactionId);
+ throw new TransactionCommitFailedException("transaction [" +
transactionId
+ + "] is prepare, not pre-committed");
+ }
+
+ long currentTimeMillis = System.currentTimeMillis();
+ // Maybe new invisible version has not been reported to master FE
after the transaction is pre-committed
+ // or FE restart, we should wait a little while to commit transaction.
+ if (currentTimeMillis - transactionState.getPreCommitTime() <
+ Config.stream_load_default_wait_report_second * 1000) {
+ throw new TransactionCommitFailedException("The interval is too
short after txn [" + transactionId + "] " +
+ "was pre-committed. Please try again later.");
+ }
+
+ Set<Long> errorReplicaIds = Sets.newHashSet();
+ Iterator<TableCommitInfo> tableCommitInfoIterator =
transactionState.getIdToTableCommitInfos().values().iterator();
+ while (tableCommitInfoIterator.hasNext()) {
+ TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+ long tableId = tableCommitInfo.getTableId();
+ OlapTable table = (OlapTable) db.getTableNullable(tableId);
+ // table maybe dropped between pre-commit and commit, ignore this
error
+ if (table == null) {
+ tableCommitInfoIterator.remove();
+ LOG.warn("table {} is dropped, skip version check and remove
it from transaction state {}",
+ tableId,
+ transactionState);
+ continue;
+ }
+ PartitionInfo partitionInfo = table.getPartitionInfo();
+ Iterator<PartitionCommitInfo> partitionCommitInfoIterator =
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+ while (partitionCommitInfoIterator.hasNext()) {
+ PartitionCommitInfo partitionCommitInfo =
partitionCommitInfoIterator.next();
+ long partitionId = partitionCommitInfo.getPartitionId();
+ Partition partition = table.getPartition(partitionId);
+ // partition maybe dropped between pre-commit and commit,
ignore this error
+ if (partition == null) {
+ partitionCommitInfoIterator.remove();
+ LOG.warn("partition {} is dropped, skip version check and
remove it from transaction state {}",
+ partitionId,
+ transactionState);
+ continue;
+ }
+
+ int quorumReplicaNum =
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1;
+
+ List<MaterializedIndex> allIndices;
+ if (transactionState.getLoadedTblIndexes().isEmpty()) {
+ allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+ } else {
+ allIndices = Lists.newArrayList();
+ for (long indexId :
transactionState.getLoadedTblIndexes().get(tableId)) {
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index != null) {
+ allIndices.add(index);
+ }
+ }
+ }
+
+ for (MaterializedIndex index : allIndices) {
+ for (Tablet tablet : index.getTablets()) {
+ int healthReplicaNum = 0;
+ for (Replica replica : tablet.getReplicas()) {
+ List<Long> committedTxnIds =
replica.getCommittedTxnIds();
+ if (committedTxnIds.contains(transactionId)) {
+ ++healthReplicaNum;
+ } else {
+ errorReplicaIds.add(replica.getId());
+ }
+ }
+
+ if (healthReplicaNum < quorumReplicaNum) {
+ LOG.warn("failed to commit transaction {} on
tablet {}, with only {} replicas less than quorum {}, txnId: {}",
+ transactionState, tablet,
healthReplicaNum, quorumReplicaNum, transactionId);
+ throw new TransactionCommitFailedException("tablet
[" + tablet.getId() +"], with only ["
+ +healthReplicaNum+ "] replicas less than
quorum [" +quorumReplicaNum+ "], txnId ["
+ + transactionId + "]");
+ }
+ }
+ }
+ }
+ }
+ transactionState.setErrorReplicas(errorReplicaIds);
+
+ return transactionState;
+ }
+
+ public void commitTransaction(List<Long> transactionIds) throws
UserException {
+ try {
+ List<TransactionState> transactionStates = Lists.newArrayList();
+ for (long transactionId : transactionIds) {
+ transactionStates.add(checkPreCommitStatus(transactionId));
Review comment:
If the txn is in PRECOMMITTED, it should be able to be COMMITTED, why we
still need to call `checkPreCommitStatus()` here?
The only thing need to be done in `checkPreCommitStatus()` is to check
whether the txn state is PRECOMMITTED, but I saw that there other checking list
in `checkPreCommitStatus()`, why?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]