This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0ad638f9fe [enhancement](transaction) Reduce hold writeLock time for
DatabaseTransactionMgr to clear transaction (#17414)
0ad638f9fe is described below
commit 0ad638f9fe5217182c9c10cfb118a1ac46191208
Author: caiconghui <[email protected]>
AuthorDate: Mon Mar 6 11:32:21 2023 +0800
[enhancement](transaction) Reduce hold writeLock time for
DatabaseTransactionMgr to clear transaction (#17414)
* [enhancement](transaction) Reduce hold writeLock time for
DatabaseTransactionMgr to clear transaction
* fix ut
* remove unnessary field for remove txn bdbje log
---------
Co-authored-by: caiconghui1 <[email protected]>
---
.../org/apache/doris/journal/JournalEntity.java | 6 ++
.../BatchRemoveTransactionsOperationV2.java | 70 +++++++++++++++++++++
.../java/org/apache/doris/persist/EditLog.java | 10 ++-
.../org/apache/doris/persist/OperationType.java | 5 +-
.../doris/transaction/DatabaseTransactionMgr.java | 71 +++++++++++++++-------
.../doris/transaction/GlobalTransactionMgr.java | 10 +++
.../java/org/apache/doris/catalog/FakeEditLog.java | 4 +-
7 files changed, 148 insertions(+), 28 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 839071e241..1d9cba3b6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -71,6 +71,7 @@ import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
+import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.persist.ClusterInfo;
import org.apache.doris.persist.ColocatePersistInfo;
@@ -453,6 +454,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_BATCH_REMOVE_TXNS_V2: {
+ data = BatchRemoveTransactionsOperationV2.read(in);
+ isRead = true;
+ break;
+ }
case OperationType.OP_CREATE_REPOSITORY: {
data = Repository.read(in);
isRead = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java
new file mode 100644
index 0000000000..0371a61bc0
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+// Persist the info when removing batch of expired txns
+public class BatchRemoveTransactionsOperationV2 implements Writable {
+
+ @SerializedName(value = "dbId")
+ private long dbId;
+
+ @SerializedName(value = "latestTxnIdForShort")
+ private long latestTxnIdForShort;
+
+ @SerializedName(value = "latestTxnIdForLong")
+ private long latestTxnIdForLong;
+
+ public BatchRemoveTransactionsOperationV2(long dbId, long
latestTxnIdForShort, long latestTxnIdForLong) {
+ this.dbId = dbId;
+ this.latestTxnIdForShort = latestTxnIdForShort;
+ this.latestTxnIdForLong = latestTxnIdForLong;
+ }
+
+ public long getDbId() {
+ return dbId;
+ }
+
+ public long getLatestTxnIdForShort() {
+ return latestTxnIdForShort;
+ }
+
+ public long getLatestTxnIdForLong() {
+ return latestTxnIdForLong;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static BatchRemoveTransactionsOperationV2 read(DataInput in) throws
IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json,
BatchRemoveTransactionsOperationV2.class);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 7c27274530..e8cd29235a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -562,6 +562,12 @@ public class EditLog {
Env.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactions(operation);
break;
}
+ case OperationType.OP_BATCH_REMOVE_TXNS_V2: {
+ final BatchRemoveTransactionsOperationV2 operation =
+ (BatchRemoveTransactionsOperationV2)
journal.getData();
+
Env.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactionV2(operation);
+ break;
+ }
case OperationType.OP_CREATE_REPOSITORY: {
Repository repository = (Repository) journal.getData();
env.getBackupHandler().getRepoMgr().addAndInitRepoIfNotExist(repository, true);
@@ -1604,8 +1610,8 @@ public class EditLog {
logEdit(OperationType.OP_REPLACE_TABLE, log);
}
- public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation
op) {
- logEdit(OperationType.OP_BATCH_REMOVE_TXNS, op);
+ public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2
op) {
+ logEdit(OperationType.OP_BATCH_REMOVE_TXNS_V2, op);
}
public void logModifyComment(ModifyCommentOperationLog op) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 102270a007..32434d08c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -163,12 +163,15 @@ public class OperationType {
//real time load 100 -108
public static final short OP_UPSERT_TRANSACTION_STATE = 100;
@Deprecated
- // use OP_BATCH_REMOVE_TXNS instead
+ // use OP_BATCH_REMOVE_TXNS_V2 instead
public static final short OP_DELETE_TRANSACTION_STATE = 101;
public static final short OP_FINISHING_ROLLUP = 102;
public static final short OP_FINISHING_SCHEMA_CHANGE = 103;
public static final short OP_SAVE_TRANSACTION_ID = 104;
+ @Deprecated
+ // use OP_BATCH_REMOVE_TXNS_V2 instead
public static final short OP_BATCH_REMOVE_TXNS = 105;
+ public static final short OP_BATCH_REMOVE_TXNS_V2 = 106;
// routine load 110~120
public static final short OP_ROUTINE_LOAD_JOB = 110;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index ed5c0cff7f..29fcf2b296 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -46,7 +46,7 @@ import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.persist.BatchRemoveTransactionsOperation;
+import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.AgentBatchTask;
@@ -742,6 +742,33 @@ public class DatabaseTransactionMgr {
}
}
+ public void
replayBatchRemoveTransaction(BatchRemoveTransactionsOperationV2 operation) {
+ writeLock();
+ try {
+ if (operation.getLatestTxnIdForShort() != -1) {
+ while (!finalStatusTransactionStateDequeShort.isEmpty()) {
+ TransactionState transactionState =
finalStatusTransactionStateDequeShort.pop();
+ clearTransactionState(transactionState.getTransactionId());
+ if (operation.getLatestTxnIdForShort() ==
transactionState.getTransactionId()) {
+ break;
+ }
+ }
+ }
+
+ if (operation.getLatestTxnIdForLong() != -1) {
+ while (!finalStatusTransactionStateDequeLong.isEmpty()) {
+ TransactionState transactionState =
finalStatusTransactionStateDequeLong.pop();
+ clearTransactionState(transactionState.getTransactionId());
+ if (operation.getLatestTxnIdForLong() ==
transactionState.getTransactionId()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
public TransactionStatus getLabelState(String label) {
readLock();
try {
@@ -1368,23 +1395,21 @@ public class DatabaseTransactionMgr {
}
public void removeExpiredTxns(long currentMillis) {
- List<Long> expiredTxnIds = Lists.newArrayList();
// delete expired txns
- int leftNum = MAX_REMOVE_TXN_PER_ROUND;
writeLock();
try {
- leftNum = unprotectedRemoveExpiredTxns(currentMillis,
expiredTxnIds,
- finalStatusTransactionStateDequeShort, leftNum);
- leftNum = unprotectedRemoveExpiredTxns(currentMillis,
expiredTxnIds,
- finalStatusTransactionStateDequeLong, leftNum);
-
- if (!expiredTxnIds.isEmpty()) {
- Map<Long, List<Long>> dbExpiredTxnIds = Maps.newHashMap();
- dbExpiredTxnIds.put(dbId, expiredTxnIds);
- BatchRemoveTransactionsOperation op = new
BatchRemoveTransactionsOperation(dbExpiredTxnIds);
+ Pair<Long, Integer> expiredTxnsInfoForShort =
unprotectedRemoveExpiredTxns(currentMillis,
+ finalStatusTransactionStateDequeShort,
MAX_REMOVE_TXN_PER_ROUND);
+ Pair<Long, Integer> expiredTxnsInfoForLong =
unprotectedRemoveExpiredTxns(currentMillis,
+ finalStatusTransactionStateDequeLong,
+ MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second);
+ int numOfClearedTransaction = expiredTxnsInfoForShort.second +
expiredTxnsInfoForLong.second;
+ if (numOfClearedTransaction > 0) {
+ BatchRemoveTransactionsOperationV2 op = new
BatchRemoveTransactionsOperationV2(dbId,
+ expiredTxnsInfoForShort.first,
expiredTxnsInfoForLong.first);
editLog.logBatchRemoveTransactions(op);
if (LOG.isDebugEnabled()) {
- LOG.debug("Remove {} expired transactions",
MAX_REMOVE_TXN_PER_ROUND - leftNum);
+ LOG.debug("Remove {} expired transactions",
numOfClearedTransaction);
}
}
} finally {
@@ -1392,22 +1417,22 @@ public class DatabaseTransactionMgr {
}
}
- private int unprotectedRemoveExpiredTxns(long currentMillis, List<Long>
expiredTxnIds,
- ArrayDeque<TransactionState>
finalStatusTransactionStateDequeShort,
- int maxNumber) {
- int left = maxNumber;
- while (!finalStatusTransactionStateDequeShort.isEmpty() && left > 0) {
- TransactionState transactionState =
finalStatusTransactionStateDequeShort.getFirst();
+ private Pair<Long, Integer> unprotectedRemoveExpiredTxns(long
currentMillis,
+ ArrayDeque<TransactionState> finalStatusTransactionStateDeque, int
left) {
+ long latestTxnId = -1;
+ int numOfClearedTransaction = 0;
+ while (!finalStatusTransactionStateDeque.isEmpty() &&
numOfClearedTransaction < left) {
+ TransactionState transactionState =
finalStatusTransactionStateDeque.getFirst();
if (transactionState.isExpired(currentMillis)) {
- finalStatusTransactionStateDequeShort.pop();
+ finalStatusTransactionStateDeque.pop();
clearTransactionState(transactionState.getTransactionId());
- expiredTxnIds.add(transactionState.getTransactionId());
- left--;
+ latestTxnId = transactionState.getTransactionId();
+ numOfClearedTransaction++;
} else {
break;
}
}
- return left;
+ return Pair.of(latestTxnId, numOfClearedTransaction);
}
private void clearTransactionState(long txnId) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 3c84056789..18bf57bb19 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -35,6 +35,7 @@ import org.apache.doris.metric.AutoMappedMetric;
import org.apache.doris.metric.GaugeMetricImpl;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
+import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.EditLog;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TUniqueId;
@@ -441,6 +442,15 @@ public class GlobalTransactionMgr implements Writable {
}
}
+ public void
replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation) {
+ try {
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(operation.getDbId());
+ dbTransactionMgr.replayBatchRemoveTransaction(operation);
+ } catch (AnalysisException e) {
+ LOG.warn("replay batch remove transactions failed. db " +
operation.getDbId(), e);
+ }
+ }
+
public List<List<Comparable>> getDbInfo() {
List<List<Comparable>> infos = new ArrayList<List<Comparable>>();
List<Long> dbIds =
Lists.newArrayList(dbIdToDatabaseTransactionMgrs.keySet());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
index bffcc9c4f5..08800510a1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
@@ -20,7 +20,7 @@ package org.apache.doris.catalog;
import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.BatchAlterJobPersistInfo;
import org.apache.doris.cluster.Cluster;
-import org.apache.doris.persist.BatchRemoveTransactionsOperation;
+import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.RoutineLoadOperation;
@@ -91,7 +91,7 @@ public class FakeEditLog extends MockUp<EditLog> {
}
@Mock
- public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation
info) {
+ public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2
info) {
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]