This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5c3fed216dc [fix](transaction) Fix publish txn wait too long when not
meet quorum (#26659)
5c3fed216dc is described below
commit 5c3fed216dcf137d83279f68ba7bccf65e83464d
Author: yujun <[email protected]>
AuthorDate: Fri Nov 10 14:55:26 2023 +0800
[fix](transaction) Fix publish txn wait too long when not meet quorum
(#26659)
---
.../doris/transaction/DatabaseTransactionMgr.java | 16 ++------
.../doris/transaction/PublishVersionDaemon.java | 4 +-
.../apache/doris/transaction/TransactionState.java | 36 ++++++++---------
.../data/load/insert/test_publish_one_succ.out | 10 +++++
.../load/insert/test_publish_one_succ.groovy | 45 ++++++++++++++++++++++
5 files changed, 79 insertions(+), 32 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9b123030465..42cac0c9690 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -294,7 +294,7 @@ public class DatabaseTransactionMgr {
info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
info.add(TimeUtils.longToTimeString(txnState.getPreCommitTime()));
info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
- info.add(TimeUtils.longToTimeString(txnState.getPublishVersionTime()));
+
info.add(TimeUtils.longToTimeString(txnState.getLastPublishVersionTime()));
info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
info.add(txnState.getReason());
info.add(String.valueOf(txnState.getErrorReplicas().size()));
@@ -944,10 +944,10 @@ public class DatabaseTransactionMgr {
Map<Long, PublishVersionTask> publishTasks =
transactionState.getPublishVersionTasks();
long now = System.currentTimeMillis();
- long firstPublishOneSuccTime =
transactionState.getFirstPublishOneSuccTime();
+ long firstPublishVersionTime =
transactionState.getFirstPublishVersionTime();
boolean allowPublishOneSucc = false;
- if (Config.publish_wait_time_second > 0 && firstPublishOneSuccTime > 0
- && now >= firstPublishOneSuccTime +
Config.publish_wait_time_second * 1000L) {
+ if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
+ && now >= firstPublishVersionTime +
Config.publish_wait_time_second * 1000L) {
allowPublishOneSucc = true;
}
@@ -970,7 +970,6 @@ public class DatabaseTransactionMgr {
tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
PublishResult publishResult = PublishResult.QUORUM_SUCC;
try {
- boolean allTabletsLeastOneSucc = true;
Iterator<TableCommitInfo> tableCommitInfoIterator
=
transactionState.getIdToTableCommitInfos().values().iterator();
while (tableCommitInfoIterator.hasNext()) {
@@ -1058,10 +1057,6 @@ public class DatabaseTransactionMgr {
continue;
}
- if (healthReplicaNum == 0) {
- allTabletsLeastOneSucc = false;
- }
-
String writeDetail =
getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
if (allowPublishOneSucc && healthReplicaNum > 0) {
@@ -1100,9 +1095,6 @@ public class DatabaseTransactionMgr {
}
}
}
- if (allTabletsLeastOneSucc && firstPublishOneSuccTime <= 0) {
- transactionState.setFirstPublishOneSuccTime(now);
- }
if (publishResult == PublishResult.FAILED) {
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 922e8645d9f..06fa71303c1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -121,7 +121,7 @@ public class PublishVersionDaemon extends MasterDaemon {
batchTask.addTask(task);
transactionState.addPublishVersionTask(backendId, task);
}
- transactionState.setHasSendTask(true);
+ transactionState.setSendedTask();
LOG.info("send publish tasks for transaction: {}, db: {}",
transactionState.getTransactionId(),
transactionState.getDbId());
}
@@ -174,7 +174,7 @@ public class PublishVersionDaemon extends MasterDaemon {
AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
}
if (MetricRepo.isInit) {
- long publishTime =
transactionState.getPublishVersionTime() - transactionState.getCommitTime();
+ long publishTime =
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 2b01a612534..17e9f53d609 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -220,13 +220,14 @@ public class TransactionState implements Writable {
// this state need not be serialized
private Map<Long, PublishVersionTask> publishVersionTasks;
private boolean hasSendTask;
- private long publishVersionTime = -1;
private TransactionStatus preStatus = null;
// When publish txn, if every tablet has at least 1 replica published
succ, but not quorum replicas succ,
- // and time since firstPublishOneSuccTime has exceeds
Config.publish_wait_time_second,
+ // and time since firstPublishVersionTime has exceeds
Config.publish_wait_time_second,
// then this transaction will become visible.
- private long firstPublishOneSuccTime = -1;
+ private long firstPublishVersionTime = -1;
+
+ private long lastPublishVersionTime = -1;
@SerializedName(value = "callbackId")
private long callbackId = -1;
@@ -339,17 +340,24 @@ public class TransactionState implements Writable {
this.publishVersionTasks.put(backendId, task);
}
- public void setHasSendTask(boolean hasSendTask) {
- this.hasSendTask = hasSendTask;
- this.publishVersionTime = System.currentTimeMillis();
+ public void setSendedTask() {
+ this.hasSendTask = true;
+ updateSendTaskTime();
}
public void updateSendTaskTime() {
- this.publishVersionTime = System.currentTimeMillis();
+ this.lastPublishVersionTime = System.currentTimeMillis();
+ if (this.firstPublishVersionTime <= 0) {
+ this.firstPublishVersionTime = lastPublishVersionTime;
+ }
+ }
+
+ public long getFirstPublishVersionTime() {
+ return firstPublishVersionTime;
}
- public long getPublishVersionTime() {
- return this.publishVersionTime;
+ public long getLastPublishVersionTime() {
+ return this.lastPublishVersionTime;
}
public boolean hasSendTask() {
@@ -420,14 +428,6 @@ public class TransactionState implements Writable {
return errorLogUrl;
}
- public long getFirstPublishOneSuccTime() {
- return firstPublishOneSuccTime;
- }
-
- public void setFirstPublishOneSuccTime(long firstPublishOneSuccTime) {
- this.firstPublishOneSuccTime = firstPublishOneSuccTime;
- }
-
public void setTransactionStatus(TransactionStatus transactionStatus) {
// status changed
this.preStatus = this.transactionStatus;
@@ -646,7 +646,7 @@ public class TransactionState implements Writable {
if (prolongPublishTimeout) {
timeoutMillis *= 2;
}
- return System.currentTimeMillis() - publishVersionTime > timeoutMillis;
+ return System.currentTimeMillis() - lastPublishVersionTime >
timeoutMillis;
}
public void prolongPublishTimeout() {
diff --git a/regression-test/data/load/insert/test_publish_one_succ.out
b/regression-test/data/load/insert/test_publish_one_succ.out
new file mode 100644
index 00000000000..c82f0c7f1da
--- /dev/null
+++ b/regression-test/data/load/insert/test_publish_one_succ.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_1 --
+
+-- !select_2 --
+1 10
+2 20
+3 30
+4 40
+5 50
+
diff --git a/regression-test/suites/load/insert/test_publish_one_succ.groovy
b/regression-test/suites/load/insert/test_publish_one_succ.groovy
new file mode 100644
index 00000000000..4e331b5c8d2
--- /dev/null
+++ b/regression-test/suites/load/insert/test_publish_one_succ.groovy
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite('test_publish_one_succ') {
+ def options = new ClusterOptions()
+ options.enableDebugPoints()
+ docker(options) {
+ cluster.injectDebugPoints(NodeType.FE,
['PublishVersionDaemon.stop_publish':null])
+
+ sql 'SET GLOBAL insert_visible_timeout_ms = 1000'
+ sql "ADMIN SET FRONTEND CONFIG ('publish_wait_time_second' =
'1000000')"
+ sql 'CREATE TABLE tbl (k1 int, k2 int)'
+ for (def i = 1; i <= 5; i++) {
+ sql "INSERT INTO tbl VALUES (${i}, ${10 * i})"
+ }
+
+ cluster.stopBackends(2, 3)
+ cluster.checkBeIsAlive(2, false)
+ cluster.checkBeIsAlive(3, false)
+ cluster.clearFrontendDebugPoints()
+
+ sleep(1000)
+ order_qt_select_1 'SELECT * FROM tbl'
+ sql "ADMIN SET FRONTEND CONFIG ('publish_wait_time_second' = '2')"
+ sleep(2000)
+ order_qt_select_2 'SELECT * FROM tbl'
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]