This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b2a21e6533c [bugfix](external)Memory leak problem for external table 
with insert operation (#40440)
b2a21e6533c is described below

commit b2a21e6533cabdf800ea436ed33f9764c47445d6
Author: wuwenchi <[email protected]>
AuthorDate: Sat Sep 21 08:52:00 2024 +0800

    [bugfix](external)Memory leak problem for external table with insert 
operation (#40440)
    
    ## Proposed changes
    
    Get the corresponding transaction through `txnId`, and then update the
    file list returned from be according to the transaction.
---
 .../main/java/org/apache/doris/catalog/Env.java    |  8 ++++
 .../commands/insert/AbstractInsertExecutor.java    |  9 ++++-
 .../insert/BaseExternalTableInsertExecutor.java    | 13 ------
 .../plans/commands/insert/HiveInsertExecutor.java  |  8 +---
 .../commands/insert/IcebergInsertExecutor.java     |  8 +---
 .../commands/insert/InsertIntoTableCommand.java    |  1 +
 .../plans/commands/insert/JdbcInsertExecutor.java  |  5 ---
 .../plans/commands/insert/OlapInsertExecutor.java  |  7 ----
 .../main/java/org/apache/doris/qe/Coordinator.java | 32 ++++++---------
 ...ava => AbstractExternalTransactionManager.java} | 40 ++++++++++--------
 ....java => GlobalExternalTransactionInfoMgr.java} | 26 ++++++++----
 .../doris/transaction/HiveTransactionManager.java  | 47 ++--------------------
 .../transaction/IcebergTransactionManager.java     | 47 ++--------------------
 .../doris/transaction/TransactionManager.java      |  2 +-
 14 files changed, 82 insertions(+), 171 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 1b55a966958..4c4c3089bab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -290,6 +290,7 @@ import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
+import org.apache.doris.transaction.GlobalExternalTransactionInfoMgr;
 import org.apache.doris.transaction.GlobalTransactionMgrIface;
 import org.apache.doris.transaction.PublishVersionDaemon;
 
@@ -568,6 +569,8 @@ public class Env {
 
     private final SplitSourceManager splitSourceManager;
 
+    private final GlobalExternalTransactionInfoMgr 
globalExternalTransactionInfoMgr;
+
     private final List<String> forceSkipJournalIds = 
Arrays.asList(Config.force_skip_journal_ids);
 
     // if a config is relative to a daemon thread. record the relation here. 
we will proactively change interval of it.
@@ -816,6 +819,7 @@ public class Env {
         this.dnsCache = new DNSCache();
         this.sqlCacheManager = new NereidsSqlCacheManager();
         this.splitSourceManager = new SplitSourceManager();
+        this.globalExternalTransactionInfoMgr = new 
GlobalExternalTransactionInfoMgr();
     }
 
     public static void destroyCheckpoint() {
@@ -6591,6 +6595,10 @@ public class Env {
         return splitSourceManager;
     }
 
+    public GlobalExternalTransactionInfoMgr 
getGlobalExternalTransactionInfoMgr() {
+        return globalExternalTransactionInfoMgr;
+    }
+
     public StatisticsJobAppender getStatisticsJobAppender() {
         return statisticsJobAppender;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index cafffab295e..de3fc5eb953 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -48,7 +48,9 @@ import java.util.Optional;
  * The derived class should implement the abstract method for certain type of 
target table
  */
 public abstract class AbstractInsertExecutor {
+    protected static final long INVALID_TXN_ID = -1L;
     private static final Logger LOG = 
LogManager.getLogger(AbstractInsertExecutor.class);
+
     protected long jobId;
     protected final ConnectContext ctx;
     protected final Coordinator coordinator;
@@ -62,6 +64,7 @@ public abstract class AbstractInsertExecutor {
     protected String errMsg = "";
     protected Optional<InsertCommandContext> insertCtx;
     protected final boolean emptyInsert;
+    protected long txnId = INVALID_TXN_ID;
 
     /**
      * Constructor
@@ -93,7 +96,9 @@ public abstract class AbstractInsertExecutor {
         return labelName;
     }
 
-    public abstract long getTxnId();
+    public long getTxnId() {
+        return txnId;
+    }
 
     /**
      * begin transaction if necessary
@@ -108,7 +113,7 @@ public abstract class AbstractInsertExecutor {
     /**
      * Do something before exec
      */
-    protected abstract void beforeExec();
+    protected abstract void beforeExec() throws UserException;
 
     /**
      * Do something after exec finished
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
index e456d171df5..082f1bab7d6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
@@ -46,9 +46,7 @@ import java.util.Optional;
  * Insert executor for base external table
  */
 public abstract class BaseExternalTableInsertExecutor extends 
AbstractInsertExecutor {
-    protected static final long INVALID_TXN_ID = -1L;
     private static final Logger LOG = 
LogManager.getLogger(BaseExternalTableInsertExecutor.class);
-    protected long txnId = INVALID_TXN_ID;
     protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
     protected final TransactionManager transactionManager;
     protected final String catalogName;
@@ -70,16 +68,6 @@ public abstract class BaseExternalTableInsertExecutor 
extends AbstractInsertExec
         }
     }
 
-    @Override
-    public long getTxnId() {
-        return txnId;
-    }
-
-    /**
-     * collect commit infos from BEs
-     */
-    protected abstract void setCollectCommitInfoFunc();
-
     /**
      * At this time, FE has successfully collected all commit information from 
BEs.
      * Before commit this txn, commit information need to be analyzed and 
processed.
@@ -94,7 +82,6 @@ public abstract class BaseExternalTableInsertExecutor extends 
AbstractInsertExec
     @Override
     public void beginTransaction() {
         txnId = transactionManager.begin();
-        setCollectCommitInfoFunc();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index 10ff27add86..99464ccfc01 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -49,13 +49,7 @@ public class HiveInsertExecutor extends 
BaseExternalTableInsertExecutor {
     }
 
     @Override
-    public void setCollectCommitInfoFunc() {
-        HMSTransaction transaction = (HMSTransaction) 
transactionManager.getTransaction(txnId);
-        
coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates);
-    }
-
-    @Override
-    protected void beforeExec() {
+    protected void beforeExec() throws UserException {
         // check params
         HMSTransaction transaction = (HMSTransaction) 
transactionManager.getTransaction(txnId);
         Preconditions.checkArgument(insertCtx.isPresent(), "insert context 
must be present");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
index 86b1f1ef0b7..fe8ff063571 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
@@ -47,13 +47,7 @@ public class IcebergInsertExecutor extends 
BaseExternalTableInsertExecutor {
     }
 
     @Override
-    public void setCollectCommitInfoFunc() {
-        IcebergTransaction transaction = (IcebergTransaction) 
transactionManager.getTransaction(txnId);
-        
coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData);
-    }
-
-    @Override
-    protected void beforeExec() {
+    protected void beforeExec() throws UserException {
         String dbName = ((IcebergExternalTable) table).getDbName();
         String tbName = table.getName();
         SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 38d0d838630..74f75d2d7d5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -237,6 +237,7 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         executor.setProfileType(ProfileType.LOAD);
         // We exposed @StmtExecutor#cancel as a unified entry point for 
statement interruption,
         // so we need to set this here
+        insertExecutor.getCoordinator().setTxnId(insertExecutor.getTxnId());
         executor.setCoord(insertExecutor.getCoordinator());
         return insertExecutor;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
index 928b17edf38..fb41f71083a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
@@ -90,11 +90,6 @@ public class JdbcInsertExecutor extends 
BaseExternalTableInsertExecutor {
         // do nothing
     }
 
-    @Override
-    protected void setCollectCommitInfoFunc() {
-        // do nothing
-    }
-
     @Override
     protected void doBeforeCommit() throws UserException {
         // do nothing
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index e38ee40bc9a..658b154b017 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -72,9 +72,7 @@ import java.util.stream.Collectors;
  * Insert executor for olap table
  */
 public class OlapInsertExecutor extends AbstractInsertExecutor {
-    protected static final long INVALID_TXN_ID = -1L;
     private static final Logger LOG = 
LogManager.getLogger(OlapInsertExecutor.class);
-    protected long txnId = INVALID_TXN_ID;
     protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
 
     /**
@@ -85,11 +83,6 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
         super(ctx, table, labelName, planner, insertCtx, emptyInsert);
     }
 
-    @Override
-    public long getTxnId() {
-        return txnId;
-    }
-
     @Override
     public void beginTransaction() {
         if (isGroupCommitHttpStream()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index b1ea3772deb..39cdb051378 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -36,6 +36,8 @@ import org.apache.doris.common.util.ListUtil;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.ExternalScanNode;
 import org.apache.doris.datasource.FileQueryScanNode;
+import org.apache.doris.datasource.hive.HMSTransaction;
+import org.apache.doris.datasource.iceberg.IcebergTransaction;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.MysqlCommand;
@@ -94,8 +96,6 @@ import org.apache.doris.thrift.TExternalScanRange;
 import org.apache.doris.thrift.TFileScanRange;
 import org.apache.doris.thrift.TFileScanRangeParams;
 import org.apache.doris.thrift.TFragmentInstanceReport;
-import org.apache.doris.thrift.THivePartitionUpdate;
-import org.apache.doris.thrift.TIcebergCommitData;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloScanRange;
 import org.apache.doris.thrift.TPipelineFragmentParams;
@@ -251,12 +251,6 @@ public class Coordinator implements CoordInterface {
     private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
     private final List<TErrorTabletInfo> errorTabletInfos = 
Lists.newArrayList();
 
-    // Collect all hivePartitionUpdates obtained from be
-    Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc;
-
-    // Collect all icebergCommitData obtained from be
-    Consumer<List<TIcebergCommitData>> icebergCommitDataFunc;
-
     // Input parameter
     private long jobId = -1; // job which this task belongs to
     private TUniqueId queryId;
@@ -484,6 +478,10 @@ public class Coordinator implements CoordInterface {
         return txnId;
     }
 
+    public void setTxnId(long txnId) {
+        this.txnId = txnId;
+    }
+
     public String getLabel() {
         return label;
     }
@@ -2381,14 +2379,6 @@ public class Coordinator implements CoordInterface {
         // TODO: more ranges?
     }
 
-    public void 
setHivePartitionUpdateFunc(Consumer<List<THivePartitionUpdate>> 
hivePartitionUpdateFunc) {
-        this.hivePartitionUpdateFunc = hivePartitionUpdateFunc;
-    }
-
-    public void setIcebergCommitDataFunc(Consumer<List<TIcebergCommitData>> 
icebergCommitDataFunc) {
-        this.icebergCommitDataFunc = icebergCommitDataFunc;
-    }
-
     // update job progress from BE
     public void updateFragmentExecStatus(TReportExecStatusParams params) {
         PipelineExecContext ctx = 
pipelineExecContexts.get(Pair.of(params.getFragmentId(), 
params.getBackendId()));
@@ -2441,11 +2431,13 @@ public class Coordinator implements CoordInterface {
         if (params.isSetErrorTabletInfos()) {
             updateErrorTabletInfos(params.getErrorTabletInfos());
         }
-        if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != 
null) {
-            hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
+        if (params.isSetHivePartitionUpdates()) {
+            ((HMSTransaction) 
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
+                .updateHivePartitionUpdates(params.getHivePartitionUpdates());
         }
-        if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) 
{
-            icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
+        if (params.isSetIcebergCommitDatas()) {
+            ((IcebergTransaction) 
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
+                .updateIcebergCommitData(params.getIcebergCommitDatas());
         }
 
         if (ctx.done) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AbstractExternalTransactionManager.java
similarity index 53%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/transaction/AbstractExternalTransactionManager.java
index f373c133685..da80b8f77bd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AbstractExternalTransactionManager.java
@@ -17,29 +17,33 @@
 
 package org.apache.doris.transaction;
 
-
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.UserException;
-import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
-import org.apache.doris.datasource.iceberg.IcebergTransaction;
+import org.apache.doris.datasource.operations.ExternalMetadataOps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class IcebergTransactionManager implements TransactionManager {
+public abstract class AbstractExternalTransactionManager<T extends 
Transaction> implements TransactionManager {
+    private static final Logger LOG = 
LogManager.getLogger(AbstractExternalTransactionManager.class);
+    private final Map<Long, T> transactions = new ConcurrentHashMap<>();
+    protected final ExternalMetadataOps ops;
 
-    private final Map<Long, IcebergTransaction> transactions = new 
ConcurrentHashMap<>();
-    private final IcebergMetadataOps ops;
-
-    public IcebergTransactionManager(IcebergMetadataOps ops) {
+    public AbstractExternalTransactionManager(ExternalMetadataOps ops) {
         this.ops = ops;
     }
 
+    abstract T createTransaction();
+
     @Override
     public long begin() {
         long id = Env.getCurrentEnv().getNextId();
-        IcebergTransaction icebergTransaction = new IcebergTransaction(ops);
-        transactions.put(id, icebergTransaction);
+        T transaction = createTransaction();
+        transactions.put(id, transaction);
+        
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().putTxnById(id, 
transaction);
         return id;
     }
 
@@ -47,27 +51,31 @@ public class IcebergTransactionManager implements 
TransactionManager {
     public void commit(long id) throws UserException {
         getTransactionWithException(id).commit();
         transactions.remove(id);
+        
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().removeTxnById(id);
     }
 
     @Override
     public void rollback(long id) {
         try {
             getTransactionWithException(id).rollback();
+        } catch (TransactionNotFoundException e) {
+            LOG.warn(e.getMessage(), e);
         } finally {
             transactions.remove(id);
+            
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().removeTxnById(id);
         }
     }
 
     @Override
-    public IcebergTransaction getTransaction(long id) {
+    public Transaction getTransaction(long id) throws UserException {
         return getTransactionWithException(id);
     }
 
-    public IcebergTransaction getTransactionWithException(long id) {
-        IcebergTransaction icebergTransaction = transactions.get(id);
-        if (icebergTransaction == null) {
-            throw new RuntimeException("Can't find transaction for " + id);
+    private Transaction getTransactionWithException(long id) throws 
TransactionNotFoundException {
+        Transaction txn = transactions.get(id);
+        if (txn == null) {
+            throw new TransactionNotFoundException("Can't find transaction for 
" + id);
         }
-        return icebergTransaction;
+        return txn;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalExternalTransactionInfoMgr.java
similarity index 54%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalExternalTransactionInfoMgr.java
index ca9cbb917ec..e516c648dff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalExternalTransactionInfoMgr.java
@@ -17,15 +17,27 @@
 
 package org.apache.doris.transaction;
 
-import org.apache.doris.common.UserException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
-public interface TransactionManager {
+public class GlobalExternalTransactionInfoMgr {
+    public Map<Long, Transaction> idToTxn = new ConcurrentHashMap<>();
 
-    long begin();
+    public Transaction getTxnById(long txnId) {
+        if (idToTxn.containsKey(txnId)) {
+            return idToTxn.get(txnId);
+        }
+        throw new RuntimeException("Can't find txn for " + txnId);
+    }
 
-    void commit(long id) throws UserException;
+    public void putTxnById(long txnId, Transaction txn) {
+        if (idToTxn.containsKey(txnId)) {
+            throw new RuntimeException("Duplicate txnId for " + txnId);
+        }
+        idToTxn.put(txnId, txn);
+    }
 
-    void rollback(long id);
-
-    Transaction getTransaction(long id);
+    public void removeTxnById(long txnId) {
+        idToTxn.remove(txnId);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
index c48210ad452..65f0c2bd5e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
@@ -17,65 +17,26 @@
 
 package org.apache.doris.transaction;
 
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.hive.HMSTransaction;
 import org.apache.doris.datasource.hive.HiveMetadataOps;
 import org.apache.doris.fs.FileSystemProvider;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
-public class HiveTransactionManager implements TransactionManager {
-
-    private final Map<Long, HMSTransaction> transactions = new 
ConcurrentHashMap<>();
-    private final HiveMetadataOps ops;
+public class HiveTransactionManager extends 
AbstractExternalTransactionManager<HMSTransaction> {
 
     private final FileSystemProvider fileSystemProvider;
-
     private final Executor fileSystemExecutor;
 
     public HiveTransactionManager(HiveMetadataOps ops, FileSystemProvider 
fileSystemProvider,
             Executor fileSystemExecutor) {
-        this.ops = ops;
+        super(ops);
         this.fileSystemProvider = fileSystemProvider;
         this.fileSystemExecutor = fileSystemExecutor;
     }
 
     @Override
-    public long begin() {
-        long id = Env.getCurrentEnv().getNextId();
-        HMSTransaction hiveTransaction = new HMSTransaction(ops, 
fileSystemProvider, fileSystemExecutor);
-        transactions.put(id, hiveTransaction);
-        return id;
-    }
-
-    @Override
-    public void commit(long id) throws UserException {
-        getTransactionWithException(id).commit();
-        transactions.remove(id);
-    }
-
-    @Override
-    public void rollback(long id) {
-        try {
-            getTransactionWithException(id).rollback();
-        } finally {
-            transactions.remove(id);
-        }
-    }
-
-    @Override
-    public HMSTransaction getTransaction(long id) {
-        return getTransactionWithException(id);
-    }
-
-    public HMSTransaction getTransactionWithException(long id) {
-        HMSTransaction hiveTransaction = transactions.get(id);
-        if (hiveTransaction == null) {
-            throw new RuntimeException("Can't find transaction for " + id);
-        }
-        return hiveTransaction;
+    HMSTransaction createTransaction() {
+        return new HMSTransaction((HiveMetadataOps) ops, fileSystemProvider, 
fileSystemExecutor);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
index f373c133685..8f4d25a19b3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
@@ -18,56 +18,17 @@
 package org.apache.doris.transaction;
 
 
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
 import org.apache.doris.datasource.iceberg.IcebergTransaction;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class IcebergTransactionManager implements TransactionManager {
-
-    private final Map<Long, IcebergTransaction> transactions = new 
ConcurrentHashMap<>();
-    private final IcebergMetadataOps ops;
+public class IcebergTransactionManager extends 
AbstractExternalTransactionManager<IcebergTransaction> {
 
     public IcebergTransactionManager(IcebergMetadataOps ops) {
-        this.ops = ops;
+        super(ops);
     }
 
     @Override
-    public long begin() {
-        long id = Env.getCurrentEnv().getNextId();
-        IcebergTransaction icebergTransaction = new IcebergTransaction(ops);
-        transactions.put(id, icebergTransaction);
-        return id;
-    }
-
-    @Override
-    public void commit(long id) throws UserException {
-        getTransactionWithException(id).commit();
-        transactions.remove(id);
-    }
-
-    @Override
-    public void rollback(long id) {
-        try {
-            getTransactionWithException(id).rollback();
-        } finally {
-            transactions.remove(id);
-        }
-    }
-
-    @Override
-    public IcebergTransaction getTransaction(long id) {
-        return getTransactionWithException(id);
-    }
-
-    public IcebergTransaction getTransactionWithException(long id) {
-        IcebergTransaction icebergTransaction = transactions.get(id);
-        if (icebergTransaction == null) {
-            throw new RuntimeException("Can't find transaction for " + id);
-        }
-        return icebergTransaction;
+    IcebergTransaction createTransaction() {
+        return new IcebergTransaction((IcebergMetadataOps) ops);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
index ca9cbb917ec..fbff324ae91 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
@@ -27,5 +27,5 @@ public interface TransactionManager {
 
     void rollback(long id);
 
-    Transaction getTransaction(long id);
+    Transaction getTransaction(long id) throws UserException;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to