This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1cdd04ce32e [opt](txn) Per-transaction locking and parallel publish
for DatabaseTransactionMgr (#59990)
1cdd04ce32e is described below
commit 1cdd04ce32ea162dbd3135e8284ad6925e1cc739
Author: Yongqiang YANG <[email protected]>
AuthorDate: Wed Feb 11 07:51:05 2026 -0800
[opt](txn) Per-transaction locking and parallel publish for
DatabaseTransactionMgr (#59990)
## Summary
- Replace database-wide write lock with per-transaction
`synchronized(transactionState)` for commit, preCommit, abort, and
finish paths, so independent transactions proceed concurrently
- Enable parallel publish within a single database by routing publish
executors by transactionId instead of dbId
- Move edit log writes outside the write lock to reduce lock hold time
- Add config flags for runtime control and safe fallback
Fixes: https://github.com/apache/doris/issues/53642
## Changes
### Per-transaction locking (DatabaseTransactionMgr.java)
- Extract `updateTxnLabels` from `unprotectUpdateInMemoryState` — only
needed at `beginTransaction` and replay time, not during state
transitions
- Convert `runningTxnNums` from `volatile int` to `AtomicInteger` for
thread-safe increment/decrement without db lock
- Replace `ArrayDeque` with `ConcurrentLinkedDeque` for final-status
transaction queues
- Replace `writeLock()`/`writeUnlock()` with
`synchronized(transactionState)` in:
- `preCommitTransaction2PC`
- `commitTransaction` (both overloads)
- `finishTransaction`
- `abortTransaction` / `abortTransaction2PC`
- DB write lock retained for: `beginTransaction` (label uniqueness),
`removeUselessTxns`/`cleanLabel` (touch `labelToTxnIds` HashMap), replay
paths
### Parallel publish within a database (PublishVersionDaemon.java)
- Route publish executor by `transactionId` instead of `dbId` when
`enable_per_txn_publish=true`, so different transactions in the same DB
finish in parallel
- Fix race condition: use local variables in `tryFinishTxnSync` instead
of shared instance fields for
`partitionVisibleVersions`/`backendPartitions`
- Rename `dbExecutors` to `publishExecutors`
### Edit log outside lock (DatabaseTransactionMgr.java, EditLog.java)
- Enqueue edit log inside lock (nanoseconds), await outside lock
(milliseconds)
- Add `enable_txn_log_outside_lock` config flag
### Config flags (Config.java)
- `enable_per_txn_publish` (default `true`, `mutable=true`): controls
publish routing. Set to `false` to fall back to sequential-per-DB
publish
- `enable_txn_log_outside_lock` (default `true`, `mutable=true`):
controls edit log write location
## Correctness
**Two different transactions committing concurrently (the main win):**
- Thread A: `synchronized(txn1) { set COMMITTED, ConcurrentMap.put }`
- Thread B: `synchronized(txn2) { set COMMITTED, ConcurrentMap.put }`
- Different locks, different map entries → fully concurrent
**Same transaction: concurrent commit + abort:**
- `synchronized(txnState)` serializes them — second thread sees the
first's state change and handles accordingly
**No deadlock:** State transition paths only acquire per-txn locks.
Cleanup/replay paths only acquire db write lock. No path acquires both.
## Test plan
- [ ] Run DatabaseTransactionMgrTest unit tests
- [ ] Concurrent load testing with multiple tables in same database
- [ ] Verify lock hold time reduction via metrics
- [ ] Toggle `enable_per_txn_publish` at runtime — verify both parallel
and sequential modes work
- [ ] Toggle `enable_txn_log_outside_lock` at runtime — verify both
paths work
🤖 Generated with [Claude Code](https://claude.com/claude-code)
## test
**scripts**
`for i in `seq 1 2000`; do
mysql -h127.0.0.1 -P9030 -uroot paralldb -e "drop table if exists
sbtest${i}; create table sbtest${i} (
id BIGINT NOT NULL AUTO_INCREMENT,
k INTEGER DEFAULT 0,
c CHAR(120) DEFAULT '',
pad CHAR(60) DEFAULT ''
)
DISTRIBUTED BY RANDOM
BUCKETS 1
PROPERTIES(
"replication_num" = "1"
)"
done`
`sysbench --db-driver=mysql --mysql-host=127.0.0.1 --mysql-port=9030
--mysql-user=root --mysql-db=paralldb --tables=1000 --threads=1000
--time=180 --report-interval=10 oltp_insert run`
**env**
3 fe 3 be
**enable_txn_log_outside_lock = true enable_per_txn_publish = true**
[ 10s ] thds: 1000 tps: 1345.63 qps: 1345.63 (r/w/o: 0.00/1345.63/0.00)
lat (ms,95%): 1869.60 err/s: 0.00 reconn/s: 0.00
[ 20s ] thds: 1000 tps: 1669.04 qps: 1669.04 (r/w/o: 0.00/1669.04/0.00)
lat (ms,95%): 1032.01 err/s: 0.00 reconn/s: 0.00
[ 30s ] thds: 1000 tps: 1780.93 qps: 1780.93 (r/w/o: 0.00/1780.93/0.00)
lat (ms,95%): 893.56 err/s: 0.00 reconn/s: 0.00
[ 40s ] thds: 1000 tps: 1845.02 qps: 1845.02 (r/w/o: 0.00/1845.02/0.00)
lat (ms,95%): 861.95 err/s: 0.00 reconn/s: 0.00
[ 50s ] thds: 1000 tps: 1576.76 qps: 1576.76 (r/w/o: 0.00/1576.76/0.00)
lat (ms,95%): 2493.86 err/s: 0.00 reconn/s: 0.00
[ 60s ] thds: 1000 tps: 1911.56 qps: 1911.56 (r/w/o: 0.00/1911.56/0.00)
lat (ms,95%): 877.61 err/s: 0.00 reconn/s: 0.00
[ 70s ] thds: 1000 tps: 2006.93 qps: 2006.93 (r/w/o: 0.00/2006.93/0.00)
lat (ms,95%): 733.00 err/s: 0.00 reconn/s: 0.00
[ 80s ] thds: 1000 tps: 997.81 qps: 997.81 (r/w/o: 0.00/997.81/0.00) lat
(ms,95%): 995.51 err/s: 0.00 reconn/s: 0.00
[ 90s ] thds: 1000 tps: 1462.80 qps: 1462.80 (r/w/o: 0.00/1462.80/0.00)
lat (ms,95%): 4855.31 err/s: 0.00 reconn/s: 0.00
[ 100s ] thds: 1000 tps: 1661.09 qps: 1661.09 (r/w/o: 0.00/1661.09/0.00)
lat (ms,95%): 943.16 err/s: 0.00 reconn/s: 0.00
[ 110s ] thds: 1000 tps: 1686.85 qps: 1686.85 (r/w/o: 0.00/1686.85/0.00)
lat (ms,95%): 909.80 err/s: 0.00 reconn/s: 0.00
[ 120s ] thds: 1000 tps: 725.92 qps: 725.92 (r/w/o: 0.00/725.92/0.00)
lat (ms,95%): 6360.91 err/s: 0.00 reconn/s: 0.00
[ 130s ] thds: 1000 tps: 1839.17 qps: 1839.17 (r/w/o: 0.00/1839.17/0.00)
lat (ms,95%): 1013.60 err/s: 0.00 reconn/s: 0.00
[ 140s ] thds: 1000 tps: 1986.62 qps: 1986.62 (r/w/o: 0.00/1986.62/0.00)
lat (ms,95%): 773.68 err/s: 0.00 reconn/s: 0.00
[ 150s ] thds: 1000 tps: 1173.42 qps: 1173.42 (r/w/o: 0.00/1173.42/0.00)
lat (ms,95%): 1109.09 err/s: 0.00 reconn/s: 0.00
[ 160s ] thds: 1000 tps: 1057.97 qps: 1057.97 (r/w/o: 0.00/1057.97/0.00)
lat (ms,95%): 9118.47 err/s: 0.00 reconn/s: 0.00
[ 170s ] thds: 1000 tps: 2023.63 qps: 2023.63 (r/w/o: 0.00/2023.63/0.00)
lat (ms,95%): 831.46 err/s: 0.00 reconn/s: 0.00
[ 180s ] thds: 1000 tps: 2015.55 qps: 2015.55 (r/w/o: 0.00/2015.55/0.00)
lat (ms,95%): 787.74 err/s: 0.00 reconn/s: 0.00
SQL statistics:
queries performed:
read: 0
write: 288670
other: 0
total: 288670
transactions: 288670 (1601.69 per sec.)
queries: 288670 (1601.69 per sec.)
**enable_txn_log_outside_lock = false enable_per_txn_publish = false**
[ 10s ] thds: 1000 tps: 135.28 qps: 135.28 (r/w/o: 0.00/135.28/0.00) lat
(ms,95%): 7346.49 err/s: 0.00 reconn/s: 0.00
[ 20s ] thds: 1000 tps: 162.20 qps: 162.20 (r/w/o: 0.00/162.20/0.00) lat
(ms,95%): 6247.39 err/s: 0.00 reconn/s: 0.00
[ 30s ] thds: 1000 tps: 90.70 qps: 90.70 (r/w/o: 0.00/90.70/0.00) lat
(ms,95%): 6247.39 err/s: 0.00 reconn/s: 0.00
[ 40s ] thds: 1000 tps: 150.60 qps: 150.60 (r/w/o: 0.00/150.60/0.00) lat
(ms,95%): 11946.04 err/s: 0.00 reconn/s: 0.00
[ 50s ] thds: 1000 tps: 182.10 qps: 182.10 (r/w/o: 0.00/182.10/0.00) lat
(ms,95%): 5709.50 err/s: 0.00 reconn/s: 0.00
[ 60s ] thds: 1000 tps: 185.10 qps: 185.10 (r/w/o: 0.00/185.10/0.00) lat
(ms,95%): 5409.26 err/s: 0.00 reconn/s: 0.00
[ 70s ] thds: 1000 tps: 80.70 qps: 80.70 (r/w/o: 0.00/80.70/0.00) lat
(ms,95%): 11115.87 err/s: 0.00 reconn/s: 0.00
[ 80s ] thds: 1000 tps: 183.50 qps: 183.50 (r/w/o: 0.00/183.50/0.00) lat
(ms,95%): 11115.87 err/s: 0.00 reconn/s: 0.00
[ 90s ] thds: 1000 tps: 182.40 qps: 182.40 (r/w/o: 0.00/182.40/0.00) lat
(ms,95%): 5607.61 err/s: 0.00 reconn/s: 0.00
[ 100s ] thds: 1000 tps: 128.30 qps: 128.30 (r/w/o: 0.00/128.30/0.00)
lat (ms,95%): 5409.26 err/s: 0.00 reconn/s: 0.00
[ 110s ] thds: 1000 tps: 137.50 qps: 137.50 (r/w/o: 0.00/137.50/0.00)
lat (ms,95%): 10917.50 err/s: 0.00 reconn/s: 0.00
[ 120s ] thds: 1000 tps: 182.60 qps: 182.60 (r/w/o: 0.00/182.60/0.00)
lat (ms,95%): 5607.61 err/s: 0.00 reconn/s: 0.00
[ 130s ] thds: 1000 tps: 183.60 qps: 183.60 (r/w/o: 0.00/183.60/0.00)
lat (ms,95%): 5409.26 err/s: 0.00 reconn/s: 0.00
[ 140s ] thds: 1000 tps: 86.50 qps: 86.50 (r/w/o: 0.00/86.50/0.00) lat
(ms,95%): 10722.67 err/s: 0.00 reconn/s: 0.00
[ 150s ] thds: 1000 tps: 177.00 qps: 177.00 (r/w/o: 0.00/177.00/0.00)
lat (ms,95%): 10722.67 err/s: 0.00 reconn/s: 0.00
[ 160s ] thds: 1000 tps: 183.00 qps: 183.00 (r/w/o: 0.00/183.00/0.00)
lat (ms,95%): 5709.50 err/s: 0.00 reconn/s: 0.00
[ 170s ] thds: 1000 tps: 142.30 qps: 142.30 (r/w/o: 0.00/142.30/0.00)
lat (ms,95%): 5507.54 err/s: 0.00 reconn/s: 0.00
[ 180s ] thds: 1000 tps: 130.10 qps: 130.10 (r/w/o: 0.00/130.10/0.00)
lat (ms,95%): 10722.67 err/s: 0.00 reconn/s: 0.00
SQL statistics:
queries performed:
read: 0
write: 28035
other: 0
total: 28035
transactions: 28035 (152.91 per sec.)
queries: 28035 (152.91 per sec.)
ignored errors: 0 (0.00 per sec.)
reconnects: 0 (0.00 per sec.)
**enable_txn_log_outside_lock = false enable_per_txn_publish = true**
[ 10s ] thds: 1000 tps: 965.23 qps: 965.23 (r/w/o: 0.00/965.23/0.00) lat
(ms,95%): 2493.86 err/s: 0.00 reconn/s: 0.00
[ 20s ] thds: 1000 tps: 1464.63 qps: 1464.63 (r/w/o: 0.00/1464.63/0.00)
lat (ms,95%): 1050.76 err/s: 0.00 reconn/s: 0.00
[ 30s ] thds: 1000 tps: 888.00 qps: 888.00 (r/w/o: 0.00/888.00/0.00) lat
(ms,95%): 909.80 err/s: 0.00 reconn/s: 0.00
[ 40s ] thds: 1000 tps: 1632.37 qps: 1632.37 (r/w/o: 0.00/1632.37/0.00)
lat (ms,95%): 5312.73 err/s: 0.00 reconn/s: 0.00
[ 50s ] thds: 1000 tps: 1810.11 qps: 1810.11 (r/w/o: 0.00/1810.11/0.00)
lat (ms,95%): 831.46 err/s: 0.00 reconn/s: 0.00
[ 60s ] thds: 1000 tps: 1922.10 qps: 1922.10 (r/w/o: 0.00/1922.10/0.00)
lat (ms,95%): 831.46 err/s: 0.00 reconn/s: 0.00
[ 70s ] thds: 1000 tps: 712.58 qps: 712.58 (r/w/o: 0.00/712.58/0.00) lat
(ms,95%): 7215.39 err/s: 0.00 reconn/s: 0.00
[ 80s ] thds: 1000 tps: 2093.76 qps: 2093.76 (r/w/o: 0.00/2093.76/0.00)
lat (ms,95%): 816.63 err/s: 0.00 reconn/s: 0.00
[ 90s ] thds: 1000 tps: 2115.99 qps: 2115.99 (r/w/o: 0.00/2115.99/0.00)
lat (ms,95%): 733.00 err/s: 0.00 reconn/s: 0.00
[ 100s ] thds: 1000 tps: 1494.48 qps: 1494.48 (r/w/o: 0.00/1494.48/0.00)
lat (ms,95%): 733.00 err/s: 0.00 reconn/s: 0.00
[ 110s ] thds: 1000 tps: 651.00 qps: 651.00 (r/w/o: 0.00/651.00/0.00)
lat (ms,95%): 10531.32 err/s: 0.00 reconn/s: 0.00
[ 120s ] thds: 1000 tps: 1976.26 qps: 1976.26 (r/w/o: 0.00/1976.26/0.00)
lat (ms,95%): 787.74 err/s: 0.00 reconn/s: 0.00
[ 130s ] thds: 1000 tps: 1779.35 qps: 1779.35 (r/w/o: 0.00/1779.35/0.00)
lat (ms,95%): 861.95 err/s: 0.00 reconn/s: 0.00
[ 140s ] thds: 1000 tps: 1281.70 qps: 1281.70 (r/w/o: 0.00/1281.70/0.00)
lat (ms,95%): 877.61 err/s: 0.00 reconn/s: 0.00
[ 150s ] thds: 1000 tps: 0.00 qps: 0.00 (r/w/o: 0.00/0.00/0.00) lat
(ms,95%): 0.00 err/s: 0.00 reconn/s: 0.00
[ 160s ] thds: 1000 tps: 1250.21 qps: 1250.21 (r/w/o: 0.00/1250.21/0.00)
lat (ms,95%): 14302.94 err/s: 0.00 reconn/s: 0.00
[ 170s ] thds: 1000 tps: 1775.39 qps: 1775.39 (r/w/o: 0.00/1775.39/0.00)
lat (ms,95%): 877.61 err/s: 0.00 reconn/s: 0.00
[ 180s ] thds: 1000 tps: 1894.81 qps: 1894.81 (r/w/o: 0.00/1894.81/0.00)
lat (ms,95%): 816.63 err/s: 0.00 reconn/s: 0.00
SQL statistics:
queries performed:
read: 0
write: 258081
other: 0
total: 258081
transactions: 258081 (1431.61 per sec.)
queries: 258081 (1431.61 per sec.)
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 22 ++
.../src/main/java/org/apache/doris/DorisFE.java | 6 +
.../java/org/apache/doris/persist/EditLog.java | 68 +++-
.../doris/transaction/DatabaseTransactionMgr.java | 353 ++++++++++++++-------
.../doris/transaction/PublishVersionDaemon.java | 20 +-
.../java/org/apache/doris/catalog/FakeEditLog.java | 11 +
6 files changed, 356 insertions(+), 124 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5b3abc07f62..101c110d124 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -744,6 +744,28 @@ public class Config extends ConfigBase {
"Txn manager will reject coming txns."})
public static int max_running_txn_num_per_db = 10000;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "是否将事务的 edit log 写入移到写锁之外以减少锁竞争。"
+ + "开启后,edit log 条目在写锁内入队(FIFO 保证顺序),"
+ + "在写锁外等待持久化完成,从而降低写锁持有时间,提高并发事务吞吐量。"
+ + "默认开启。关闭后使用传统的锁内同步写入模式。",
+ "Whether to move transaction edit log writes outside the write
lock to reduce lock contention. "
+ + "When enabled, edit log entries are enqueued inside the
write lock (FIFO preserves ordering) "
+ + "and awaited outside the lock, reducing write lock hold
time "
+ + "and improving concurrent transaction throughput. "
+ + "Default is true. Set to false to use the traditional
in-lock synchronous write mode."})
+ public static boolean enable_txn_log_outside_lock = true;
+
+ @ConfField(mutable = true, description = {
+ "是否启用按事务级别并行发布。开启后,同一数据库内的不同事务可以在不同的执行器线程上并行完成发布,"
+ + "而不是按数据库顺序执行。关闭后回退到按数据库路由(旧行为),同一数据库内的事务顺序发布。",
+ "Whether to enable per-transaction parallel publish. When enabled,
different transactions "
+ + "in the same database can finish publishing in parallel
across executor threads, "
+ + "instead of being serialized per database. "
+ + "When disabled, falls back to per-database routing (old
behavior) "
+ + "where transactions within a DB are published
sequentially."})
+ public static boolean enable_per_txn_publish = true;
+
@ConfField(masterOnly = true, description = {"pending load task
执行线程数。这个配置可以限制当前等待的导入作业数。"
+ "并且应小于 `max_running_txn_num_per_db`。",
"The pending load task executor pool size. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
index 05820ac6b44..026fd45462b 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
@@ -66,6 +66,7 @@ import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
import java.time.LocalDate;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -596,6 +597,11 @@ public class DorisFE {
LOG.info("fuzzy set random_add_cluster_keys_for_mow={}",
Config.random_add_cluster_keys_for_mow);
}
+ Config.enable_txn_log_outside_lock = new Random().nextBoolean();
+ LOG.info("fuzzy set enable_txn_log_outside_lock={}",
Config.enable_txn_log_outside_lock);
+ Config.enable_batch_editlog = new Random().nextBoolean();
+ LOG.info("fuzzy set enable_batch_editlog={}",
Config.enable_batch_editlog);
+
setFuzzyForCatalog();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 4f126fcf214..d506b474ed7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -124,13 +124,14 @@ import java.util.concurrent.atomic.AtomicLong;
public class EditLog {
public static final Logger LOG = LogManager.getLogger(EditLog.class);
- // Helper class to hold log edit requests
- private static class EditLogItem {
+ // Helper class to hold log edit requests.
+ // Public so that callers can enqueue inside a lock and await outside it.
+ public static class EditLogItem {
static AtomicLong nextUid = new AtomicLong(0);
final short op;
final Writable writable;
final Object lock = new Object();
- boolean finished = false;
+ volatile boolean finished = false;
long logId = -1;
long uid = -1;
@@ -139,6 +140,24 @@ public class EditLog {
this.writable = writable;
uid = nextUid.getAndIncrement();
}
+
+ /**
+ * Wait for this edit log entry to be flushed to persistent storage.
+ * Returns the assigned log ID.
+ */
+ public long await() {
+ synchronized (lock) {
+ while (!finished) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ LOG.error("Fatal Error : write stream Exception");
+ System.exit(-1);
+ }
+ }
+ }
+ return logId;
+ }
}
private final BlockingQueue<EditLogItem> logEditQueue = new
LinkedBlockingQueue<>();
@@ -1534,6 +1553,49 @@ public class EditLog {
return req.logId;
}
+ /**
+ * Submit an edit log entry to the batch queue without waiting for it to
be flushed.
+ * The entry is enqueued in FIFO order, so calling this inside a write
lock guarantees
+ * that edit log entries are ordered by lock acquisition order.
+ *
+ * <p>The caller MUST call {@link EditLogItem#await()} after releasing the
lock to ensure
+ * the entry is persisted before proceeding.
+ *
+ * <p>If batch edit log is disabled, this falls back to a synchronous
direct write
+ * and the returned item is already completed.
+ *
+ * @return an {@link EditLogItem} handle to await completion
+ */
+ public EditLogItem submitEdit(short op, Writable writable) {
+ if (this.getNumEditStreams() == 0) {
+ LOG.error("Fatal Error : no editLog stream", new Exception());
+ throw new Error("Fatal Error : no editLog stream");
+ }
+
+ EditLogItem req = new EditLogItem(op, writable);
+ if (Config.enable_batch_editlog && op != OperationType.OP_TIMESTAMP) {
+ while (true) {
+ try {
+ logEditQueue.put(req);
+ break;
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted during put, will sleep and retry.");
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ LOG.warn("interrupted during sleep, will retry.", ex);
+ }
+ }
+ }
+ } else {
+ // Non-batch mode: write directly (synchronous)
+ long logId = logEditDirectly(op, writable);
+ req.logId = logId;
+ req.finished = true;
+ }
+ return req;
+ }
+
private synchronized long logEditDirectly(short op, Writable writable) {
long logId = -1;
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index ade04e95b15..eb905f3eda1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -18,6 +18,7 @@
package org.apache.doris.transaction;
import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.binlog.UpsertRecord;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@@ -55,6 +56,7 @@ import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.persist.EditLog;
+import org.apache.doris.persist.OperationType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.task.AgentBatchTask;
@@ -78,7 +80,6 @@ import org.apache.logging.log4j.Logger;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -90,7 +91,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@@ -134,8 +137,10 @@ public class DatabaseTransactionMgr {
// The "Short" queue is used to store the txns of the expire time
// controlled by Config.streaming_label_keep_max_second.
// The "Long" queue is used to store the txns of the expire time
controlled by Config.label_keep_max_second.
- private final ArrayDeque<TransactionState>
finalStatusTransactionStateDequeShort = new ArrayDeque<>();
- private final ArrayDeque<TransactionState>
finalStatusTransactionStateDequeLong = new ArrayDeque<>();
+ private final ConcurrentLinkedDeque<TransactionState>
finalStatusTransactionStateDequeShort
+ = new ConcurrentLinkedDeque<>();
+ private final ConcurrentLinkedDeque<TransactionState>
finalStatusTransactionStateDequeLong
+ = new ConcurrentLinkedDeque<>();
// label -> txn ids
// this is used for checking if label already used. a label may correspond
to multiple txns,
@@ -150,7 +155,7 @@ public class DatabaseTransactionMgr {
private Long lastCommittedTxnCountUpdateTime = 0L;
// count the number of running txns of database
- private volatile int runningTxnNums = 0;
+ private final AtomicInteger runningTxnNums = new AtomicInteger(0);
private final Env env;
@@ -222,7 +227,7 @@ public class DatabaseTransactionMgr {
}
protected int getRunningTxnNums() {
- return runningTxnNums;
+ return runningTxnNums.get();
}
@VisibleForTesting
@@ -331,6 +336,8 @@ public class DatabaseTransactionMgr {
FeNameFormat.checkLabel(label);
long tid = 0L;
+ TransactionState transactionState = null;
+ EditLog.EditLogItem logItem = null;
writeLock();
try {
/*
@@ -368,10 +375,16 @@ public class DatabaseTransactionMgr {
checkRunningTxnExceedLimit(tableIdList);
tid = idGenerator.getNextTransactionId();
- TransactionState transactionState = new TransactionState(dbId,
tableIdList,
+ transactionState = new TransactionState(dbId, tableIdList,
tid, label, requestId, sourceType, coordinator,
listenerId, timeoutSecond * 1000);
transactionState.setPrepareTime(System.currentTimeMillis());
- unprotectUpsertTransactionState(transactionState, false);
+ unprotectUpdateInMemoryState(transactionState, false);
+ updateTxnLabels(transactionState);
+ if (Config.enable_txn_log_outside_lock) {
+ logItem = enqueueTransactionState(transactionState);
+ } else {
+ persistTransactionState(transactionState);
+ }
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_BEGIN.increase(1L);
@@ -379,6 +392,7 @@ public class DatabaseTransactionMgr {
} finally {
writeUnlock();
}
+ awaitTransactionState(logItem, transactionState);
LOG.info("begin transaction: txn id {} with label {} from coordinator
{}, listener id: {}",
tid, label, coordinator, listenerId);
return tid;
@@ -448,13 +462,17 @@ public class DatabaseTransactionMgr {
checkCommitStatus(tableList, transactionState, tabletCommitInfos,
txnCommitAttachment, errorReplicaIds,
tableToPartition, totalInvolvedBackends);
- writeLock();
- try {
+ EditLog.EditLogItem logItem = null;
+ synchronized (transactionState) {
unprotectedPreCommitTransaction2PC(transactionState,
errorReplicaIds, tableToPartition,
totalInvolvedBackends, db);
- } finally {
- writeUnlock();
+ if (Config.enable_txn_log_outside_lock) {
+ logItem = enqueueTransactionState(transactionState);
+ } else {
+ persistTransactionState(transactionState);
+ }
}
+ awaitTransactionState(logItem, transactionState);
LOG.info("transaction:[{}] successfully pre-committed",
transactionState);
}
@@ -802,23 +820,29 @@ public class DatabaseTransactionMgr {
transactionState.beforeStateTransform(TransactionStatus.COMMITTED);
// transaction state transform
boolean txnOperated = false;
- writeLock();
- try {
+ EditLog.EditLogItem logItem = null;
+ synchronized (transactionState) {
if (is2PC) {
unprotectedCommitTransaction2PC(transactionState, db);
} else {
unprotectedCommitTransaction(transactionState, errorReplicaIds,
tableToPartition, totalInvolvedBackends, db);
}
- txnOperated = true;
- } finally {
- writeUnlock();
- // after state transform
- try {
-
transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated);
- } catch (Throwable e) {
- LOG.warn("afterStateTransform txn {} failed. exception: ",
transactionState, e);
+ if (Config.enable_txn_log_outside_lock) {
+ logItem = enqueueTransactionState(transactionState);
+ } else {
+ persistTransactionState(transactionState);
}
+ txnOperated = true;
+ }
+ // after state transform
+ try {
+ transactionState.afterStateTransform(TransactionStatus.COMMITTED,
txnOperated);
+ } catch (Throwable e) {
+ LOG.warn("afterStateTransform txn {} failed. exception: ",
transactionState, e);
+ }
+ if (txnOperated) {
+ awaitTransactionState(logItem, transactionState);
}
// update nextVersion because of the failure of persistent transaction
resulting in error version
@@ -869,19 +893,25 @@ public class DatabaseTransactionMgr {
transactionState.beforeStateTransform(TransactionStatus.COMMITTED);
// transaction state transform
boolean txnOperated = false;
- writeLock();
- try {
+ EditLog.EditLogItem logItem = null;
+ synchronized (transactionState) {
unprotectedCommitTransaction(transactionState, errorReplicaIds,
subTxnToPartition, totalInvolvedBackends,
subTransactionStates, db);
- txnOperated = true;
- } finally {
- writeUnlock();
- // after state transform
- try {
-
transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated);
- } catch (Throwable e) {
- LOG.warn("afterStateTransform txn {} failed. exception: ",
transactionState, e);
+ if (Config.enable_txn_log_outside_lock) {
+ logItem = enqueueTransactionState(transactionState);
+ } else {
+ persistTransactionState(transactionState);
}
+ txnOperated = true;
+ }
+ // after state transform
+ try {
+ transactionState.afterStateTransform(TransactionStatus.COMMITTED,
txnOperated);
+ } catch (Throwable e) {
+ LOG.warn("afterStateTransform txn {} failed. exception: ",
transactionState, e);
+ }
+ if (txnOperated) {
+ awaitTransactionState(logItem, transactionState);
}
// update nextVersion because of the failure of persistent transaction
resulting in error version
@@ -930,15 +960,15 @@ public class DatabaseTransactionMgr {
// here we only delete the oldest element, so if element exist in
finalStatusTransactionStateDeque,
// it must at the front of the finalStatusTransactionStateDeque.
// check both "short" and "long" queue.
- if (!finalStatusTransactionStateDequeShort.isEmpty()
- && transactionState.getTransactionId()
- ==
finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) {
- finalStatusTransactionStateDequeShort.pop();
+ TransactionState shortHead =
finalStatusTransactionStateDequeShort.peekFirst();
+ TransactionState longHead =
finalStatusTransactionStateDequeLong.peekFirst();
+ if (shortHead != null
+ && transactionState.getTransactionId() ==
shortHead.getTransactionId()) {
+ finalStatusTransactionStateDequeShort.pollFirst();
clearTransactionState(transactionState.getTransactionId());
- } else if (!finalStatusTransactionStateDequeLong.isEmpty()
- && transactionState.getTransactionId()
- ==
finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) {
- finalStatusTransactionStateDequeLong.pop();
+ } else if (longHead != null
+ && transactionState.getTransactionId() ==
longHead.getTransactionId()) {
+ finalStatusTransactionStateDequeLong.pollFirst();
clearTransactionState(transactionState.getTransactionId());
}
} finally {
@@ -953,13 +983,13 @@ public class DatabaseTransactionMgr {
// here we only delete the oldest element, so if element exist
in finalStatusTransactionStateDeque,
// it must at the front of the finalStatusTransactionStateDeque
// check both "short" and "long" queue.
- if (!finalStatusTransactionStateDequeShort.isEmpty()
- && txnId ==
finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) {
- finalStatusTransactionStateDequeShort.pop();
+ TransactionState shortHead =
finalStatusTransactionStateDequeShort.peekFirst();
+ TransactionState longHead =
finalStatusTransactionStateDequeLong.peekFirst();
+ if (shortHead != null && txnId ==
shortHead.getTransactionId()) {
+ finalStatusTransactionStateDequeShort.pollFirst();
clearTransactionState(txnId);
- } else if (!finalStatusTransactionStateDequeLong.isEmpty()
- && txnId ==
finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) {
- finalStatusTransactionStateDequeLong.pop();
+ } else if (longHead != null && txnId ==
longHead.getTransactionId()) {
+ finalStatusTransactionStateDequeLong.pollFirst();
clearTransactionState(txnId);
}
}
@@ -972,8 +1002,8 @@ public class DatabaseTransactionMgr {
writeLock();
try {
if (operation.getLatestTxnIdForShort() != -1) {
- while (!finalStatusTransactionStateDequeShort.isEmpty()) {
- TransactionState transactionState =
finalStatusTransactionStateDequeShort.pop();
+ TransactionState transactionState;
+ while ((transactionState =
finalStatusTransactionStateDequeShort.pollFirst()) != null) {
clearTransactionState(transactionState.getTransactionId());
if (operation.getLatestTxnIdForShort() ==
transactionState.getTransactionId()) {
break;
@@ -982,8 +1012,8 @@ public class DatabaseTransactionMgr {
}
if (operation.getLatestTxnIdForLong() != -1) {
- while (!finalStatusTransactionStateDequeLong.isEmpty()) {
- TransactionState transactionState =
finalStatusTransactionStateDequeLong.pop();
+ TransactionState transactionState;
+ while ((transactionState =
finalStatusTransactionStateDequeLong.pollFirst()) != null) {
clearTransactionState(transactionState.getTransactionId());
if (operation.getLatestTxnIdForLong() ==
transactionState.getTransactionId()) {
break;
@@ -1169,14 +1199,19 @@ public class DatabaseTransactionMgr {
}
}
boolean txnOperated = false;
- writeLock();
- try {
+ EditLog.EditLogItem logItem = null;
+ synchronized (transactionState) {
transactionState.setErrorReplicas(errorReplicaIds);
transactionState.setFinishTime(System.currentTimeMillis());
transactionState.clearErrorMsg();
transactionState.setTransactionStatus(TransactionStatus.VISIBLE);
setTableVersion(transactionState, db);
- unprotectUpsertTransactionState(transactionState, false);
+ unprotectUpdateInMemoryState(transactionState, false);
+ if (Config.enable_txn_log_outside_lock) {
+ logItem = enqueueTransactionState(transactionState);
+ } else {
+ persistTransactionState(transactionState);
+ }
txnOperated = true;
// TODO(cmy): We found a very strange problem. When
delete-related transactions are processed here,
// subsequent `updateCatalogAfterVisible()` is called, but it
does not seem to be executed here
@@ -1185,13 +1220,14 @@ public class DatabaseTransactionMgr {
if (LOG.isDebugEnabled()) {
LOG.debug("after set transaction {} to visible",
transactionState);
}
- } finally {
- writeUnlock();
- try {
-
transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated);
- } catch (Throwable e) {
- LOG.warn("afterStateTransform txn {} failed. exception: ",
transactionState, e);
- }
+ }
+ try {
+
transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated);
+ } catch (Throwable e) {
+ LOG.warn("afterStateTransform txn {} failed. exception: ",
transactionState, e);
+ }
+ if (txnOperated) {
+ awaitTransactionState(logItem, transactionState);
}
updateCatalogAfterVisible(transactionState, db,
partitionVisibleVersions, backendPartitions);
} finally {
@@ -1527,8 +1563,8 @@ public class DatabaseTransactionMgr {
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
}
- // persist transactionState
- unprotectUpsertTransactionState(transactionState, false);
+ // Update in-memory state only; caller handles edit log persistence
+ unprotectUpdateInMemoryState(transactionState, false);
transactionState.setInvolvedBackends(totalInvolvedBackends);
}
@@ -1565,8 +1601,8 @@ public class DatabaseTransactionMgr {
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
}
- // persist transactionState
- unprotectUpsertTransactionState(transactionState, false);
+ // Update in-memory state only; caller handles edit log persistence
+ unprotectUpdateInMemoryState(transactionState, false);
transactionState.setInvolvedBackends(totalInvolvedBackends);
}
@@ -1626,8 +1662,8 @@ public class DatabaseTransactionMgr {
transactionState.addSubTxnTableCommitInfo(subTransactionState,
tableCommitInfo);
}
}
- // persist transactionState
- unprotectUpsertTransactionState(transactionState, false);
+ // Update in-memory state only; caller handles edit log persistence
+ unprotectUpdateInMemoryState(transactionState, false);
transactionState.setInvolvedBackends(totalInvolvedBackends);
}
@@ -1674,26 +1710,34 @@ public class DatabaseTransactionMgr {
partitionCommitInfo.setVersionTime(System.currentTimeMillis());
}
}
- // persist transactionState
- editLog.logInsertTransactionState(transactionState);
+ // Update in-memory state only; caller handles edit log persistence
+ unprotectUpdateInMemoryState(transactionState, false);
}
- // for add/update/delete TransactionState
+ /**
+ * Replays a transaction state update from the edit log.
+ * Only called during replay (follower sync or restart), so no edit log
persistence is needed.
+ *
+ * @param transactionState the transaction state to replay
+ * @param isReplay must be true (only used in replay path)
+ */
protected void unprotectUpsertTransactionState(TransactionState
transactionState, boolean isReplay) {
- // if this is a replay operation, we should not log it
- if (!isReplay) {
- if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE
- || transactionState.getSourceType() ==
TransactionState.LoadJobSourceType.FRONTEND) {
- // if this is a prepare txn, and load source type is not
FRONTEND
- // no need to persist it. if prepare txn lost, the following
commit will just be failed.
- // user only need to retry this txn.
- // The FRONTEND type txn is committed and running
asynchronously, so we have to persist it.
- editLog.logInsertTransactionState(transactionState);
- }
- }
+ unprotectUpdateInMemoryState(transactionState, isReplay);
+ }
+
+ /**
+ * Updates only in-memory transaction state without persisting to edit log.
+ * This method must be called while holding the write lock.
+ * Use this method in combination with {@link #persistTransactionState} to
reduce lock contention
+ * by moving edit log writes outside the lock.
+ *
+ * @param transactionState the transaction state to update
+ * @param isReplay true if this is a replay operation (edit log already
contains this state)
+ */
+ protected void unprotectUpdateInMemoryState(TransactionState
transactionState, boolean isReplay) {
if (!transactionState.getTransactionStatus().isFinalStatus()) {
if
(idToRunningTransactionState.put(transactionState.getTransactionId(),
transactionState) == null) {
- runningTxnNums++;
+ runningTxnNums.incrementAndGet();
}
if (isReplay && transactionState.getSubTxnIds() != null) {
LOG.info("add sub transactions for txn_id={}, status={},
sub_txn_ids={}",
@@ -1705,7 +1749,7 @@ public class DatabaseTransactionMgr {
}
} else {
if
(idToRunningTransactionState.remove(transactionState.getTransactionId()) !=
null) {
- runningTxnNums--;
+ runningTxnNums.decrementAndGet();
}
idToFinalStatusTransactionState.put(transactionState.getTransactionId(),
transactionState);
if (transactionState.isShortTxn()) {
@@ -1719,18 +1763,71 @@ public class DatabaseTransactionMgr {
cleanSubTransactions(transactionState.getTransactionId());
}
}
- updateTxnLabels(transactionState);
+ if (isReplay) {
+ updateTxnLabels(transactionState);
+ }
}
- public int getRunningTxnNumsWithLock() {
- readLock();
- try {
- return runningTxnNums;
- } finally {
- readUnlock();
+ /**
+ * Persists the transaction state to edit log synchronously.
+ * For PREPARE transactions with non-FRONTEND source type, persistence is
skipped
+ * because losing them only requires the client to retry.
+ */
+ protected void persistTransactionState(TransactionState transactionState) {
+ if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE
+ || transactionState.getSourceType() ==
TransactionState.LoadJobSourceType.FRONTEND) {
+ // if this is a prepare txn, and load source type is not FRONTEND
+ // no need to persist it. if prepare txn lost, the following
commit will just be failed.
+ // user only need to retry this txn.
+ // The FRONTEND type txn is committed and running asynchronously,
so we have to persist it.
+ editLog.logInsertTransactionState(transactionState);
+ }
+ }
+
+ /**
+ * Enqueue a transaction state edit log entry without waiting for
persistence.
+ * Must be called inside the write lock to preserve ordering via the FIFO
queue.
+ *
+ * @return an {@link EditLog.EditLogItem} handle to await completion, or
null if persistence is skipped
+ */
+ protected EditLog.EditLogItem enqueueTransactionState(TransactionState
transactionState) {
+ if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE
+ || transactionState.getSourceType() ==
TransactionState.LoadJobSourceType.FRONTEND) {
+ return
editLog.submitEdit(OperationType.OP_UPSERT_TRANSACTION_STATE, transactionState);
+ }
+ return null;
+ }
+
+ /**
+ * Await completion of a previously enqueued transaction state edit log
entry.
+ * Should be called outside the write lock. Handles binlog and timing
logic.
+ *
+ * @param item the handle returned by {@link #enqueueTransactionState},
may be null
+ * @param transactionState the transaction state (for binlog and timing)
+ */
+ protected void awaitTransactionState(EditLog.EditLogItem item,
TransactionState transactionState) {
+ if (item == null) {
+ return;
+ }
+ long start = System.currentTimeMillis();
+ long logId = item.await();
+ long logEditEnd = System.currentTimeMillis();
+ long end = logEditEnd;
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ UpsertRecord record = new UpsertRecord(logId, transactionState);
+ Env.getCurrentEnv().getBinlogManager().addUpsertRecord(record);
+ end = System.currentTimeMillis();
+ }
+ if (end - start > Config.lock_reporting_threshold_ms) {
+ LOG.warn("edit log insert transaction take a lot time, write bdb
{} ms, write binlog {} ms",
+ logEditEnd - start, end - logEditEnd);
}
}
+ public int getRunningTxnNumsWithLock() {
+ return runningTxnNums.get();
+ }
+
private void updateTxnLabels(TransactionState transactionState) {
Set<Long> txnIds =
labelToTxnIds.computeIfAbsent(transactionState.getLabel(), k ->
Sets.newHashSet());
txnIds.add(transactionState.getTransactionId());
@@ -1770,12 +1867,20 @@ public class DatabaseTransactionMgr {
// before state transform
transactionState.beforeStateTransform(TransactionStatus.ABORTED);
boolean txnOperated = false;
- writeLock();
- try {
+ EditLog.EditLogItem logItem = null;
+ synchronized (transactionState) {
txnOperated = unprotectAbortTransaction(transactionId, reason);
- } finally {
- writeUnlock();
- transactionState.afterStateTransform(TransactionStatus.ABORTED,
txnOperated, reason);
+ if (txnOperated) {
+ if (Config.enable_txn_log_outside_lock) {
+ logItem = enqueueTransactionState(transactionState);
+ } else {
+ persistTransactionState(transactionState);
+ }
+ }
+ }
+ transactionState.afterStateTransform(TransactionStatus.ABORTED,
txnOperated, reason);
+ if (txnOperated) {
+ awaitTransactionState(logItem, transactionState);
}
// send clear txn task to BE to clear the transactions on BE.
@@ -1812,12 +1917,20 @@ public class DatabaseTransactionMgr {
// before state transform
transactionState.beforeStateTransform(TransactionStatus.ABORTED);
boolean txnOperated = false;
- writeLock();
- try {
+ EditLog.EditLogItem logItem = null;
+ synchronized (transactionState) {
txnOperated = unprotectAbortTransaction(transactionId, "User
Abort");
- } finally {
- writeUnlock();
- transactionState.afterStateTransform(TransactionStatus.ABORTED,
txnOperated, "User Abort");
+ if (txnOperated) {
+ if (Config.enable_txn_log_outside_lock) {
+ logItem = enqueueTransactionState(transactionState);
+ } else {
+ persistTransactionState(transactionState);
+ }
+ }
+ }
+ transactionState.afterStateTransform(TransactionStatus.ABORTED,
txnOperated, "User Abort");
+ if (txnOperated) {
+ awaitTransactionState(logItem, transactionState);
}
// send clear txn task to BE to clear the transactions on BE.
@@ -1849,7 +1962,8 @@ public class DatabaseTransactionMgr {
transactionState.setFinishTime(System.currentTimeMillis());
transactionState.setReason(reason);
transactionState.setTransactionStatus(TransactionStatus.ABORTED);
- unprotectUpsertTransactionState(transactionState, false);
+ // Update in-memory state only; caller handles edit log persistence
+ unprotectUpdateInMemoryState(transactionState, false);
return true;
}
@@ -1953,6 +2067,9 @@ public class DatabaseTransactionMgr {
public void removeUselessTxns(long currentMillis) {
// delete expired txns
+ BatchRemoveTransactionsOperationV2 op = null;
+ EditLog.EditLogItem logItem = null;
+ int numOfClearedTransaction = 0;
writeLock();
try {
Pair<Long, Integer> expiredTxnsInfoForShort =
unprotectedRemoveUselessTxns(currentMillis,
@@ -1960,28 +2077,36 @@ public class DatabaseTransactionMgr {
Pair<Long, Integer> expiredTxnsInfoForLong =
unprotectedRemoveUselessTxns(currentMillis,
finalStatusTransactionStateDequeLong,
MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second);
- int numOfClearedTransaction = expiredTxnsInfoForShort.second +
expiredTxnsInfoForLong.second;
+ numOfClearedTransaction = expiredTxnsInfoForShort.second +
expiredTxnsInfoForLong.second;
if (numOfClearedTransaction > 0) {
- BatchRemoveTransactionsOperationV2 op = new
BatchRemoveTransactionsOperationV2(dbId,
+ op = new BatchRemoveTransactionsOperationV2(dbId,
expiredTxnsInfoForShort.first,
expiredTxnsInfoForLong.first);
- editLog.logBatchRemoveTransactions(op);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Remove {} expired transactions",
numOfClearedTransaction);
+ if (Config.enable_txn_log_outside_lock) {
+ logItem =
editLog.submitEdit(OperationType.OP_BATCH_REMOVE_TXNS_V2, op);
+ } else {
+ editLog.logBatchRemoveTransactions(op);
}
}
} finally {
writeUnlock();
}
+ if (logItem != null) {
+ logItem.await();
+ }
+ if (op != null && LOG.isDebugEnabled()) {
+ LOG.debug("Remove {} expired transactions",
numOfClearedTransaction);
+ }
}
private Pair<Long, Integer> unprotectedRemoveUselessTxns(long
currentMillis,
- ArrayDeque<TransactionState> finalStatusTransactionStateDeque, int
left) {
+ ConcurrentLinkedDeque<TransactionState>
finalStatusTransactionStateDeque, int left) {
long latestTxnId = -1;
int numOfClearedTransaction = 0;
- while (!finalStatusTransactionStateDeque.isEmpty() &&
numOfClearedTransaction < left) {
- TransactionState transactionState =
finalStatusTransactionStateDeque.getFirst();
+ TransactionState transactionState;
+ while ((transactionState =
finalStatusTransactionStateDeque.peekFirst()) != null
+ && numOfClearedTransaction < left) {
if (transactionState.isExpired(currentMillis)) {
- finalStatusTransactionStateDeque.pop();
+ finalStatusTransactionStateDeque.pollFirst();
clearTransactionState(transactionState.getTransactionId());
latestTxnId = transactionState.getTransactionId();
numOfClearedTransaction++;
@@ -1991,9 +2116,9 @@ public class DatabaseTransactionMgr {
}
while ((Config.label_num_threshold > 0 &&
finalStatusTransactionStateDeque.size() > Config.label_num_threshold)
&& numOfClearedTransaction < left) {
- TransactionState transactionState =
finalStatusTransactionStateDeque.getFirst();
- if (transactionState.getFinishTime() != -1) {
- finalStatusTransactionStateDeque.pop();
+ transactionState = finalStatusTransactionStateDeque.peekFirst();
+ if (transactionState != null && transactionState.getFinishTime()
!= -1) {
+ finalStatusTransactionStateDeque.pollFirst();
clearTransactionState(transactionState.getTransactionId());
latestTxnId = transactionState.getTransactionId();
numOfClearedTransaction++;
@@ -2105,9 +2230,9 @@ public class DatabaseTransactionMgr {
throws BeginTransactionException, MetaNotFoundException {
long txnQuota =
env.getInternalCatalog().getDbOrMetaException(dbId).getTransactionQuotaSize();
- if (runningTxnNums >= txnQuota) {
+ if (runningTxnNums.get() >= txnQuota) {
throw new BeginTransactionException("current running txns on db "
+ dbId + " is "
- + runningTxnNums + ", larger than limit " + txnQuota);
+ + runningTxnNums.get() + ", larger than limit " +
txnQuota);
}
// Check if committed txn count on any table exceeds the configured
limit
@@ -2524,7 +2649,7 @@ public class DatabaseTransactionMgr {
readLock();
try {
infos.add(Lists.newArrayList("running", String.valueOf(
- runningTxnNums)));
+ runningTxnNums.get())));
long finishedNum = getFinishedTxnNums();
infos.add(Lists.newArrayList("finished",
String.valueOf(finishedNum)));
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 4200e0c5e89..fcbd6705ac2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -61,7 +61,7 @@ public class PublishVersionDaemon extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(PublishVersionDaemon.class);
- private static ArrayList<ExecutorService> dbExecutors = new
ArrayList(Config.publish_thread_pool_num);
+ private static ArrayList<ExecutorService> publishExecutors = new
ArrayList(Config.publish_thread_pool_num);
private Set<Long> publishingTxnIds = Sets.newConcurrentHashSet();
@@ -72,7 +72,7 @@ public class PublishVersionDaemon extends MasterDaemon {
public PublishVersionDaemon() {
super("PUBLISH_VERSION", Config.publish_version_interval_ms);
for (int i = 0; i < Config.publish_thread_pool_num; i++) {
- dbExecutors.add(ThreadPoolManager.newDaemonFixedThreadPool(1,
Config.publish_queue_size,
+ publishExecutors.add(ThreadPoolManager.newDaemonFixedThreadPool(1,
Config.publish_queue_size,
"PUBLISH_VERSION_EXEC-" + i, true));
}
}
@@ -245,7 +245,13 @@ public class PublishVersionDaemon extends MasterDaemon {
LOG.info("try to finish transaction {}, dbId: {}, txnId: {}",
transactionState.getTransactionId(),
transactionState.getDbId(), transactionState.getTransactionId());
try {
- dbExecutors.get((int) (transactionState.getDbId() %
Config.publish_thread_pool_num)).execute(() -> {
+ // When enable_per_txn_publish is true, route by transactionId so
different
+ // transactions in the same database can finish in parallel across
executor threads.
+ // When false, route by dbId (old behavior) so transactions within
a DB are sequential.
+ long routingKey = Config.enable_per_txn_publish
+ ? transactionState.getTransactionId()
+ : transactionState.getDbId();
+ publishExecutors.get((int) (routingKey %
Config.publish_thread_pool_num)).execute(() -> {
try {
tryFinishTxnSync(transactionState, globalTransactionMgr);
} catch (Throwable e) {
@@ -269,12 +275,12 @@ public class PublishVersionDaemon extends MasterDaemon {
}
try {
- partitionVisibleVersions = Maps.newHashMap();
- backendPartitions = Maps.newHashMap();
+ Map<Long, Long> txnPartitionVersions = Maps.newHashMap();
+ Map<Long, Set<Long>> txnBackendPartitions = Maps.newHashMap();
// one transaction exception should not affect other transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
- transactionState.getTransactionId(),
partitionVisibleVersions, backendPartitions);
- addBackendVisibleVersions(partitionVisibleVersions,
backendPartitions);
+ transactionState.getTransactionId(), txnPartitionVersions,
txnBackendPartitions);
+ addBackendVisibleVersions(txnPartitionVersions,
txnBackendPartitions);
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}",
transactionState.getTransactionId(), e);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
index e2b11395836..73ed9f255cd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
@@ -20,9 +20,11 @@ package org.apache.doris.catalog;
import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.BatchAlterJobPersistInfo;
import org.apache.doris.cluster.Cluster;
+import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
+import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.system.Backend;
@@ -110,6 +112,15 @@ public class FakeEditLog extends MockUp<EditLog> {
return 1; // fake that we have streams
}
+ @Mock
+ public EditLog.EditLogItem submitEdit(short op, Writable writable) {
+ if (op == OperationType.OP_UPSERT_TRANSACTION_STATE && writable
instanceof TransactionState) {
+ TransactionState transactionState = (TransactionState) writable;
+ allTransactionState.put(transactionState.getTransactionId(),
transactionState);
+ }
+ return null;
+ }
+
public TransactionState getTransaction(long transactionId) {
return allTransactionState.get(transactionId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]