This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9dcb75a RATIS-243. Add log purge function after taking snapshot.
9dcb75a is described below
commit 9dcb75a94eacae49b4fed7f7105b97deee4aba51
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Apr 23 13:55:51 2019 +0800
RATIS-243. Add log purge function after taking snapshot.
---
.../ratis/server/impl/StateMachineUpdater.java | 5 +--
.../apache/ratis/server/storage/MemoryRaftLog.java | 16 ++++++++
.../org/apache/ratis/server/storage/RaftLog.java | 14 +++++++
.../apache/ratis/server/storage/RaftLogCache.java | 31 ++++++++++++++-
.../apache/ratis/server/storage/RaftLogWorker.java | 30 +++++++++++++++
.../ratis/server/storage/SegmentedRaftLog.java | 14 +++++++
.../ratis/server/storage/TestRaftLogCache.java | 45 ++++++++++++++++++++++
.../ratis/server/storage/TestSegmentedRaftLog.java | 32 +++++++++++++++
8 files changed, 183 insertions(+), 4 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index c80e6ba..8e67907 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -183,9 +183,8 @@ class StateMachineUpdater implements Runnable {
if (futures.isInitialized()) {
JavaUtils.allOf(futures.get()).get();
}
- stateMachine.takeSnapshot();
- // TODO purge logs, including log cache. but should keep log for
leader's RPCSenders
- lastSnapshotIndex = lastAppliedIndex;
+ lastSnapshotIndex = stateMachine.takeSnapshot();
+ raftLog.purge(lastSnapshotIndex);
}
if (shouldStop()) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index 3bc2dd2..83ae36f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -56,6 +56,12 @@ public class MemoryRaftLog extends RaftLog {
}
}
+ void purge(int index) {
+ if (entries.size() > index) {
+ entries.subList(0, index).clear();
+ }
+ }
+
void add(LogEntryProto entry) {
entries.add(entry);
}
@@ -116,6 +122,16 @@ public class MemoryRaftLog extends RaftLog {
return CompletableFuture.completedFuture(index);
}
+
+ @Override
+ CompletableFuture<Long> purgeImpl(long index) {
+ try (AutoCloseableLock writeLock = writeLock()) {
+ Preconditions.assertTrue(index >= 0);
+ entries.purge(Math.toIntExact(index));
+ }
+ return CompletableFuture.completedFuture(index);
+ }
+
@Override
public TermIndex getLastEntryTermIndex() {
checkLogState();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 749d371..f1379e4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -314,6 +314,20 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
abstract CompletableFuture<Long> truncateImpl(long index);
+
+ /**
+ * Purge asynchronously delete the segment files which does not overlap with
the given index.
+ * Open segment will not be considered for purging.
+ *
+ * @param index - is inclusive.
+ */
+ public final CompletableFuture<Long> purge(long index) {
+ return purgeImpl(index);
+ }
+
+ abstract CompletableFuture<Long> purgeImpl(long index);
+
+
@Override
public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
return runner.runSequentially(() -> appendEntryImpl(entry));
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 1ea11e4..8d4fc12 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.storage;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
@@ -26,7 +27,6 @@ import org.apache.ratis.server.protocol.TermIndex;
import
org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
import org.apache.ratis.server.storage.LogSegment.LogRecord;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.AutoCloseableReadWriteLock;
import org.apache.ratis.util.JavaUtils;
@@ -238,6 +238,31 @@ class RaftLogCache {
}
}
+ TruncationSegments purge(long index) {
+ try (AutoCloseableLock writeLock = writeLock()) {
+ int segmentIndex = binarySearch(index);
+ List<SegmentFileInfo> list = new ArrayList<>();
+
+ if (segmentIndex == -segments.size() - 1) {
+ for (LogSegment ls : segments) {
+ list.add(new SegmentFileInfo(ls.getStartIndex(), ls.getEndIndex(),
false, 0, 0));
+ }
+ segments.clear();
+ } else if (segmentIndex >= 0) {
+ // we start to purge the closedSegments which do not overlap with
index.
+ for (int i = segmentIndex - 1; i >= 0; i--) {
+ LogSegment ls = segments.get(i);
+ list.add(new SegmentFileInfo(ls.getStartIndex(), ls.getEndIndex(),
false, 0, 0));
+ segments.remove(i);
+ }
+ } else {
+ throw new IllegalStateException("Unexpected gap in segments:
binarySearch(" + index + ") returns "
+ + segmentIndex + ", segments=" + segments);
+ }
+ return list.isEmpty() ? null : new TruncationSegments(null, list);
+ }
+ }
+
static SegmentFileInfo deleteOpenSegment(LogSegment openSegment, Runnable
clearOpenSegment) {
final long oldEnd = openSegment.getEndIndex();
openSegment.clear();
@@ -432,6 +457,10 @@ class RaftLogCache {
return closedSegments.truncate(index, openSegment, this::clearOpenSegment);
}
+ TruncationSegments purge(long index) {
+ return closedSegments.purge(index);
+ }
+
Iterator<TermIndex> iterator(long startIndex) {
return new EntryIterator(startIndex);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 345baed..58e5a20 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -315,6 +315,36 @@ class RaftLogWorker implements Runnable {
return addIOTask(new TruncateLog(ts, index));
}
+ Task purge(TruncationSegments ts) {
+ return addIOTask(new PurgeLog(ts, storage));
+ }
+
+ private static final class PurgeLog extends Task {
+ private final TruncationSegments segments;
+ private final RaftStorage storage;
+
+ private PurgeLog(TruncationSegments segments, RaftStorage storage) {
+ this.segments = segments;
+ this.storage = storage;
+ }
+
+ @Override
+ void execute() throws IOException {
+ if (segments.toDelete != null) {
+ for (SegmentFileInfo fileInfo : segments.toDelete) {
+ File delFile = storage.getStorageDir()
+ .getClosedLogFile(fileInfo.startIndex, fileInfo.endIndex);
+ FileUtils.deleteFile(delFile);
+ }
+ }
+ }
+
+ @Override
+ long getEndIndex() {
+ return 0;
+ }
+ }
+
private class WriteLog extends Task {
private final LogEntryProto entry;
private final CompletableFuture<?> stateMachineFuture;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 5d151c6..607816e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -266,6 +266,20 @@ public class SegmentedRaftLog extends RaftLog {
return CompletableFuture.completedFuture(index);
}
+
+ @Override
+ public CompletableFuture<Long> purgeImpl(long index) {
+ try (AutoCloseableLock writeLock = writeLock()) {
+ RaftLogCache.TruncationSegments ts = cache.purge(index);
+ LOG.debug("truncating segments:{}", ts);
+ if (ts != null) {
+ Task task = fileLogWorker.purge(ts);
+ return task.getFuture();
+ }
+ }
+ return CompletableFuture.completedFuture(index);
+ }
+
@Override
CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
checkLogState();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
index d3c216d..a1157e9 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -19,6 +19,8 @@ package org.apache.ratis.server.storage;
import java.io.IOException;
import java.util.Iterator;
+import java.util.function.IntConsumer;
+import java.util.stream.IntStream;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
@@ -217,6 +219,49 @@ public class TestRaftLogCache {
Assert.assertEquals(249, ts.toTruncate.endIndex);
}
+ @Test
+ public void testOpenSegmentPurge() {
+ int start = 0;
+ int end = 5;
+ int segmentSize = 100;
+ populatedSegment(start, end, segmentSize, false);
+
+ int sIndex = (end - start) * segmentSize;
+ populatedSegment(end, end + 1, segmentSize, true);
+
+ int purgeIndex = sIndex;
+ // open segment should never be purged
+ TruncationSegments ts = cache.purge(purgeIndex);
+ Assert.assertNull(ts.toTruncate);
+ Assert.assertEquals(end - start, ts.toDelete.length);
+ Assert.assertEquals(sIndex, cache.getStartIndex());
+ }
+
+ @Test
+ public void testCloseSegmentPurge() {
+ int start = 0;
+ int end = 5;
+ int segmentSize = 100;
+ populatedSegment(start, end, segmentSize, false);
+
+ int purgeIndex = (end - start) * segmentSize - 1;
+
+ // overlapped close segment will not purged.
+ TruncationSegments ts = cache.purge(purgeIndex);
+ Assert.assertNull(ts.toTruncate);
+ Assert.assertEquals(end - start - 1, ts.toDelete.length);
+ Assert.assertEquals(1, cache.getNumOfSegments());
+ }
+
+ private void populatedSegment(int start, int end, int segmentSize, boolean
isOpen) {
+ IntStream.range(start, end).forEach(x -> {
+ int startIndex = x * segmentSize;
+ int endIndex = startIndex + segmentSize - 1;
+ LogSegment s = prepareLogSegment(startIndex, endIndex, isOpen);
+ cache.addSegment(s);
+ });
+ }
+
private void testIterator(long startIndex) throws IOException {
Iterator<TermIndex> iterator = cache.iterator(startIndex);
TermIndex prev = null;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index a247678..8007f4d 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -362,6 +362,38 @@ public class TestSegmentedRaftLog extends BaseTest {
}
}
+ @Test
+ public void testPurgeOnOpenSegment() throws Exception {
+ int startTerm = 0;
+ int endTerm = 5;
+ int segmentSize = 200;
+ long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
+ long expectedIndex = segmentSize * (endTerm - startTerm - 1);
+ purgeAndVerify(startTerm, endTerm, segmentSize, beginIndexOfOpenSegment,
expectedIndex);
+ }
+
+ @Test
+ public void testPurgeOnClosedSegments() throws Exception {
+ int startTerm = 0;
+ int endTerm = 5;
+ int segmentSize = 200;
+ long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
+ long expectedIndex = segmentSize * (endTerm - startTerm - 2);
+ purgeAndVerify(startTerm, endTerm, segmentSize, endIndexOfClosedSegment,
expectedIndex);
+ }
+
+ private void purgeAndVerify(int startTerm, int endTerm, int segmentSize,
long purgeIndex, long expectedIndex) throws IOException {
+ List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize,
0);
+ List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null,
storage, -1, properties)) {
+ raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
+ raftLog.purge(purgeIndex).join();
+ Assert.assertEquals(expectedIndex,
raftLog.getRaftLogCache().getStartIndex());
+ }
+ }
+
/**
* Test append with inconsistent entries
*/