This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2400c36 [ISSUE-3856] refine exception handling logic in commitTo in
Raft… (#3848)
2400c36 is described below
commit 2400c36179229c55364539632f2fcc3212ac6bed
Author: Jianyun Cheng <[email protected]>
AuthorDate: Tue Sep 7 11:01:57 2021 +0800
[ISSUE-3856] refine exception handling logic in commitTo in Raft… (#3848)
* [discussion-3833] refine exception handling logic in commitTo in
RaftLogManager
* fix grammer error
Co-authored-by: chengjianyun <[email protected]>
---
.../iotdb/cluster/log/manage/RaftLogManager.java | 115 ++++++++++++---------
1 file changed, 65 insertions(+), 50 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 322b541..8709203 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -585,71 +585,86 @@ public abstract class RaftLogManager {
.subList(0, (int) (getCommitLogIndex() -
entries.get(0).getCurrLogIndex() + 1))
.clear();
}
- try {
- boolean needToCompactLog = false;
- int numToReserveForNew = minNumOfLogsInMem;
- if (committedEntryManager.getTotalSize() + entries.size() >
maxNumOfLogsInMem) {
- needToCompactLog = true;
- numToReserveForNew = maxNumOfLogsInMem - entries.size();
- }
- long newEntryMemSize = 0;
- for (Log entry : entries) {
- if (entry.getByteSize() == 0) {
- logger.debug(
- "{} should not go here, must be send to the follower, "
- + "so the log has been serialized exclude single node mode",
- entry);
- entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
- }
- newEntryMemSize += entry.getByteSize();
- }
- int sizeToReserveForNew = minNumOfLogsInMem;
- if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() >
maxLogMemSize) {
- needToCompactLog = true;
- sizeToReserveForNew =
- committedEntryManager.maxLogNumShouldReserve(maxLogMemSize -
newEntryMemSize);
+ boolean needToCompactLog = false;
+ int numToReserveForNew = minNumOfLogsInMem;
+ if (committedEntryManager.getTotalSize() + entries.size() >
maxNumOfLogsInMem) {
+ needToCompactLog = true;
+ numToReserveForNew = maxNumOfLogsInMem - entries.size();
+ }
+
+ long newEntryMemSize = 0;
+ for (Log entry : entries) {
+ if (entry.getByteSize() == 0) {
+ logger.debug(
+ "{} should not go here, must be send to the follower, "
+ + "so the log has been serialized exclude single node mode",
+ entry);
+ entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
}
+ newEntryMemSize += entry.getByteSize();
+ }
+ int sizeToReserveForNew = minNumOfLogsInMem;
+ if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() >
maxLogMemSize) {
+ needToCompactLog = true;
+ sizeToReserveForNew =
+ committedEntryManager.maxLogNumShouldReserve(maxLogMemSize -
newEntryMemSize);
+ }
- if (needToCompactLog) {
- int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
- int sizeToReserveForConfig = minNumOfLogsInMem;
- startTime =
Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
- synchronized (this) {
- innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
- }
-
Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
+ if (needToCompactLog) {
+ int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
+ int sizeToReserveForConfig = minNumOfLogsInMem;
+ startTime =
Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
+ synchronized (this) {
+ innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
}
+
Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
+ }
- startTime =
Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
+ startTime =
Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
+ try {
+ // Operations here are so simple that the execution could be thought
+ // success or fail together approximately.
+ // TODO: make it real atomic
getCommittedEntryManager().append(entries);
- if
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
- getStableEntryManager().append(entries, maxHaveAppliedCommitIndex);
- }
Log lastLog = entries.get(entries.size() - 1);
getUnCommittedEntryManager().stableTo(lastLog.getCurrLogIndex());
-
Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.calOperationCostTimeFromStart(startTime);
-
commitIndex = lastLog.getCurrLogIndex();
- startTime =
Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.getOperationStartTime();
- applyEntries(entries);
-
Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.calOperationCostTimeFromStart(startTime);
- long unappliedLogSize = commitLogIndex - maxHaveAppliedCommitIndex;
- if (unappliedLogSize >
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
- logger.debug(
- "There are too many unapplied logs [{}], wait for a while to avoid
memory "
- + "overflow",
- unappliedLogSize);
- Thread.sleep(
- unappliedLogSize -
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+ if
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
+ // Cluster could continue provide service when exception is thrown here
+ getStableEntryManager().append(entries, maxHaveAppliedCommitIndex);
}
} catch (TruncateCommittedEntryException e) {
+ // fatal error, node won't recover from the error anymore
+ // TODO: let node quit the raft group once encounter the error
logger.error("{}: Unexpected error:", name, e);
} catch (IOException e) {
+ // The exception will block the raft service continue accept log.
+ // TODO: Notify user that the persisted logs before these
entries(include) are corrupted.
+ // TODO: An idea is that we can degrade the service by disable raft log
persistent for
+ // TODO: the group. It needs fine-grained control for the config of Raft
log persistence.
+ logger.error("{}: persistent raft log error:", name, e);
throw new LogExecutionException(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ } finally {
+
Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.calOperationCostTimeFromStart(startTime);
+ }
+
+ startTime =
Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.getOperationStartTime();
+ applyEntries(entries);
+
Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.calOperationCostTimeFromStart(startTime);
+
+ long unappliedLogSize = commitLogIndex - maxHaveAppliedCommitIndex;
+ if (unappliedLogSize >
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
+ logger.debug(
+ "There are too many unapplied logs [{}], wait for a while to avoid
memory overflow",
+ unappliedLogSize);
+ try {
+ Thread.sleep(
+ unappliedLogSize -
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
}