morningman commented on a change in pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#discussion_r799920692



##########
File path: docs/zh-CN/administrator-guide/load-data/stream-load-manual.md
##########
@@ -179,6 +179,22 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
     3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 
对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 
对其不产生影响。
 + merge\_type
     数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 
其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 
条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
+    
++ two\_phase\_commit
+
+    Stream load 导入可以开启两阶段批量事务提交模式。开启方式为在 HEADER 中声明 
```two_phase_commit=true``` 。默认的两阶段批量事务提交为关闭。
+    两阶段批量事务提交模式的意思是:Stream 
load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
+    
+    1. 用户可以调用如下接口对多个stream load事务批量触发commit操作:
+    ```
+    curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..." 
http://fe_host:http_port/api/{db}/{table}/_stream_load_commit

Review comment:
       No need `{table}` here

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long 
usedQuotaDataBytes) {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
+    public void preCommitTransaction(List<Table> tableList, long 
transactionId, List<TabletCommitInfo> tabletCommitInfos,
+                                  TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        // 1. check status
+        // the caller method already own db lock, we do not obtain db lock here
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+        if (transactionState == null
+                || transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
+            throw new TransactionCommitFailedException(
+                    transactionState == null ? "transaction not found" : 
transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already 
visible");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already 
committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.PRECOMMITTED) {
+            LOG.debug("transaction is already pre-committed: {}", 
transactionId);
+            return;
+        }
+
+        if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+            throw new 
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        TabletInvertedIndex tabletInvertedIndex = 
catalog.getTabletInvertedIndex();
+        Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+        Map<Long, Table> idToTable = new HashMap<>();
+        for (int i = 0; i < tableList.size(); i++) {
+            idToTable.put(tableList.get(i).getId(), tableList.get(i));
+        }
+        // 2. validate potential exists problem: db->table->partition
+        // guarantee exist exception during a transaction
+        // if index is dropped, it does not matter.
+        // if table or partition is dropped during load, just ignore that 
tablet,
+        // because we should allow dropping rollup or partition during load
+        List<Long> tabletIds = tabletCommitInfos.stream().map(
+                tabletCommitInfo -> 
tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+        List<TabletMeta> tabletMetaList = 
tabletInvertedIndex.getTabletMetaList(tabletIds);
+        for (int i = 0; i < tabletMetaList.size(); i++) {
+            TabletMeta tabletMeta = tabletMetaList.get(i);
+            if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+                continue;
+            }
+            long tabletId = tabletIds.get(i);
+            long tableId = tabletMeta.getTableId();
+            OlapTable tbl = (OlapTable) idToTable.get(tableId);
+            if (tbl == null) {
+                // this can happen when tableId == -1 (tablet being dropping)
+                // or table really not exist.
+                continue;
+            }
+
+            if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+                throw new LoadException("Table " + tbl.getName() + " is in 
restore process. "
+                        + "Can not load into it");
+            }
+
+            long partitionId = tabletMeta.getPartitionId();
+            if (tbl.getPartition(partitionId) == null) {
+                // this can happen when partitionId == -1 (tablet being 
dropping)
+                // or partition really not exist.
+                continue;
+            }
+
+            if (!tableToPartition.containsKey(tableId)) {
+                tableToPartition.put(tableId, new HashSet<>());
+            }
+            tableToPartition.get(tableId).add(partitionId);
+            if (!tabletToBackends.containsKey(tabletId)) {
+                tabletToBackends.put(tabletId, new HashSet<>());
+            }
+            
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+        }
+
+        if (tableToPartition.isEmpty()) {
+            // table or all partitions are being dropped
+            throw new 
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Set<Long> totalInvolvedBackends = Sets.newHashSet();
+        for (long tableId : tableToPartition.keySet()) {
+            OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+            for (Partition partition : table.getAllPartitions()) {
+                if 
(!tableToPartition.get(tableId).contains(partition.getId())) {
+                    continue;
+                }
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : 
transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                int quorumReplicaNum = 
table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
 / 2 + 1;
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int successReplicaNum = 0;
+                        long tabletId = tablet.getId();
+                        Set<Long> tabletBackends = tablet.getBackendIds();
+                        totalInvolvedBackends.addAll(tabletBackends);
+                        Set<Long> commitBackends = 
tabletToBackends.get(tabletId);
+                        // save the error replica ids for current tablet
+                        // this param is used for log
+                        Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+                        for (long tabletBackend : tabletBackends) {
+                            Replica replica = 
tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+                            if (replica == null) {
+                                throw new 
TransactionCommitFailedException("could not find replica for tablet ["
+                                        + tabletId + "], backend [" + 
tabletBackend + "]");
+                            }
+                            // if the tablet have no replica's to commit or 
the tablet is a rolling up tablet, the commit backends maybe null
+                            // if the commit backends is null, set all 
replicas as error replicas
+                            if (commitBackends != null && 
commitBackends.contains(tabletBackend)) {
+                                // if the backend load success but the backend 
has some errors previously, then it is not a normal replica
+                                // ignore it but not log it
+                                // for example, a replica is in clone state
+                                if (replica.getLastFailedVersion() < 0) {
+                                    ++successReplicaNum;
+                                }
+                            } else {
+                                errorBackendIdsForTablet.add(tabletBackend);
+                                errorReplicaIds.add(replica.getId());
+                                // not remove rollup task here, because the 
commit maybe failed
+                                // remove rollup task when commit successfully
+                            }
+                        }
+
+                        if (successReplicaNum < quorumReplicaNum) {
+                            LOG.warn("Failed to pre-commit txn [{}]. "
+                                            + "Tablet [{}] success replica num 
is {} < quorum replica num {} "
+                                            + "while error backends {}",
+                                    transactionId, tablet.getId(), 
successReplicaNum, quorumReplicaNum,
+                                    
Joiner.on(",").join(errorBackendIdsForTablet));
+                            throw new 
TabletQuorumFailedException(transactionId, tablet.getId(),
+                                    successReplicaNum, quorumReplicaNum,
+                                    errorBackendIdsForTablet);
+                        }
+                    }
+                }
+            }
+        }
+
+        unprotectedPreCommitTransaction(transactionState, errorReplicaIds, 
tableToPartition, totalInvolvedBackends, db);
+        LOG.info("transaction:[{}] successfully pre-committed", 
transactionState);
+    }
+
+    public TransactionState checkPreCommitStatus(long transactionId) throws 
UserException {

Review comment:
       ```suggestion
       private TransactionState checkPreCommitStatus(long transactionId) throws 
UserException {
   ```

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long 
usedQuotaDataBytes) {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
+    public void preCommitTransaction(List<Table> tableList, long 
transactionId, List<TabletCommitInfo> tabletCommitInfos,
+                                  TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        // 1. check status
+        // the caller method already own db lock, we do not obtain db lock here
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+        if (transactionState == null
+                || transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
+            throw new TransactionCommitFailedException(
+                    transactionState == null ? "transaction not found" : 
transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already 
visible");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already 
committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.PRECOMMITTED) {
+            LOG.debug("transaction is already pre-committed: {}", 
transactionId);
+            return;
+        }
+
+        if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+            throw new 
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        TabletInvertedIndex tabletInvertedIndex = 
catalog.getTabletInvertedIndex();
+        Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+        Map<Long, Table> idToTable = new HashMap<>();
+        for (int i = 0; i < tableList.size(); i++) {
+            idToTable.put(tableList.get(i).getId(), tableList.get(i));
+        }
+        // 2. validate potential exists problem: db->table->partition
+        // guarantee exist exception during a transaction
+        // if index is dropped, it does not matter.
+        // if table or partition is dropped during load, just ignore that 
tablet,
+        // because we should allow dropping rollup or partition during load
+        List<Long> tabletIds = tabletCommitInfos.stream().map(
+                tabletCommitInfo -> 
tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+        List<TabletMeta> tabletMetaList = 
tabletInvertedIndex.getTabletMetaList(tabletIds);
+        for (int i = 0; i < tabletMetaList.size(); i++) {
+            TabletMeta tabletMeta = tabletMetaList.get(i);
+            if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+                continue;
+            }
+            long tabletId = tabletIds.get(i);
+            long tableId = tabletMeta.getTableId();
+            OlapTable tbl = (OlapTable) idToTable.get(tableId);
+            if (tbl == null) {
+                // this can happen when tableId == -1 (tablet being dropping)
+                // or table really not exist.
+                continue;
+            }
+
+            if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+                throw new LoadException("Table " + tbl.getName() + " is in 
restore process. "
+                        + "Can not load into it");
+            }
+
+            long partitionId = tabletMeta.getPartitionId();
+            if (tbl.getPartition(partitionId) == null) {
+                // this can happen when partitionId == -1 (tablet being 
dropping)
+                // or partition really not exist.
+                continue;
+            }
+
+            if (!tableToPartition.containsKey(tableId)) {
+                tableToPartition.put(tableId, new HashSet<>());
+            }
+            tableToPartition.get(tableId).add(partitionId);
+            if (!tabletToBackends.containsKey(tabletId)) {
+                tabletToBackends.put(tabletId, new HashSet<>());
+            }
+            
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+        }
+
+        if (tableToPartition.isEmpty()) {
+            // table or all partitions are being dropped
+            throw new 
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Set<Long> totalInvolvedBackends = Sets.newHashSet();
+        for (long tableId : tableToPartition.keySet()) {
+            OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+            for (Partition partition : table.getAllPartitions()) {
+                if 
(!tableToPartition.get(tableId).contains(partition.getId())) {
+                    continue;
+                }
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : 
transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                int quorumReplicaNum = 
table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
 / 2 + 1;
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int successReplicaNum = 0;
+                        long tabletId = tablet.getId();
+                        Set<Long> tabletBackends = tablet.getBackendIds();
+                        totalInvolvedBackends.addAll(tabletBackends);
+                        Set<Long> commitBackends = 
tabletToBackends.get(tabletId);
+                        // save the error replica ids for current tablet
+                        // this param is used for log
+                        Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+                        for (long tabletBackend : tabletBackends) {
+                            Replica replica = 
tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+                            if (replica == null) {
+                                throw new 
TransactionCommitFailedException("could not find replica for tablet ["
+                                        + tabletId + "], backend [" + 
tabletBackend + "]");
+                            }
+                            // if the tablet have no replica's to commit or 
the tablet is a rolling up tablet, the commit backends maybe null
+                            // if the commit backends is null, set all 
replicas as error replicas
+                            if (commitBackends != null && 
commitBackends.contains(tabletBackend)) {
+                                // if the backend load success but the backend 
has some errors previously, then it is not a normal replica
+                                // ignore it but not log it
+                                // for example, a replica is in clone state
+                                if (replica.getLastFailedVersion() < 0) {
+                                    ++successReplicaNum;
+                                }
+                            } else {
+                                errorBackendIdsForTablet.add(tabletBackend);
+                                errorReplicaIds.add(replica.getId());
+                                // not remove rollup task here, because the 
commit maybe failed
+                                // remove rollup task when commit successfully
+                            }
+                        }
+
+                        if (successReplicaNum < quorumReplicaNum) {
+                            LOG.warn("Failed to pre-commit txn [{}]. "
+                                            + "Tablet [{}] success replica num 
is {} < quorum replica num {} "
+                                            + "while error backends {}",
+                                    transactionId, tablet.getId(), 
successReplicaNum, quorumReplicaNum,
+                                    
Joiner.on(",").join(errorBackendIdsForTablet));
+                            throw new 
TabletQuorumFailedException(transactionId, tablet.getId(),
+                                    successReplicaNum, quorumReplicaNum,
+                                    errorBackendIdsForTablet);
+                        }
+                    }
+                }
+            }
+        }
+
+        unprotectedPreCommitTransaction(transactionState, errorReplicaIds, 
tableToPartition, totalInvolvedBackends, db);
+        LOG.info("transaction:[{}] successfully pre-committed", 
transactionState);
+    }
+
+    public TransactionState checkPreCommitStatus(long transactionId) throws 
UserException {
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+
+        if (transactionState == null) {
+            LOG.debug("transaction not found: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction {" + 
transactionId + "} not found.");
+        }
+
+        transactionState.setCheckTimeout(false);
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
+            LOG.debug("transaction is already aborted: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is already aborted, not pre-committed" + 
transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is already visible, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is already committed, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.PREPARE) {
+            LOG.debug("transaction is prepare, not pre-committed: {}", 
transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is prepare, not pre-committed");
+        }
+
+        long currentTimeMillis = System.currentTimeMillis();
+        // Maybe new  invisible version has not been reported to master FE 
after the transaction is pre-committed
+        // or FE restart, we should wait a little while to commit transaction.
+        if (currentTimeMillis - transactionState.getPreCommitTime() <

Review comment:
       Does it mean that we need to wait at least 60sec to commit the 
precommitted the txn?
   That is strange to me.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -154,4 +178,125 @@ private Object executeWithoutPassword(HttpServletRequest 
request,
             return new RestBaseResult(e.getMessage());
         }
     }
+
+    private Object executeStreamLoadCommit(HttpServletRequest request,
+                                          HttpServletResponse response, String 
db, String table) {
+        try {
+            String dbName = db;
+            String tableName = table;
+
+            final String clusterName = ConnectContext.get().getClusterName();
+            if (Strings.isNullOrEmpty(clusterName)) {
+                return new RestBaseResult("No cluster selected.");
+            }
+
+            if (Strings.isNullOrEmpty(dbName)) {
+                return new RestBaseResult("No database selected.");
+            }
+
+            if (Strings.isNullOrEmpty(tableName)) {
+                return new RestBaseResult("No table selected.");
+            }
+
+            String fullDbName = ClusterNamespace.getFullName(clusterName, 
dbName);
+
+            List<Long> transactionIds = Lists.newArrayList();
+            String txnIds = request.getHeader(TXN_KEY);
+            if (Strings.isNullOrEmpty(txnIds)) {
+                return new RestBaseResult("No transaction id selected.");
+            } else {
+                for (String txnId : txnIds.split(",")) {

Review comment:
       trim the space?

##########
File path: docs/zh-CN/administrator-guide/load-data/stream-load-manual.md
##########
@@ -179,6 +179,22 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
     3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 
对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 
对其不产生影响。
 + merge\_type
     数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 
其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 
条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
+    
++ two\_phase\_commit
+
+    Stream load 导入可以开启两阶段批量事务提交模式。开启方式为在 HEADER 中声明 
```two_phase_commit=true``` 。默认的两阶段批量事务提交为关闭。
+    两阶段批量事务提交模式的意思是:Stream 
load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
+    
+    1. 用户可以调用如下接口对多个stream load事务批量触发commit操作:
+    ```
+    curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..." 
http://fe_host:http_port/api/{db}/{table}/_stream_load_commit
+    ```
+    当所有事务都能成功commit时,则本次批量触发commit的操作才成功,只要有一个事务commit失败,则本次批量触发commit的操作不会成功。
+    
+    2. 用户可以调用如下接口对多个stream load事务批量触发abort操作:
+    ```
+    curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..." 
http://fe_host:http_port/api/{db}/{table}/_stream_load_abort

Review comment:
       No need `{table}` here

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -198,6 +224,16 @@ public void commitTransaction(long dbId, List<Table> 
tableList, long transaction
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
         dbTransactionMgr.commitTransaction(tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
     }
+
+    public void commitTransaction(long dbId, List<Long> transactionIds)

Review comment:
       ```suggestion
       private void commitTransaction(long dbId, List<Long> transactionIds)
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -73,10 +78,29 @@ public Object streamLoad(HttpServletRequest request,
         return executeWithoutPassword(request, response, db, table);
     }
 
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + 
"}/_stream_load_commit", method = RequestMethod.GET)
+    public Object streamLoadCommit(HttpServletRequest request,
+                             HttpServletResponse response,
+                             @PathVariable(value = DB_KEY) String db, 
@PathVariable(value = TABLE_KEY) String table) {
+        this.isStreamLoad = true;
+        executeCheckPassword(request, response);
+        return executeStreamLoadCommit(request, response, db, table);
+    }
+
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + 
"}/_stream_load_abort", method = RequestMethod.GET)

Review comment:
       better to use `RequestMethod.PUT`?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -154,4 +178,125 @@ private Object executeWithoutPassword(HttpServletRequest 
request,
             return new RestBaseResult(e.getMessage());
         }
     }
+
+    private Object executeStreamLoadCommit(HttpServletRequest request,
+                                          HttpServletResponse response, String 
db, String table) {
+        try {
+            String dbName = db;
+            String tableName = table;
+
+            final String clusterName = ConnectContext.get().getClusterName();
+            if (Strings.isNullOrEmpty(clusterName)) {
+                return new RestBaseResult("No cluster selected.");
+            }
+
+            if (Strings.isNullOrEmpty(dbName)) {
+                return new RestBaseResult("No database selected.");
+            }
+
+            if (Strings.isNullOrEmpty(tableName)) {
+                return new RestBaseResult("No table selected.");
+            }
+
+            String fullDbName = ClusterNamespace.getFullName(clusterName, 
dbName);
+
+            List<Long> transactionIds = Lists.newArrayList();
+            String txnIds = request.getHeader(TXN_KEY);
+            if (Strings.isNullOrEmpty(txnIds)) {
+                return new RestBaseResult("No transaction id selected.");
+            } else {
+                for (String txnId : txnIds.split(",")) {
+                    transactionIds.add(Long.parseLong(txnId));
+                }
+            }
+
+            // check auth
+            checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), 
fullDbName, tableName, PrivPredicate.LOAD);
+            LOG.info("redirect stream load commit request to master FE, txns: 
{}", txnIds);
+
+            RedirectView redirectView = redirectToMaster(request, response);
+            if (redirectView != null) {
+                return redirectView;
+            }
+            {
+                LOG.info("Master FE received http request to commit txns: {}", 
txnIds);

Review comment:
       Use `debug` level, or there will be too many logs

##########
File path: gensrc/thrift/MasterService.thrift
##########
@@ -31,13 +31,14 @@ struct TTabletInfo {
     5: required Types.TCount row_count
     6: required Types.TSize data_size
     7: optional Types.TStorageMedium storage_medium
-    8: optional list<Types.TTransactionId> transaction_ids

Review comment:
       changing fields name or order may cause incompatibility.

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long 
usedQuotaDataBytes) {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
+    public void preCommitTransaction(List<Table> tableList, long 
transactionId, List<TabletCommitInfo> tabletCommitInfos,

Review comment:
       Most part of this method is same as `commitTransaction`. Can we merge 
these 2 methods?
   I think the only difference it to set the txn state to PRECOMMITTED or 
COMMITTED.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -73,10 +78,29 @@ public Object streamLoad(HttpServletRequest request,
         return executeWithoutPassword(request, response, db, table);
     }
 
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + 
"}/_stream_load_commit", method = RequestMethod.GET)
+    public Object streamLoadCommit(HttpServletRequest request,
+                             HttpServletResponse response,
+                             @PathVariable(value = DB_KEY) String db, 
@PathVariable(value = TABLE_KEY) String table) {
+        this.isStreamLoad = true;
+        executeCheckPassword(request, response);
+        return executeStreamLoadCommit(request, response, db, table);
+    }
+
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + 
"}/_stream_load_abort", method = RequestMethod.GET)
+    public Object streamLoadAbort(HttpServletRequest request,
+                                   HttpServletResponse response,
+                                   @PathVariable(value = DB_KEY) String db, 
@PathVariable(value = TABLE_KEY) String table) {
+        this.isStreamLoad = true;
+        executeCheckPassword(request, response);
+        return executeStreamLoadAbort(request, response, db, table);
+    }
+
     // Same as Multi load, to be compatible with http v1's response body,
     // we return error by using RestBaseResult.
     private Object executeWithoutPassword(HttpServletRequest request,
                                           HttpServletResponse response, String 
db, String table) {
+        LOG.info("received stream load request (precommit  httpv2).");

Review comment:
       This log can be removed.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -73,10 +78,29 @@ public Object streamLoad(HttpServletRequest request,
         return executeWithoutPassword(request, response, db, table);
     }
 
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + 
"}/_stream_load_commit", method = RequestMethod.GET)

Review comment:
       better to use `RequestMethod.PUT`?

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long 
usedQuotaDataBytes) {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
+    public void preCommitTransaction(List<Table> tableList, long 
transactionId, List<TabletCommitInfo> tabletCommitInfos,
+                                  TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        // 1. check status
+        // the caller method already own db lock, we do not obtain db lock here
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+        if (transactionState == null
+                || transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
+            throw new TransactionCommitFailedException(
+                    transactionState == null ? "transaction not found" : 
transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already 
visible");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already 
committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.PRECOMMITTED) {
+            LOG.debug("transaction is already pre-committed: {}", 
transactionId);
+            return;
+        }
+
+        if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+            throw new 
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        TabletInvertedIndex tabletInvertedIndex = 
catalog.getTabletInvertedIndex();
+        Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+        Map<Long, Table> idToTable = new HashMap<>();
+        for (int i = 0; i < tableList.size(); i++) {
+            idToTable.put(tableList.get(i).getId(), tableList.get(i));
+        }
+        // 2. validate potential exists problem: db->table->partition
+        // guarantee exist exception during a transaction
+        // if index is dropped, it does not matter.
+        // if table or partition is dropped during load, just ignore that 
tablet,
+        // because we should allow dropping rollup or partition during load
+        List<Long> tabletIds = tabletCommitInfos.stream().map(
+                tabletCommitInfo -> 
tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+        List<TabletMeta> tabletMetaList = 
tabletInvertedIndex.getTabletMetaList(tabletIds);
+        for (int i = 0; i < tabletMetaList.size(); i++) {
+            TabletMeta tabletMeta = tabletMetaList.get(i);
+            if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+                continue;
+            }
+            long tabletId = tabletIds.get(i);
+            long tableId = tabletMeta.getTableId();
+            OlapTable tbl = (OlapTable) idToTable.get(tableId);
+            if (tbl == null) {
+                // this can happen when tableId == -1 (tablet being dropping)
+                // or table really not exist.
+                continue;
+            }
+
+            if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+                throw new LoadException("Table " + tbl.getName() + " is in 
restore process. "
+                        + "Can not load into it");
+            }
+
+            long partitionId = tabletMeta.getPartitionId();
+            if (tbl.getPartition(partitionId) == null) {
+                // this can happen when partitionId == -1 (tablet being 
dropping)
+                // or partition really not exist.
+                continue;
+            }
+
+            if (!tableToPartition.containsKey(tableId)) {
+                tableToPartition.put(tableId, new HashSet<>());
+            }
+            tableToPartition.get(tableId).add(partitionId);
+            if (!tabletToBackends.containsKey(tabletId)) {
+                tabletToBackends.put(tabletId, new HashSet<>());
+            }
+            
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+        }
+
+        if (tableToPartition.isEmpty()) {
+            // table or all partitions are being dropped
+            throw new 
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Set<Long> totalInvolvedBackends = Sets.newHashSet();
+        for (long tableId : tableToPartition.keySet()) {
+            OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+            for (Partition partition : table.getAllPartitions()) {
+                if 
(!tableToPartition.get(tableId).contains(partition.getId())) {
+                    continue;
+                }
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : 
transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                int quorumReplicaNum = 
table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
 / 2 + 1;
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int successReplicaNum = 0;
+                        long tabletId = tablet.getId();
+                        Set<Long> tabletBackends = tablet.getBackendIds();
+                        totalInvolvedBackends.addAll(tabletBackends);
+                        Set<Long> commitBackends = 
tabletToBackends.get(tabletId);
+                        // save the error replica ids for current tablet
+                        // this param is used for log
+                        Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+                        for (long tabletBackend : tabletBackends) {
+                            Replica replica = 
tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+                            if (replica == null) {
+                                throw new 
TransactionCommitFailedException("could not find replica for tablet ["
+                                        + tabletId + "], backend [" + 
tabletBackend + "]");
+                            }
+                            // if the tablet have no replica's to commit or 
the tablet is a rolling up tablet, the commit backends maybe null
+                            // if the commit backends is null, set all 
replicas as error replicas
+                            if (commitBackends != null && 
commitBackends.contains(tabletBackend)) {
+                                // if the backend load success but the backend 
has some errors previously, then it is not a normal replica
+                                // ignore it but not log it
+                                // for example, a replica is in clone state
+                                if (replica.getLastFailedVersion() < 0) {
+                                    ++successReplicaNum;
+                                }
+                            } else {
+                                errorBackendIdsForTablet.add(tabletBackend);
+                                errorReplicaIds.add(replica.getId());
+                                // not remove rollup task here, because the 
commit maybe failed
+                                // remove rollup task when commit successfully
+                            }
+                        }
+
+                        if (successReplicaNum < quorumReplicaNum) {
+                            LOG.warn("Failed to pre-commit txn [{}]. "
+                                            + "Tablet [{}] success replica num 
is {} < quorum replica num {} "
+                                            + "while error backends {}",
+                                    transactionId, tablet.getId(), 
successReplicaNum, quorumReplicaNum,
+                                    
Joiner.on(",").join(errorBackendIdsForTablet));
+                            throw new 
TabletQuorumFailedException(transactionId, tablet.getId(),
+                                    successReplicaNum, quorumReplicaNum,
+                                    errorBackendIdsForTablet);
+                        }
+                    }
+                }
+            }
+        }
+
+        unprotectedPreCommitTransaction(transactionState, errorReplicaIds, 
tableToPartition, totalInvolvedBackends, db);
+        LOG.info("transaction:[{}] successfully pre-committed", 
transactionState);
+    }
+
+    public TransactionState checkPreCommitStatus(long transactionId) throws 
UserException {
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+
+        if (transactionState == null) {
+            LOG.debug("transaction not found: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction {" + 
transactionId + "} not found.");
+        }
+
+        transactionState.setCheckTimeout(false);
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
+            LOG.debug("transaction is already aborted: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is already aborted, not pre-committed" + 
transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is already visible, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is already committed, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.PREPARE) {
+            LOG.debug("transaction is prepare, not pre-committed: {}", 
transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is prepare, not pre-committed");
+        }
+
+        long currentTimeMillis = System.currentTimeMillis();
+        // Maybe new  invisible version has not been reported to master FE 
after the transaction is pre-committed
+        // or FE restart, we should wait a little while to commit transaction.
+        if (currentTimeMillis - transactionState.getPreCommitTime() <
+                Config.stream_load_default_wait_report_second * 1000) {
+            throw new TransactionCommitFailedException("The interval is too 
short after txn [" + transactionId + "] " +
+                    "was pre-committed. Please try again later.");
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Iterator<TableCommitInfo> tableCommitInfoIterator = 
transactionState.getIdToTableCommitInfos().values().iterator();
+        while (tableCommitInfoIterator.hasNext()) {
+            TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+            long tableId = tableCommitInfo.getTableId();
+            OlapTable table = (OlapTable) db.getTableNullable(tableId);
+            // table maybe dropped between pre-commit and commit, ignore this 
error
+            if (table == null) {
+                tableCommitInfoIterator.remove();
+                LOG.warn("table {} is dropped, skip version check and remove 
it from transaction state {}",
+                        tableId,
+                        transactionState);
+                continue;
+            }
+            PartitionInfo partitionInfo = table.getPartitionInfo();
+            Iterator<PartitionCommitInfo> partitionCommitInfoIterator = 
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+            while (partitionCommitInfoIterator.hasNext()) {
+                PartitionCommitInfo partitionCommitInfo = 
partitionCommitInfoIterator.next();
+                long partitionId = partitionCommitInfo.getPartitionId();
+                Partition partition = table.getPartition(partitionId);
+                // partition maybe dropped between pre-commit and commit, 
ignore this error
+                if (partition == null) {
+                    partitionCommitInfoIterator.remove();
+                    LOG.warn("partition {} is dropped, skip version check and 
remove it from transaction state {}",
+                            partitionId,
+                            transactionState);
+                    continue;
+                }
+
+                int quorumReplicaNum = 
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1;
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : 
transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int healthReplicaNum = 0;
+                        for (Replica replica : tablet.getReplicas()) {
+                            List<Long> committedTxnIds = 
replica.getCommittedTxnIds();
+                            if (committedTxnIds.contains(transactionId)) {
+                                ++healthReplicaNum;
+                            } else {
+                                errorReplicaIds.add(replica.getId());
+                            }
+                        }
+
+                        if (healthReplicaNum < quorumReplicaNum) {
+                            LOG.warn("failed to commit transaction {} on 
tablet {}, with only {} replicas less than quorum {}, txnId: {}",
+                                    transactionState, tablet, 
healthReplicaNum, quorumReplicaNum, transactionId);
+                            throw new TransactionCommitFailedException("tablet 
[" + tablet.getId() +"], with only ["
+                                    +healthReplicaNum+ "] replicas less than 
quorum [" +quorumReplicaNum+ "], txnId ["
+                                    + transactionId + "]");
+                        }
+                    }
+                }
+            }
+        }
+        transactionState.setErrorReplicas(errorReplicaIds);
+
+        return transactionState;
+    }
+
+    public void commitTransaction(List<Long> transactionIds) throws 
UserException {
+        try {
+            List<TransactionState> transactionStates = Lists.newArrayList();
+            for (long transactionId : transactionIds) {
+                transactionStates.add(checkPreCommitStatus(transactionId));
+            }
+
+            for (TransactionState transactionState : transactionStates) {
+                commitTransaction(transactionState);
+            }
+        } finally {
+            for (long transactionId : transactionIds) {
+                TransactionState transactionState;
+                readLock();
+                try {
+                    transactionState = 
unprotectedGetTransactionState(transactionId);
+                } finally {
+                    readUnlock();
+                }
+
+                if (transactionState != null) {
+                    transactionState.setCheckTimeout(true);
+                }
+            }
+        }
+    }
+
+    public void commitTransaction(TransactionState transactionState) throws 
UserException {

Review comment:
       ```suggestion
       private void commitTransaction(TransactionState transactionState) throws 
UserException {
   ```

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -198,6 +224,16 @@ public void commitTransaction(long dbId, List<Table> 
tableList, long transaction
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
         dbTransactionMgr.commitTransaction(tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
     }
+
+    public void commitTransaction(long dbId, List<Long> transactionIds)

Review comment:
       Better to change the method name, for example, adding a prefix of suffix 
`2PC` to make it different from the normal `commitTransaction`. Or it is 
confused for code reading.
   
   All other methods which are related to `2PC` in both GlobalTransactionMgr 
and DatabaseTransactionMgr need to be changed.

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long 
usedQuotaDataBytes) {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
+    public void preCommitTransaction(List<Table> tableList, long 
transactionId, List<TabletCommitInfo> tabletCommitInfos,
+                                  TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        // 1. check status
+        // the caller method already own db lock, we do not obtain db lock here
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+        if (transactionState == null
+                || transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
+            throw new TransactionCommitFailedException(
+                    transactionState == null ? "transaction not found" : 
transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already 
visible");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already 
committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.PRECOMMITTED) {
+            LOG.debug("transaction is already pre-committed: {}", 
transactionId);
+            return;
+        }
+
+        if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+            throw new 
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        TabletInvertedIndex tabletInvertedIndex = 
catalog.getTabletInvertedIndex();
+        Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+        Map<Long, Table> idToTable = new HashMap<>();
+        for (int i = 0; i < tableList.size(); i++) {
+            idToTable.put(tableList.get(i).getId(), tableList.get(i));
+        }
+        // 2. validate potential exists problem: db->table->partition
+        // guarantee exist exception during a transaction
+        // if index is dropped, it does not matter.
+        // if table or partition is dropped during load, just ignore that 
tablet,
+        // because we should allow dropping rollup or partition during load
+        List<Long> tabletIds = tabletCommitInfos.stream().map(
+                tabletCommitInfo -> 
tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+        List<TabletMeta> tabletMetaList = 
tabletInvertedIndex.getTabletMetaList(tabletIds);
+        for (int i = 0; i < tabletMetaList.size(); i++) {
+            TabletMeta tabletMeta = tabletMetaList.get(i);
+            if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+                continue;
+            }
+            long tabletId = tabletIds.get(i);
+            long tableId = tabletMeta.getTableId();
+            OlapTable tbl = (OlapTable) idToTable.get(tableId);
+            if (tbl == null) {
+                // this can happen when tableId == -1 (tablet being dropping)
+                // or table really not exist.
+                continue;
+            }
+
+            if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+                throw new LoadException("Table " + tbl.getName() + " is in 
restore process. "
+                        + "Can not load into it");
+            }
+
+            long partitionId = tabletMeta.getPartitionId();
+            if (tbl.getPartition(partitionId) == null) {
+                // this can happen when partitionId == -1 (tablet being 
dropping)
+                // or partition really not exist.
+                continue;
+            }
+
+            if (!tableToPartition.containsKey(tableId)) {
+                tableToPartition.put(tableId, new HashSet<>());
+            }
+            tableToPartition.get(tableId).add(partitionId);
+            if (!tabletToBackends.containsKey(tabletId)) {
+                tabletToBackends.put(tabletId, new HashSet<>());
+            }
+            
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+        }
+
+        if (tableToPartition.isEmpty()) {
+            // table or all partitions are being dropped
+            throw new 
TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Set<Long> totalInvolvedBackends = Sets.newHashSet();
+        for (long tableId : tableToPartition.keySet()) {
+            OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+            for (Partition partition : table.getAllPartitions()) {
+                if 
(!tableToPartition.get(tableId).contains(partition.getId())) {
+                    continue;
+                }
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : 
transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                int quorumReplicaNum = 
table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
 / 2 + 1;
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int successReplicaNum = 0;
+                        long tabletId = tablet.getId();
+                        Set<Long> tabletBackends = tablet.getBackendIds();
+                        totalInvolvedBackends.addAll(tabletBackends);
+                        Set<Long> commitBackends = 
tabletToBackends.get(tabletId);
+                        // save the error replica ids for current tablet
+                        // this param is used for log
+                        Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+                        for (long tabletBackend : tabletBackends) {
+                            Replica replica = 
tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+                            if (replica == null) {
+                                throw new 
TransactionCommitFailedException("could not find replica for tablet ["
+                                        + tabletId + "], backend [" + 
tabletBackend + "]");
+                            }
+                            // if the tablet have no replica's to commit or 
the tablet is a rolling up tablet, the commit backends maybe null
+                            // if the commit backends is null, set all 
replicas as error replicas
+                            if (commitBackends != null && 
commitBackends.contains(tabletBackend)) {
+                                // if the backend load success but the backend 
has some errors previously, then it is not a normal replica
+                                // ignore it but not log it
+                                // for example, a replica is in clone state
+                                if (replica.getLastFailedVersion() < 0) {
+                                    ++successReplicaNum;
+                                }
+                            } else {
+                                errorBackendIdsForTablet.add(tabletBackend);
+                                errorReplicaIds.add(replica.getId());
+                                // not remove rollup task here, because the 
commit maybe failed
+                                // remove rollup task when commit successfully
+                            }
+                        }
+
+                        if (successReplicaNum < quorumReplicaNum) {
+                            LOG.warn("Failed to pre-commit txn [{}]. "
+                                            + "Tablet [{}] success replica num 
is {} < quorum replica num {} "
+                                            + "while error backends {}",
+                                    transactionId, tablet.getId(), 
successReplicaNum, quorumReplicaNum,
+                                    
Joiner.on(",").join(errorBackendIdsForTablet));
+                            throw new 
TabletQuorumFailedException(transactionId, tablet.getId(),
+                                    successReplicaNum, quorumReplicaNum,
+                                    errorBackendIdsForTablet);
+                        }
+                    }
+                }
+            }
+        }
+
+        unprotectedPreCommitTransaction(transactionState, errorReplicaIds, 
tableToPartition, totalInvolvedBackends, db);
+        LOG.info("transaction:[{}] successfully pre-committed", 
transactionState);
+    }
+
+    public TransactionState checkPreCommitStatus(long transactionId) throws 
UserException {
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+
+        if (transactionState == null) {
+            LOG.debug("transaction not found: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction {" + 
transactionId + "} not found.");
+        }
+
+        transactionState.setCheckTimeout(false);
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
+            LOG.debug("transaction is already aborted: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is already aborted, not pre-committed" + 
transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is already visible, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is already committed, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.PREPARE) {
+            LOG.debug("transaction is prepare, not pre-committed: {}", 
transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is prepare, not pre-committed");
+        }
+
+        long currentTimeMillis = System.currentTimeMillis();
+        // Maybe new  invisible version has not been reported to master FE 
after the transaction is pre-committed
+        // or FE restart, we should wait a little while to commit transaction.
+        if (currentTimeMillis - transactionState.getPreCommitTime() <
+                Config.stream_load_default_wait_report_second * 1000) {
+            throw new TransactionCommitFailedException("The interval is too 
short after txn [" + transactionId + "] " +
+                    "was pre-committed. Please try again later.");
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Iterator<TableCommitInfo> tableCommitInfoIterator = 
transactionState.getIdToTableCommitInfos().values().iterator();
+        while (tableCommitInfoIterator.hasNext()) {
+            TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+            long tableId = tableCommitInfo.getTableId();
+            OlapTable table = (OlapTable) db.getTableNullable(tableId);
+            // table maybe dropped between pre-commit and commit, ignore this 
error
+            if (table == null) {
+                tableCommitInfoIterator.remove();
+                LOG.warn("table {} is dropped, skip version check and remove 
it from transaction state {}",
+                        tableId,
+                        transactionState);
+                continue;
+            }
+            PartitionInfo partitionInfo = table.getPartitionInfo();
+            Iterator<PartitionCommitInfo> partitionCommitInfoIterator = 
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+            while (partitionCommitInfoIterator.hasNext()) {
+                PartitionCommitInfo partitionCommitInfo = 
partitionCommitInfoIterator.next();
+                long partitionId = partitionCommitInfo.getPartitionId();
+                Partition partition = table.getPartition(partitionId);
+                // partition maybe dropped between pre-commit and commit, 
ignore this error
+                if (partition == null) {
+                    partitionCommitInfoIterator.remove();
+                    LOG.warn("partition {} is dropped, skip version check and 
remove it from transaction state {}",
+                            partitionId,
+                            transactionState);
+                    continue;
+                }
+
+                int quorumReplicaNum = 
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1;
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : 
transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int healthReplicaNum = 0;
+                        for (Replica replica : tablet.getReplicas()) {
+                            List<Long> committedTxnIds = 
replica.getCommittedTxnIds();
+                            if (committedTxnIds.contains(transactionId)) {
+                                ++healthReplicaNum;
+                            } else {
+                                errorReplicaIds.add(replica.getId());
+                            }
+                        }
+
+                        if (healthReplicaNum < quorumReplicaNum) {
+                            LOG.warn("failed to commit transaction {} on 
tablet {}, with only {} replicas less than quorum {}, txnId: {}",
+                                    transactionState, tablet, 
healthReplicaNum, quorumReplicaNum, transactionId);
+                            throw new TransactionCommitFailedException("tablet 
[" + tablet.getId() +"], with only ["
+                                    +healthReplicaNum+ "] replicas less than 
quorum [" +quorumReplicaNum+ "], txnId ["
+                                    + transactionId + "]");
+                        }
+                    }
+                }
+            }
+        }
+        transactionState.setErrorReplicas(errorReplicaIds);
+
+        return transactionState;
+    }
+
+    public void commitTransaction(List<Long> transactionIds) throws 
UserException {
+        try {
+            List<TransactionState> transactionStates = Lists.newArrayList();
+            for (long transactionId : transactionIds) {
+                transactionStates.add(checkPreCommitStatus(transactionId));

Review comment:
       If the txn is in PRECOMMITTED, it should be able to be COMMITTED, why we 
still need to call `checkPreCommitStatus()` here?
   The only thing need to be done in `checkPreCommitStatus()` is to check 
whether the txn state is PRECOMMITTED, but I saw that there other checking list 
in `checkPreCommitStatus()`, why?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to