This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cdd04ce32e [opt](txn) Per-transaction locking and parallel publish 
for DatabaseTransactionMgr (#59990)
1cdd04ce32e is described below

commit 1cdd04ce32ea162dbd3135e8284ad6925e1cc739
Author: Yongqiang YANG <[email protected]>
AuthorDate: Wed Feb 11 07:51:05 2026 -0800

    [opt](txn) Per-transaction locking and parallel publish for 
DatabaseTransactionMgr (#59990)
    
    ## Summary
    - Replace database-wide write lock with per-transaction
    `synchronized(transactionState)` for commit, preCommit, abort, and
    finish paths, so independent transactions proceed concurrently
    - Enable parallel publish within a single database by routing publish
    executors by transactionId instead of dbId
    - Move edit log writes outside the write lock to reduce lock hold time
    - Add config flags for runtime control and safe fallback
    
    Fixes: https://github.com/apache/doris/issues/53642
    
    ## Changes
    
    ### Per-transaction locking (DatabaseTransactionMgr.java)
    - Extract `updateTxnLabels` from `unprotectUpdateInMemoryState` — only
    needed at `beginTransaction` and replay time, not during state
    transitions
    - Convert `runningTxnNums` from `volatile int` to `AtomicInteger` for
    thread-safe increment/decrement without db lock
    - Replace `ArrayDeque` with `ConcurrentLinkedDeque` for final-status
    transaction queues
    - Replace `writeLock()`/`writeUnlock()` with
    `synchronized(transactionState)` in:
      - `preCommitTransaction2PC`
      - `commitTransaction` (both overloads)
      - `finishTransaction`
      - `abortTransaction` / `abortTransaction2PC`
    - DB write lock retained for: `beginTransaction` (label uniqueness),
    `removeUselessTxns`/`cleanLabel` (touch `labelToTxnIds` HashMap), replay
    paths
    
    ### Parallel publish within a database (PublishVersionDaemon.java)
    - Route publish executor by `transactionId` instead of `dbId` when
    `enable_per_txn_publish=true`, so different transactions in the same DB
    finish in parallel
    - Fix race condition: use local variables in `tryFinishTxnSync` instead
    of shared instance fields for
    `partitionVisibleVersions`/`backendPartitions`
    - Rename `dbExecutors` to `publishExecutors`
    
    ### Edit log outside lock (DatabaseTransactionMgr.java, EditLog.java)
    - Enqueue edit log inside lock (nanoseconds), await outside lock
    (milliseconds)
    - Add `enable_txn_log_outside_lock` config flag
    
    ### Config flags (Config.java)
    - `enable_per_txn_publish` (default `true`, `mutable=true`): controls
    publish routing. Set to `false` to fall back to sequential-per-DB
    publish
    - `enable_txn_log_outside_lock` (default `true`, `mutable=true`):
    controls edit log write location
    
    ## Correctness
    
    **Two different transactions committing concurrently (the main win):**
    - Thread A: `synchronized(txn1) { set COMMITTED, ConcurrentMap.put }`
    - Thread B: `synchronized(txn2) { set COMMITTED, ConcurrentMap.put }`
    - Different locks, different map entries → fully concurrent
    
    **Same transaction: concurrent commit + abort:**
    - `synchronized(txnState)` serializes them — second thread sees the
    first's state change and handles accordingly
    
    **No deadlock:** State transition paths only acquire per-txn locks.
    Cleanup/replay paths only acquire db write lock. No path acquires both.
    
    ## Test plan
    - [ ] Run DatabaseTransactionMgrTest unit tests
    - [ ] Concurrent load testing with multiple tables in same database
    - [ ] Verify lock hold time reduction via metrics
    - [ ] Toggle `enable_per_txn_publish` at runtime — verify both parallel
    and sequential modes work
    - [ ] Toggle `enable_txn_log_outside_lock` at runtime — verify both
    paths work
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    ## test
    
    **scripts**
    
    `for i in `seq 1 2000`; do
    mysql -h127.0.0.1 -P9030 -uroot paralldb -e "drop table if exists
    sbtest${i}; create table sbtest${i} (
      id BIGINT NOT NULL AUTO_INCREMENT,
      k INTEGER DEFAULT 0,
      c CHAR(120) DEFAULT '',
      pad CHAR(60) DEFAULT ''
    )
    DISTRIBUTED BY RANDOM
    BUCKETS 1
    PROPERTIES(
    "replication_num" = "1"
    )"
    done`
    
    `sysbench --db-driver=mysql --mysql-host=127.0.0.1 --mysql-port=9030
    --mysql-user=root --mysql-db=paralldb --tables=1000 --threads=1000
    --time=180 --report-interval=10 oltp_insert run`
    
    
    **env**
    
    3 fe 3 be
    
    **enable_txn_log_outside_lock = true enable_per_txn_publish = true**
    
    [ 10s ] thds: 1000 tps: 1345.63 qps: 1345.63 (r/w/o: 0.00/1345.63/0.00)
    lat (ms,95%): 1869.60 err/s: 0.00 reconn/s: 0.00
    [ 20s ] thds: 1000 tps: 1669.04 qps: 1669.04 (r/w/o: 0.00/1669.04/0.00)
    lat (ms,95%): 1032.01 err/s: 0.00 reconn/s: 0.00
    [ 30s ] thds: 1000 tps: 1780.93 qps: 1780.93 (r/w/o: 0.00/1780.93/0.00)
    lat (ms,95%): 893.56 err/s: 0.00 reconn/s: 0.00
    [ 40s ] thds: 1000 tps: 1845.02 qps: 1845.02 (r/w/o: 0.00/1845.02/0.00)
    lat (ms,95%): 861.95 err/s: 0.00 reconn/s: 0.00
    [ 50s ] thds: 1000 tps: 1576.76 qps: 1576.76 (r/w/o: 0.00/1576.76/0.00)
    lat (ms,95%): 2493.86 err/s: 0.00 reconn/s: 0.00
    [ 60s ] thds: 1000 tps: 1911.56 qps: 1911.56 (r/w/o: 0.00/1911.56/0.00)
    lat (ms,95%): 877.61 err/s: 0.00 reconn/s: 0.00
    [ 70s ] thds: 1000 tps: 2006.93 qps: 2006.93 (r/w/o: 0.00/2006.93/0.00)
    lat (ms,95%): 733.00 err/s: 0.00 reconn/s: 0.00
    [ 80s ] thds: 1000 tps: 997.81 qps: 997.81 (r/w/o: 0.00/997.81/0.00) lat
    (ms,95%): 995.51 err/s: 0.00 reconn/s: 0.00
    [ 90s ] thds: 1000 tps: 1462.80 qps: 1462.80 (r/w/o: 0.00/1462.80/0.00)
    lat (ms,95%): 4855.31 err/s: 0.00 reconn/s: 0.00
    [ 100s ] thds: 1000 tps: 1661.09 qps: 1661.09 (r/w/o: 0.00/1661.09/0.00)
    lat (ms,95%): 943.16 err/s: 0.00 reconn/s: 0.00
    [ 110s ] thds: 1000 tps: 1686.85 qps: 1686.85 (r/w/o: 0.00/1686.85/0.00)
    lat (ms,95%): 909.80 err/s: 0.00 reconn/s: 0.00
    [ 120s ] thds: 1000 tps: 725.92 qps: 725.92 (r/w/o: 0.00/725.92/0.00)
    lat (ms,95%): 6360.91 err/s: 0.00 reconn/s: 0.00
    [ 130s ] thds: 1000 tps: 1839.17 qps: 1839.17 (r/w/o: 0.00/1839.17/0.00)
    lat (ms,95%): 1013.60 err/s: 0.00 reconn/s: 0.00
    [ 140s ] thds: 1000 tps: 1986.62 qps: 1986.62 (r/w/o: 0.00/1986.62/0.00)
    lat (ms,95%): 773.68 err/s: 0.00 reconn/s: 0.00
    [ 150s ] thds: 1000 tps: 1173.42 qps: 1173.42 (r/w/o: 0.00/1173.42/0.00)
    lat (ms,95%): 1109.09 err/s: 0.00 reconn/s: 0.00
    [ 160s ] thds: 1000 tps: 1057.97 qps: 1057.97 (r/w/o: 0.00/1057.97/0.00)
    lat (ms,95%): 9118.47 err/s: 0.00 reconn/s: 0.00
    [ 170s ] thds: 1000 tps: 2023.63 qps: 2023.63 (r/w/o: 0.00/2023.63/0.00)
    lat (ms,95%): 831.46 err/s: 0.00 reconn/s: 0.00
    [ 180s ] thds: 1000 tps: 2015.55 qps: 2015.55 (r/w/o: 0.00/2015.55/0.00)
    lat (ms,95%): 787.74 err/s: 0.00 reconn/s: 0.00
    SQL statistics:
    queries performed:
    read:                            0
    write:                           288670
    other:                           0
    total:                           288670
    transactions:                        288670 (1601.69 per sec.)
    queries:                             288670 (1601.69 per sec.)
    
    
    **enable_txn_log_outside_lock = false enable_per_txn_publish = false**
    
    [ 10s ] thds: 1000 tps: 135.28 qps: 135.28 (r/w/o: 0.00/135.28/0.00) lat
    (ms,95%): 7346.49 err/s: 0.00 reconn/s: 0.00
    [ 20s ] thds: 1000 tps: 162.20 qps: 162.20 (r/w/o: 0.00/162.20/0.00) lat
    (ms,95%): 6247.39 err/s: 0.00 reconn/s: 0.00
    [ 30s ] thds: 1000 tps: 90.70 qps: 90.70 (r/w/o: 0.00/90.70/0.00) lat
    (ms,95%): 6247.39 err/s: 0.00 reconn/s: 0.00
    [ 40s ] thds: 1000 tps: 150.60 qps: 150.60 (r/w/o: 0.00/150.60/0.00) lat
    (ms,95%): 11946.04 err/s: 0.00 reconn/s: 0.00
    [ 50s ] thds: 1000 tps: 182.10 qps: 182.10 (r/w/o: 0.00/182.10/0.00) lat
    (ms,95%): 5709.50 err/s: 0.00 reconn/s: 0.00
    [ 60s ] thds: 1000 tps: 185.10 qps: 185.10 (r/w/o: 0.00/185.10/0.00) lat
    (ms,95%): 5409.26 err/s: 0.00 reconn/s: 0.00
    [ 70s ] thds: 1000 tps: 80.70 qps: 80.70 (r/w/o: 0.00/80.70/0.00) lat
    (ms,95%): 11115.87 err/s: 0.00 reconn/s: 0.00
    [ 80s ] thds: 1000 tps: 183.50 qps: 183.50 (r/w/o: 0.00/183.50/0.00) lat
    (ms,95%): 11115.87 err/s: 0.00 reconn/s: 0.00
    [ 90s ] thds: 1000 tps: 182.40 qps: 182.40 (r/w/o: 0.00/182.40/0.00) lat
    (ms,95%): 5607.61 err/s: 0.00 reconn/s: 0.00
    [ 100s ] thds: 1000 tps: 128.30 qps: 128.30 (r/w/o: 0.00/128.30/0.00)
    lat (ms,95%): 5409.26 err/s: 0.00 reconn/s: 0.00
    [ 110s ] thds: 1000 tps: 137.50 qps: 137.50 (r/w/o: 0.00/137.50/0.00)
    lat (ms,95%): 10917.50 err/s: 0.00 reconn/s: 0.00
    [ 120s ] thds: 1000 tps: 182.60 qps: 182.60 (r/w/o: 0.00/182.60/0.00)
    lat (ms,95%): 5607.61 err/s: 0.00 reconn/s: 0.00
    [ 130s ] thds: 1000 tps: 183.60 qps: 183.60 (r/w/o: 0.00/183.60/0.00)
    lat (ms,95%): 5409.26 err/s: 0.00 reconn/s: 0.00
    [ 140s ] thds: 1000 tps: 86.50 qps: 86.50 (r/w/o: 0.00/86.50/0.00) lat
    (ms,95%): 10722.67 err/s: 0.00 reconn/s: 0.00
    [ 150s ] thds: 1000 tps: 177.00 qps: 177.00 (r/w/o: 0.00/177.00/0.00)
    lat (ms,95%): 10722.67 err/s: 0.00 reconn/s: 0.00
    [ 160s ] thds: 1000 tps: 183.00 qps: 183.00 (r/w/o: 0.00/183.00/0.00)
    lat (ms,95%): 5709.50 err/s: 0.00 reconn/s: 0.00
    [ 170s ] thds: 1000 tps: 142.30 qps: 142.30 (r/w/o: 0.00/142.30/0.00)
    lat (ms,95%): 5507.54 err/s: 0.00 reconn/s: 0.00
    [ 180s ] thds: 1000 tps: 130.10 qps: 130.10 (r/w/o: 0.00/130.10/0.00)
    lat (ms,95%): 10722.67 err/s: 0.00 reconn/s: 0.00
    SQL statistics:
    queries performed:
    read:                            0
    write:                           28035
    other:                           0
    total:                           28035
    transactions:                        28035  (152.91 per sec.)
    queries:                             28035  (152.91 per sec.)
    ignored errors:                      0      (0.00 per sec.)
    reconnects:                          0      (0.00 per sec.)
    
    
    **enable_txn_log_outside_lock = false enable_per_txn_publish = true**
    
    [ 10s ] thds: 1000 tps: 965.23 qps: 965.23 (r/w/o: 0.00/965.23/0.00) lat
    (ms,95%): 2493.86 err/s: 0.00 reconn/s: 0.00
    [ 20s ] thds: 1000 tps: 1464.63 qps: 1464.63 (r/w/o: 0.00/1464.63/0.00)
    lat (ms,95%): 1050.76 err/s: 0.00 reconn/s: 0.00
    [ 30s ] thds: 1000 tps: 888.00 qps: 888.00 (r/w/o: 0.00/888.00/0.00) lat
    (ms,95%): 909.80 err/s: 0.00 reconn/s: 0.00
    [ 40s ] thds: 1000 tps: 1632.37 qps: 1632.37 (r/w/o: 0.00/1632.37/0.00)
    lat (ms,95%): 5312.73 err/s: 0.00 reconn/s: 0.00
    [ 50s ] thds: 1000 tps: 1810.11 qps: 1810.11 (r/w/o: 0.00/1810.11/0.00)
    lat (ms,95%): 831.46 err/s: 0.00 reconn/s: 0.00
    [ 60s ] thds: 1000 tps: 1922.10 qps: 1922.10 (r/w/o: 0.00/1922.10/0.00)
    lat (ms,95%): 831.46 err/s: 0.00 reconn/s: 0.00
    [ 70s ] thds: 1000 tps: 712.58 qps: 712.58 (r/w/o: 0.00/712.58/0.00) lat
    (ms,95%): 7215.39 err/s: 0.00 reconn/s: 0.00
    [ 80s ] thds: 1000 tps: 2093.76 qps: 2093.76 (r/w/o: 0.00/2093.76/0.00)
    lat (ms,95%): 816.63 err/s: 0.00 reconn/s: 0.00
    [ 90s ] thds: 1000 tps: 2115.99 qps: 2115.99 (r/w/o: 0.00/2115.99/0.00)
    lat (ms,95%): 733.00 err/s: 0.00 reconn/s: 0.00
    [ 100s ] thds: 1000 tps: 1494.48 qps: 1494.48 (r/w/o: 0.00/1494.48/0.00)
    lat (ms,95%): 733.00 err/s: 0.00 reconn/s: 0.00
    [ 110s ] thds: 1000 tps: 651.00 qps: 651.00 (r/w/o: 0.00/651.00/0.00)
    lat (ms,95%): 10531.32 err/s: 0.00 reconn/s: 0.00
    [ 120s ] thds: 1000 tps: 1976.26 qps: 1976.26 (r/w/o: 0.00/1976.26/0.00)
    lat (ms,95%): 787.74 err/s: 0.00 reconn/s: 0.00
    [ 130s ] thds: 1000 tps: 1779.35 qps: 1779.35 (r/w/o: 0.00/1779.35/0.00)
    lat (ms,95%): 861.95 err/s: 0.00 reconn/s: 0.00
    [ 140s ] thds: 1000 tps: 1281.70 qps: 1281.70 (r/w/o: 0.00/1281.70/0.00)
    lat (ms,95%): 877.61 err/s: 0.00 reconn/s: 0.00
    [ 150s ] thds: 1000 tps: 0.00 qps: 0.00 (r/w/o: 0.00/0.00/0.00) lat
    (ms,95%): 0.00 err/s: 0.00 reconn/s: 0.00
    [ 160s ] thds: 1000 tps: 1250.21 qps: 1250.21 (r/w/o: 0.00/1250.21/0.00)
    lat (ms,95%): 14302.94 err/s: 0.00 reconn/s: 0.00
    [ 170s ] thds: 1000 tps: 1775.39 qps: 1775.39 (r/w/o: 0.00/1775.39/0.00)
    lat (ms,95%): 877.61 err/s: 0.00 reconn/s: 0.00
    [ 180s ] thds: 1000 tps: 1894.81 qps: 1894.81 (r/w/o: 0.00/1894.81/0.00)
    lat (ms,95%): 816.63 err/s: 0.00 reconn/s: 0.00
    SQL statistics:
    queries performed:
    read:                            0
    write:                           258081
    other:                           0
    total:                           258081
    transactions:                        258081 (1431.61 per sec.)
    queries:                             258081 (1431.61 per sec.)
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |  22 ++
 .../src/main/java/org/apache/doris/DorisFE.java    |   6 +
 .../java/org/apache/doris/persist/EditLog.java     |  68 +++-
 .../doris/transaction/DatabaseTransactionMgr.java  | 353 ++++++++++++++-------
 .../doris/transaction/PublishVersionDaemon.java    |  20 +-
 .../java/org/apache/doris/catalog/FakeEditLog.java |  11 +
 6 files changed, 356 insertions(+), 124 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5b3abc07f62..101c110d124 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -744,6 +744,28 @@ public class Config extends ConfigBase {
             "Txn manager will reject coming txns."})
     public static int max_running_txn_num_per_db = 10000;
 
+    @ConfField(mutable = true, masterOnly = true, description = {
+            "是否将事务的 edit log 写入移到写锁之外以减少锁竞争。"
+                    + "开启后,edit log 条目在写锁内入队(FIFO 保证顺序),"
+                    + "在写锁外等待持久化完成,从而降低写锁持有时间,提高并发事务吞吐量。"
+                    + "默认开启。关闭后使用传统的锁内同步写入模式。",
+            "Whether to move transaction edit log writes outside the write 
lock to reduce lock contention. "
+                    + "When enabled, edit log entries are enqueued inside the 
write lock (FIFO preserves ordering) "
+                    + "and awaited outside the lock, reducing write lock hold 
time "
+                    + "and improving concurrent transaction throughput. "
+                    + "Default is true. Set to false to use the traditional 
in-lock synchronous write mode."})
+    public static boolean enable_txn_log_outside_lock = true;
+
+    @ConfField(mutable = true, description = {
+            "是否启用按事务级别并行发布。开启后,同一数据库内的不同事务可以在不同的执行器线程上并行完成发布,"
+                    + "而不是按数据库顺序执行。关闭后回退到按数据库路由(旧行为),同一数据库内的事务顺序发布。",
+            "Whether to enable per-transaction parallel publish. When enabled, 
different transactions "
+                    + "in the same database can finish publishing in parallel 
across executor threads, "
+                    + "instead of being serialized per database. "
+                    + "When disabled, falls back to per-database routing (old 
behavior) "
+                    + "where transactions within a DB are published 
sequentially."})
+    public static boolean enable_per_txn_publish = true;
+
     @ConfField(masterOnly = true, description = {"pending load task 
执行线程数。这个配置可以限制当前等待的导入作业数。"
             + "并且应小于 `max_running_txn_num_per_db`。",
             "The pending load task executor pool size. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java 
b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
index 05820ac6b44..026fd45462b 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
@@ -66,6 +66,7 @@ import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.StandardOpenOption;
 import java.time.LocalDate;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -596,6 +597,11 @@ public class DorisFE {
             LOG.info("fuzzy set random_add_cluster_keys_for_mow={}", 
Config.random_add_cluster_keys_for_mow);
         }
 
+        Config.enable_txn_log_outside_lock = new Random().nextBoolean();
+        LOG.info("fuzzy set enable_txn_log_outside_lock={}", 
Config.enable_txn_log_outside_lock);
+        Config.enable_batch_editlog = new Random().nextBoolean();
+        LOG.info("fuzzy set enable_batch_editlog={}", 
Config.enable_batch_editlog);
+
         setFuzzyForCatalog();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 4f126fcf214..d506b474ed7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -124,13 +124,14 @@ import java.util.concurrent.atomic.AtomicLong;
 public class EditLog {
     public static final Logger LOG = LogManager.getLogger(EditLog.class);
 
-    // Helper class to hold log edit requests
-    private static class EditLogItem {
+    // Helper class to hold log edit requests.
+    // Public so that callers can enqueue inside a lock and await outside it.
+    public static class EditLogItem {
         static AtomicLong nextUid = new AtomicLong(0);
         final short op;
         final Writable writable;
         final Object lock = new Object();
-        boolean finished = false;
+        volatile boolean finished = false;
         long logId = -1;
         long uid = -1;
 
@@ -139,6 +140,24 @@ public class EditLog {
             this.writable = writable;
             uid = nextUid.getAndIncrement();
         }
+
+        /**
+         * Wait for this edit log entry to be flushed to persistent storage.
+         * Returns the assigned log ID.
+         */
+        public long await() {
+            synchronized (lock) {
+                while (!finished) {
+                    try {
+                        lock.wait();
+                    } catch (InterruptedException e) {
+                        LOG.error("Fatal Error : write stream Exception");
+                        System.exit(-1);
+                    }
+                }
+            }
+            return logId;
+        }
     }
 
     private final BlockingQueue<EditLogItem> logEditQueue = new 
LinkedBlockingQueue<>();
@@ -1534,6 +1553,49 @@ public class EditLog {
         return req.logId;
     }
 
+    /**
+     * Submit an edit log entry to the batch queue without waiting for it to 
be flushed.
+     * The entry is enqueued in FIFO order, so calling this inside a write 
lock guarantees
+     * that edit log entries are ordered by lock acquisition order.
+     *
+     * <p>The caller MUST call {@link EditLogItem#await()} after releasing the 
lock to ensure
+     * the entry is persisted before proceeding.
+     *
+     * <p>If batch edit log is disabled, this falls back to a synchronous 
direct write
+     * and the returned item is already completed.
+     *
+     * @return an {@link EditLogItem} handle to await completion
+     */
+    public EditLogItem submitEdit(short op, Writable writable) {
+        if (this.getNumEditStreams() == 0) {
+            LOG.error("Fatal Error : no editLog stream", new Exception());
+            throw new Error("Fatal Error : no editLog stream");
+        }
+
+        EditLogItem req = new EditLogItem(op, writable);
+        if (Config.enable_batch_editlog && op != OperationType.OP_TIMESTAMP) {
+            while (true) {
+                try {
+                    logEditQueue.put(req);
+                    break;
+                } catch (InterruptedException e) {
+                    LOG.warn("Interrupted during put, will sleep and retry.");
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ex) {
+                        LOG.warn("interrupted during sleep, will retry.", ex);
+                    }
+                }
+            }
+        } else {
+            // Non-batch mode: write directly (synchronous)
+            long logId = logEditDirectly(op, writable);
+            req.logId = logId;
+            req.finished = true;
+        }
+        return req;
+    }
+
     private synchronized long logEditDirectly(short op, Writable writable) {
         long logId = -1;
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index ade04e95b15..eb905f3eda1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -18,6 +18,7 @@
 package org.apache.doris.transaction;
 
 import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.binlog.UpsertRecord;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
@@ -55,6 +56,7 @@ import org.apache.doris.mtmv.MTMVUtil;
 import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
 import org.apache.doris.persist.CleanLabelOperationLog;
 import org.apache.doris.persist.EditLog;
+import org.apache.doris.persist.OperationType;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.AnalysisManager;
 import org.apache.doris.task.AgentBatchTask;
@@ -78,7 +80,6 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -90,7 +91,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /**
@@ -134,8 +137,10 @@ public class DatabaseTransactionMgr {
     // The "Short" queue is used to store the txns of the expire time
     // controlled by Config.streaming_label_keep_max_second.
     // The "Long" queue is used to store the txns of the expire time 
controlled by Config.label_keep_max_second.
-    private final ArrayDeque<TransactionState> 
finalStatusTransactionStateDequeShort = new ArrayDeque<>();
-    private final ArrayDeque<TransactionState> 
finalStatusTransactionStateDequeLong = new ArrayDeque<>();
+    private final ConcurrentLinkedDeque<TransactionState> 
finalStatusTransactionStateDequeShort
+            = new ConcurrentLinkedDeque<>();
+    private final ConcurrentLinkedDeque<TransactionState> 
finalStatusTransactionStateDequeLong
+            = new ConcurrentLinkedDeque<>();
 
     // label -> txn ids
     // this is used for checking if label already used. a label may correspond 
to multiple txns,
@@ -150,7 +155,7 @@ public class DatabaseTransactionMgr {
     private Long lastCommittedTxnCountUpdateTime = 0L;
 
     // count the number of running txns of database
-    private volatile int runningTxnNums = 0;
+    private final AtomicInteger runningTxnNums = new AtomicInteger(0);
 
     private final Env env;
 
@@ -222,7 +227,7 @@ public class DatabaseTransactionMgr {
     }
 
     protected int getRunningTxnNums() {
-        return runningTxnNums;
+        return runningTxnNums.get();
     }
 
     @VisibleForTesting
@@ -331,6 +336,8 @@ public class DatabaseTransactionMgr {
         FeNameFormat.checkLabel(label);
 
         long tid = 0L;
+        TransactionState transactionState = null;
+        EditLog.EditLogItem logItem = null;
         writeLock();
         try {
             /*
@@ -368,10 +375,16 @@ public class DatabaseTransactionMgr {
             checkRunningTxnExceedLimit(tableIdList);
 
             tid = idGenerator.getNextTransactionId();
-            TransactionState transactionState = new TransactionState(dbId, 
tableIdList,
+            transactionState = new TransactionState(dbId, tableIdList,
                     tid, label, requestId, sourceType, coordinator, 
listenerId, timeoutSecond * 1000);
             transactionState.setPrepareTime(System.currentTimeMillis());
-            unprotectUpsertTransactionState(transactionState, false);
+            unprotectUpdateInMemoryState(transactionState, false);
+            updateTxnLabels(transactionState);
+            if (Config.enable_txn_log_outside_lock) {
+                logItem = enqueueTransactionState(transactionState);
+            } else {
+                persistTransactionState(transactionState);
+            }
 
             if (MetricRepo.isInit) {
                 MetricRepo.COUNTER_TXN_BEGIN.increase(1L);
@@ -379,6 +392,7 @@ public class DatabaseTransactionMgr {
         } finally {
             writeUnlock();
         }
+        awaitTransactionState(logItem, transactionState);
         LOG.info("begin transaction: txn id {} with label {} from coordinator 
{}, listener id: {}",
                     tid, label, coordinator, listenerId);
         return tid;
@@ -448,13 +462,17 @@ public class DatabaseTransactionMgr {
         checkCommitStatus(tableList, transactionState, tabletCommitInfos, 
txnCommitAttachment, errorReplicaIds,
                           tableToPartition, totalInvolvedBackends);
 
-        writeLock();
-        try {
+        EditLog.EditLogItem logItem = null;
+        synchronized (transactionState) {
             unprotectedPreCommitTransaction2PC(transactionState, 
errorReplicaIds, tableToPartition,
                     totalInvolvedBackends, db);
-        } finally {
-            writeUnlock();
+            if (Config.enable_txn_log_outside_lock) {
+                logItem = enqueueTransactionState(transactionState);
+            } else {
+                persistTransactionState(transactionState);
+            }
         }
+        awaitTransactionState(logItem, transactionState);
         LOG.info("transaction:[{}] successfully pre-committed", 
transactionState);
     }
 
@@ -802,23 +820,29 @@ public class DatabaseTransactionMgr {
         transactionState.beforeStateTransform(TransactionStatus.COMMITTED);
         // transaction state transform
         boolean txnOperated = false;
-        writeLock();
-        try {
+        EditLog.EditLogItem logItem = null;
+        synchronized (transactionState) {
             if (is2PC) {
                 unprotectedCommitTransaction2PC(transactionState, db);
             } else {
                 unprotectedCommitTransaction(transactionState, errorReplicaIds,
                         tableToPartition, totalInvolvedBackends, db);
             }
-            txnOperated = true;
-        } finally {
-            writeUnlock();
-            // after state transform
-            try {
-                
transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated);
-            } catch (Throwable e) {
-                LOG.warn("afterStateTransform txn {} failed. exception: ", 
transactionState, e);
+            if (Config.enable_txn_log_outside_lock) {
+                logItem = enqueueTransactionState(transactionState);
+            } else {
+                persistTransactionState(transactionState);
             }
+            txnOperated = true;
+        }
+        // after state transform
+        try {
+            transactionState.afterStateTransform(TransactionStatus.COMMITTED, 
txnOperated);
+        } catch (Throwable e) {
+            LOG.warn("afterStateTransform txn {} failed. exception: ", 
transactionState, e);
+        }
+        if (txnOperated) {
+            awaitTransactionState(logItem, transactionState);
         }
 
         // update nextVersion because of the failure of persistent transaction 
resulting in error version
@@ -869,19 +893,25 @@ public class DatabaseTransactionMgr {
         transactionState.beforeStateTransform(TransactionStatus.COMMITTED);
         // transaction state transform
         boolean txnOperated = false;
-        writeLock();
-        try {
+        EditLog.EditLogItem logItem = null;
+        synchronized (transactionState) {
             unprotectedCommitTransaction(transactionState, errorReplicaIds, 
subTxnToPartition, totalInvolvedBackends,
                     subTransactionStates, db);
-            txnOperated = true;
-        } finally {
-            writeUnlock();
-            // after state transform
-            try {
-                
transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated);
-            } catch (Throwable e) {
-                LOG.warn("afterStateTransform txn {} failed. exception: ", 
transactionState, e);
+            if (Config.enable_txn_log_outside_lock) {
+                logItem = enqueueTransactionState(transactionState);
+            } else {
+                persistTransactionState(transactionState);
             }
+            txnOperated = true;
+        }
+        // after state transform
+        try {
+            transactionState.afterStateTransform(TransactionStatus.COMMITTED, 
txnOperated);
+        } catch (Throwable e) {
+            LOG.warn("afterStateTransform txn {} failed. exception: ", 
transactionState, e);
+        }
+        if (txnOperated) {
+            awaitTransactionState(logItem, transactionState);
         }
 
         // update nextVersion because of the failure of persistent transaction 
resulting in error version
@@ -930,15 +960,15 @@ public class DatabaseTransactionMgr {
             // here we only delete the oldest element, so if element exist in 
finalStatusTransactionStateDeque,
             // it must at the front of the finalStatusTransactionStateDeque.
             // check both "short" and "long" queue.
-            if (!finalStatusTransactionStateDequeShort.isEmpty()
-                    && transactionState.getTransactionId()
-                    == 
finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) {
-                finalStatusTransactionStateDequeShort.pop();
+            TransactionState shortHead = 
finalStatusTransactionStateDequeShort.peekFirst();
+            TransactionState longHead = 
finalStatusTransactionStateDequeLong.peekFirst();
+            if (shortHead != null
+                    && transactionState.getTransactionId() == 
shortHead.getTransactionId()) {
+                finalStatusTransactionStateDequeShort.pollFirst();
                 clearTransactionState(transactionState.getTransactionId());
-            } else if (!finalStatusTransactionStateDequeLong.isEmpty()
-                    && transactionState.getTransactionId()
-                    == 
finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) {
-                finalStatusTransactionStateDequeLong.pop();
+            } else if (longHead != null
+                    && transactionState.getTransactionId() == 
longHead.getTransactionId()) {
+                finalStatusTransactionStateDequeLong.pollFirst();
                 clearTransactionState(transactionState.getTransactionId());
             }
         } finally {
@@ -953,13 +983,13 @@ public class DatabaseTransactionMgr {
                 // here we only delete the oldest element, so if element exist 
in finalStatusTransactionStateDeque,
                 // it must at the front of the finalStatusTransactionStateDeque
                 // check both "short" and "long" queue.
-                if (!finalStatusTransactionStateDequeShort.isEmpty()
-                        && txnId == 
finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) {
-                    finalStatusTransactionStateDequeShort.pop();
+                TransactionState shortHead = 
finalStatusTransactionStateDequeShort.peekFirst();
+                TransactionState longHead = 
finalStatusTransactionStateDequeLong.peekFirst();
+                if (shortHead != null && txnId == 
shortHead.getTransactionId()) {
+                    finalStatusTransactionStateDequeShort.pollFirst();
                     clearTransactionState(txnId);
-                } else if (!finalStatusTransactionStateDequeLong.isEmpty()
-                        && txnId == 
finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) {
-                    finalStatusTransactionStateDequeLong.pop();
+                } else if (longHead != null && txnId == 
longHead.getTransactionId()) {
+                    finalStatusTransactionStateDequeLong.pollFirst();
                     clearTransactionState(txnId);
                 }
             }
@@ -972,8 +1002,8 @@ public class DatabaseTransactionMgr {
         writeLock();
         try {
             if (operation.getLatestTxnIdForShort() != -1) {
-                while (!finalStatusTransactionStateDequeShort.isEmpty()) {
-                    TransactionState transactionState = 
finalStatusTransactionStateDequeShort.pop();
+                TransactionState transactionState;
+                while ((transactionState = 
finalStatusTransactionStateDequeShort.pollFirst()) != null) {
                     clearTransactionState(transactionState.getTransactionId());
                     if (operation.getLatestTxnIdForShort() == 
transactionState.getTransactionId()) {
                         break;
@@ -982,8 +1012,8 @@ public class DatabaseTransactionMgr {
             }
 
             if (operation.getLatestTxnIdForLong() != -1) {
-                while (!finalStatusTransactionStateDequeLong.isEmpty()) {
-                    TransactionState transactionState = 
finalStatusTransactionStateDequeLong.pop();
+                TransactionState transactionState;
+                while ((transactionState = 
finalStatusTransactionStateDequeLong.pollFirst()) != null) {
                     clearTransactionState(transactionState.getTransactionId());
                     if (operation.getLatestTxnIdForLong() == 
transactionState.getTransactionId()) {
                         break;
@@ -1169,14 +1199,19 @@ public class DatabaseTransactionMgr {
                 }
             }
             boolean txnOperated = false;
-            writeLock();
-            try {
+            EditLog.EditLogItem logItem = null;
+            synchronized (transactionState) {
                 transactionState.setErrorReplicas(errorReplicaIds);
                 transactionState.setFinishTime(System.currentTimeMillis());
                 transactionState.clearErrorMsg();
                 
transactionState.setTransactionStatus(TransactionStatus.VISIBLE);
                 setTableVersion(transactionState, db);
-                unprotectUpsertTransactionState(transactionState, false);
+                unprotectUpdateInMemoryState(transactionState, false);
+                if (Config.enable_txn_log_outside_lock) {
+                    logItem = enqueueTransactionState(transactionState);
+                } else {
+                    persistTransactionState(transactionState);
+                }
                 txnOperated = true;
                 // TODO(cmy): We found a very strange problem. When 
delete-related transactions are processed here,
                 // subsequent `updateCatalogAfterVisible()` is called, but it 
does not seem to be executed here
@@ -1185,13 +1220,14 @@ public class DatabaseTransactionMgr {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("after set transaction {} to visible", 
transactionState);
                 }
-            } finally {
-                writeUnlock();
-                try {
-                    
transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated);
-                } catch (Throwable e) {
-                    LOG.warn("afterStateTransform txn {} failed. exception: ", 
transactionState, e);
-                }
+            }
+            try {
+                
transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated);
+            } catch (Throwable e) {
+                LOG.warn("afterStateTransform txn {} failed. exception: ", 
transactionState, e);
+            }
+            if (txnOperated) {
+                awaitTransactionState(logItem, transactionState);
             }
             updateCatalogAfterVisible(transactionState, db, 
partitionVisibleVersions, backendPartitions);
         } finally {
@@ -1527,8 +1563,8 @@ public class DatabaseTransactionMgr {
             }
             transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
         }
-        // persist transactionState
-        unprotectUpsertTransactionState(transactionState, false);
+        // Update in-memory state only; caller handles edit log persistence
+        unprotectUpdateInMemoryState(transactionState, false);
         transactionState.setInvolvedBackends(totalInvolvedBackends);
     }
 
@@ -1565,8 +1601,8 @@ public class DatabaseTransactionMgr {
             }
             transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
         }
-        // persist transactionState
-        unprotectUpsertTransactionState(transactionState, false);
+        // Update in-memory state only; caller handles edit log persistence
+        unprotectUpdateInMemoryState(transactionState, false);
         transactionState.setInvolvedBackends(totalInvolvedBackends);
     }
 
@@ -1626,8 +1662,8 @@ public class DatabaseTransactionMgr {
                 transactionState.addSubTxnTableCommitInfo(subTransactionState, 
tableCommitInfo);
             }
         }
-        // persist transactionState
-        unprotectUpsertTransactionState(transactionState, false);
+        // Update in-memory state only; caller handles edit log persistence
+        unprotectUpdateInMemoryState(transactionState, false);
         transactionState.setInvolvedBackends(totalInvolvedBackends);
     }
 
@@ -1674,26 +1710,34 @@ public class DatabaseTransactionMgr {
                 partitionCommitInfo.setVersionTime(System.currentTimeMillis());
             }
         }
-        // persist transactionState
-        editLog.logInsertTransactionState(transactionState);
+        // Update in-memory state only; caller handles edit log persistence
+        unprotectUpdateInMemoryState(transactionState, false);
     }
 
-    // for add/update/delete TransactionState
+    /**
+     * Replays a transaction state update from the edit log.
+     * Only called during replay (follower sync or restart), so no edit log 
persistence is needed.
+     *
+     * @param transactionState the transaction state to replay
+     * @param isReplay must be true (only used in replay path)
+     */
     protected void unprotectUpsertTransactionState(TransactionState 
transactionState, boolean isReplay) {
-        // if this is a replay operation, we should not log it
-        if (!isReplay) {
-            if (transactionState.getTransactionStatus() != 
TransactionStatus.PREPARE
-                    || transactionState.getSourceType() == 
TransactionState.LoadJobSourceType.FRONTEND) {
-                // if this is a prepare txn, and load source type is not 
FRONTEND
-                // no need to persist it. if prepare txn lost, the following 
commit will just be failed.
-                // user only need to retry this txn.
-                // The FRONTEND type txn is committed and running 
asynchronously, so we have to persist it.
-                editLog.logInsertTransactionState(transactionState);
-            }
-        }
+        unprotectUpdateInMemoryState(transactionState, isReplay);
+    }
+
+    /**
+     * Updates only in-memory transaction state without persisting to edit log.
+     * This method must be called while holding the write lock.
+     * Use this method in combination with {@link #persistTransactionState} to 
reduce lock contention
+     * by moving edit log writes outside the lock.
+     *
+     * @param transactionState the transaction state to update
+     * @param isReplay true if this is a replay operation (edit log already 
contains this state)
+     */
+    protected void unprotectUpdateInMemoryState(TransactionState 
transactionState, boolean isReplay) {
         if (!transactionState.getTransactionStatus().isFinalStatus()) {
             if 
(idToRunningTransactionState.put(transactionState.getTransactionId(), 
transactionState) == null) {
-                runningTxnNums++;
+                runningTxnNums.incrementAndGet();
             }
             if (isReplay && transactionState.getSubTxnIds() != null) {
                 LOG.info("add sub transactions for txn_id={}, status={}, 
sub_txn_ids={}",
@@ -1705,7 +1749,7 @@ public class DatabaseTransactionMgr {
             }
         } else {
             if 
(idToRunningTransactionState.remove(transactionState.getTransactionId()) != 
null) {
-                runningTxnNums--;
+                runningTxnNums.decrementAndGet();
             }
             
idToFinalStatusTransactionState.put(transactionState.getTransactionId(), 
transactionState);
             if (transactionState.isShortTxn()) {
@@ -1719,18 +1763,71 @@ public class DatabaseTransactionMgr {
                 cleanSubTransactions(transactionState.getTransactionId());
             }
         }
-        updateTxnLabels(transactionState);
+        if (isReplay) {
+            updateTxnLabels(transactionState);
+        }
     }
 
-    public int getRunningTxnNumsWithLock() {
-        readLock();
-        try {
-            return runningTxnNums;
-        } finally {
-            readUnlock();
+    /**
+     * Persists the transaction state to edit log synchronously.
+     * For PREPARE transactions with non-FRONTEND source type, persistence is 
skipped
+     * because losing them only requires the client to retry.
+     */
+    protected void persistTransactionState(TransactionState transactionState) {
+        if (transactionState.getTransactionStatus() != 
TransactionStatus.PREPARE
+                || transactionState.getSourceType() == 
TransactionState.LoadJobSourceType.FRONTEND) {
+            // if this is a prepare txn, and load source type is not FRONTEND
+            // no need to persist it. if prepare txn lost, the following 
commit will just be failed.
+            // user only need to retry this txn.
+            // The FRONTEND type txn is committed and running asynchronously, 
so we have to persist it.
+            editLog.logInsertTransactionState(transactionState);
+        }
+    }
+
+    /**
+     * Enqueue a transaction state edit log entry without waiting for 
persistence.
+     * Must be called inside the write lock to preserve ordering via the FIFO 
queue.
+     *
+     * @return an {@link EditLog.EditLogItem} handle to await completion, or 
null if persistence is skipped
+     */
+    protected EditLog.EditLogItem enqueueTransactionState(TransactionState 
transactionState) {
+        if (transactionState.getTransactionStatus() != 
TransactionStatus.PREPARE
+                || transactionState.getSourceType() == 
TransactionState.LoadJobSourceType.FRONTEND) {
+            return 
editLog.submitEdit(OperationType.OP_UPSERT_TRANSACTION_STATE, transactionState);
+        }
+        return null;
+    }
+
+    /**
+     * Await completion of a previously enqueued transaction state edit log 
entry.
+     * Should be called outside the write lock. Handles binlog and timing 
logic.
+     *
+     * @param item the handle returned by {@link #enqueueTransactionState}, 
may be null
+     * @param transactionState the transaction state (for binlog and timing)
+     */
+    protected void awaitTransactionState(EditLog.EditLogItem item, 
TransactionState transactionState) {
+        if (item == null) {
+            return;
+        }
+        long start = System.currentTimeMillis();
+        long logId = item.await();
+        long logEditEnd = System.currentTimeMillis();
+        long end = logEditEnd;
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            UpsertRecord record = new UpsertRecord(logId, transactionState);
+            Env.getCurrentEnv().getBinlogManager().addUpsertRecord(record);
+            end = System.currentTimeMillis();
+        }
+        if (end - start > Config.lock_reporting_threshold_ms) {
+            LOG.warn("edit log insert transaction take a lot time, write bdb 
{} ms, write binlog {} ms",
+                    logEditEnd - start, end - logEditEnd);
         }
     }
 
+    public int getRunningTxnNumsWithLock() {
+        return runningTxnNums.get();
+    }
+
     private void updateTxnLabels(TransactionState transactionState) {
         Set<Long> txnIds = 
labelToTxnIds.computeIfAbsent(transactionState.getLabel(), k -> 
Sets.newHashSet());
         txnIds.add(transactionState.getTransactionId());
@@ -1770,12 +1867,20 @@ public class DatabaseTransactionMgr {
         // before state transform
         transactionState.beforeStateTransform(TransactionStatus.ABORTED);
         boolean txnOperated = false;
-        writeLock();
-        try {
+        EditLog.EditLogItem logItem = null;
+        synchronized (transactionState) {
             txnOperated = unprotectAbortTransaction(transactionId, reason);
-        } finally {
-            writeUnlock();
-            transactionState.afterStateTransform(TransactionStatus.ABORTED, 
txnOperated, reason);
+            if (txnOperated) {
+                if (Config.enable_txn_log_outside_lock) {
+                    logItem = enqueueTransactionState(transactionState);
+                } else {
+                    persistTransactionState(transactionState);
+                }
+            }
+        }
+        transactionState.afterStateTransform(TransactionStatus.ABORTED, 
txnOperated, reason);
+        if (txnOperated) {
+            awaitTransactionState(logItem, transactionState);
         }
 
         // send clear txn task to BE to clear the transactions on BE.
@@ -1812,12 +1917,20 @@ public class DatabaseTransactionMgr {
         // before state transform
         transactionState.beforeStateTransform(TransactionStatus.ABORTED);
         boolean txnOperated = false;
-        writeLock();
-        try {
+        EditLog.EditLogItem logItem = null;
+        synchronized (transactionState) {
             txnOperated = unprotectAbortTransaction(transactionId, "User 
Abort");
-        } finally {
-            writeUnlock();
-            transactionState.afterStateTransform(TransactionStatus.ABORTED, 
txnOperated, "User Abort");
+            if (txnOperated) {
+                if (Config.enable_txn_log_outside_lock) {
+                    logItem = enqueueTransactionState(transactionState);
+                } else {
+                    persistTransactionState(transactionState);
+                }
+            }
+        }
+        transactionState.afterStateTransform(TransactionStatus.ABORTED, 
txnOperated, "User Abort");
+        if (txnOperated) {
+            awaitTransactionState(logItem, transactionState);
         }
 
         // send clear txn task to BE to clear the transactions on BE.
@@ -1849,7 +1962,8 @@ public class DatabaseTransactionMgr {
         transactionState.setFinishTime(System.currentTimeMillis());
         transactionState.setReason(reason);
         transactionState.setTransactionStatus(TransactionStatus.ABORTED);
-        unprotectUpsertTransactionState(transactionState, false);
+        // Update in-memory state only; caller handles edit log persistence
+        unprotectUpdateInMemoryState(transactionState, false);
         return true;
     }
 
@@ -1953,6 +2067,9 @@ public class DatabaseTransactionMgr {
 
     public void removeUselessTxns(long currentMillis) {
         // delete expired txns
+        BatchRemoveTransactionsOperationV2 op = null;
+        EditLog.EditLogItem logItem = null;
+        int numOfClearedTransaction = 0;
         writeLock();
         try {
             Pair<Long, Integer> expiredTxnsInfoForShort = 
unprotectedRemoveUselessTxns(currentMillis,
@@ -1960,28 +2077,36 @@ public class DatabaseTransactionMgr {
             Pair<Long, Integer> expiredTxnsInfoForLong = 
unprotectedRemoveUselessTxns(currentMillis,
                     finalStatusTransactionStateDequeLong,
                     MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second);
-            int numOfClearedTransaction = expiredTxnsInfoForShort.second + 
expiredTxnsInfoForLong.second;
+            numOfClearedTransaction = expiredTxnsInfoForShort.second + 
expiredTxnsInfoForLong.second;
             if (numOfClearedTransaction > 0) {
-                BatchRemoveTransactionsOperationV2 op = new 
BatchRemoveTransactionsOperationV2(dbId,
+                op = new BatchRemoveTransactionsOperationV2(dbId,
                         expiredTxnsInfoForShort.first, 
expiredTxnsInfoForLong.first);
-                editLog.logBatchRemoveTransactions(op);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Remove {} expired transactions", 
numOfClearedTransaction);
+                if (Config.enable_txn_log_outside_lock) {
+                    logItem = 
editLog.submitEdit(OperationType.OP_BATCH_REMOVE_TXNS_V2, op);
+                } else {
+                    editLog.logBatchRemoveTransactions(op);
                 }
             }
         } finally {
             writeUnlock();
         }
+        if (logItem != null) {
+            logItem.await();
+        }
+        if (op != null && LOG.isDebugEnabled()) {
+            LOG.debug("Remove {} expired transactions", 
numOfClearedTransaction);
+        }
     }
 
     private Pair<Long, Integer> unprotectedRemoveUselessTxns(long 
currentMillis,
-            ArrayDeque<TransactionState> finalStatusTransactionStateDeque, int 
left) {
+            ConcurrentLinkedDeque<TransactionState> 
finalStatusTransactionStateDeque, int left) {
         long latestTxnId = -1;
         int numOfClearedTransaction = 0;
-        while (!finalStatusTransactionStateDeque.isEmpty() && 
numOfClearedTransaction < left) {
-            TransactionState transactionState = 
finalStatusTransactionStateDeque.getFirst();
+        TransactionState transactionState;
+        while ((transactionState = 
finalStatusTransactionStateDeque.peekFirst()) != null
+                && numOfClearedTransaction < left) {
             if (transactionState.isExpired(currentMillis)) {
-                finalStatusTransactionStateDeque.pop();
+                finalStatusTransactionStateDeque.pollFirst();
                 clearTransactionState(transactionState.getTransactionId());
                 latestTxnId = transactionState.getTransactionId();
                 numOfClearedTransaction++;
@@ -1991,9 +2116,9 @@ public class DatabaseTransactionMgr {
         }
         while ((Config.label_num_threshold > 0 && 
finalStatusTransactionStateDeque.size() > Config.label_num_threshold)
                 && numOfClearedTransaction < left) {
-            TransactionState transactionState = 
finalStatusTransactionStateDeque.getFirst();
-            if (transactionState.getFinishTime() != -1) {
-                finalStatusTransactionStateDeque.pop();
+            transactionState = finalStatusTransactionStateDeque.peekFirst();
+            if (transactionState != null && transactionState.getFinishTime() 
!= -1) {
+                finalStatusTransactionStateDeque.pollFirst();
                 clearTransactionState(transactionState.getTransactionId());
                 latestTxnId = transactionState.getTransactionId();
                 numOfClearedTransaction++;
@@ -2105,9 +2230,9 @@ public class DatabaseTransactionMgr {
             throws BeginTransactionException, MetaNotFoundException {
         long txnQuota = 
env.getInternalCatalog().getDbOrMetaException(dbId).getTransactionQuotaSize();
 
-        if (runningTxnNums >= txnQuota) {
+        if (runningTxnNums.get() >= txnQuota) {
             throw new BeginTransactionException("current running txns on db " 
+ dbId + " is "
-                    + runningTxnNums + ", larger than limit " + txnQuota);
+                    + runningTxnNums.get() + ", larger than limit " + 
txnQuota);
         }
 
         // Check if committed txn count on any table exceeds the configured 
limit
@@ -2524,7 +2649,7 @@ public class DatabaseTransactionMgr {
         readLock();
         try {
             infos.add(Lists.newArrayList("running", String.valueOf(
-                    runningTxnNums)));
+                    runningTxnNums.get())));
             long finishedNum = getFinishedTxnNums();
             infos.add(Lists.newArrayList("finished", 
String.valueOf(finishedNum)));
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 4200e0c5e89..fcbd6705ac2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -61,7 +61,7 @@ public class PublishVersionDaemon extends MasterDaemon {
 
     private static final Logger LOG = 
LogManager.getLogger(PublishVersionDaemon.class);
 
-    private static ArrayList<ExecutorService> dbExecutors = new 
ArrayList(Config.publish_thread_pool_num);
+    private static ArrayList<ExecutorService> publishExecutors = new 
ArrayList(Config.publish_thread_pool_num);
 
     private Set<Long> publishingTxnIds = Sets.newConcurrentHashSet();
 
@@ -72,7 +72,7 @@ public class PublishVersionDaemon extends MasterDaemon {
     public PublishVersionDaemon() {
         super("PUBLISH_VERSION", Config.publish_version_interval_ms);
         for (int i = 0; i < Config.publish_thread_pool_num; i++) {
-            dbExecutors.add(ThreadPoolManager.newDaemonFixedThreadPool(1, 
Config.publish_queue_size,
+            publishExecutors.add(ThreadPoolManager.newDaemonFixedThreadPool(1, 
Config.publish_queue_size,
                     "PUBLISH_VERSION_EXEC-" + i, true));
         }
     }
@@ -245,7 +245,13 @@ public class PublishVersionDaemon extends MasterDaemon {
         LOG.info("try to finish transaction {}, dbId: {}, txnId: {}",
                 transactionState.getTransactionId(), 
transactionState.getDbId(), transactionState.getTransactionId());
         try {
-            dbExecutors.get((int) (transactionState.getDbId() % 
Config.publish_thread_pool_num)).execute(() -> {
+            // When enable_per_txn_publish is true, route by transactionId so 
different
+            // transactions in the same database can finish in parallel across 
executor threads.
+            // When false, route by dbId (old behavior) so transactions within 
a DB are sequential.
+            long routingKey = Config.enable_per_txn_publish
+                    ? transactionState.getTransactionId()
+                    : transactionState.getDbId();
+            publishExecutors.get((int) (routingKey % 
Config.publish_thread_pool_num)).execute(() -> {
                 try {
                     tryFinishTxnSync(transactionState, globalTransactionMgr);
                 } catch (Throwable e) {
@@ -269,12 +275,12 @@ public class PublishVersionDaemon extends MasterDaemon {
         }
 
         try {
-            partitionVisibleVersions = Maps.newHashMap();
-            backendPartitions = Maps.newHashMap();
+            Map<Long, Long> txnPartitionVersions = Maps.newHashMap();
+            Map<Long, Set<Long>> txnBackendPartitions = Maps.newHashMap();
             // one transaction exception should not affect other transaction
             globalTransactionMgr.finishTransaction(transactionState.getDbId(),
-                    transactionState.getTransactionId(), 
partitionVisibleVersions, backendPartitions);
-            addBackendVisibleVersions(partitionVisibleVersions, 
backendPartitions);
+                    transactionState.getTransactionId(), txnPartitionVersions, 
txnBackendPartitions);
+            addBackendVisibleVersions(txnPartitionVersions, 
txnBackendPartitions);
         } catch (Exception e) {
             LOG.warn("error happens when finish transaction {}", 
transactionState.getTransactionId(), e);
         }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
index e2b11395836..73ed9f255cd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
@@ -20,9 +20,11 @@ package org.apache.doris.catalog;
 import org.apache.doris.alter.AlterJobV2;
 import org.apache.doris.alter.BatchAlterJobPersistInfo;
 import org.apache.doris.cluster.Cluster;
+import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.ModifyTablePropertyOperationLog;
+import org.apache.doris.persist.OperationType;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.persist.TableInfo;
 import org.apache.doris.system.Backend;
@@ -110,6 +112,15 @@ public class FakeEditLog extends MockUp<EditLog> {
         return 1; // fake that we have streams
     }
 
+    @Mock
+    public EditLog.EditLogItem submitEdit(short op, Writable writable) {
+        if (op == OperationType.OP_UPSERT_TRANSACTION_STATE && writable 
instanceof TransactionState) {
+            TransactionState transactionState = (TransactionState) writable;
+            allTransactionState.put(transactionState.getTransactionId(), 
transactionState);
+        }
+        return null;
+    }
+
     public TransactionState getTransaction(long transactionId) {
         return allTransactionState.get(transactionId);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to