Repository: incubator-ratis Updated Branches: refs/heads/branch-0.3 68bb4c918 -> 91eafe761
RATIS-430. RaftLogCache#getCachedSegmentNum hits ConcurrentModificationException. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/91eafe76 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/91eafe76 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/91eafe76 Branch: refs/heads/branch-0.3 Commit: 91eafe761d666657d0491a03f2076c68d5ad7ff4 Parents: 68bb4c9 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Thu Nov 22 11:41:35 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Thu Nov 22 11:44:16 2018 -0800 ---------------------------------------------------------------------- .../server/storage/CacheInvalidationPolicy.java | 15 +- .../ratis/server/storage/RaftLogCache.java | 301 +++++++++++++------ .../server/storage/RaftStorageDirectory.java | 4 +- .../ratis/server/storage/SegmentedRaftLog.java | 52 +--- .../ratis/server/storage/TestCacheEviction.java | 11 +- 5 files changed, 246 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/91eafe76/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java index 12534cf..a794092 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java @@ -21,6 +21,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.ratis.server.storage.RaftLogCache.LogSegmentList; +import org.apache.ratis.util.AutoCloseableLock; + public interface CacheInvalidationPolicy { /** * Determine which log segments should evict their log entry cache @@ -34,13 +37,21 @@ public interface CacheInvalidationPolicy { * @return the log segments that should evict cache */ List<LogSegment> evict(long[] followerNextIndices, long localFlushedIndex, - long lastAppliedIndex, List<LogSegment> segments, int maxCachedSegments); + long lastAppliedIndex, LogSegmentList segments, int maxCachedSegments); class CacheInvalidationPolicyDefault implements CacheInvalidationPolicy { @Override public List<LogSegment> evict(long[] followerNextIndices, long localFlushedIndex, long lastAppliedIndex, - List<LogSegment> segments, final int maxCachedSegments) { + LogSegmentList segments, final int maxCachedSegments) { + try(AutoCloseableLock readLock = segments.readLock()) { + return evictImpl(followerNextIndices, localFlushedIndex, lastAppliedIndex, segments, maxCachedSegments); + } + } + + private List<LogSegment> evictImpl(long[] followerNextIndices, + long localFlushedIndex, long lastAppliedIndex, + LogSegmentList segments, final int maxCachedSegments) { List<LogSegment> result = new ArrayList<>(); int safeIndex = segments.size() - 1; for (; safeIndex >= 0; safeIndex--) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/91eafe76/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java index 451e713..f0cab43 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java @@ -21,17 +21,20 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault; import org.apache.ratis.server.storage.LogSegment.LogRecord; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; @@ -86,9 +89,159 @@ class RaftLogCache { } } + static class LogSegmentList { + private final List<LogSegment> segments = new ArrayList<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + AutoCloseableLock readLock() { + return AutoCloseableLock.acquire(lock.readLock()); + } + + AutoCloseableLock writeLock() { + return AutoCloseableLock.acquire(lock.writeLock()); + } + + boolean isEmpty() { + try(AutoCloseableLock readLock = readLock()) { + return segments.isEmpty(); + } + } + + int size() { + try(AutoCloseableLock readLock = readLock()) { + return segments.size(); + } + } + + long countCached() { + try(AutoCloseableLock readLock = readLock()) { + return segments.stream().filter(LogSegment::hasCache).count(); + } + } + + LogSegment getLast() { + try(AutoCloseableLock readLock = readLock()) { + return segments.isEmpty()? null: segments.get(segments.size() - 1); + } + } + + LogSegment get(int i) { + try(AutoCloseableLock readLock = readLock()) { + return segments.get(i); + } + } + + int binarySearch(long index) { + try(AutoCloseableLock readLock = readLock()) { + return Collections.binarySearch(segments, index); + } + } + + LogSegment search(long index) { + try(AutoCloseableLock readLock = readLock()) { + final int i = Collections.binarySearch(segments, index); + return i < 0? null: segments.get(i); + } + } + + TermIndex[] getTermIndex(long startIndex, long realEnd, LogSegment openSegment) { + final TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - startIndex)]; + final int searchIndex; + long index = startIndex; + + try(AutoCloseableLock readLock = readLock()) { + searchIndex = Collections.binarySearch(segments, startIndex); + if (searchIndex >= 0) { + for(int i = searchIndex; i < segments.size() && index < realEnd; i++) { + final LogSegment s = segments.get(i); + final int numberFromSegment = Math.toIntExact(Math.min(realEnd - index, s.getEndIndex() - index + 1)); + getFromSegment(s, index, entries, Math.toIntExact(index - startIndex), numberFromSegment); + index += numberFromSegment; + } + } + } + + // openSegment is read outside the lock. + if (searchIndex < 0) { + getFromSegment(openSegment, startIndex, entries, 0, entries.length); + } else if (index < realEnd) { + getFromSegment(openSegment, index, entries, + Math.toIntExact(index - startIndex), Math.toIntExact(realEnd - index)); + } + return entries; + } + + boolean add(LogSegment logSegment) { + try(AutoCloseableLock writeLock = writeLock()) { + return segments.add(logSegment); + } + } + + void clear() { + try(AutoCloseableLock writeLock = writeLock()) { + segments.forEach(LogSegment::clear); + segments.clear(); + } + } + + TruncationSegments truncate(long index, LogSegment openSegment, Runnable clearOpenSegment) { + try(AutoCloseableLock writeLock = writeLock()) { + final int segmentIndex = binarySearch(index); + if (segmentIndex == -segments.size() - 1) { + if (openSegment != null && openSegment.getEndIndex() >= index) { + final long oldEnd = openSegment.getEndIndex(); + if (index == openSegment.getStartIndex()) { + // the open segment should be deleted + final SegmentFileInfo deleted = deleteOpenSegment(openSegment, clearOpenSegment); + return new TruncationSegments(null, Collections.singletonList(deleted)); + } else { + openSegment.truncate(index); + Preconditions.assertTrue(!openSegment.isOpen()); + final SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(), + oldEnd, true, openSegment.getTotalSize(), openSegment.getEndIndex()); + segments.add(openSegment); + clearOpenSegment.run(); + return new TruncationSegments(info, Collections.emptyList()); + } + } + } else if (segmentIndex >= 0) { + final LogSegment ts = segments.get(segmentIndex); + final long oldEnd = ts.getEndIndex(); + final List<SegmentFileInfo> list = new ArrayList<>(); + ts.truncate(index); + final int size = segments.size(); + for(int i = size - 1; + i >= (ts.numOfEntries() == 0? segmentIndex: segmentIndex + 1); + i--) { + LogSegment s = segments.remove(i); + final long endOfS = i == segmentIndex? oldEnd: s.getEndIndex(); + s.clear(); + list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0, s.getEndIndex())); + } + if (openSegment != null) { + list.add(deleteOpenSegment(openSegment, clearOpenSegment)); + } + SegmentFileInfo t = ts.numOfEntries() == 0? null: + new SegmentFileInfo(ts.getStartIndex(), oldEnd, false, ts.getTotalSize(), ts.getEndIndex()); + return new TruncationSegments(t, list); + } + return null; + } + } + + static SegmentFileInfo deleteOpenSegment(LogSegment openSegment, Runnable clearOpenSegment) { + final long oldEnd = openSegment.getEndIndex(); + openSegment.clear(); + final SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(), oldEnd, true, + 0, openSegment.getEndIndex()); + clearOpenSegment.run(); + return info; + } + } + private final String name; private volatile LogSegment openSegment; - private final List<LogSegment> closedSegments; + private final LogSegmentList closedSegments = new LogSegmentList(); private final RaftStorage storage; private final int maxCachedSegments; @@ -98,7 +251,6 @@ class RaftLogCache { this.name = selfId + "-" + getClass().getSimpleName(); this.storage = storage; maxCachedSegments = RaftServerConfigKeys.Log.maxCachedSegmentNum(properties); - closedSegments = new ArrayList<>(); } int getMaxCachedSegments() { @@ -115,11 +267,11 @@ class RaftLogCache { } long getCachedSegmentNum() { - return closedSegments.stream().filter(LogSegment::hasCache).count(); + return closedSegments.countCached(); } boolean shouldEvict() { - return getCachedSegmentNum() > maxCachedSegments; + return closedSegments.countCached() > maxCachedSegments; } void evictCache(long[] followerIndices, long flushedIndex, @@ -131,13 +283,9 @@ class RaftLogCache { } } - private LogSegment getLastClosedSegment() { - return closedSegments.isEmpty() ? - null : closedSegments.get(closedSegments.size() - 1); - } private void validateAdding(LogSegment segment) { - final LogSegment lastClosed = getLastClosedSegment(); + final LogSegment lastClosed = closedSegments.getLast(); if (lastClosed != null) { Preconditions.assertTrue(!lastClosed.isOpen()); Preconditions.assertTrue(lastClosed.getEndIndex() + 1 == segment.getStartIndex()); @@ -192,8 +340,7 @@ class RaftLogCache { if (openSegment != null && index >= openSegment.getStartIndex()) { return openSegment; } else { - int segmentIndex = Collections.binarySearch(closedSegments, index); - return segmentIndex < 0 ? null : closedSegments.get(segmentIndex); + return closedSegments.search(index); } } @@ -219,31 +366,10 @@ class RaftLogCache { if (startIndex >= realEnd) { return TermIndex.EMPTY_TERMINDEX_ARRAY; } - - TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - startIndex)]; - int segmentIndex = Collections.binarySearch(closedSegments, startIndex); - if (segmentIndex < 0) { - getFromSegment(openSegment, startIndex, entries, 0, entries.length); - } else { - long index = startIndex; - for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; i++) { - LogSegment s = closedSegments.get(i); - int numberFromSegment = Math.toIntExact( - Math.min(realEnd - index, s.getEndIndex() - index + 1)); - getFromSegment(s, index, entries, - Math.toIntExact(index - startIndex), numberFromSegment); - index += numberFromSegment; - } - if (index < realEnd) { - getFromSegment(openSegment, index, entries, - Math.toIntExact(index - startIndex), - Math.toIntExact(realEnd - index)); - } - } - return entries; + return closedSegments.getTermIndex(startIndex, realEnd, openSegment); } - private void getFromSegment(LogSegment segment, long startIndex, + private static void getFromSegment(LogSegment segment, long startIndex, TermIndex[] entries, int offset, int size) { long endIndex = segment.getEndIndex(); endIndex = Math.min(endIndex, startIndex + size - 1); @@ -289,68 +415,68 @@ class RaftLogCache { openSegment.appendToOpenSegment(entry); } - private SegmentFileInfo deleteOpenSegment() { - final long oldEnd = openSegment.getEndIndex(); - openSegment.clear(); - SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(), - oldEnd, true, 0, openSegment.getEndIndex()); - clearOpenSegment(); - return info; - } - /** * truncate log entries starting from the given index (inclusive) */ TruncationSegments truncate(long index) { - int segmentIndex = Collections.binarySearch(closedSegments, index); - if (segmentIndex == -closedSegments.size() - 1) { - if (openSegment != null && openSegment.getEndIndex() >= index) { - final long oldEnd = openSegment.getEndIndex(); - if (index == openSegment.getStartIndex()) { - // the open segment should be deleted - return new TruncationSegments(null, - Collections.singletonList(deleteOpenSegment())); - } else { - openSegment.truncate(index); - Preconditions.assertTrue(!openSegment.isOpen()); - SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(), - oldEnd, true, openSegment.getTotalSize(), - openSegment.getEndIndex()); - closedSegments.add(openSegment); - clearOpenSegment(); - return new TruncationSegments(info, Collections.emptyList()); - } - } - } else if (segmentIndex >= 0) { - LogSegment ts = closedSegments.get(segmentIndex); - final long oldEnd = ts.getEndIndex(); - List<SegmentFileInfo> list = new ArrayList<>(); - ts.truncate(index); - final int size = closedSegments.size(); - for (int i = size - 1; - i >= (ts.numOfEntries() == 0 ? segmentIndex : segmentIndex + 1); - i-- ) { - LogSegment s = closedSegments.remove(i); - final long endOfS = i == segmentIndex ? oldEnd : s.getEndIndex(); - s.clear(); - list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0, - s.getEndIndex())); - } - if (openSegment != null) { - list.add(deleteOpenSegment()); - } - SegmentFileInfo t = ts.numOfEntries() == 0 ? null : - new SegmentFileInfo(ts.getStartIndex(), oldEnd, false, - ts.getTotalSize(), ts.getEndIndex()); - return new TruncationSegments(t, list); - } - return null; + return closedSegments.truncate(index, openSegment, this::clearOpenSegment); } Iterator<TermIndex> iterator(long startIndex) { return new EntryIterator(startIndex); } + static class TruncateIndices { + final int arrayIndex; + final long truncateIndex; + + TruncateIndices(int arrayIndex, long truncateIndex) { + this.arrayIndex = arrayIndex; + this.truncateIndex = truncateIndex; + } + + int getArrayIndex() { + return arrayIndex; + } + + long getTruncateIndex() { + return truncateIndex; + } + } + + TruncateIndices computeTruncateIndices(Consumer<TermIndex> failClientRequest, LogEntryProto... entries) { + int arrayIndex = 0; + long truncateIndex = -1; + + try(AutoCloseableLock readLock = closedSegments.readLock()) { + final Iterator<TermIndex> i = iterator(entries[0].getIndex()); + for(; i.hasNext() && arrayIndex < entries.length; arrayIndex++) { + final TermIndex storedEntry = i.next(); + Preconditions.assertTrue(storedEntry.getIndex() == entries[arrayIndex].getIndex(), + "The stored entry's index %s is not consistent with the received entries[%s]'s index %s", + storedEntry.getIndex(), arrayIndex, entries[arrayIndex].getIndex()); + + if (storedEntry.getTerm() != entries[arrayIndex].getTerm()) { + // we should truncate from the storedEntry's arrayIndex + truncateIndex = storedEntry.getIndex(); + if (LOG.isTraceEnabled()) { + LOG.trace("{}: truncate to {}, arrayIndex={}, ti={}, storedEntry={}, entries={}", + name, truncateIndex, arrayIndex, + ServerProtoUtils.toTermIndex(entries[arrayIndex]), storedEntry, + ServerProtoUtils.toString(entries)); + } + + // fail all requests starting at truncateIndex + failClientRequest.accept(storedEntry); + for(; i.hasNext(); ) { + failClientRequest.accept(i.next()); + } + } + } + } + return new TruncateIndices(arrayIndex, truncateIndex); + } + private class EntryIterator implements Iterator<TermIndex> { private long nextIndex; private LogSegment currentSegment; @@ -358,7 +484,7 @@ class RaftLogCache { EntryIterator(long start) { this.nextIndex = start; - segmentIndex = Collections.binarySearch(closedSegments, nextIndex); + segmentIndex = closedSegments.binarySearch(nextIndex); if (segmentIndex >= 0) { currentSegment = closedSegments.get(segmentIndex); } else { @@ -412,7 +538,6 @@ class RaftLogCache { openSegment.clear(); clearOpenSegment(); } - closedSegments.forEach(LogSegment::clear); closedSegments.clear(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/91eafe76/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java index 2242934..5127e74 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java @@ -210,9 +210,7 @@ public class RaftStorageDirectory { } } } - Collections.sort(list, - (o1, o2) -> o1.startIndex == o2.startIndex ? - 0 : (o1.startIndex < o2.startIndex ? -1 : 1)); + list.sort(Comparator.comparingLong(o -> o.startIndex)); return list; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/91eafe76/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 3aee7e1..f977c45 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -25,6 +25,7 @@ import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.LogSegment.LogRecord; import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; +import org.apache.ratis.server.storage.RaftLogCache.TruncateIndices; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.StateMachine; @@ -36,7 +37,6 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -319,51 +319,25 @@ public class SegmentedRaftLog extends RaftLog { } } + private void failClientRequest(TermIndex ti) { + try { + final LogEntryProto entry = get(ti.getIndex()); + server.failClientRequest(entry); + } catch(RaftLogIOException e) { + LOG.error(getName() + ": Failed to read log " + ti, e); + } + } + @Override public List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries) { checkLogState(); if (entries == null || entries.length == 0) { return Collections.emptyList(); } - try(AutoCloseableLock writeLock = writeLock()) { - Iterator<TermIndex> iter = cache.iterator(entries[0].getIndex()); - int index = 0; - long truncateIndex = -1; - for (; iter.hasNext() && index < entries.length; index++) { - TermIndex storedEntry = iter.next(); - Preconditions.assertTrue( - storedEntry.getIndex() == entries[index].getIndex(), - "The stored entry's index %s is not consistent with" + - " the received entries[%s]'s index %s", storedEntry.getIndex(), - index, entries[index].getIndex()); - - if (storedEntry.getTerm() != entries[index].getTerm()) { - // we should truncate from the storedEntry's index - truncateIndex = storedEntry.getIndex(); - if (LOG.isTraceEnabled()) { - LOG.trace("{}: truncate to {}, index={}, ti={}, storedEntry={}, entries={}", - server.getId(), truncateIndex, index, - ServerProtoUtils.toTermIndex(entries[index]), storedEntry, - ServerProtoUtils.toString(entries)); - } - while (true) { - try { - final LogEntryProto entry = get(storedEntry.getIndex()); - server.failClientRequest(entry); - } catch (RaftLogIOException e) { - LOG.error("Failed to read log " + storedEntry, e); - } - - if (iter.hasNext()) { - storedEntry = iter.next(); - } else { - break; - } - } - break; - } - } + final TruncateIndices ti = cache.computeTruncateIndices(this::failClientRequest, entries); + final long truncateIndex = ti.getTruncateIndex(); + final int index = ti.getArrayIndex(); final List<CompletableFuture<Long>> futures; if (truncateIndex != -1) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/91eafe76/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java index 1cd41a5..b34d58f 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java @@ -21,6 +21,7 @@ import org.apache.ratis.BaseTest; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; @@ -28,8 +29,8 @@ import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.impl.ServerState; import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault; +import org.apache.ratis.server.storage.RaftLogCache.LogSegmentList; import org.apache.ratis.server.storage.TestSegmentedRaftLog.SegmentRange; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.SizeInBytes; @@ -46,9 +47,9 @@ import java.util.concurrent.CompletableFuture; public class TestCacheEviction extends BaseTest { private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault(); - private List<LogSegment> prepareSegments(int numSegments, boolean[] cached, long start, long size) { + static LogSegmentList prepareSegments(int numSegments, boolean[] cached, long start, long size) { Assert.assertEquals(numSegments, cached.length); - List<LogSegment> segments = new ArrayList<>(numSegments); + final LogSegmentList segments = new LogSegmentList(); for (int i = 0; i < numSegments; i++) { LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1); if (cached[i]) { @@ -64,7 +65,7 @@ public class TestCacheEviction extends BaseTest { @Test public void testBasicEviction() throws Exception { final int maxCached = 5; - List<LogSegment> segments = prepareSegments(5, + final LogSegmentList segments = prepareSegments(5, new boolean[]{true, true, true, true, true}, 0, 10); // case 1, make sure we do not evict cache for segments behind local flushed index @@ -103,7 +104,7 @@ public class TestCacheEviction extends BaseTest { @Test public void testEvictionWithFollowerIndices() throws Exception { final int maxCached = 6; - List<LogSegment> segments = prepareSegments(6, + final LogSegmentList segments = prepareSegments(6, new boolean[]{true, true, true, true, true, true}, 0, 10); // case 1, no matter where the followers are, we do not evict segments behind local
