This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 420fa10  [To rel/0.12] Add a judgement to determine raft log size can 
fit into buffer before log appending (#4670)
420fa10 is described below

commit 420fa10e7991a5e4377aaa5bdc1643f0a88b7ab1
Author: Mrquan <[email protected]>
AuthorDate: Fri Dec 31 09:01:12 2021 +0800

    [To rel/0.12] Add a judgement to determine raft log size can fit into 
buffer before log appending (#4670)
    
    * fix the client ip bug in cluster
    
    * fix a typo
    
    * Add an example for Cluster setup on 3 nodes
    
    * add a judgement
    
    Co-authored-by: 权思屹 <[email protected]>
---
 .../iotdb/cluster/server/member/RaftMember.java    | 35 +++++++++++++++++-----
 1 file changed, 28 insertions(+), 7 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 99aeb15..36794bd 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -970,12 +970,22 @@ public abstract class RaftMember {
     }
     long startTime = 
Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
     PhysicalPlanLog log = new PhysicalPlanLog();
+    log.setPlan(plan);
+    // if a single log exceeds the threshold
+    // we need to return error code to the client as in server mode
+    if 
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
+        & log.serialize().capacity() + Integer.BYTES
+            >= 
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+      logger.error(
+          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+              + "or reduce the size of requests you send.");
+      return StatusUtils.INTERNAL_ERROR;
+    }
     // assign term and index to the new log and append it
     synchronized (logManager) {
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
-      log.setPlan(plan);
       plan.setIndex(log.getCurrLogIndex());
       // appendLogInGroup will serialize log, and set log size, and we will 
use the size after it
       logManager.append(log);
@@ -1001,6 +1011,17 @@ public abstract class RaftMember {
     // assign term and index to the new log and append it
     SendLogRequest sendLogRequest;
 
+    log.setPlan(plan);
+    // just like processPlanLocally,we need to check the size of log
+    if 
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
+        & log.serialize().capacity() + Integer.BYTES
+            >= 
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+      logger.error(
+          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+              + "or reduce the size of requests you send.");
+      return StatusUtils.INTERNAL_ERROR;
+    }
+
     long startTime =
         
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
     synchronized (logManager) {
@@ -1009,7 +1030,7 @@ public abstract class RaftMember {
 
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-      log.setPlan(plan);
+
       plan.setIndex(log.getCurrLogIndex());
 
       startTime = 
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
@@ -1020,13 +1041,13 @@ public abstract class RaftMember {
       startTime = 
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
       sendLogRequest = buildSendLogRequest(log);
       
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
-
-      startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
-      log.setCreateTime(System.nanoTime());
-      getLogDispatcher().offer(sendLogRequest);
-      Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
     }
 
+    startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+    log.setCreateTime(System.nanoTime());
+    getLogDispatcher().offer(sendLogRequest);
+    Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+
     try {
       AppendLogResult appendLogResult =
           waitAppendResult(

Reply via email to