This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_vgraft by this push:
new 2af3174326 temp save
2af3174326 is described below
commit 2af3174326d835400f7bda82fb5f48ad714cfda7
Author: Tian Jiang <[email protected]>
AuthorDate: Sun Sep 25 23:07:26 2022 +0800
temp save
---
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 5 +
.../apache/iotdb/cluster/config/ClusterConfig.java | 10 ++
.../iotdb/cluster/config/ClusterDescriptor.java | 6 +
.../iotdb/cluster/log/FragmentedLogDispatcher.java | 10 +-
.../iotdb/cluster/log/IndirectLogDispatcher.java | 18 ++-
.../apache/iotdb/cluster/log/LogDispatcher.java | 136 ++++++++++-----------
.../org/apache/iotdb/cluster/log/VotingLog.java | 47 ++-----
.../apache/iotdb/cluster/log/VotingLogList.java | 83 +++++--------
.../cluster/log/appender/BlockingLogAppender.java | 51 ++++----
.../cluster/log/applier/AsyncDataLogApplier.java | 6 +
.../iotdb/cluster/log/applier/DataLogApplier.java | 3 +-
.../iotdb/cluster/log/manage/RaftLogManager.java | 5 +
.../log/sequencing/AsynchronousSequencer.java | 1 -
.../log/sequencing/SynchronousSequencer.java | 1 -
.../handlers/caller/AppendNodeEntryHandler.java | 9 +-
.../cluster/server/member/MetaGroupMember.java | 10 +-
.../iotdb/cluster/server/member/RaftMember.java | 103 +++++-----------
.../caller/AppendNodeEntryHandlerTest.java | 8 +-
18 files changed, 222 insertions(+), 290 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 5e94a945ca..dfa2427c7e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.cluster.server.monitor.NodeReport;
import org.apache.iotdb.cluster.server.monitor.NodeStatus;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
import org.apache.iotdb.cluster.server.raft.DataRaftService;
import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
@@ -308,6 +309,10 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
logger.info("Send nums: {}", sendNums);
logger.info("Send latency sum: {}", sendLatencySums);
logger.info("Send latency avg: {}", sendLatencyAvg);
+ logger.info("Append time: {}", Statistic.RAFT_SENDER_SEND_LOG);
+ logger.info("Index diff: {}", Statistic.RAFT_RECEIVER_INDEX_DIFF);
+ logger.info("Follower time: {}",
Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL);
+ logger.info("Window length: {}", Statistic.RAFT_WINDOW_LENGTH);
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 51c0105d05..a9724b4280 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -219,6 +219,8 @@ public class ClusterConfig {
// VG-Raft related
private boolean useVGRaft = true;
+ private boolean useCRaft = false;
+
/**
* create a clusterConfig class. The internalIP will be set according to the
server's hostname. If
* there is something error for getting the ip of the hostname, then set the
internalIp as
@@ -682,4 +684,12 @@ public class ClusterConfig {
public void setUseVGRaft(boolean useVGRaft) {
this.useVGRaft = useVGRaft;
}
+
+ public boolean isUseCRaft() {
+ return useCRaft;
+ }
+
+ public void setUseCRaft(boolean useCRaft) {
+ this.useCRaft = useCRaft;
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index d533a61347..b3af782f2b 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -390,6 +390,12 @@ public class ClusterDescriptor {
"use_vg_raft",
String.valueOf(config.isUseVGRaft()))));
+ config.setUseCRaft(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "use_c_raft",
+ String.valueOf(config.isUseCRaft()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
index f8954effdc..4a08476869 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.log;
+import java.util.Map.Entry;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -26,6 +27,7 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,13 +47,15 @@ public class FragmentedLogDispatcher extends LogDispatcher {
long startTime =
Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
- for (int i = 0; i < nodesLogQueues.size(); i++) {
- BlockingQueue<SendLogRequest> nodeLogQueue = nodesLogQueues.get(i);
+
+ int i = 0;
+ for (Pair<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesList)
{
+ BlockingQueue<SendLogRequest> nodeLogQueue = entry.right;
SendLogRequest fragmentedRequest = new SendLogRequest(request);
fragmentedRequest.setVotingLog(new VotingLog(request.getVotingLog()));
fragmentedRequest
.getVotingLog()
- .setLog(new FragmentedLog((FragmentedLog)
request.getVotingLog().getLog(), i));
+ .setLog(new FragmentedLog((FragmentedLog)
request.getVotingLog().getLog(), i++));
try {
boolean addSucceeded;
if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 7102fd799a..1665edbb58 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.log;
+import java.util.concurrent.ArrayBlockingQueue;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -29,6 +30,8 @@ import
org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.WeightedList;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,10 +73,23 @@ public class IndirectLogDispatcher extends LogDispatcher {
@Override
void createQueueAndBindingThreads() {
+ nodesEnabled = new HashMap<>();
for (Node node : member.getAllNodes()) {
if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
nodesEnabled.put(node, false);
- nodesLogQueues.put(node, createQueueAndBindingThread(node));
+ BlockingQueue<SendLogRequest> logBlockingQueue;
+ logBlockingQueue =
+ new ArrayBlockingQueue<>(
+
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+ nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
+ }
+ }
+
+ for (int i = 0; i < bindingThreadNum; i++) {
+ for (Pair<Node, BlockingQueue<SendLogRequest>> pair :
nodesLogQueuesList) {
+ executorServices.computeIfAbsent(pair.left, n ->
IoTDBThreadPoolFactory.newCachedThreadPool(
+ "LogDispatcher-" + member.getName() + "-" +
ClusterUtils.nodeToString(pair.left)))
+ .submit(newDispatcherThread(pair.left, pair.right));
}
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index ef587da6ce..7032ef6e52 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
@@ -55,7 +56,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -76,12 +76,10 @@ public class LogDispatcher {
RaftMember member;
private static final ClusterConfig clusterConfig =
ClusterDescriptor.getInstance().getConfig();
protected boolean useBatchInLogCatchUp =
clusterConfig.isUseBatchInLogCatchUp();
- Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueues = new HashMap<>();
- Map<Node, Boolean> nodesEnabled = new HashMap<>();
- ExecutorService executorService;
- private static ExecutorService serializationService =
- IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
- Runtime.getRuntime().availableProcessors(), "DispatcherEncoder");
+ Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueueMap = new HashMap<>();
+ List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new
ArrayList<>();
+ Map<Node, Boolean> nodesEnabled;
+ Map<Node, ExecutorService> executorServices = new HashMap<>();
public static int bindingThreadNum =
clusterConfig.getDispatcherBindingThreadNum();
public static int maxBatchSize = 10;
@@ -89,35 +87,36 @@ public class LogDispatcher {
public LogDispatcher(RaftMember member) {
this.member = member;
- executorService =
- IoTDBThreadPoolFactory.newFixedThreadPool(
- bindingThreadNum * (member.getAllNodes().size() - 1),
- "LogDispatcher-" + member.getName());
createQueueAndBindingThreads();
}
void createQueueAndBindingThreads() {
for (Node node : member.getAllNodes()) {
if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
- nodesEnabled.put(node, true);
- nodesLogQueues.put(node, createQueueAndBindingThread(node));
+ BlockingQueue<SendLogRequest> logBlockingQueue;
+ logBlockingQueue =
+ new ArrayBlockingQueue<>(
+
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+ nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
+ }
+ }
+
+ for (int i = 0; i < bindingThreadNum; i++) {
+ for (Pair<Node, BlockingQueue<SendLogRequest>> pair :
nodesLogQueuesList) {
+ executorServices.computeIfAbsent(pair.left, n ->
IoTDBThreadPoolFactory.newCachedThreadPool(
+ "LogDispatcher-" + member.getName() + "-" +
ClusterUtils.nodeToString(pair.left)))
+ .submit(newDispatcherThread(pair.left, pair.right));
}
}
}
@TestOnly
public void close() throws InterruptedException {
- executorService.shutdownNow();
- executorService.awaitTermination(10, TimeUnit.SECONDS);
- }
-
- private ByteBuffer serializeTask(SendLogRequest request) {
- ByteBuffer byteBuffer = request.getVotingLog().getLog().serialize();
- request.getVotingLog().getLog().setByteSize(byteBuffer.capacity());
- if (clusterConfig.isUseVGRaft()) {
- request.getAppendEntryRequest().setEntryHash(byteBuffer.hashCode());
+ for (Entry<Node, ExecutorService> entry : executorServices.entrySet()) {
+ ExecutorService value = entry.getValue();
+ value.shutdownNow();
+ value.awaitTermination(10, TimeUnit.SECONDS);
}
- return byteBuffer;
}
protected SendLogRequest transformRequest(Node node, SendLogRequest request)
{
@@ -125,12 +124,29 @@ public class LogDispatcher {
return newRequest;
}
- public void offer(SendLogRequest request) {
- // do serialization here to avoid taking LogManager for too long
- if (!nodesLogQueues.isEmpty()) {
- SendLogRequest finalRequest = request;
- request.serializedLogFuture = serializationService.submit(() ->
serializeTask(finalRequest));
+
+ private boolean addToQueue(BlockingQueue<SendLogRequest> nodeLogQueue,
SendLogRequest request) {
+ if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
+ long waitStart = System.currentTimeMillis();
+ long waitTime = 1;
+ while (System.currentTimeMillis() - waitStart <
clusterConfig.getConnectionTimeoutInMS()) {
+ if (nodeLogQueue.add(request)) {
+ return true;
+ } else {
+ try {
+ member.getLogManager().wait(waitTime);
+ waitTime *= 2;
+ } catch (InterruptedException e) {
+ logger.warn("Unexpected interruption");
+ }
+ }
+ }
+ return false;
+ } else {
+ return nodeLogQueue.add(request);
}
+ }
+ public void offer(SendLogRequest request) {
long startTime =
Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
@@ -138,34 +154,25 @@ public class LogDispatcher {
if (clusterConfig.isUseVGRaft()) {
verifiers = member.getTrustValueHolder().chooseVerifiers();
}
- for (Entry<Node, BlockingQueue<SendLogRequest>> entry :
nodesLogQueues.entrySet()) {
- boolean nodeEnabled = this.nodesEnabled.getOrDefault(entry.getKey(),
false);
- if (!nodeEnabled) {
+
+ for (Pair<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesList)
{
+ if (nodesEnabled != null && !this.nodesEnabled.getOrDefault(entry.left,
false)) {
continue;
}
- request = transformRequest(entry.getKey(), request);
- if (clusterConfig.isUseVGRaft() && ClusterUtils.isNodeIn(entry.getKey(),
verifiers)) {
+ if (clusterConfig.isUseVGRaft() && ClusterUtils.isNodeIn(entry.left,
verifiers)) {
+ request = transformRequest(entry.left, request);
request.setVerifier(true);
}
- BlockingQueue<SendLogRequest> nodeLogQueue = entry.getValue();
+ BlockingQueue<SendLogRequest> nodeLogQueue = entry.right;
try {
- boolean addSucceeded;
- if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
- addSucceeded =
- nodeLogQueue.offer(
- request,
-
ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(),
- TimeUnit.MILLISECONDS);
- } else {
- addSucceeded = nodeLogQueue.add(request);
- }
+ boolean addSucceeded = addToQueue(nodeLogQueue, request);
if (!addSucceeded) {
logger.debug(
"Log queue[{}] of {} is full, ignore the request to this node",
- entry.getKey(),
+ entry.left,
member.getName());
} else {
request.setEnqueueTime(System.nanoTime());
@@ -173,10 +180,8 @@ public class LogDispatcher {
} catch (IllegalStateException e) {
logger.debug(
"Log queue[{}] of {} is full, ignore the request to this node",
- entry.getKey(),
+ entry.left,
member.getName());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
}
}
Statistic.LOG_DISPATCHER_LOG_ENQUEUE.calOperationCostTimeFromStart(startTime);
@@ -187,17 +192,6 @@ public class LogDispatcher {
}
}
- BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
- BlockingQueue<SendLogRequest> logBlockingQueue;
- logBlockingQueue =
- new ArrayBlockingQueue<>(
-
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
- for (int i = 0; i < bindingThreadNum; i++) {
- executorService.submit(newDispatcherThread(node, logBlockingQueue));
- }
- return logBlockingQueue;
- }
-
DispatcherThread newDispatcherThread(Node node,
BlockingQueue<SendLogRequest> logBlockingQueue) {
return new DispatcherThread(node, logBlockingQueue);
}
@@ -315,8 +309,6 @@ public class LogDispatcher {
baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
}
-
-
@Override
public void run() {
if (logger.isDebugEnabled()) {
@@ -347,14 +339,20 @@ public class LogDispatcher {
logger.info("Dispatcher exits");
}
- protected void serializeEntries() throws ExecutionException,
InterruptedException {
+ protected void serializeEntries() throws InterruptedException {
for (SendLogRequest request : currBatch) {
Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
request.getVotingLog().getLog().getEnqueueTime());
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE.calOperationCostTimeFromStart(
request.getVotingLog().getLog().getCreateTime());
long start =
Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
- request.getAppendEntryRequest().entry =
request.serializedLogFuture.get();
+ request.getAppendEntryRequest().entry =
request.getVotingLog().getLog().serialize();
+ request.getVotingLog().getLog()
+ .setByteSize(request.getAppendEntryRequest().entry.capacity());
+ if (clusterConfig.isUseVGRaft()) {
+ request.getAppendEntryRequest()
+ .setEntryHash(request.getAppendEntryRequest().entry.hashCode());
+ }
Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
}
}
@@ -376,17 +374,7 @@ public class LogDispatcher {
private void appendEntriesSync(
List<ByteBuffer> logList, AppendEntriesRequest request,
List<SendLogRequest> currBatch) {
- long startTime =
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
- if (!member.waitForPrevLog(peerInfo,
currBatch.get(0).getVotingLog().getLog())) {
- logger.warn(
- "{}: node {} timed out when appending {}",
- member.getName(),
- receiver,
- currBatch.get(0).getVotingLog());
- return;
- }
-
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
-
+ long startTime;
AsyncMethodCallback<AppendEntryResult> handler = new
AppendEntriesHandler(currBatch);
startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index edc933664b..848a61c967 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -19,26 +19,20 @@
package org.apache.iotdb.cluster.log;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.expr.vgraft.TrustValueHolder;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
public class VotingLog {
protected Log log;
- protected Set<Integer> stronglyAcceptedNodeIds;
protected Set<Integer> weaklyAcceptedNodeIds;
protected Set<Integer> failedNodeIds;
protected Set<byte[]> signatures;
public AtomicLong acceptedTime;
+ private boolean hasFailed;
public VotingLog(Log log, int groupSize) {
this.log = log;
- stronglyAcceptedNodeIds = new HashSet<>(groupSize);
weaklyAcceptedNodeIds = new HashSet<>(groupSize);
acceptedTime = new AtomicLong();
failedNodeIds = new HashSet<>(groupSize);
@@ -47,7 +41,6 @@ public class VotingLog {
public VotingLog(VotingLog another) {
this.log = another.log;
- this.stronglyAcceptedNodeIds = another.stronglyAcceptedNodeIds;
this.weaklyAcceptedNodeIds = another.weaklyAcceptedNodeIds;
this.acceptedTime = another.acceptedTime;
this.failedNodeIds = another.failedNodeIds;
@@ -62,36 +55,6 @@ public class VotingLog {
this.log = log;
}
- public Set<Integer> getStronglyAcceptedNodeIds() {
- return stronglyAcceptedNodeIds;
- }
-
- public boolean onStronglyAccept(long index, long term, Node acceptingNode,
int quorumSize,
- ByteBuffer signature, int nodeNum) {
- if (getLog().getCurrLogIndex() <= index
- && getLog().getCurrLogTerm() == term) {
- synchronized (this) {
- getStronglyAcceptedNodeIds().add(acceptingNode.nodeIdentifier);
- if (signature != null) {
- signatures.add(Arrays.copyOfRange(signature.array(),
signature.arrayOffset(),
- signature.arrayOffset() + signature.remaining()));
- }
- }
-
- if (getStronglyAcceptedNodeIds().size()
- + getWeaklyAcceptedNodeIds().size()
- >= quorumSize) {
- acceptedTime.set(System.nanoTime());
- }
- if (ClusterDescriptor.getInstance().getConfig().isUseVGRaft()) {
- return signatures.size() > TrustValueHolder.verifierGroupSize(nodeNum)
/ 2;
- } else {
- return getStronglyAcceptedNodeIds().size() >= quorumSize;
- }
- }
- return false;
- }
-
public Set<Integer> getWeaklyAcceptedNodeIds() {
return weaklyAcceptedNodeIds;
}
@@ -108,4 +71,12 @@ public class VotingLog {
public Set<byte[]> getSignatures() {
return signatures;
}
+
+ public boolean isHasFailed() {
+ return hasFailed;
+ }
+
+ public void setHasFailed(boolean hasFailed) {
+ this.hasFailed = hasFailed;
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 71c1ec4135..29a72f1612 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -20,6 +20,11 @@
package org.apache.iotdb.cluster.log;
import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -36,30 +41,22 @@ public class VotingLogList {
private static final Logger logger =
LoggerFactory.getLogger(VotingLogList.class);
- private List<VotingLog> logList = new ArrayList<>();
private volatile long currTerm = -1;
private int quorumSize;
private RaftMember member;
+ private Map<Integer, Long> stronglyAcceptedIndices = new
ConcurrentHashMap<>();
public VotingLogList(int quorumSize, RaftMember member) {
this.quorumSize = quorumSize;
this.member = member;
}
- /**
- * Insert a voting entry into the list. Notice the logs must be inserted in
order of index, as
- * they are inserted as soon as created
- *
- * @param log
- */
- public synchronized void insert(VotingLog log) {
- if (log.getLog().getCurrLogTerm() != currTerm) {
- clear();
- currTerm = log.getLog().getCurrLogTerm();
- }
- logList.add(log);
- }
+ private long computeNewCommitIndex() {
+ List<Entry<Integer, Long>> nodeIndices = new
ArrayList<>(stronglyAcceptedIndices.entrySet());
+ nodeIndices.sort(Entry.comparingByValue());
+ return nodeIndices.get(quorumSize - 1).getValue();
+ }
/**
* When an entry of index-term is strongly accepted by a node of
acceptingNodeId, record the id in
* all entries whose index <= the accepted entry. If any entry is accepted
by a quorum, remove it
@@ -73,55 +70,35 @@ public class VotingLogList {
*/
public void onStronglyAccept(long index, long term, Node acceptingNode,
ByteBuffer signature) {
logger.debug("{}-{} is strongly accepted by {}", index, term,
acceptingNode);
- int lastEntryIndexToCommit = -1;
- List<VotingLog> acceptedLogs;
- synchronized (this) {
- for (int i = 0, logListSize = logList.size(); i < logListSize; i++) {
- VotingLog votingLog = logList.get(i);
- if (votingLog.onStronglyAccept(index, term, acceptingNode, quorumSize,
- signature, member.getAllNodes().size())) {
- lastEntryIndexToCommit = i;
- }
- if (votingLog.getLog().getCurrLogIndex() > index) {
- break;
- }
+ stronglyAcceptedIndices.compute(acceptingNode.nodeIdentifier, (nid, idx)
-> {
+ if (idx == null) {
+ return index;
+ } else {
+ return Math.max(index, idx);
}
+ });
- List<VotingLog> tmpAcceptedLogs = logList.subList(0,
lastEntryIndexToCommit + 1);
- acceptedLogs = new ArrayList<>(tmpAcceptedLogs);
- tmpAcceptedLogs.clear();
- }
-
- if (lastEntryIndexToCommit != -1) {
- Log lastLog = acceptedLogs.get(acceptedLogs.size() - 1).log;
+ long newCommitIndex = computeNewCommitIndex();
+ if (newCommitIndex > member.getCommitIndex()) {
synchronized (member.getLogManager()) {
try {
- member.getLogManager().commitTo(lastLog.getCurrLogIndex());
+ member.getLogManager().commitTo(newCommitIndex);
} catch (LogExecutionException e) {
- logger.error("Fail to commit {}", lastLog, e);
- }
- }
-
- for (VotingLog acceptedLog : acceptedLogs) {
- synchronized (acceptedLog) {
- acceptedLog.acceptedTime.set(System.nanoTime());
- acceptedLog.notifyAll();
- }
- if
(ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
- member.removeAppendLogHandler(
- new Pair<>(
- acceptedLog.getLog().getCurrLogIndex(),
acceptedLog.getLog().getCurrLogTerm()));
+ logger.error("Fail to commit {}", newCommitIndex, e);
}
}
}
}
- public synchronized void clear() {
- logList.clear();
- }
-
- public int size() {
- return logList.size();
+ public int totalAcceptedNodeNum(VotingLog log) {
+ long index = log.getLog().getCurrLogIndex();
+ int num = log.getWeaklyAcceptedNodeIds().size();
+ for (Entry<Integer, Long> entry : stronglyAcceptedIndices.entrySet()) {
+ if (entry.getValue() >= index) {
+ num ++;
+ }
+ }
+ return num;
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
index 8b87c100fb..f60523a514 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
@@ -103,6 +103,10 @@ public class BlockingLogAppender implements LogAppender {
if (success != -1) {
logger.debug("{} append a new log {}", member.getName(), log);
result.status = Response.RESPONSE_STRONG_ACCEPT;
+ if (request.isSetSubReceivers() && !request.getSubReceivers().isEmpty())
{
+ request.entry.rewind();
+ member.getLogRelay().offer(request, request.subReceivers);
+ }
} else {
// the incoming log points to an illegal position, reject it
result.status = Response.RESPONSE_LOG_MISMATCH;
@@ -118,12 +122,13 @@ public class BlockingLogAppender implements LogAppender {
Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
long lastLogIndex = logManager.getLastLogIndex();
Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
+ long waitTime = 10;
while (lastLogIndex < prevLogIndex
&& alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS()) {
try {
// each time new logs are appended, this will be notified
synchronized (logUpdateCondition) {
- logUpdateCondition.wait(1);
+ logUpdateCondition.wait(waitTime);
}
lastLogIndex = logManager.getLastLogIndex();
if (lastLogIndex >= prevLogIndex) {
@@ -133,6 +138,7 @@ public class BlockingLogAppender implements LogAppender {
Thread.currentThread().interrupt();
return false;
}
+ waitTime = waitTime * 2;
alreadyWait = System.currentTimeMillis() - waitStart;
}
@@ -190,29 +196,11 @@ public class BlockingLogAppender implements LogAppender {
logManager.maybeAppend(
request.prevLogIndex, request.prevLogTerm,
request.leaderCommit, logs);
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
- if (resp != -1) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{} append a new log list {}, commit to {}",
- member.getName(),
- logs,
- request.leaderCommit);
- }
- result.status = Response.RESPONSE_STRONG_ACCEPT;
- result.setLastLogIndex(logManager.getLastLogIndex());
- result.setLastLogTerm(logManager.getLastLogTerm());
-
- if (request.isSetSubReceivers()) {
- request.entries.forEach(Buffer::rewind);
- member.getLogRelay().offer(request, request.subReceivers);
- }
- } else {
- // the incoming log points to an illegal position, reject it
- result.status = Response.RESPONSE_LOG_MISMATCH;
- }
+
break;
}
}
+
try {
TimeUnit.MILLISECONDS.sleep(
IoTDBDescriptor.getInstance().getConfig().getCheckPeriodWhenInsertBlocked());
@@ -225,6 +213,27 @@ public class BlockingLogAppender implements LogAppender {
Thread.currentThread().interrupt();
}
}
+
+ if (resp != -1) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{} append a new log list {}, commit to {}",
+ member.getName(),
+ logs,
+ request.leaderCommit);
+ }
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogTerm(logManager.getLastLogTerm());
+
+ if (request.isSetSubReceivers()) {
+ request.entries.forEach(Buffer::rewind);
+ member.getLogRelay().offer(request, request.subReceivers);
+ }
+ } else {
+ // the incoming log points to an illegal position, reject it
+ result.status = Response.RESPONSE_LOG_MISMATCH;
+ }
return result;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index bcd5e6c03c..a717bf4543 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.log.applier;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
@@ -33,6 +34,7 @@ import
org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.slf4j.Logger;
@@ -130,6 +132,8 @@ public class AsyncDataLogApplier implements LogApplier {
// unreachable
}
return partialPath;
+ } else if (log instanceof FragmentedLog) {
+ return new PartialPath("dummy", false);
}
return null;
}
@@ -164,6 +168,8 @@ public class AsyncDataLogApplier implements LogApplier {
} else if (plan instanceof CreateTimeSeriesPlan) {
PartialPath path = ((CreateTimeSeriesPlan) plan).getPath();
sgPath = IoTDB.schemaProcessor.getBelongedStorageGroup(path);
+ } else if (plan instanceof DummyPlan) {
+ sgPath = new PartialPath("dummy", false);
}
return sgPath;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 7a4160a2c7..f17ce02b13 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
@@ -87,7 +88,7 @@ public class DataLogApplier extends BaseApplier {
closeFileLog.getPartitionId(),
closeFileLog.isSeq(),
false);
- } else {
+ } else if (!(log instanceof FragmentedLog)) {
logger.error("Unsupported log: {}", log);
}
} catch (Exception e) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index a3ab0bb2be..1a22b26a5d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -661,6 +661,11 @@ public abstract class RaftLogManager {
// success or fail together approximately.
// TODO: make it real atomic
getCommittedEntryManager().append(entries);
+ for (Log entry : entries) {
+ synchronized (entry) {
+ entry.notifyAll();
+ }
+ }
Log lastLog = entries.get(entries.size() - 1);
getUnCommittedEntryManager().stableTo(lastLog.getCurrLogIndex());
commitIndex = lastLog.getCurrLogIndex();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 465c4f0dd5..a6fda19576 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -124,7 +124,6 @@ public class AsynchronousSequencer implements LogSequencer {
log.getReceiveTime());
log.setCreateTime(System.nanoTime());
if (member.getAllNodes().size() > 1) {
- member.getVotingLogList().insert(sendLogRequest.getVotingLog());
member.getLogDispatcher().offer(sendLogRequest);
}
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index 9a54c626d8..07ce3a1cc7 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -92,7 +92,6 @@ public class SynchronousSequencer implements LogSequencer {
startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
log.setCreateTime(System.nanoTime());
if (member.getAllNodes().size() > 1) {
- member.getVotingLogList().insert(sendLogRequest.getVotingLog());
member.getLogDispatcher().offer(sendLogRequest);
}
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 692d89c56e..c9a97388d4 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -73,8 +73,7 @@ public class AppendNodeEntryHandler implements
AsyncMethodCallback<AppendEntryRe
if (Timer.ENABLE_INSTRUMENTING) {
Statistic.RAFT_SENDER_SEND_LOG_ASYNC.calOperationCostTimeFromStart(sendStart);
}
- if (log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
- // the request already failed
+ if (log.isHasFailed()) {
return;
}
@@ -123,10 +122,6 @@ public class AppendNodeEntryHandler implements
AsyncMethodCallback<AppendEntryRe
} else if (resp == RESPONSE_WEAK_ACCEPT) {
synchronized (log) {
log.getWeaklyAcceptedNodeIds().add(trueReceiver.nodeIdentifier);
- if (log.getWeaklyAcceptedNodeIds().size() +
log.getStronglyAcceptedNodeIds().size()
- >= quorumSize) {
- log.acceptedTime.set(System.nanoTime());
- }
log.notifyAll();
}
} else {
@@ -173,7 +168,7 @@ public class AppendNodeEntryHandler implements
AsyncMethodCallback<AppendEntryRe
log.getFailedNodeIds().add(trueReceiver.nodeIdentifier);
if (log.getFailedNodeIds().size() > quorumSize) {
// quorum members have failed, there is no need to wait for others
- log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
+ log.setHasFailed(true);
log.notifyAll();
if
(ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
member.removeAppendLogHandler(
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 670f86a068..1ba9f12415 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -894,9 +894,6 @@ public class MetaGroupMember extends RaftMember implements
IService, MetaGroupMe
addNodeLog.setNewNode(newNode);
logManager.append(addNodeLog);
- if (getAllNodes().size() > 1) {
- votingLogList.insert(votingLog);
- }
}
int retryTime = 0;
@@ -909,7 +906,7 @@ public class MetaGroupMember extends RaftMember implements
IService, MetaGroupMe
AppendLogResult result = sendLogToFollowers(votingLog);
switch (result) {
case OK:
- commitLog(addNodeLog);
+ waitApply(addNodeLog);
logger.info("{}: Join request of {} is accepted", name, newNode);
synchronized (partitionTable) {
@@ -1649,9 +1646,6 @@ public class MetaGroupMember extends RaftMember
implements IService, MetaGroupMe
removeNodeLog.setRemovedNode(target);
logManager.append(removeNodeLog);
- if (getAllNodes().size() > 1) {
- votingLogList.insert(votingLog);
- }
}
int retryTime = 0;
@@ -1664,7 +1658,7 @@ public class MetaGroupMember extends RaftMember
implements IService, MetaGroupMe
AppendLogResult result = sendLogToFollowers(votingLog);
switch (result) {
case OK:
- commitLog(removeNodeLog);
+ waitApply(removeNodeLog);
logger.info("{}: Removal request of {} is accepted", name, target);
return Response.RESPONSE_AGREE;
case TIME_OUT:
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 1bb143536f..027b706af0 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -151,7 +151,7 @@ public abstract class RaftMember implements RaftMemberMBean
{
public static boolean USE_LOG_DISPATCHER = true;
private static final boolean ENABLE_WEAK_ACCEPTANCE =
ClusterDescriptor.getInstance().getConfig().isEnableWeakAcceptance();
- public static boolean USE_CRAFT = false;
+ public static boolean USE_CRAFT =
ClusterDescriptor.getInstance().getConfig().isUseCRaft();
protected LogAppenderFactory appenderFactory = new
BlockingLogAppender.Factory();
protected static final LogSequencerFactory SEQUENCER_FACTORY =
@@ -679,12 +679,10 @@ public abstract class RaftMember implements
RaftMemberMBean {
log.setByteSize(logByteSize);
Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
- long appendStartTime =
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
AppendEntryResult result = getLogAppender().appendEntry(request, log);
if (ClusterDescriptor.getInstance().getConfig().isUseVGRaft() &&
isVerifier) {
result.setSignature(KeyManager.INSTANCE.getNodeSignature());
}
-
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(appendStartTime);
logger.debug("{} AppendEntryRequest of {} completed with result {}", name,
log, result.status);
@@ -736,9 +734,7 @@ public abstract class RaftMember implements RaftMemberMBean
{
Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
- long appendStartTime =
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
response = getLogAppender().appendEntries(request, logs);
-
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(appendStartTime);
if (logger.isDebugEnabled()) {
logger.debug(
@@ -1219,9 +1215,6 @@ public abstract class RaftMember implements
RaftMemberMBean {
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
logManager.append(log);
votingLog = buildVotingLog(log);
- if (getAllNodes().size() > 1) {
- votingLogList.insert(votingLog);
- }
break;
}
}
@@ -1307,9 +1300,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
return includeLogNumbersInStatus(
StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), log);
case OK:
- logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
startTime =
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
- commitLog(log);
+ waitApply(log);
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
log.getCreateTime());
@@ -1770,64 +1762,48 @@ public abstract class RaftMember implements
RaftMemberMBean {
*/
@SuppressWarnings({"java:S2445"}) // safe synchronized
private void waitAppendResultLoop(VotingLog log, int quorumSize) {
- int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
- int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
- int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+ int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
long nextTimeToPrint = 5000;
long waitStart = System.nanoTime();
long alreadyWait = 0;
String threadBaseName = Thread.currentThread().getName();
- synchronized (log) {
+ long waitTime = 1;
+ synchronized (log.getLog()) {
while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
|| (!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
- && stronglyAcceptedNodeNum < quorumSize
+ && getCommitIndex() < log.getLog().getCurrLogIndex()
|| ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
&& log.getSignatures().size() <
TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
&& (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
- || (totalAccepted < quorumSize)
- || votingLogList.size() > config.getMaxNumOfLogsInMem())
+ || (totalAccepted < quorumSize))
&& alreadyWait < ClusterConstant.getWriteOperationTimeoutMS()
- && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
+ && !log.isHasFailed()) {
try {
- log.wait(1);
+ log.getLog().wait(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption when sending a log", e);
}
- if (logger.isDebugEnabled()) {
- Thread.currentThread()
- .setName(
- threadBaseName
- + "-waiting-"
- + log.getLog().getCurrLogIndex()
- + "-"
- + log.getStronglyAcceptedNodeIds()
- + "-"
- + log.getWeaklyAcceptedNodeIds());
- }
+ waitTime = waitTime * 2;
alreadyWait = (System.nanoTime() - waitStart) / 1000000;
if (alreadyWait > nextTimeToPrint) {
logger.info(
- "Still not receive enough votes for {}, strongly accepted {},
weakly "
- + "accepted {}, voting logs {}, wait {}ms, wait to sequence
{}ms, wait to enqueue "
+ "Still not receive enough votes for {}, weakly "
+ + "accepted {}, wait {}ms, wait to sequence {}ms, wait to
enqueue "
+ "{}ms, wait to accept "
+ "{}ms",
log,
- log.getStronglyAcceptedNodeIds(),
log.getWeaklyAcceptedNodeIds(),
- votingLogList.size(),
alreadyWait,
(log.getLog().getSequenceStartTime() - waitStart) / 1000000,
(log.getLog().getEnqueueTime() - waitStart) / 1000000,
(log.acceptedTime.get() - waitStart) / 1000000);
nextTimeToPrint *= 2;
}
- stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
- weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
- totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+ totalAccepted = votingLogList.totalAcceptedNodeNum(log);
}
}
if (logger.isDebugEnabled()) {
@@ -1836,9 +1812,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
if (alreadyWait > 15000) {
logger.info(
- "Slow entry {}, strongly accepted {}, weakly " + "accepted {},
waited time {}ms",
+ "Slow entry {}, weakly " + "accepted {}, waited time {}ms",
log,
- log.getStronglyAcceptedNodeIds(),
log.getWeaklyAcceptedNodeIds(),
alreadyWait);
}
@@ -1852,26 +1827,20 @@ public abstract class RaftMember implements
RaftMemberMBean {
VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm,
int quorumSize) {
// wait for the followers to vote
long startTime =
Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
-
- int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
- int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
- int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+ int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
if (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
|| ((!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
- && stronglyAcceptedNodeNum < quorumSize
+ && log.getLog().getCurrLogIndex() > getCommitIndex()
|| ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
&& log.getSignatures().size() <
TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
&& (!ENABLE_WEAK_ACCEPTANCE
- || (totalAccepted < quorumSize)
- || votingLogList.size() > config.getMaxNumOfLogsInMem())
- && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE))) {
+ || (totalAccepted < quorumSize))
+ && !log.isHasFailed())) {
waitAppendResultLoop(log, quorumSize);
}
- stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
- weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
- totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+ totalAccepted = votingLogList.totalAcceptedNodeNum(log);
if (log.acceptedTime.get() != 0) {
Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
@@ -1888,13 +1857,13 @@ public abstract class RaftMember implements
RaftMemberMBean {
return AppendLogResult.LEADERSHIP_STALE;
}
- // cannot get enough agreements within a certain amount of time
- if (stronglyAcceptedNodeNum < quorumSize && totalAccepted < quorumSize) {
- return AppendLogResult.TIME_OUT;
+ if (totalAccepted >= quorumSize && log.getLog().getCurrLogIndex() >
getCommitIndex()) {
+ return AppendLogResult.WEAK_ACCEPT;
}
- if (stronglyAcceptedNodeNum < quorumSize && totalAccepted >= quorumSize) {
- return AppendLogResult.WEAK_ACCEPT;
+ // cannot get enough agreements within a certain amount of time
+ if (log.getLog().getCurrLogIndex() > getCommitIndex()) {
+ return AppendLogResult.TIME_OUT;
}
// voteCounter has counted down to zero
@@ -1902,24 +1871,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
}
@SuppressWarnings("java:S2445")
- protected void commitLog(Log log) throws LogExecutionException {
+ protected void waitApply(Log log) throws LogExecutionException {
long startTime;
- // if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
- // startTime =
- //
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
- // synchronized (logManager) {
- //
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
- // startTime);
- // if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
- // startTime =
Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
- // logManager.commitTo(log.getCurrLogIndex());
- //
- //
Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
- // }
- // startTime =
Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
- // }
- //
Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
- // }
// when using async applier, the log here may not be applied. To return
the execution
// result, we must wait until the log is applied.
@@ -2078,7 +2031,7 @@ public abstract class RaftMember implements
RaftMemberMBean {
// single node group, no followers
long startTime =
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
- commitLog(log.getLog());
+ waitApply(log.getLog());
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(),
log.getLog());
}
@@ -2105,7 +2058,7 @@ public abstract class RaftMember implements
RaftMemberMBean {
case OK:
startTime =
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
- commitLog(log.getLog());
+ waitApply(log.getLog());
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(),
log.getLog());
@@ -2169,7 +2122,7 @@ public abstract class RaftMember implements
RaftMemberMBean {
AppendEntryRequest request = buildAppendEntryRequest(log.getLog(), true);
log.getFailedNodeIds().clear();
- log.getStronglyAcceptedNodeIds().remove(Integer.MAX_VALUE);
+ log.setHasFailed(false);
try {
if (allNodes.size() > 2) {
diff --git
a/cluster/src/test_back/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
b/cluster/src/test_back/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index f69f144054..538040ca72 100644
---
a/cluster/src/test_back/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++
b/cluster/src/test_back/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -70,7 +70,6 @@ public class AppendNodeEntryHandlerTest {
try {
ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
VotingLog votingLog = new VotingLog(log, 10);
- member.getVotingLogList().insert(votingLog);
PeerInfo peerInfo = new PeerInfo(1);
for (int i = 0; i < 10; i++) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
@@ -85,11 +84,7 @@ public class AppendNodeEntryHandlerTest {
result.setStatus(resp);
new Thread(() -> handler.onComplete(result)).start();
}
- while (votingLog.getStronglyAcceptedNodeIds().size() < 5) {
- synchronized (votingLog) {
- votingLog.wait(1);
- }
- }
+
assertEquals(-1, receiverTerm.get());
assertFalse(leadershipStale.get());
assertEquals(5, votingLog.getStronglyAcceptedNodeIds().size());
@@ -104,7 +99,6 @@ public class AppendNodeEntryHandlerTest {
AtomicBoolean leadershipStale = new AtomicBoolean(false);
Log log = new TestLog();
VotingLog votingLog = new VotingLog(log, 10);
- member.getVotingLogList().insert(votingLog);
PeerInfo peerInfo = new PeerInfo(1);
for (int i = 0; i < 3; i++) {