This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_vgraft by this push:
     new 2af3174326 temp save
2af3174326 is described below

commit 2af3174326d835400f7bda82fb5f48ad714cfda7
Author: Tian Jiang <[email protected]>
AuthorDate: Sun Sep 25 23:07:26 2022 +0800

    temp save
---
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |   5 +
 .../apache/iotdb/cluster/config/ClusterConfig.java |  10 ++
 .../iotdb/cluster/config/ClusterDescriptor.java    |   6 +
 .../iotdb/cluster/log/FragmentedLogDispatcher.java |  10 +-
 .../iotdb/cluster/log/IndirectLogDispatcher.java   |  18 ++-
 .../apache/iotdb/cluster/log/LogDispatcher.java    | 136 ++++++++++-----------
 .../org/apache/iotdb/cluster/log/VotingLog.java    |  47 ++-----
 .../apache/iotdb/cluster/log/VotingLogList.java    |  83 +++++--------
 .../cluster/log/appender/BlockingLogAppender.java  |  51 ++++----
 .../cluster/log/applier/AsyncDataLogApplier.java   |   6 +
 .../iotdb/cluster/log/applier/DataLogApplier.java  |   3 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   5 +
 .../log/sequencing/AsynchronousSequencer.java      |   1 -
 .../log/sequencing/SynchronousSequencer.java       |   1 -
 .../handlers/caller/AppendNodeEntryHandler.java    |   9 +-
 .../cluster/server/member/MetaGroupMember.java     |  10 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 103 +++++-----------
 .../caller/AppendNodeEntryHandlerTest.java         |   8 +-
 18 files changed, 222 insertions(+), 290 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 5e94a945ca..dfa2427c7e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.cluster.server.monitor.NodeReport;
 import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
 import org.apache.iotdb.cluster.server.raft.DataRaftService;
 import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
@@ -308,6 +309,10 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
       logger.info("Send nums: {}", sendNums);
       logger.info("Send latency sum: {}", sendLatencySums);
       logger.info("Send latency avg: {}", sendLatencyAvg);
+      logger.info("Append time: {}", Statistic.RAFT_SENDER_SEND_LOG);
+      logger.info("Index diff: {}", Statistic.RAFT_RECEIVER_INDEX_DIFF);
+      logger.info("Follower time: {}", 
Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL);
+      logger.info("Window length: {}", Statistic.RAFT_WINDOW_LENGTH);
     }
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 51c0105d05..a9724b4280 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -219,6 +219,8 @@ public class ClusterConfig {
   // VG-Raft related
   private boolean useVGRaft = true;
 
+  private boolean useCRaft = false;
+
   /**
    * create a clusterConfig class. The internalIP will be set according to the 
server's hostname. If
    * there is something error for getting the ip of the hostname, then set the 
internalIp as
@@ -682,4 +684,12 @@ public class ClusterConfig {
   public void setUseVGRaft(boolean useVGRaft) {
     this.useVGRaft = useVGRaft;
   }
+
+  public boolean isUseCRaft() {
+    return useCRaft;
+  }
+
+  public void setUseCRaft(boolean useCRaft) {
+    this.useCRaft = useCRaft;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index d533a61347..b3af782f2b 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -390,6 +390,12 @@ public class ClusterDescriptor {
                 "use_vg_raft",
                 String.valueOf(config.isUseVGRaft()))));
 
+    config.setUseCRaft(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "use_c_raft",
+                String.valueOf(config.isUseCRaft()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
index f8954effdc..4a08476869 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.log;
 
+import java.util.Map.Entry;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -26,6 +27,7 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,13 +47,15 @@ public class FragmentedLogDispatcher extends LogDispatcher {
 
     long startTime = 
Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
     request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
-    for (int i = 0; i < nodesLogQueues.size(); i++) {
-      BlockingQueue<SendLogRequest> nodeLogQueue = nodesLogQueues.get(i);
+
+    int i = 0;
+    for (Pair<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesList) 
{
+      BlockingQueue<SendLogRequest> nodeLogQueue = entry.right;
       SendLogRequest fragmentedRequest = new SendLogRequest(request);
       fragmentedRequest.setVotingLog(new VotingLog(request.getVotingLog()));
       fragmentedRequest
           .getVotingLog()
-          .setLog(new FragmentedLog((FragmentedLog) 
request.getVotingLog().getLog(), i));
+          .setLog(new FragmentedLog((FragmentedLog) 
request.getVotingLog().getLog(), i++));
       try {
         boolean addSucceeded;
         if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 7102fd799a..1665edbb58 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.log;
 
+import java.util.concurrent.ArrayBlockingQueue;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -29,6 +30,8 @@ import 
org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.WeightedList;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,10 +73,23 @@ public class IndirectLogDispatcher extends LogDispatcher {
 
   @Override
   void createQueueAndBindingThreads() {
+    nodesEnabled = new HashMap<>();
     for (Node node : member.getAllNodes()) {
       if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
         nodesEnabled.put(node, false);
-        nodesLogQueues.put(node, createQueueAndBindingThread(node));
+        BlockingQueue<SendLogRequest> logBlockingQueue;
+        logBlockingQueue =
+            new ArrayBlockingQueue<>(
+                
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+        nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
+      }
+    }
+
+    for (int i = 0; i < bindingThreadNum; i++) {
+      for (Pair<Node, BlockingQueue<SendLogRequest>> pair : 
nodesLogQueuesList) {
+        executorServices.computeIfAbsent(pair.left, n -> 
IoTDBThreadPoolFactory.newCachedThreadPool(
+                "LogDispatcher-" + member.getName() + "-" + 
ClusterUtils.nodeToString(pair.left)))
+            .submit(newDispatcherThread(pair.left, pair.right));
       }
     }
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index ef587da6ce..7032ef6e52 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -55,7 +56,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -76,12 +76,10 @@ public class LogDispatcher {
   RaftMember member;
   private static final ClusterConfig clusterConfig = 
ClusterDescriptor.getInstance().getConfig();
   protected boolean useBatchInLogCatchUp = 
clusterConfig.isUseBatchInLogCatchUp();
-  Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueues = new HashMap<>();
-  Map<Node, Boolean> nodesEnabled = new HashMap<>();
-  ExecutorService executorService;
-  private static ExecutorService serializationService =
-      IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
-          Runtime.getRuntime().availableProcessors(), "DispatcherEncoder");
+  Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueueMap = new HashMap<>();
+  List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new 
ArrayList<>();
+  Map<Node, Boolean> nodesEnabled;
+  Map<Node, ExecutorService> executorServices = new HashMap<>();
 
   public static int bindingThreadNum = 
clusterConfig.getDispatcherBindingThreadNum();
   public static int maxBatchSize = 10;
@@ -89,35 +87,36 @@ public class LogDispatcher {
 
   public LogDispatcher(RaftMember member) {
     this.member = member;
-    executorService =
-        IoTDBThreadPoolFactory.newFixedThreadPool(
-            bindingThreadNum * (member.getAllNodes().size() - 1),
-            "LogDispatcher-" + member.getName());
     createQueueAndBindingThreads();
   }
 
   void createQueueAndBindingThreads() {
     for (Node node : member.getAllNodes()) {
       if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
-        nodesEnabled.put(node, true);
-        nodesLogQueues.put(node, createQueueAndBindingThread(node));
+        BlockingQueue<SendLogRequest> logBlockingQueue;
+        logBlockingQueue =
+            new ArrayBlockingQueue<>(
+                
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+        nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
+      }
+    }
+
+    for (int i = 0; i < bindingThreadNum; i++) {
+      for (Pair<Node, BlockingQueue<SendLogRequest>> pair : 
nodesLogQueuesList) {
+        executorServices.computeIfAbsent(pair.left, n -> 
IoTDBThreadPoolFactory.newCachedThreadPool(
+                "LogDispatcher-" + member.getName() + "-" + 
ClusterUtils.nodeToString(pair.left)))
+            .submit(newDispatcherThread(pair.left, pair.right));
       }
     }
   }
 
   @TestOnly
   public void close() throws InterruptedException {
-    executorService.shutdownNow();
-    executorService.awaitTermination(10, TimeUnit.SECONDS);
-  }
-
-  private ByteBuffer serializeTask(SendLogRequest request) {
-    ByteBuffer byteBuffer = request.getVotingLog().getLog().serialize();
-    request.getVotingLog().getLog().setByteSize(byteBuffer.capacity());
-    if (clusterConfig.isUseVGRaft()) {
-      request.getAppendEntryRequest().setEntryHash(byteBuffer.hashCode());
+    for (Entry<Node, ExecutorService> entry : executorServices.entrySet()) {
+      ExecutorService value = entry.getValue();
+      value.shutdownNow();
+      value.awaitTermination(10, TimeUnit.SECONDS);
     }
-    return byteBuffer;
   }
 
   protected SendLogRequest transformRequest(Node node, SendLogRequest request) 
{
@@ -125,12 +124,29 @@ public class LogDispatcher {
     return newRequest;
   }
 
-  public void offer(SendLogRequest request) {
-    // do serialization here to avoid taking LogManager for too long
-    if (!nodesLogQueues.isEmpty()) {
-      SendLogRequest finalRequest = request;
-      request.serializedLogFuture = serializationService.submit(() -> 
serializeTask(finalRequest));
+
+  private boolean addToQueue(BlockingQueue<SendLogRequest> nodeLogQueue, 
SendLogRequest request) {
+    if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
+      long waitStart = System.currentTimeMillis();
+      long waitTime = 1;
+      while (System.currentTimeMillis() - waitStart < 
clusterConfig.getConnectionTimeoutInMS()) {
+        if (nodeLogQueue.add(request)) {
+          return true;
+        } else {
+          try {
+            member.getLogManager().wait(waitTime);
+            waitTime *= 2;
+          } catch (InterruptedException e) {
+            logger.warn("Unexpected interruption");
+          }
+        }
+      }
+      return false;
+    } else {
+      return nodeLogQueue.add(request);
     }
+  }
+  public void offer(SendLogRequest request) {
 
     long startTime = 
Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
     request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
@@ -138,34 +154,25 @@ public class LogDispatcher {
     if (clusterConfig.isUseVGRaft()) {
       verifiers = member.getTrustValueHolder().chooseVerifiers();
     }
-    for (Entry<Node, BlockingQueue<SendLogRequest>> entry : 
nodesLogQueues.entrySet()) {
-      boolean nodeEnabled = this.nodesEnabled.getOrDefault(entry.getKey(), 
false);
-      if (!nodeEnabled) {
+
+    for (Pair<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueuesList) 
{
+      if (nodesEnabled != null && !this.nodesEnabled.getOrDefault(entry.left, 
false)) {
         continue;
       }
 
-      request = transformRequest(entry.getKey(), request);
-      if (clusterConfig.isUseVGRaft() && ClusterUtils.isNodeIn(entry.getKey(), 
verifiers)) {
+      if (clusterConfig.isUseVGRaft() && ClusterUtils.isNodeIn(entry.left, 
verifiers)) {
+        request = transformRequest(entry.left, request);
         request.setVerifier(true);
       }
 
-      BlockingQueue<SendLogRequest> nodeLogQueue = entry.getValue();
+      BlockingQueue<SendLogRequest> nodeLogQueue = entry.right;
       try {
-        boolean addSucceeded;
-        if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
-          addSucceeded =
-              nodeLogQueue.offer(
-                  request,
-                  
ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(),
-                  TimeUnit.MILLISECONDS);
-        } else {
-          addSucceeded = nodeLogQueue.add(request);
-        }
+        boolean addSucceeded = addToQueue(nodeLogQueue, request);
 
         if (!addSucceeded) {
           logger.debug(
               "Log queue[{}] of {} is full, ignore the request to this node",
-              entry.getKey(),
+              entry.left,
               member.getName());
         } else {
           request.setEnqueueTime(System.nanoTime());
@@ -173,10 +180,8 @@ public class LogDispatcher {
       } catch (IllegalStateException e) {
         logger.debug(
             "Log queue[{}] of {} is full, ignore the request to this node",
-            entry.getKey(),
+            entry.left,
             member.getName());
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
       }
     }
     
Statistic.LOG_DISPATCHER_LOG_ENQUEUE.calOperationCostTimeFromStart(startTime);
@@ -187,17 +192,6 @@ public class LogDispatcher {
     }
   }
 
-  BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
-    BlockingQueue<SendLogRequest> logBlockingQueue;
-    logBlockingQueue =
-        new ArrayBlockingQueue<>(
-            
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
-    for (int i = 0; i < bindingThreadNum; i++) {
-      executorService.submit(newDispatcherThread(node, logBlockingQueue));
-    }
-    return logBlockingQueue;
-  }
-
   DispatcherThread newDispatcherThread(Node node, 
BlockingQueue<SendLogRequest> logBlockingQueue) {
     return new DispatcherThread(node, logBlockingQueue);
   }
@@ -315,8 +309,6 @@ public class LogDispatcher {
       baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
     }
 
-
-
     @Override
     public void run() {
       if (logger.isDebugEnabled()) {
@@ -347,14 +339,20 @@ public class LogDispatcher {
       logger.info("Dispatcher exits");
     }
 
-    protected void serializeEntries() throws ExecutionException, 
InterruptedException {
+    protected void serializeEntries() throws InterruptedException {
       for (SendLogRequest request : currBatch) {
         
Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
             request.getVotingLog().getLog().getEnqueueTime());
         
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE.calOperationCostTimeFromStart(
             request.getVotingLog().getLog().getCreateTime());
         long start = 
Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
-        request.getAppendEntryRequest().entry = 
request.serializedLogFuture.get();
+        request.getAppendEntryRequest().entry = 
request.getVotingLog().getLog().serialize();
+        request.getVotingLog().getLog()
+            .setByteSize(request.getAppendEntryRequest().entry.capacity());
+        if (clusterConfig.isUseVGRaft()) {
+          request.getAppendEntryRequest()
+              .setEntryHash(request.getAppendEntryRequest().entry.hashCode());
+        }
         
Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
       }
     }
@@ -376,17 +374,7 @@ public class LogDispatcher {
     private void appendEntriesSync(
         List<ByteBuffer> logList, AppendEntriesRequest request, 
List<SendLogRequest> currBatch) {
 
-      long startTime = 
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
-      if (!member.waitForPrevLog(peerInfo, 
currBatch.get(0).getVotingLog().getLog())) {
-        logger.warn(
-            "{}: node {} timed out when appending {}",
-            member.getName(),
-            receiver,
-            currBatch.get(0).getVotingLog());
-        return;
-      }
-      
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
-
+      long startTime;
       AsyncMethodCallback<AppendEntryResult> handler = new 
AppendEntriesHandler(currBatch);
       startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
       try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index edc933664b..848a61c967 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -19,26 +19,20 @@
 
 package org.apache.iotdb.cluster.log;
 
-import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.expr.vgraft.TrustValueHolder;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
 
 public class VotingLog {
   protected Log log;
-  protected Set<Integer> stronglyAcceptedNodeIds;
   protected Set<Integer> weaklyAcceptedNodeIds;
   protected Set<Integer> failedNodeIds;
   protected Set<byte[]> signatures;
   public AtomicLong acceptedTime;
+  private boolean hasFailed;
 
   public VotingLog(Log log, int groupSize) {
     this.log = log;
-    stronglyAcceptedNodeIds = new HashSet<>(groupSize);
     weaklyAcceptedNodeIds = new HashSet<>(groupSize);
     acceptedTime = new AtomicLong();
     failedNodeIds = new HashSet<>(groupSize);
@@ -47,7 +41,6 @@ public class VotingLog {
 
   public VotingLog(VotingLog another) {
     this.log = another.log;
-    this.stronglyAcceptedNodeIds = another.stronglyAcceptedNodeIds;
     this.weaklyAcceptedNodeIds = another.weaklyAcceptedNodeIds;
     this.acceptedTime = another.acceptedTime;
     this.failedNodeIds = another.failedNodeIds;
@@ -62,36 +55,6 @@ public class VotingLog {
     this.log = log;
   }
 
-  public Set<Integer> getStronglyAcceptedNodeIds() {
-    return stronglyAcceptedNodeIds;
-  }
-
-  public boolean onStronglyAccept(long index, long term, Node acceptingNode, 
int quorumSize,
-      ByteBuffer signature, int nodeNum) {
-    if (getLog().getCurrLogIndex() <= index
-        && getLog().getCurrLogTerm() == term) {
-      synchronized (this) {
-        getStronglyAcceptedNodeIds().add(acceptingNode.nodeIdentifier);
-        if (signature != null) {
-          signatures.add(Arrays.copyOfRange(signature.array(), 
signature.arrayOffset(),
-              signature.arrayOffset() + signature.remaining()));
-        }
-      }
-
-      if (getStronglyAcceptedNodeIds().size()
-          + getWeaklyAcceptedNodeIds().size()
-          >= quorumSize) {
-        acceptedTime.set(System.nanoTime());
-      }
-      if (ClusterDescriptor.getInstance().getConfig().isUseVGRaft()) {
-        return signatures.size() > TrustValueHolder.verifierGroupSize(nodeNum) 
/ 2;
-      } else {
-        return getStronglyAcceptedNodeIds().size() >= quorumSize;
-      }
-    }
-    return false;
-  }
-
   public Set<Integer> getWeaklyAcceptedNodeIds() {
     return weaklyAcceptedNodeIds;
   }
@@ -108,4 +71,12 @@ public class VotingLog {
   public Set<byte[]> getSignatures() {
     return signatures;
   }
+
+  public boolean isHasFailed() {
+    return hasFailed;
+  }
+
+  public void setHasFailed(boolean hasFailed) {
+    this.hasFailed = hasFailed;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 71c1ec4135..29a72f1612 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -20,6 +20,11 @@
 package org.apache.iotdb.cluster.log;
 
 import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -36,30 +41,22 @@ public class VotingLogList {
 
   private static final Logger logger = 
LoggerFactory.getLogger(VotingLogList.class);
 
-  private List<VotingLog> logList = new ArrayList<>();
   private volatile long currTerm = -1;
   private int quorumSize;
   private RaftMember member;
+  private Map<Integer, Long> stronglyAcceptedIndices = new 
ConcurrentHashMap<>();
 
   public VotingLogList(int quorumSize, RaftMember member) {
     this.quorumSize = quorumSize;
     this.member = member;
   }
 
-  /**
-   * Insert a voting entry into the list. Notice the logs must be inserted in 
order of index, as
-   * they are inserted as soon as created
-   *
-   * @param log
-   */
-  public synchronized void insert(VotingLog log) {
-    if (log.getLog().getCurrLogTerm() != currTerm) {
-      clear();
-      currTerm = log.getLog().getCurrLogTerm();
-    }
-    logList.add(log);
-  }
 
+  private long computeNewCommitIndex() {
+    List<Entry<Integer, Long>> nodeIndices = new 
ArrayList<>(stronglyAcceptedIndices.entrySet());
+    nodeIndices.sort(Entry.comparingByValue());
+    return nodeIndices.get(quorumSize - 1).getValue();
+  }
   /**
    * When an entry of index-term is strongly accepted by a node of 
acceptingNodeId, record the id in
    * all entries whose index <= the accepted entry. If any entry is accepted 
by a quorum, remove it
@@ -73,55 +70,35 @@ public class VotingLogList {
    */
   public void onStronglyAccept(long index, long term, Node acceptingNode, 
ByteBuffer signature) {
     logger.debug("{}-{} is strongly accepted by {}", index, term, 
acceptingNode);
-    int lastEntryIndexToCommit = -1;
 
-    List<VotingLog> acceptedLogs;
-    synchronized (this) {
-      for (int i = 0, logListSize = logList.size(); i < logListSize; i++) {
-        VotingLog votingLog = logList.get(i);
-        if (votingLog.onStronglyAccept(index, term, acceptingNode, quorumSize,
-            signature, member.getAllNodes().size())) {
-          lastEntryIndexToCommit = i;
-        }
-        if (votingLog.getLog().getCurrLogIndex() > index) {
-          break;
-        }
+    stronglyAcceptedIndices.compute(acceptingNode.nodeIdentifier, (nid, idx) 
-> {
+      if (idx == null) {
+        return index;
+      } else {
+        return Math.max(index, idx);
       }
+    });
 
-      List<VotingLog> tmpAcceptedLogs = logList.subList(0, 
lastEntryIndexToCommit + 1);
-      acceptedLogs = new ArrayList<>(tmpAcceptedLogs);
-      tmpAcceptedLogs.clear();
-    }
-
-    if (lastEntryIndexToCommit != -1) {
-      Log lastLog = acceptedLogs.get(acceptedLogs.size() - 1).log;
+    long newCommitIndex = computeNewCommitIndex();
+    if (newCommitIndex > member.getCommitIndex()) {
       synchronized (member.getLogManager()) {
         try {
-          member.getLogManager().commitTo(lastLog.getCurrLogIndex());
+          member.getLogManager().commitTo(newCommitIndex);
         } catch (LogExecutionException e) {
-          logger.error("Fail to commit {}", lastLog, e);
-        }
-      }
-
-      for (VotingLog acceptedLog : acceptedLogs) {
-        synchronized (acceptedLog) {
-          acceptedLog.acceptedTime.set(System.nanoTime());
-          acceptedLog.notifyAll();
-        }
-        if 
(ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
-          member.removeAppendLogHandler(
-              new Pair<>(
-                  acceptedLog.getLog().getCurrLogIndex(), 
acceptedLog.getLog().getCurrLogTerm()));
+          logger.error("Fail to commit {}", newCommitIndex, e);
         }
       }
     }
   }
 
-  public synchronized void clear() {
-    logList.clear();
-  }
-
-  public int size() {
-    return logList.size();
+  public int totalAcceptedNodeNum(VotingLog log) {
+    long index = log.getLog().getCurrLogIndex();
+    int num = log.getWeaklyAcceptedNodeIds().size();
+    for (Entry<Integer, Long> entry : stronglyAcceptedIndices.entrySet()) {
+      if (entry.getValue() >= index) {
+        num ++;
+      }
+    }
+    return num;
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
index 8b87c100fb..f60523a514 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
@@ -103,6 +103,10 @@ public class BlockingLogAppender implements LogAppender {
     if (success != -1) {
       logger.debug("{} append a new log {}", member.getName(), log);
       result.status = Response.RESPONSE_STRONG_ACCEPT;
+      if (request.isSetSubReceivers() && !request.getSubReceivers().isEmpty()) 
{
+        request.entry.rewind();
+        member.getLogRelay().offer(request, request.subReceivers);
+      }
     } else {
       // the incoming log points to an illegal position, reject it
       result.status = Response.RESPONSE_LOG_MISMATCH;
@@ -118,12 +122,13 @@ public class BlockingLogAppender implements LogAppender {
     Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
     long lastLogIndex = logManager.getLastLogIndex();
     Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
+    long waitTime = 10;
     while (lastLogIndex < prevLogIndex
         && alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS()) {
       try {
         // each time new logs are appended, this will be notified
         synchronized (logUpdateCondition) {
-          logUpdateCondition.wait(1);
+          logUpdateCondition.wait(waitTime);
         }
         lastLogIndex = logManager.getLastLogIndex();
         if (lastLogIndex >= prevLogIndex) {
@@ -133,6 +138,7 @@ public class BlockingLogAppender implements LogAppender {
         Thread.currentThread().interrupt();
         return false;
       }
+      waitTime = waitTime * 2;
       alreadyWait = System.currentTimeMillis() - waitStart;
     }
 
@@ -190,29 +196,11 @@ public class BlockingLogAppender implements LogAppender {
               logManager.maybeAppend(
                   request.prevLogIndex, request.prevLogTerm, 
request.leaderCommit, logs);
           
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
-          if (resp != -1) {
-            if (logger.isDebugEnabled()) {
-              logger.debug(
-                  "{} append a new log list {}, commit to {}",
-                  member.getName(),
-                  logs,
-                  request.leaderCommit);
-            }
-            result.status = Response.RESPONSE_STRONG_ACCEPT;
-            result.setLastLogIndex(logManager.getLastLogIndex());
-            result.setLastLogTerm(logManager.getLastLogTerm());
-
-            if (request.isSetSubReceivers()) {
-              request.entries.forEach(Buffer::rewind);
-              member.getLogRelay().offer(request, request.subReceivers);
-            }
-          } else {
-            // the incoming log points to an illegal position, reject it
-            result.status = Response.RESPONSE_LOG_MISMATCH;
-          }
+
           break;
         }
       }
+
       try {
         TimeUnit.MILLISECONDS.sleep(
             
IoTDBDescriptor.getInstance().getConfig().getCheckPeriodWhenInsertBlocked());
@@ -225,6 +213,27 @@ public class BlockingLogAppender implements LogAppender {
         Thread.currentThread().interrupt();
       }
     }
+
+    if (resp != -1) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "{} append a new log list {}, commit to {}",
+            member.getName(),
+            logs,
+            request.leaderCommit);
+      }
+      result.status = Response.RESPONSE_STRONG_ACCEPT;
+      result.setLastLogIndex(logManager.getLastLogIndex());
+      result.setLastLogTerm(logManager.getLastLogTerm());
+
+      if (request.isSetSubReceivers()) {
+        request.entries.forEach(Buffer::rewind);
+        member.getLogRelay().offer(request, request.subReceivers);
+      }
+    } else {
+      // the incoming log points to an illegal position, reject it
+      result.status = Response.RESPONSE_LOG_MISMATCH;
+    }
     return result;
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index bcd5e6c03c..a717bf4543 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.log.applier;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
@@ -33,6 +34,7 @@ import 
org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
 import org.apache.iotdb.db.service.IoTDB;
 
 import org.slf4j.Logger;
@@ -130,6 +132,8 @@ public class AsyncDataLogApplier implements LogApplier {
         // unreachable
       }
       return partialPath;
+    } else if (log instanceof FragmentedLog) {
+      return new PartialPath("dummy", false);
     }
     return null;
   }
@@ -164,6 +168,8 @@ public class AsyncDataLogApplier implements LogApplier {
     } else if (plan instanceof CreateTimeSeriesPlan) {
       PartialPath path = ((CreateTimeSeriesPlan) plan).getPath();
       sgPath = IoTDB.schemaProcessor.getBelongedStorageGroup(path);
+    } else if (plan instanceof DummyPlan) {
+      sgPath = new PartialPath("dummy", false);
     }
     return sgPath;
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 7a4160a2c7..f17ce02b13 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
@@ -87,7 +88,7 @@ public class DataLogApplier extends BaseApplier {
                 closeFileLog.getPartitionId(),
                 closeFileLog.isSeq(),
                 false);
-      } else {
+      } else if (!(log instanceof FragmentedLog)) {
         logger.error("Unsupported log: {}", log);
       }
     } catch (Exception e) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index a3ab0bb2be..1a22b26a5d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -661,6 +661,11 @@ public abstract class RaftLogManager {
       // success or fail together approximately.
       // TODO: make it real atomic
       getCommittedEntryManager().append(entries);
+      for (Log entry : entries) {
+        synchronized (entry) {
+          entry.notifyAll();
+        }
+      }
       Log lastLog = entries.get(entries.size() - 1);
       getUnCommittedEntryManager().stableTo(lastLog.getCurrLogIndex());
       commitIndex = lastLog.getCurrLogIndex();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 465c4f0dd5..a6fda19576 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -124,7 +124,6 @@ public class AsynchronousSequencer implements LogSequencer {
             log.getReceiveTime());
         log.setCreateTime(System.nanoTime());
         if (member.getAllNodes().size() > 1) {
-          member.getVotingLogList().insert(sendLogRequest.getVotingLog());
           member.getLogDispatcher().offer(sendLogRequest);
         }
         
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index 9a54c626d8..07ce3a1cc7 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -92,7 +92,6 @@ public class SynchronousSequencer implements LogSequencer {
           startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
           log.setCreateTime(System.nanoTime());
           if (member.getAllNodes().size() > 1) {
-            member.getVotingLogList().insert(sendLogRequest.getVotingLog());
             member.getLogDispatcher().offer(sendLogRequest);
           }
           
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 692d89c56e..c9a97388d4 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -73,8 +73,7 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<AppendEntryRe
     if (Timer.ENABLE_INSTRUMENTING) {
       
Statistic.RAFT_SENDER_SEND_LOG_ASYNC.calOperationCostTimeFromStart(sendStart);
     }
-    if (log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
-      // the request already failed
+    if (log.isHasFailed()) {
       return;
     }
 
@@ -123,10 +122,6 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<AppendEntryRe
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
       synchronized (log) {
         log.getWeaklyAcceptedNodeIds().add(trueReceiver.nodeIdentifier);
-        if (log.getWeaklyAcceptedNodeIds().size() + 
log.getStronglyAcceptedNodeIds().size()
-            >= quorumSize) {
-          log.acceptedTime.set(System.nanoTime());
-        }
         log.notifyAll();
       }
     } else {
@@ -173,7 +168,7 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<AppendEntryRe
       log.getFailedNodeIds().add(trueReceiver.nodeIdentifier);
       if (log.getFailedNodeIds().size() > quorumSize) {
         // quorum members have failed, there is no need to wait for others
-        log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
+        log.setHasFailed(true);
         log.notifyAll();
         if 
(ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
           member.removeAppendLogHandler(
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 670f86a068..1ba9f12415 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -894,9 +894,6 @@ public class MetaGroupMember extends RaftMember implements 
IService, MetaGroupMe
       addNodeLog.setNewNode(newNode);
 
       logManager.append(addNodeLog);
-      if (getAllNodes().size() > 1) {
-        votingLogList.insert(votingLog);
-      }
     }
 
     int retryTime = 0;
@@ -909,7 +906,7 @@ public class MetaGroupMember extends RaftMember implements 
IService, MetaGroupMe
       AppendLogResult result = sendLogToFollowers(votingLog);
       switch (result) {
         case OK:
-          commitLog(addNodeLog);
+          waitApply(addNodeLog);
           logger.info("{}: Join request of {} is accepted", name, newNode);
 
           synchronized (partitionTable) {
@@ -1649,9 +1646,6 @@ public class MetaGroupMember extends RaftMember 
implements IService, MetaGroupMe
       removeNodeLog.setRemovedNode(target);
 
       logManager.append(removeNodeLog);
-      if (getAllNodes().size() > 1) {
-        votingLogList.insert(votingLog);
-      }
     }
 
     int retryTime = 0;
@@ -1664,7 +1658,7 @@ public class MetaGroupMember extends RaftMember 
implements IService, MetaGroupMe
       AppendLogResult result = sendLogToFollowers(votingLog);
       switch (result) {
         case OK:
-          commitLog(removeNodeLog);
+          waitApply(removeNodeLog);
           logger.info("{}: Removal request of {} is accepted", name, target);
           return Response.RESPONSE_AGREE;
         case TIME_OUT:
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 1bb143536f..027b706af0 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -151,7 +151,7 @@ public abstract class RaftMember implements RaftMemberMBean 
{
   public static boolean USE_LOG_DISPATCHER = true;
   private static final boolean ENABLE_WEAK_ACCEPTANCE =
       ClusterDescriptor.getInstance().getConfig().isEnableWeakAcceptance();
-  public static boolean USE_CRAFT = false;
+  public static boolean USE_CRAFT = 
ClusterDescriptor.getInstance().getConfig().isUseCRaft();
 
   protected LogAppenderFactory appenderFactory = new 
BlockingLogAppender.Factory();
   protected static final LogSequencerFactory SEQUENCER_FACTORY =
@@ -679,12 +679,10 @@ public abstract class RaftMember implements 
RaftMemberMBean {
     log.setByteSize(logByteSize);
     
Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
 
-    long appendStartTime = 
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
     AppendEntryResult result = getLogAppender().appendEntry(request, log);
     if (ClusterDescriptor.getInstance().getConfig().isUseVGRaft() && 
isVerifier) {
       result.setSignature(KeyManager.INSTANCE.getNodeSignature());
     }
-    
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(appendStartTime);
 
     logger.debug("{} AppendEntryRequest of {} completed with result {}", name, 
log, result.status);
 
@@ -736,9 +734,7 @@ public abstract class RaftMember implements RaftMemberMBean 
{
 
     
Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
 
-    long appendStartTime = 
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
     response = getLogAppender().appendEntries(request, logs);
-    
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(appendStartTime);
 
     if (logger.isDebugEnabled()) {
       logger.debug(
@@ -1219,9 +1215,6 @@ public abstract class RaftMember implements 
RaftMemberMBean {
           log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
           logManager.append(log);
           votingLog = buildVotingLog(log);
-          if (getAllNodes().size() > 1) {
-            votingLogList.insert(votingLog);
-          }
           break;
         }
       }
@@ -1307,9 +1300,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
           return includeLogNumbersInStatus(
               StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), log);
         case OK:
-          logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
           startTime = 
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
-          commitLog(log);
+          waitApply(log);
           
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
           
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
               log.getCreateTime());
@@ -1770,64 +1762,48 @@ public abstract class RaftMember implements 
RaftMemberMBean {
    */
   @SuppressWarnings({"java:S2445"}) // safe synchronized
   private void waitAppendResultLoop(VotingLog log, int quorumSize) {
-    int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
-    int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
-    int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+    int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
     long nextTimeToPrint = 5000;
 
     long waitStart = System.nanoTime();
     long alreadyWait = 0;
 
     String threadBaseName = Thread.currentThread().getName();
-    synchronized (log) {
+    long waitTime = 1;
+    synchronized (log.getLog()) {
       while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
           || (!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
-          && stronglyAcceptedNodeNum < quorumSize
+          && getCommitIndex() < log.getLog().getCurrLogIndex()
           || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
           && log.getSignatures().size() < 
TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
           && (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
-          || (totalAccepted < quorumSize)
-          || votingLogList.size() > config.getMaxNumOfLogsInMem())
+          || (totalAccepted < quorumSize))
           && alreadyWait < ClusterConstant.getWriteOperationTimeoutMS()
-          && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
+          && !log.isHasFailed()) {
         try {
-          log.wait(1);
+          log.getLog().wait(waitTime);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           logger.warn("Unexpected interruption when sending a log", e);
         }
-        if (logger.isDebugEnabled()) {
-          Thread.currentThread()
-              .setName(
-                  threadBaseName
-                      + "-waiting-"
-                      + log.getLog().getCurrLogIndex()
-                      + "-"
-                      + log.getStronglyAcceptedNodeIds()
-                      + "-"
-                      + log.getWeaklyAcceptedNodeIds());
-        }
+        waitTime = waitTime * 2;
 
         alreadyWait = (System.nanoTime() - waitStart) / 1000000;
         if (alreadyWait > nextTimeToPrint) {
           logger.info(
-              "Still not receive enough votes for {}, strongly accepted {}, 
weakly "
-                  + "accepted {}, voting logs {}, wait {}ms, wait to sequence 
{}ms, wait to enqueue "
+              "Still not receive enough votes for {}, weakly "
+                  + "accepted {}, wait {}ms, wait to sequence {}ms, wait to 
enqueue "
                   + "{}ms, wait to accept "
                   + "{}ms",
               log,
-              log.getStronglyAcceptedNodeIds(),
               log.getWeaklyAcceptedNodeIds(),
-              votingLogList.size(),
               alreadyWait,
               (log.getLog().getSequenceStartTime() - waitStart) / 1000000,
               (log.getLog().getEnqueueTime() - waitStart) / 1000000,
               (log.acceptedTime.get() - waitStart) / 1000000);
           nextTimeToPrint *= 2;
         }
-        stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
-        weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
-        totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+        totalAccepted = votingLogList.totalAcceptedNodeNum(log);
       }
     }
     if (logger.isDebugEnabled()) {
@@ -1836,9 +1812,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
 
     if (alreadyWait > 15000) {
       logger.info(
-          "Slow entry {}, strongly accepted {}, weakly " + "accepted {}, 
waited time {}ms",
+          "Slow entry {}, weakly " + "accepted {}, waited time {}ms",
           log,
-          log.getStronglyAcceptedNodeIds(),
           log.getWeaklyAcceptedNodeIds(),
           alreadyWait);
     }
@@ -1852,26 +1827,20 @@ public abstract class RaftMember implements 
RaftMemberMBean {
       VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, 
int quorumSize) {
     // wait for the followers to vote
     long startTime = 
Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
-
-    int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
-    int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
-    int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+    int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
 
     if (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
         || ((!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
-        && stronglyAcceptedNodeNum < quorumSize
+        && log.getLog().getCurrLogIndex() > getCommitIndex()
         || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
         && log.getSignatures().size() < 
TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
         && (!ENABLE_WEAK_ACCEPTANCE
-        || (totalAccepted < quorumSize)
-        || votingLogList.size() > config.getMaxNumOfLogsInMem())
-        && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE))) {
+        || (totalAccepted < quorumSize))
+        && !log.isHasFailed())) {
 
       waitAppendResultLoop(log, quorumSize);
     }
-    stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
-    weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
-    totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+    totalAccepted = votingLogList.totalAcceptedNodeNum(log);
 
     if (log.acceptedTime.get() != 0) {
       
Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
@@ -1888,13 +1857,13 @@ public abstract class RaftMember implements 
RaftMemberMBean {
       return AppendLogResult.LEADERSHIP_STALE;
     }
 
-    // cannot get enough agreements within a certain amount of time
-    if (stronglyAcceptedNodeNum < quorumSize && totalAccepted < quorumSize) {
-      return AppendLogResult.TIME_OUT;
+    if (totalAccepted >= quorumSize && log.getLog().getCurrLogIndex() > 
getCommitIndex()) {
+      return AppendLogResult.WEAK_ACCEPT;
     }
 
-    if (stronglyAcceptedNodeNum < quorumSize && totalAccepted >= quorumSize) {
-      return AppendLogResult.WEAK_ACCEPT;
+    // cannot get enough agreements within a certain amount of time
+    if (log.getLog().getCurrLogIndex() > getCommitIndex()) {
+      return AppendLogResult.TIME_OUT;
     }
 
     // voteCounter has counted down to zero
@@ -1902,24 +1871,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
   }
 
   @SuppressWarnings("java:S2445")
-  protected void commitLog(Log log) throws LogExecutionException {
+  protected void waitApply(Log log) throws LogExecutionException {
     long startTime;
-    //    if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
-    //      startTime =
-    // 
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
-    //      synchronized (logManager) {
-    //        
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
-    //            startTime);
-    //        if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
-    //          startTime = 
Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
-    //          logManager.commitTo(log.getCurrLogIndex());
-    //
-    // 
Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
-    //        }
-    //        startTime = 
Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
-    //      }
-    //      
Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
-    //    }
 
     // when using async applier, the log here may not be applied. To return 
the execution
     // result, we must wait until the log is applied.
@@ -2078,7 +2031,7 @@ public abstract class RaftMember implements 
RaftMemberMBean {
       // single node group, no followers
       long startTime = 
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
       logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
-      commitLog(log.getLog());
+      waitApply(log.getLog());
       
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
       return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), 
log.getLog());
     }
@@ -2105,7 +2058,7 @@ public abstract class RaftMember implements 
RaftMemberMBean {
         case OK:
           startTime = 
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
           logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
-          commitLog(log.getLog());
+          waitApply(log.getLog());
           
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
           
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
           return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), 
log.getLog());
@@ -2169,7 +2122,7 @@ public abstract class RaftMember implements 
RaftMemberMBean {
 
     AppendEntryRequest request = buildAppendEntryRequest(log.getLog(), true);
     log.getFailedNodeIds().clear();
-    log.getStronglyAcceptedNodeIds().remove(Integer.MAX_VALUE);
+    log.setHasFailed(false);
 
     try {
       if (allNodes.size() > 2) {
diff --git 
a/cluster/src/test_back/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
 
b/cluster/src/test_back/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index f69f144054..538040ca72 100644
--- 
a/cluster/src/test_back/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ 
b/cluster/src/test_back/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -70,7 +70,6 @@ public class AppendNodeEntryHandlerTest {
     try {
       ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
       VotingLog votingLog = new VotingLog(log, 10);
-      member.getVotingLogList().insert(votingLog);
       PeerInfo peerInfo = new PeerInfo(1);
       for (int i = 0; i < 10; i++) {
         AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
@@ -85,11 +84,7 @@ public class AppendNodeEntryHandlerTest {
         result.setStatus(resp);
         new Thread(() -> handler.onComplete(result)).start();
       }
-      while (votingLog.getStronglyAcceptedNodeIds().size() < 5) {
-        synchronized (votingLog) {
-          votingLog.wait(1);
-        }
-      }
+
       assertEquals(-1, receiverTerm.get());
       assertFalse(leadershipStale.get());
       assertEquals(5, votingLog.getStronglyAcceptedNodeIds().size());
@@ -104,7 +99,6 @@ public class AppendNodeEntryHandlerTest {
     AtomicBoolean leadershipStale = new AtomicBoolean(false);
     Log log = new TestLog();
     VotingLog votingLog = new VotingLog(log, 10);
-    member.getVotingLogList().insert(votingLog);
     PeerInfo peerInfo = new PeerInfo(1);
 
     for (int i = 0; i < 3; i++) {

Reply via email to