Repository: incubator-ratis Updated Branches: refs/heads/master 6eb4f8278 -> 2fbbe0aa9
RATIS-82. Add cache eviction policy. Contributed by Jing Zhao. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2fbbe0aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2fbbe0aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2fbbe0aa Branch: refs/heads/master Commit: 2fbbe0aa9df31e5e982b235071e0b438f0241453 Parents: 6eb4f82 Author: Jing Zhao <[email protected]> Authored: Wed May 10 10:57:20 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Wed May 10 10:57:20 2017 -0700 ---------------------------------------------------------------------- .../ratis/hadooprpc/TestRaftWithHadoopRpc.java | 2 +- .../ratis/server/RaftServerConfigKeys.java | 11 +- .../apache/ratis/server/impl/LeaderState.java | 4 + .../ratis/server/impl/RaftServerImpl.java | 8 + .../ratis/server/impl/ServerProtoUtils.java | 4 +- .../server/storage/CacheInvalidationPolicy.java | 112 ++++++++++ .../apache/ratis/server/storage/LogSegment.java | 39 +++- .../ratis/server/storage/RaftLogCache.java | 50 ++++- .../ratis/server/storage/SegmentedRaftLog.java | 34 +++- .../ratis/server/storage/TestCacheEviction.java | 202 +++++++++++++++++++ .../ratis/server/storage/TestRaftLogCache.java | 4 +- .../server/storage/TestSegmentedRaftLog.java | 4 +- 12 files changed, 450 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java index 124e7ee..0519e52 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java @@ -31,7 +31,7 @@ import static org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc.sendServer public class TestRaftWithHadoopRpc extends RaftBasicTests { static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.TRACE); LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); LogUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index be0c6bc..abab5af 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -23,7 +23,6 @@ import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; import static org.apache.ratis.conf.ConfUtils.*; @@ -76,6 +75,16 @@ public interface RaftServerConfigKeys { setSizeInBytes(properties::set, SEGMENT_SIZE_MAX_KEY, segmentSizeMax); } + /** + * Besides the open segment, the max number of segments caching log entries. + */ + String SEGMENT_CACHE_MAX_NUM_KEY = PREFIX + ".segment.cache.num.max"; + int SEGMENT_CACHE_MAX_NUM_DEFAULT = 6; + static int maxCachedSegmentNum(RaftProperties properties) { + return getInt(properties::getInt, SEGMENT_CACHE_MAX_NUM_KEY, + SEGMENT_CACHE_MAX_NUM_DEFAULT, requireMin(0)); + } + String PREALLOCATED_SIZE_KEY = PREFIX + ".preallocated.size"; SizeInBytes PREALLOCATED_SIZE_DEFAULT = SizeInBytes.valueOf("4MB"); static SizeInBytes preallocatedSize(RaftProperties properties) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 96bd28b..c4f92db 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -523,6 +523,10 @@ public class LeaderState { return pendingRequests.getTransactionContext(index); } + long[] getFollowerNextIndices() { + return senders.stream().mapToLong(s -> s.getFollower().getNextIndex()).toArray(); + } + private class ConfigurationStagingState { private final Map<RaftPeerId, RaftPeer> newPeers; private final PeerConfiguration newConf; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 3c704b0..dca8b08 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -913,6 +913,14 @@ public class RaftServerImpl implements RaftServer { return null; } + public synchronized long[] getFollowerNextIndices() { + LeaderState s = this.leaderState; + if (s == null || !isLeader()) { + return null; + } + return s.getFollowerNextIndices(); + } + public void applyLogToStateMachine(LogEntryProto next) { if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { // the reply should have already been set. only need to record http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 945be8d..6705e91 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -75,8 +75,8 @@ public class ServerProtoUtils { } private static String toString(RaftRpcReplyProto reply) { - return reply.getRequestorId() + "->" + reply.getReplyId() + "," - + reply.getSuccess(); + return reply.getRequestorId().toStringUtf8() + "->" + + reply.getReplyId().toString() + "," + reply.getSuccess(); } public static RaftConfigurationProto toRaftConfigurationProto( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java new file mode 100644 index 0000000..12534cf --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public interface CacheInvalidationPolicy { + /** + * Determine which log segments should evict their log entry cache + * @param followerNextIndices the next indices of all the follower peers. Null + * if the local peer is not a leader. + * @param localFlushedIndex the index that has been flushed to the local disk. + * @param lastAppliedIndex the last index that has been applied to state machine + * @param segments The list of log segments. The segments should be sorted in + * ascending order according to log index. + * @param maxCachedSegments the max number of segments with cached log entries + * @return the log segments that should evict cache + */ + List<LogSegment> evict(long[] followerNextIndices, long localFlushedIndex, + long lastAppliedIndex, List<LogSegment> segments, int maxCachedSegments); + + class CacheInvalidationPolicyDefault implements CacheInvalidationPolicy { + @Override + public List<LogSegment> evict(long[] followerNextIndices, + long localFlushedIndex, long lastAppliedIndex, + List<LogSegment> segments, final int maxCachedSegments) { + List<LogSegment> result = new ArrayList<>(); + int safeIndex = segments.size() - 1; + for (; safeIndex >= 0; safeIndex--) { + LogSegment segment = segments.get(safeIndex); + // a segment's cache can be invalidated only if it's close and all its + // entries have been flushed to the local disk + if (!segment.isOpen() && segment.getEndIndex() <= localFlushedIndex) { + break; + } + } + if (followerNextIndices == null || followerNextIndices.length == 0) { + // no followers, determine the eviction based on lastAppliedIndex + // first scan from the oldest segment to the one that is right before + // lastAppliedIndex. All these segment's cache can be invalidated. + int j = 0; + for (; j <= safeIndex; j++) { + LogSegment segment = segments.get(j); + if (segment.getEndIndex() > lastAppliedIndex) { + break; + } + if (segment.hasCache()) { + result.add(segment); + } + } + // if there is no cache invalidation target found, pick a segment that + // later (but not now) the state machine will consume + if (result.isEmpty()) { + for (int i = safeIndex; i >= j; i--) { + LogSegment s = segments.get(i); + if (s.getStartIndex() > lastAppliedIndex && s.hasCache()) { + result.add(s); + break; + } + } + } + } else { + // this peer is the leader with followers. determine the eviction based + // on followers' next indices and the local lastAppliedIndex. + Arrays.sort(followerNextIndices); + // segments covering index minToRead will still be loaded. Thus we first + // try to evict cache for segments before minToRead. + final long minToRead = Math.min(followerNextIndices[0], lastAppliedIndex); + int j = 0; + for (; j <= safeIndex; j++) { + LogSegment s = segments.get(j); + if (s.getEndIndex() >= minToRead) { + break; + } + if (s.hasCache()) { + result.add(s); + } + } + // if there is no eviction target, continue the scanning and evict + // the one that is not being read currently. + if (result.isEmpty()) { + for (; j <= safeIndex; j++) { + LogSegment s = segments.get(j); + if (Arrays.stream(followerNextIndices).noneMatch(s::containsIndex) + && !s.containsIndex(lastAppliedIndex) && s.hasCache()) { + result.add(s); + break; + } + } + } + } + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java index 6c478dd..ff7f353 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java @@ -30,8 +30,6 @@ import org.apache.ratis.util.ProtoUtils; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -99,7 +97,8 @@ class LogSegment implements Comparable<Long> { return new LogSegment(storage, true, start, start - 1); } - private static LogSegment newCloseSegment(RaftStorage storage, + @VisibleForTesting + static LogSegment newCloseSegment(RaftStorage storage, long start, long end) { Preconditions.assertTrue(start >= 0 && end >= start); return new LogSegment(storage, false, start, end); @@ -126,14 +125,14 @@ class LogSegment implements Comparable<Long> { static LogSegment loadSegment(RaftStorage storage, File file, long start, long end, boolean isOpen, - boolean keptInCache, Consumer<LogEntryProto> logConsumer) + boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer) throws IOException { final LogSegment segment = isOpen ? LogSegment.newOpenSegment(storage, start) : LogSegment.newCloseSegment(storage, start, end); readSegmentFile(file, start, end, isOpen, entry -> { - segment.append(keptInCache | isOpen, entry); + segment.append(keepEntryInCache | isOpen, entry); if (logConsumer != null) { logConsumer.accept(entry); } @@ -162,6 +161,8 @@ class LogSegment implements Comparable<Long> { @Override public LogEntryProto load(LogRecord key) throws IOException { final File file = getSegmentFile(); + // note the loading should not exceed the endIndex: it is possible that + // the on-disk log file should be truncated but has not been done yet. readSegmentFile(file, startIndex, endIndex, isOpen, entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry)); loadingTimes.incrementAndGet(); @@ -175,14 +176,15 @@ class LogSegment implements Comparable<Long> { storage.getStorageDir().getClosedLogFile(startIndex, endIndex); } - private boolean isOpen; + private volatile boolean isOpen; private long totalSize; private final long startIndex; - private long endIndex; + private volatile long endIndex; private final RaftStorage storage; private final CacheLoader<LogRecord, LogEntryProto> cacheLoader = new LogEntryLoader(); /** later replace it with a metric */ private final AtomicInteger loadingTimes = new AtomicInteger(); + private volatile boolean hasEntryCache; /** * the list of records is more like the index of a segment @@ -200,6 +202,7 @@ class LogSegment implements Comparable<Long> { this.startIndex = start; this.endIndex = end; totalSize = SegmentedRaftLog.HEADER_BYTES.length; + hasEntryCache = isOpen; } long getStartIndex() { @@ -224,7 +227,7 @@ class LogSegment implements Comparable<Long> { append(true, entries); } - private void append(boolean keptInCache, LogEntryProto... entries) { + private void append(boolean keepEntryInCache, LogEntryProto... entries) { Preconditions.assertTrue(entries != null && entries.length > 0); final long term = entries[0].getTerm(); if (records.isEmpty()) { @@ -246,7 +249,7 @@ class LogSegment implements Comparable<Long> { final LogRecord record = new LogRecord(totalSize, entry); records.add(record); - if (keptInCache) { + if (keepEntryInCache) { entryCache.put(record.getTermIndex(), entry); } if (ProtoUtils.isConfigurationLogEntry(entry)) { @@ -274,7 +277,9 @@ class LogSegment implements Comparable<Long> { return entry; } try { - return cacheLoader.load(record); + entry = cacheLoader.load(record); + hasEntryCache = true; + return entry; } catch (Exception e) { throw new RaftLogIOException(e); } @@ -340,6 +345,7 @@ class LogSegment implements Comparable<Long> { void clear() { records.clear(); entryCache.clear(); + hasEntryCache = false; configEntries.clear(); endIndex = startIndex - 1; } @@ -347,4 +353,17 @@ class LogSegment implements Comparable<Long> { public int getLoadingTimes() { return loadingTimes.get(); } + + void evictCache() { + hasEntryCache = false; + entryCache.clear(); + } + + boolean hasCache() { + return hasEntryCache; + } + + boolean containsIndex(long index) { + return startIndex <= index && endIndex >= index; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java index 7f13dd8..5863f8d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java @@ -19,15 +19,21 @@ package org.apache.ratis.server.storage; import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.function.Consumer; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault; import org.apache.ratis.server.storage.LogSegment.LogRecord; +import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.Preconditions; @@ -65,17 +71,53 @@ class RaftLogCache { toDelete.toArray(new SegmentFileInfo[toDelete.size()]); this.toTruncate = toTruncate; } + + int getDeletionSize() { + return toDelete == null ? 0 : toDelete.length; + } } private LogSegment openSegment; private final List<LogSegment> closedSegments; private final RaftStorage storage; - RaftLogCache(RaftStorage storage) { + private final int maxCachedSegments; + private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault(); + + RaftLogCache(RaftStorage storage, RaftProperties properties) { this.storage = storage; + maxCachedSegments = RaftServerConfigKeys.Log.maxCachedSegmentNum(properties); closedSegments = new ArrayList<>(); } + int getMaxCachedSegments() { + return maxCachedSegments; + } + + void loadSegment(LogPathAndIndex pi, boolean isOpen, boolean keepEntryInCache, + Consumer<LogEntryProto> logConsumer) throws IOException { + LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(), + pi.startIndex, pi.endIndex, isOpen, keepEntryInCache, logConsumer); + addSegment(logSegment); + } + + long getCachedSegmentNum() { + return closedSegments.stream().filter(LogSegment::hasCache).count(); + } + + boolean shouldEvict() { + return getCachedSegmentNum() > maxCachedSegments; + } + + void evictCache(long[] followerIndices, long flushedIndex, + long lastAppliedIndex) { + List<LogSegment> toEvict = evictionPolicy.evict(followerIndices, + flushedIndex, lastAppliedIndex, closedSegments, maxCachedSegments); + for (LogSegment s : toEvict) { + s.evictCache(); + } + } + private boolean areConsecutiveSegments(LogSegment prev, LogSegment segment) { return !prev.isOpen() && prev.getEndIndex() + 1 == segment.getStartIndex(); } @@ -345,7 +387,11 @@ class RaftLogCache { } void clear() { - openSegment = null; + if (openSegment != null) { + openSegment.clear(); + openSegment = null; + } + closedSegments.forEach(LogSegment::clear); closedSegments.clear(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 8eaa0ab..0c54af7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -26,6 +26,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.LogSegment.LogRecord; import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; +import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.CodeInjectionForTesting; @@ -67,6 +68,7 @@ import java.util.function.Consumer; public class SegmentedRaftLog extends RaftLog { static final String HEADER_STR = "RAFTLOG1"; static final byte[] HEADER_BYTES = HEADER_STR.getBytes(StandardCharsets.UTF_8); + static final LogSegment[] EMPTY_SEGMENT_ARRAY = new LogSegment[0]; /** * I/O task definitions. @@ -96,6 +98,7 @@ public class SegmentedRaftLog extends RaftLog { } private static final ThreadLocal<Task> myTask = new ThreadLocal<>(); + private final RaftServerImpl server; private final RaftStorage storage; private final RaftLogCache cache; private final RaftLogWorker fileLogWorker; @@ -105,9 +108,10 @@ public class SegmentedRaftLog extends RaftLog { RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) throws IOException { super(selfId); + this.server = server; this.storage = storage; - this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); - cache = new RaftLogCache(storage); + segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); + cache = new RaftLogCache(storage, properties); fileLogWorker = new RaftLogWorker(server, storage, properties); lastCommitted.set(lastIndexInSnapshot); } @@ -136,11 +140,18 @@ public class SegmentedRaftLog extends RaftLog { Consumer<LogEntryProto> logConsumer) throws IOException { try(AutoCloseableLock writeLock = writeLock()) { List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles(); + int i = 0; for (LogPathAndIndex pi : paths) { boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX; - LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(), - pi.startIndex, pi.endIndex, isOpen, true, logConsumer); - cache.addSegment(logSegment); + // During the initial loading, we can only confirm the committed + // index based on the snapshot. This means if a log segment is not kept + // in cache after the initial loading, later we have to load its content + // again for updating the state machine. + // TODO we should let raft peer persist its committed index periodically + // so that during the initial loading we can apply part of the log + // entries to the state machine + boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments(); + cache.loadSegment(pi, isOpen, keepEntryInCache, logConsumer); } // if the largest index is smaller than the last index in snapshot, we do @@ -177,9 +188,21 @@ public class SegmentedRaftLog extends RaftLog { // the entry is not in the segment's cache. Load the cache without holding // RaftLog's lock. + checkAndEvictCache(); return segment.loadCache(recordAndEntry.getRecord()); } + private void checkAndEvictCache() { + if (server != null && cache.shouldEvict()) { + // TODO if the cache is hitting the maximum size and we cannot evict any + // segment's cache, should block the new entry appending or new segment + // allocation. + cache.evictCache(server.getFollowerNextIndices(), + fileLogWorker.getFlushedIndex(), + server.getState().getLastAppliedIndex()); + } + } + @Override public TermIndex getTermIndex(long index) { checkLogState(); @@ -232,6 +255,7 @@ public class SegmentedRaftLog extends RaftLog { } else if (isSegmentFull(currentOpenSegment, entry)) { cache.rollOpenSegment(true); fileLogWorker.rollLogSegment(currentOpenSegment); + checkAndEvictCache(); } else if (currentOpenSegment.numOfEntries() > 0 && currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) { // the term changes http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java new file mode 100644 index 0000000..6df8cf7 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerState; +import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault; +import org.apache.ratis.server.storage.TestSegmentedRaftLog.SegmentRange; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.SizeInBytes; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +public class TestCacheEviction { + private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault(); + private static final ClientId clientId = ClientId.createId(); + private static final long callId = 0; + + private List<LogSegment> prepareSegments(int numSegments, boolean[] cached, long start, long size) { + Assert.assertEquals(numSegments, cached.length); + List<LogSegment> segments = new ArrayList<>(numSegments); + for (int i = 0; i < numSegments; i++) { + LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1); + if (cached[i]) { + s = Mockito.spy(s); + Mockito.when(s.hasCache()).thenReturn(true); + } + segments.add(s); + start += size; + } + return segments; + } + + @Test + public void testBasicEviction() throws Exception { + final int maxCached = 5; + List<LogSegment> segments = prepareSegments(5, + new boolean[]{true, true, true, true, true}, 0, 10); + + // case 1, make sure we do not evict cache for segments behind local flushed index + List<LogSegment> evicted = policy.evict(null, 5, 15, segments, maxCached); + Assert.assertEquals(0, evicted.size()); + + // case 2, suppose the local flushed index is in the 3rd segment, then we + // can evict the first two segment + evicted = policy.evict(null, 25, 30, segments, maxCached); + Assert.assertEquals(2, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(0)); + Assert.assertSame(evicted.get(1), segments.get(1)); + + // case 3, similar with case 2, but the local applied index is less than + // the local flushed index. + evicted = policy.evict(null, 25, 15, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(0)); + + // case 4, the local applied index is very small, then evict cache behind it + // first and let the state machine load the segments later + evicted = policy.evict(null, 35, 5, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(2)); + + Mockito.when(segments.get(2).hasCache()).thenReturn(false); + evicted = policy.evict(null, 35, 5, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(1)); + + Mockito.when(segments.get(1).hasCache()).thenReturn(false); + evicted = policy.evict(null, 35, 5, segments, maxCached); + Assert.assertEquals(0, evicted.size()); + } + + @Test + public void testEvictionWithFollowerIndices() throws Exception { + final int maxCached = 6; + List<LogSegment> segments = prepareSegments(6, + new boolean[]{true, true, true, true, true, true}, 0, 10); + + // case 1, no matter where the followers are, we do not evict segments behind local + // flushed index + List<LogSegment> evicted = policy.evict(new long[]{20, 40, 40}, 5, 15, segments, + maxCached); + Assert.assertEquals(0, evicted.size()); + + // case 2, the follower indices are behind the local flushed index + evicted = policy.evict(new long[]{30, 40, 45}, 25, 30, segments, maxCached); + Assert.assertEquals(2, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(0)); + Assert.assertSame(evicted.get(1), segments.get(1)); + + // case 3, similar with case 3 in basic eviction test + evicted = policy.evict(new long[]{30, 40, 45}, 25, 15, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(0)); + + // case 4, the followers are slower than local flush + evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(0)); + + Mockito.when(segments.get(0).hasCache()).thenReturn(false); + evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(2)); + + Mockito.when(segments.get(2).hasCache()).thenReturn(false); + evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(3)); + + Mockito.when(segments.get(3).hasCache()).thenReturn(false); + evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached); + Assert.assertEquals(0, evicted.size()); + } + + @Test + public void testEvictionInSegmentedLog() throws Exception { + final RaftProperties prop = new RaftProperties(); + prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB")); + RaftServerConfigKeys.Log.setPreallocatedSize(prop, SizeInBytes.valueOf("8KB")); + final RaftPeerId peerId = new RaftPeerId("s0"); + final int maxCachedNum = RaftServerConfigKeys.Log.maxCachedSegmentNum(prop); + + File storageDir = RaftTestUtil.getTestDir(TestSegmentedRaftLog.class); + RaftServerConfigKeys.setStorageDir(prop, storageDir.getCanonicalPath()); + RaftStorage storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR); + + RaftServerImpl server = Mockito.mock(RaftServerImpl.class); + ServerState state = Mockito.mock(ServerState.class); + Mockito.when(server.getState()).thenReturn(state); + Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{}); + Mockito.when(state.getLastAppliedIndex()).thenReturn(0L); + + SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, storage, -1, prop); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, 7, 0); + LogEntryProto[] entries = generateEntries(slist); + raftLog.append(entries); + raftLog.logSync(); + + // check the current cached segment number: the last segment is still open + Assert.assertEquals(maxCachedNum - 1, + raftLog.getRaftLogCache().getCachedSegmentNum()); + + Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{21, 40, 40}); + Mockito.when(state.getLastAppliedIndex()).thenReturn(35L); + slist = TestSegmentedRaftLog.prepareRanges(2, 7, 7 * maxCachedNum); + entries = generateEntries(slist); + raftLog.append(entries); + raftLog.logSync(); + + // check the cached segment number again. since the slowest follower is on + // index 21, the eviction should happen and evict 3 segments + Assert.assertEquals(maxCachedNum + 1 - 3, + raftLog.getRaftLogCache().getCachedSegmentNum()); + } + + private LogEntryProto[] generateEntries(List<SegmentRange> slist) { + List<LogEntryProto> eList = new ArrayList<>(); + for (SegmentRange range : slist) { + for (long index = range.start; index <= range.end; index++) { + SimpleOperation m = new SimpleOperation(new String(new byte[1024])); + eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + range.term, index, clientId, callId)); + } + } + return eList.toArray(new LogEntryProto[eList.size()]); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java index 38c879b..86333f0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Iterator; import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; @@ -33,12 +34,13 @@ import org.junit.Test; public class TestRaftLogCache { private static final ClientId clientId = ClientId.createId(); private static final long callId = 0; + private static final RaftProperties prop = new RaftProperties(); private RaftLogCache cache; @Before public void setup() { - cache = new RaftLogCache(null); + cache = new RaftLogCache(null, prop); } private LogSegment prepareLogSegment(long start, long end, boolean isOpen) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 1d38971..1c49e70 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -54,7 +54,7 @@ public class TestSegmentedRaftLog { private static final ClientId clientId = ClientId.createId(); private static final long callId = 0; - private static class SegmentRange { + static class SegmentRange { final long start; final long end; final long term; @@ -109,7 +109,7 @@ public class TestSegmentedRaftLog { return entryList.toArray(new LogEntryProto[entryList.size()]); } - private List<SegmentRange> prepareRanges(int number, int segmentSize, + static List<SegmentRange> prepareRanges(int number, int segmentSize, long startIndex) { List<SegmentRange> list = new ArrayList<>(number); for (int i = 0; i < number; i++) {
