This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2400c36  [ISSUE-3856] refine exception handling logic in commitTo in 
Raft… (#3848)
2400c36 is described below

commit 2400c36179229c55364539632f2fcc3212ac6bed
Author: Jianyun Cheng <[email protected]>
AuthorDate: Tue Sep 7 11:01:57 2021 +0800

    [ISSUE-3856] refine exception handling logic in commitTo in Raft… (#3848)
    
    * [discussion-3833] refine exception handling logic in commitTo in 
RaftLogManager
    
    * fix grammer error
    
    Co-authored-by: chengjianyun <[email protected]>
---
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 115 ++++++++++++---------
 1 file changed, 65 insertions(+), 50 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 322b541..8709203 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -585,71 +585,86 @@ public abstract class RaftLogManager {
           .subList(0, (int) (getCommitLogIndex() - 
entries.get(0).getCurrLogIndex() + 1))
           .clear();
     }
-    try {
-      boolean needToCompactLog = false;
-      int numToReserveForNew = minNumOfLogsInMem;
-      if (committedEntryManager.getTotalSize() + entries.size() > 
maxNumOfLogsInMem) {
-        needToCompactLog = true;
-        numToReserveForNew = maxNumOfLogsInMem - entries.size();
-      }
 
-      long newEntryMemSize = 0;
-      for (Log entry : entries) {
-        if (entry.getByteSize() == 0) {
-          logger.debug(
-              "{} should not go here, must be send to the follower, "
-                  + "so the log has been serialized exclude single node mode",
-              entry);
-          entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
-        }
-        newEntryMemSize += entry.getByteSize();
-      }
-      int sizeToReserveForNew = minNumOfLogsInMem;
-      if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() > 
maxLogMemSize) {
-        needToCompactLog = true;
-        sizeToReserveForNew =
-            committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - 
newEntryMemSize);
+    boolean needToCompactLog = false;
+    int numToReserveForNew = minNumOfLogsInMem;
+    if (committedEntryManager.getTotalSize() + entries.size() > 
maxNumOfLogsInMem) {
+      needToCompactLog = true;
+      numToReserveForNew = maxNumOfLogsInMem - entries.size();
+    }
+
+    long newEntryMemSize = 0;
+    for (Log entry : entries) {
+      if (entry.getByteSize() == 0) {
+        logger.debug(
+            "{} should not go here, must be send to the follower, "
+                + "so the log has been serialized exclude single node mode",
+            entry);
+        entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
       }
+      newEntryMemSize += entry.getByteSize();
+    }
+    int sizeToReserveForNew = minNumOfLogsInMem;
+    if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() > 
maxLogMemSize) {
+      needToCompactLog = true;
+      sizeToReserveForNew =
+          committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - 
newEntryMemSize);
+    }
 
-      if (needToCompactLog) {
-        int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
-        int sizeToReserveForConfig = minNumOfLogsInMem;
-        startTime = 
Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
-        synchronized (this) {
-          innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
-        }
-        
Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
+    if (needToCompactLog) {
+      int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
+      int sizeToReserveForConfig = minNumOfLogsInMem;
+      startTime = 
Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
+      synchronized (this) {
+        innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
       }
+      
Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
+    }
 
-      startTime = 
Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
+    startTime = 
Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
+    try {
+      // Operations here are so simple that the execution could be thought
+      // success or fail together approximately.
+      // TODO: make it real atomic
       getCommittedEntryManager().append(entries);
-      if 
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
-        getStableEntryManager().append(entries, maxHaveAppliedCommitIndex);
-      }
       Log lastLog = entries.get(entries.size() - 1);
       getUnCommittedEntryManager().stableTo(lastLog.getCurrLogIndex());
-      
Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.calOperationCostTimeFromStart(startTime);
-
       commitIndex = lastLog.getCurrLogIndex();
-      startTime = 
Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.getOperationStartTime();
-      applyEntries(entries);
-      
Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.calOperationCostTimeFromStart(startTime);
 
-      long unappliedLogSize = commitLogIndex - maxHaveAppliedCommitIndex;
-      if (unappliedLogSize > 
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
-        logger.debug(
-            "There are too many unapplied logs [{}], wait for a while to avoid 
memory "
-                + "overflow",
-            unappliedLogSize);
-        Thread.sleep(
-            unappliedLogSize - 
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+      if 
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
+        // Cluster could continue provide service when exception is thrown here
+        getStableEntryManager().append(entries, maxHaveAppliedCommitIndex);
       }
     } catch (TruncateCommittedEntryException e) {
+      // fatal error, node won't recover from the error anymore
+      // TODO: let node quit the raft group once encounter the error
       logger.error("{}: Unexpected error:", name, e);
     } catch (IOException e) {
+      // The exception will block the raft service continue accept log.
+      // TODO: Notify user that the persisted logs before these 
entries(include) are corrupted.
+      // TODO: An idea is that we can degrade the service by disable raft log 
persistent for
+      // TODO: the group. It needs fine-grained control for the config of Raft 
log persistence.
+      logger.error("{}: persistent raft log error:", name, e);
       throw new LogExecutionException(e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
+    } finally {
+      
Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.calOperationCostTimeFromStart(startTime);
+    }
+
+    startTime = 
Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.getOperationStartTime();
+    applyEntries(entries);
+    
Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.calOperationCostTimeFromStart(startTime);
+
+    long unappliedLogSize = commitLogIndex - maxHaveAppliedCommitIndex;
+    if (unappliedLogSize > 
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
+      logger.debug(
+          "There are too many unapplied logs [{}], wait for a while to avoid 
memory overflow",
+          unappliedLogSize);
+      try {
+        Thread.sleep(
+            unappliedLogSize - 
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
     }
   }
 

Reply via email to