This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch nbraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 44427a98301e6dd02e60dfc51fd359e932e97e3b
Author: jt <[email protected]>
AuthorDate: Fri Jul 8 21:44:26 2022 +0800

    nbraft snapshot
---
 .../org/apache/iotdb/cluster/config/ClusterConfig.java  |  9 +++++++++
 .../apache/iotdb/cluster/config/ClusterDescriptor.java  |  4 ++++
 .../iotdb/cluster/log/FragmentedLogDispatcher.java      |  6 +++---
 .../org/apache/iotdb/cluster/log/LogDispatcher.java     |  2 +-
 .../java/org/apache/iotdb/cluster/log/VotingLog.java    |  6 +++---
 .../org/apache/iotdb/cluster/log/VotingLogList.java     |  6 ++----
 .../cluster/log/appender/SlidingWindowLogAppender.java  | 16 ++++++++++++++--
 .../iotdb/cluster/log/applier/DataLogApplier.java       |  3 ++-
 .../iotdb/cluster/log/logtypes/PhysicalPlanLog.java     | 14 +++++++++++++-
 .../server/handlers/caller/AppendNodeEntryHandler.java  | 16 +++++++++++-----
 .../apache/iotdb/cluster/server/member/RaftMember.java  | 17 +++++++++--------
 .../org/apache/iotdb/cluster/server/monitor/Timer.java  | 12 ++++++++++++
 12 files changed, 83 insertions(+), 28 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 187509b2ef..fb2bbcb8e9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -185,6 +185,7 @@ public class ClusterConfig {
   private boolean useAsyncSequencing = true;
 
   private boolean useFollowerSlidingWindow = true;
+  private int slidingWindowSize = 10000;
 
   private boolean enableWeakAcceptance = true;
 
@@ -600,4 +601,12 @@ public class ClusterConfig {
   public void setOptimizeIndirectBroadcasting(boolean 
optimizeIndirectBroadcasting) {
     this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting;
   }
+
+  public int getSlidingWindowSize() {
+    return slidingWindowSize;
+  }
+
+  public void setSlidingWindowSize(int slidingWindowSize) {
+    this.slidingWindowSize = slidingWindowSize;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index a089e2d75a..765ddbdc1b 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -323,6 +323,10 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "use_follower_sliding_window",
                 String.valueOf(config.isUseFollowerSlidingWindow()))));
+    config.setSlidingWindowSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "sliding_window_size", 
String.valueOf(config.getSlidingWindowSize()))));
 
     config.setEnableWeakAcceptance(
         Boolean.parseBoolean(
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
index f8954effdc..c6f3c8498a 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -45,13 +45,13 @@ public class FragmentedLogDispatcher extends LogDispatcher {
 
     long startTime = 
Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
     request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
-    for (int i = 0; i < nodesLogQueues.size(); i++) {
-      BlockingQueue<SendLogRequest> nodeLogQueue = nodesLogQueues.get(i);
+    int i = 0;
+    for (BlockingQueue<SendLogRequest> nodeLogQueue : nodesLogQueues.values()) 
{
       SendLogRequest fragmentedRequest = new SendLogRequest(request);
       fragmentedRequest.setVotingLog(new VotingLog(request.getVotingLog()));
       fragmentedRequest
           .getVotingLog()
-          .setLog(new FragmentedLog((FragmentedLog) 
request.getVotingLog().getLog(), i));
+          .setLog(new FragmentedLog((FragmentedLog) 
request.getVotingLog().getLog(), i++));
       try {
         boolean addSucceeded;
         if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 3e659a1468..23729063ff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -80,7 +80,7 @@ public class LogDispatcher {
   Map<Node, Boolean> nodesEnabled = new HashMap<>();
   ExecutorService executorService;
   private static ExecutorService serializationService =
-      IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
+      IoTDBThreadPoolFactory.newFixedThreadPool(
           Runtime.getRuntime().availableProcessors(), "DispatcherEncoder");
 
   public static int bindingThreadNum = 
clusterConfig.getDispatcherBindingThreadNum();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index ebfdccc999..447ff36363 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -32,10 +32,10 @@ public class VotingLog {
 
   public VotingLog(Log log, int groupSize) {
     this.log = log;
-    stronglyAcceptedNodeIds = new HashSet<>(groupSize);
-    weaklyAcceptedNodeIds = new HashSet<>(groupSize);
+    stronglyAcceptedNodeIds = new HashSet<>(groupSize * 2);
+    weaklyAcceptedNodeIds = new HashSet<>(groupSize * 2);
     acceptedTime = new AtomicLong();
-    failedNodeIds = new HashSet<>(groupSize);
+    failedNodeIds = new HashSet<>(groupSize * 2);
   }
 
   public VotingLog(VotingLog another) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 7b1dde2d3e..86721ed8dd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -78,9 +78,7 @@ public class VotingLogList {
         VotingLog votingLog = logList.get(i);
         if (votingLog.getLog().getCurrLogIndex() <= index
             && votingLog.getLog().getCurrLogTerm() == term) {
-          synchronized (votingLog) {
-            votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
-          }
+          votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
           if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
             lastEntryIndexToCommit = i;
           }
@@ -110,8 +108,8 @@ public class VotingLogList {
       }
 
       for (VotingLog acceptedLog : acceptedLogs) {
+        acceptedLog.acceptedTime.set(System.nanoTime());
         synchronized (acceptedLog) {
-          acceptedLog.acceptedTime.set(System.nanoTime());
           acceptedLog.notifyAll();
         }
         if 
(ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
index 1ab08449bb..c3b2339465 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
@@ -40,7 +40,7 @@ public class SlidingWindowLogAppender implements LogAppender {
 
   private static final Logger logger = 
LoggerFactory.getLogger(SlidingWindowLogAppender.class);
 
-  private int windowCapacity = 
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
+  private int windowCapacity = 
ClusterDescriptor.getInstance().getConfig().getSlidingWindowSize();
   private int windowLength = 0;
   private Log[] logWindow = new Log[windowCapacity];
   private long firstPosPrevIndex;
@@ -48,6 +48,7 @@ public class SlidingWindowLogAppender implements LogAppender {
 
   private RaftMember member;
   private RaftLogManager logManager;
+  private Object oowWaitCond = new Object();
 
   public SlidingWindowLogAppender(RaftMember member) {
     this.member = member;
@@ -144,6 +145,7 @@ public class SlidingWindowLogAppender implements 
LogAppender {
     for (int i = 1; i <= step; i++) {
       logWindow[windowCapacity - i] = null;
     }
+    windowLength -= step;
     firstPosPrevIndex = logManager.getLastLogIndex();
   }
 
@@ -197,7 +199,9 @@ public class SlidingWindowLogAppender implements 
LogAppender {
       retryTime = System.currentTimeMillis() - start;
       if (result.status == Response.RESPONSE_OUT_OF_WINDOW && retryTime < 
maxRetry) {
         try {
-          Thread.sleep(10);
+          synchronized (oowWaitCond) {
+            oowWaitCond.wait(1);
+          }
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           break;
@@ -219,6 +223,7 @@ public class SlidingWindowLogAppender implements 
LogAppender {
     long appendedPos = 0;
 
     AppendEntryResult result = new AppendEntryResult();
+    boolean flushed = false;
     synchronized (logManager) {
       int windowPos = (int) (log.getCurrLogIndex() - 
logManager.getLastLogIndex() - 1);
       if (windowPos < 0) {
@@ -238,6 +243,7 @@ public class SlidingWindowLogAppender implements 
LogAppender {
         checkLog(windowPos);
         if (windowPos == 0) {
           appendedPos = flushWindow(result, leaderCommit);
+          flushed = true;
         } else {
           result.status = Response.RESPONSE_WEAK_ACCEPT;
         }
@@ -250,6 +256,12 @@ public class SlidingWindowLogAppender implements 
LogAppender {
       }
     }
 
+    if (flushed) {
+      synchronized (oowWaitCond) {
+        oowWaitCond.notifyAll();
+      }
+    }
+
     if (appendedPos == -1) {
       // the incoming log points to an illegal position, reject it
       result.status = Response.RESPONSE_LOG_MISMATCH;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 7e0e033483..545c39ace7 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
@@ -91,7 +92,7 @@ public class DataLogApplier extends BaseApplier {
                 closeFileLog.getPartitionId(),
                 closeFileLog.isSeq(),
                 false);
-      } else {
+      } else if (!(log instanceof FragmentedLog)) {
         logger.error("Unsupported log: {}", log);
       }
     } catch (Exception e) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index 5d5793d350..2a2092fbbd 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -57,9 +57,21 @@ public class PhysicalPlanLog extends Log {
     return DEFAULT_BUFFER_SIZE;
   }
 
+  private ThreadLocal<PublicBAOS> baosThreadLocal = new ThreadLocal<>();
+
+  private PublicBAOS getSerializeOutputStream() {
+    PublicBAOS publicBAOS = baosThreadLocal.get();
+    if (publicBAOS == null) {
+      publicBAOS = new PublicBAOS(getDefaultBufferSize());
+      baosThreadLocal.set(publicBAOS);
+    }
+    publicBAOS.reset();
+    return publicBAOS;
+  }
+
   @Override
   public ByteBuffer serialize() {
-    PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultBufferSize());
+    PublicBAOS byteArrayOutputStream = getSerializeOutputStream();
     try (DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) PHYSICAL_PLAN.ordinal());
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 7f8c7df4f4..cbd0682f5a 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -90,6 +90,7 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<AppendEntryRe
     long resp = response.status;
 
     if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
+      long operationStartTime = 
Statistic.RAFT_SENDER_HANDLE_STRONG_ACCEPT.getOperationStartTime();
       member
           .getVotingLogList()
           .onStronglyAccept(
@@ -97,6 +98,7 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<AppendEntryRe
               log.getLog().getCurrLogTerm(),
               trueReceiver.nodeIdentifier);
       member.getPeer(trueReceiver).setMatchIndex(response.lastLogIndex);
+      
Statistic.RAFT_SENDER_HANDLE_STRONG_ACCEPT.calOperationCostTimeFromStart(operationStartTime);
     } else if (resp > 0) {
       // a response > 0 is the follower's term
       // the leader ship is stale, wait for the new leader's heartbeat
@@ -120,14 +122,18 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<AppendEntryRe
             new Pair<>(log.getLog().getCurrLogIndex(), 
log.getLog().getCurrLogTerm()));
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
-      synchronized (log) {
+      long operationStartTime = 
Statistic.RAFT_SENDER_HANDLE_WEAK_ACCEPT.getOperationStartTime();
+      synchronized (log.getWeaklyAcceptedNodeIds()) {
         log.getWeaklyAcceptedNodeIds().add(trueReceiver.nodeIdentifier);
-        if (log.getWeaklyAcceptedNodeIds().size() + 
log.getStronglyAcceptedNodeIds().size()
-            >= quorumSize) {
-          log.acceptedTime.set(System.nanoTime());
-        }
+      }
+      if (log.getWeaklyAcceptedNodeIds().size() + 
log.getStronglyAcceptedNodeIds().size()
+          >= quorumSize) {
+        log.acceptedTime.set(System.nanoTime());
+      }
+      synchronized (log) {
         log.notifyAll();
       }
+      
Statistic.RAFT_SENDER_HANDLE_WEAK_ACCEPT.calOperationCostTimeFromStart(operationStartTime);
     } else {
       // e.g., Response.RESPONSE_LOG_MISMATCH
       if (resp == RESPONSE_LOG_MISMATCH || resp == RESPONSE_OUT_OF_WINDOW) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0d605d3f46..9401d6c9cb 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -598,6 +598,14 @@ public abstract class RaftMember implements 
RaftMemberMBean {
     long operationStartTime = 
Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
     Thread.currentThread()
         .setName(getThreadBaseName() + "-appending-" + (request.prevLogIndex + 
1));
+    //    if (true) {
+    //      AppendEntryResult result = new AppendEntryResult();
+    //      result.setLastLogTerm(request.prevLogTerm);
+    //      result.setLastLogIndex(request.prevLogIndex + 1);
+    //      result.setStatus(Response.RESPONSE_STRONG_ACCEPT);
+    //      result.setHeader(getHeader());
+    //      return result;
+    //    }
     AppendEntryResult result = appendEntryInternal(request);
     
Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(operationStartTime);
     return result;
@@ -1761,14 +1769,7 @@ public abstract class RaftMember implements 
RaftMemberMBean {
           logger.warn("Unexpected interruption when sending a log", e);
         }
         Thread.currentThread()
-            .setName(
-                threadBaseName
-                    + "-waiting-"
-                    + log.getLog().getCurrLogIndex()
-                    + "-"
-                    + log.getStronglyAcceptedNodeIds()
-                    + "-"
-                    + log.getWeaklyAcceptedNodeIds());
+            .setName(threadBaseName + "-waiting-" + 
log.getLog().getCurrLogIndex());
         alreadyWait = (System.nanoTime() - waitStart) / 1000000;
         if (alreadyWait > nextTimeToPrint) {
           logger.info(
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 72892bdb56..4abde4d974 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -152,6 +152,18 @@ public class Timer {
         TIME_SCALE,
         true,
         RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_HANDLE_STRONG_ACCEPT(
+        RAFT_MEMBER_SENDER,
+        "handle strong accept",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_HANDLE_WEAK_ACCEPT(
+        RAFT_MEMBER_SENDER,
+        "handle weak accept",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_RECEIVER_RELAY_OFFER_LOG(
         RAFT_MEMBER_RECEIVER,
         "relay offer log",

Reply via email to