This is an automated email from the ASF dual-hosted git repository. chaow pushed a commit to branch limit_memory_for_raftlog in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a02653f8403dfbe75f5d7dbe79ec4fe84265e47f Author: chaow <[email protected]> AuthorDate: Sat Mar 20 16:40:54 2021 +0800 IOTDB-854 Limit the memory foorprint of the committed log cache --- .../apache/iotdb/cluster/config/ClusterConfig.java | 10 +++++++ .../iotdb/cluster/config/ClusterDescriptor.java | 6 ++++ .../java/org/apache/iotdb/cluster/log/Log.java | 10 +++++++ .../cluster/log/manage/CommittedEntryManager.java | 34 +++++++++++++++++++++ .../iotdb/cluster/log/manage/RaftLogManager.java | 29 ++++++++++++++---- .../iotdb/cluster/server/member/RaftMember.java | 5 ++++ .../org/apache/iotdb/cluster/common/TestUtils.java | 1 + .../cluster/log/manage/RaftLogManagerTest.java | 35 ++++++++++++++++++++++ 8 files changed, 125 insertions(+), 5 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java index 08f7608..386bb25 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java @@ -65,6 +65,8 @@ public class ClusterConfig { /** max number of committed logs in memory */ private int maxNumOfLogsInMem = 1000; + private long maxMemorySizeForRaftLog = 536870912; + /** deletion check period of the submitted log */ private int logDeleteCheckIntervalSecond = -1; @@ -381,6 +383,14 @@ public class ClusterConfig { this.maxRaftLogIndexSizeInMemory = maxRaftLogIndexSizeInMemory; } + public long getMaxMemorySizeForRaftLog() { + return maxMemorySizeForRaftLog; + } + + public void setMaxMemorySizeForRaftLog(long maxMemorySizeForRaftLog) { + this.maxMemorySizeForRaftLog = maxMemorySizeForRaftLog; + } + public int getMaxRaftLogPersistDataSizePerFile() { return maxRaftLogPersistDataSizePerFile; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java index d6bbf82..25f85e0 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java @@ -200,6 +200,12 @@ public class ClusterDescriptor { properties.getProperty( "max_num_of_logs_in_mem", String.valueOf(config.getMaxNumOfLogsInMem())))); + config.setMaxMemorySizeForRaftLog( + Long.parseLong( + properties.getProperty( + "max_memory_size_for_raft_log", + String.valueOf(config.getMaxMemorySizeForRaftLog())))); + config.setLogDeleteCheckIntervalSecond( Integer.parseInt( properties.getProperty( diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java index 6977634..d2294eb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java @@ -45,6 +45,8 @@ public abstract class Log implements Comparable<Log> { private long createTime; private long enqueueTime; + private int byteSize; + public abstract ByteBuffer serialize(); public abstract void deserialize(ByteBuffer buffer); @@ -132,4 +134,12 @@ public abstract class Log implements Comparable<Log> { public void setEnqueueTime(long enqueueTime) { this.enqueueTime = enqueueTime; } + + public long getByteSize() { + return byteSize; + } + + public void setByteSize(int byteSize) { + this.byteSize = byteSize; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java index a5d83a8..4e53e26 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java @@ -41,6 +41,8 @@ public class CommittedEntryManager { // memory cache for logs which have been persisted in disk. private List<Log> entries; + private long entryTotalMemSize; + /** * Note that it is better to use applyingSnapshot to update dummy entry immediately after this * instance is created. @@ -48,6 +50,7 @@ public class CommittedEntryManager { CommittedEntryManager(int maxNumOfLogInMem) { entries = Collections.synchronizedList(new ArrayList<>(maxNumOfLogInMem)); entries.add(new EmptyContentLog(-1, -1)); + entryTotalMemSize = 0; } /** @@ -209,6 +212,9 @@ public class CommittedEntryManager { 0, new EmptyContentLog( entries.get(index).getCurrLogIndex(), entries.get(index).getCurrLogTerm())); + for (int i = 1; i <= index; i++) { + entryTotalMemSize -= entries.get(i).getByteSize(); + } entries.subList(1, index + 1).clear(); } @@ -225,6 +231,9 @@ public class CommittedEntryManager { } long offset = appendingEntries.get(0).getCurrLogIndex() - getDummyIndex(); if (entries.size() - offset == 0) { + for (int i = 0; i < appendingEntries.size(); i++) { + entryTotalMemSize += appendingEntries.get(i).getByteSize(); + } entries.addAll(appendingEntries); } else if (entries.size() - offset > 0) { throw new TruncateCommittedEntryException( @@ -246,4 +255,29 @@ public class CommittedEntryManager { List<Log> getAllEntries() { return entries; } + + public long getEntryTotalMemSize() { + return entryTotalMemSize; + } + + public void setEntryTotalMemSize(long entryTotalMemSize) { + this.entryTotalMemSize = entryTotalMemSize; + } + + /** + * check how many logs could be reserved in memory. + * + * @param maxMemSize the max memory size for old committed log + * @return max num to reserve old committed log + */ + public int maxLogNumShouldReserve(long maxMemSize) { + long totalSize = 0; + for (int i = entries.size() - 1; i >= 1; i--) { + if (totalSize + entries.get(i).getByteSize() > maxMemSize) { + return entries.size() - 1 - i; + } + totalSize += entries.get(i).getByteSize(); + } + return entries.size(); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java index 86b2d33..fce98bc 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java @@ -100,6 +100,9 @@ public abstract class RaftLogManager { private int maxNumOfLogsInMem = ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem(); + private long maxLogMemSize = + ClusterDescriptor.getInstance().getConfig().getMaxMemorySizeForRaftLog(); + /** * Each time new logs are appended, this condition will be notified so logs that have larger * indices but arrived earlier can proceed. @@ -582,14 +585,30 @@ public abstract class RaftLogManager { .clear(); } try { - int currentSize = committedEntryManager.getTotalSize(); - int deltaSize = entries.size(); - if (currentSize + deltaSize > maxNumOfLogsInMem) { - int sizeToReserveForNew = maxNumOfLogsInMem - deltaSize; + boolean needCompactedLog = false; + int numToReserveForNew = minNumOfLogsInMem; + if (committedEntryManager.getTotalSize() + entries.size() > maxNumOfLogsInMem) { + needCompactedLog = true; + numToReserveForNew = maxNumOfLogsInMem - entries.size(); + } + + long newEntryMemSize = 0; + for (Log entry : entries) { + newEntryMemSize += entry.getByteSize(); + } + int sizeToReserveForNew = minNumOfLogsInMem; + if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() > maxLogMemSize) { + needCompactedLog = true; + sizeToReserveForNew = + committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - newEntryMemSize); + } + + if (needCompactedLog) { + int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew); int sizeToReserveForConfig = minNumOfLogsInMem; startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime(); synchronized (this) { - innerDeleteLog(Math.min(sizeToReserveForConfig, sizeToReserveForNew)); + innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew)); } Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 8783e6c..d128e56 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -508,7 +508,9 @@ public abstract class RaftMember { } long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime(); + int logByteSize = request.getEntry().length; Log log = LogParser.getINSTANCE().parse(request.entry); + log.setByteSize(logByteSize); Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime); long result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log); @@ -529,12 +531,15 @@ public abstract class RaftMember { long response; List<Log> logs = new ArrayList<>(); + int logByteSize = 0; long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime(); for (ByteBuffer buffer : request.getEntries()) { buffer.mark(); Log log; + logByteSize = buffer.limit() - buffer.position(); try { log = LogParser.getINSTANCE().parse(buffer); + log.setByteSize(logByteSize); } catch (BufferUnderflowException e) { buffer.reset(); throw e; diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java index 53becb4..d8aebfc 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java @@ -126,6 +126,7 @@ public class TestUtils { Log log = new LargeTestLog(); log.setCurrLogIndex(i); log.setCurrLogTerm(i); + log.setByteSize(8192); logList.add(log); } return logList; diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java index 2eddaf8..4d47219 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java @@ -973,6 +973,41 @@ public class RaftLogManagerTest { } @Test + public void testInnerDeleteLogsWithLargeLog() { + long maxMemSize = ClusterDescriptor.getInstance().getConfig().getMaxMemorySizeForRaftLog(); + int minNumOfLogsInMem = ClusterDescriptor.getInstance().getConfig().getMinNumOfLogsInMem(); + int maxNumOfLogsInMem = ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem(); + ClusterDescriptor.getInstance().getConfig().setMaxNumOfLogsInMem(10); + ClusterDescriptor.getInstance().getConfig().setMinNumOfLogsInMem(10); + ClusterDescriptor.getInstance().getConfig().setMaxMemorySizeForRaftLog(1024 * 56); + CommittedEntryManager committedEntryManager = + new CommittedEntryManager( + ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()); + RaftLogManager instance = + new TestRaftLogManager( + committedEntryManager, new SyncLogDequeSerializer(testIdentifier), logApplier); + List<Log> logs = TestUtils.prepareLargeTestLogs(12); + + try { + instance.append(logs.subList(0, 7)); + instance.maybeCommit(6, 6); + while (instance.getMaxHaveAppliedCommitIndex() < 6) { + // wait + } + instance.append(logs.subList(7, 12)); + instance.maybeCommit(11, 11); + + List<Log> entries = instance.getEntries(0, 12); + assertEquals(logs.subList(5, 12), entries); + } finally { + instance.close(); + ClusterDescriptor.getInstance().getConfig().setMaxNumOfLogsInMem(maxNumOfLogsInMem); + ClusterDescriptor.getInstance().getConfig().setMinNumOfLogsInMem(minNumOfLogsInMem); + ClusterDescriptor.getInstance().getConfig().setMaxMemorySizeForRaftLog(maxMemSize); + } + } + + @Test @SuppressWarnings("java:S2925") public void testReapplyBlockedLogs() throws LogExecutionException, InterruptedException { CommittedEntryManager committedEntryManager =
