This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dcb75a  RATIS-243. Add log purge function after taking snapshot.
9dcb75a is described below

commit 9dcb75a94eacae49b4fed7f7105b97deee4aba51
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Apr 23 13:55:51 2019 +0800

    RATIS-243. Add log purge function after taking snapshot.
---
 .../ratis/server/impl/StateMachineUpdater.java     |  5 +--
 .../apache/ratis/server/storage/MemoryRaftLog.java | 16 ++++++++
 .../org/apache/ratis/server/storage/RaftLog.java   | 14 +++++++
 .../apache/ratis/server/storage/RaftLogCache.java  | 31 ++++++++++++++-
 .../apache/ratis/server/storage/RaftLogWorker.java | 30 +++++++++++++++
 .../ratis/server/storage/SegmentedRaftLog.java     | 14 +++++++
 .../ratis/server/storage/TestRaftLogCache.java     | 45 ++++++++++++++++++++++
 .../ratis/server/storage/TestSegmentedRaftLog.java | 32 +++++++++++++++
 8 files changed, 183 insertions(+), 4 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index c80e6ba..8e67907 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -183,9 +183,8 @@ class StateMachineUpdater implements Runnable {
           if (futures.isInitialized()) {
             JavaUtils.allOf(futures.get()).get();
           }
-          stateMachine.takeSnapshot();
-          // TODO purge logs, including log cache. but should keep log for 
leader's RPCSenders
-          lastSnapshotIndex = lastAppliedIndex;
+          lastSnapshotIndex = stateMachine.takeSnapshot();
+          raftLog.purge(lastSnapshotIndex);
         }
 
         if (shouldStop()) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index 3bc2dd2..83ae36f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -56,6 +56,12 @@ public class MemoryRaftLog extends RaftLog {
       }
     }
 
+    void purge(int index) {
+      if (entries.size() > index) {
+        entries.subList(0, index).clear();
+      }
+    }
+
     void add(LogEntryProto entry) {
       entries.add(entry);
     }
@@ -116,6 +122,16 @@ public class MemoryRaftLog extends RaftLog {
     return CompletableFuture.completedFuture(index);
   }
 
+
+  @Override
+  CompletableFuture<Long> purgeImpl(long index) {
+    try (AutoCloseableLock writeLock = writeLock()) {
+      Preconditions.assertTrue(index >= 0);
+      entries.purge(Math.toIntExact(index));
+    }
+    return CompletableFuture.completedFuture(index);
+  }
+
   @Override
   public TermIndex getLastEntryTermIndex() {
     checkLogState();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 749d371..f1379e4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -314,6 +314,20 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
 
   abstract CompletableFuture<Long> truncateImpl(long index);
 
+
+  /**
+   * Purge asynchronously delete the segment files which does not overlap with 
the given index.
+   * Open segment will not be considered for purging.
+   *
+   * @param index - is inclusive.
+   */
+  public final CompletableFuture<Long> purge(long index) {
+    return purgeImpl(index);
+  }
+
+  abstract CompletableFuture<Long> purgeImpl(long index);
+
+
   @Override
   public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
     return runner.runSequentially(() -> appendEntryImpl(entry));
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 1ea11e4..8d4fc12 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.storage;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
@@ -26,7 +27,6 @@ import org.apache.ratis.server.protocol.TermIndex;
 import 
org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
 import org.apache.ratis.server.storage.LogSegment.LogRecord;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.AutoCloseableReadWriteLock;
 import org.apache.ratis.util.JavaUtils;
@@ -238,6 +238,31 @@ class RaftLogCache {
       }
     }
 
+    TruncationSegments purge(long index) {
+      try (AutoCloseableLock writeLock = writeLock()) {
+        int segmentIndex = binarySearch(index);
+        List<SegmentFileInfo> list = new ArrayList<>();
+
+        if (segmentIndex == -segments.size() - 1) {
+          for (LogSegment ls : segments) {
+            list.add(new SegmentFileInfo(ls.getStartIndex(), ls.getEndIndex(), 
false, 0, 0));
+          }
+          segments.clear();
+        } else if (segmentIndex >= 0) {
+          // we start to purge the closedSegments which do not overlap with 
index.
+          for (int i = segmentIndex - 1; i >= 0; i--) {
+            LogSegment ls = segments.get(i);
+            list.add(new SegmentFileInfo(ls.getStartIndex(), ls.getEndIndex(), 
false, 0, 0));
+            segments.remove(i);
+          }
+        } else {
+          throw new IllegalStateException("Unexpected gap in segments: 
binarySearch(" + index + ") returns "
+                  + segmentIndex + ", segments=" + segments);
+        }
+        return list.isEmpty() ? null : new TruncationSegments(null, list);
+      }
+    }
+
     static SegmentFileInfo deleteOpenSegment(LogSegment openSegment, Runnable 
clearOpenSegment) {
       final long oldEnd = openSegment.getEndIndex();
       openSegment.clear();
@@ -432,6 +457,10 @@ class RaftLogCache {
     return closedSegments.truncate(index, openSegment, this::clearOpenSegment);
   }
 
+  TruncationSegments purge(long index) {
+    return closedSegments.purge(index);
+  }
+
   Iterator<TermIndex> iterator(long startIndex) {
     return new EntryIterator(startIndex);
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 345baed..58e5a20 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -315,6 +315,36 @@ class RaftLogWorker implements Runnable {
     return addIOTask(new TruncateLog(ts, index));
   }
 
+  Task purge(TruncationSegments ts) {
+    return addIOTask(new PurgeLog(ts, storage));
+  }
+
+  private static final class PurgeLog extends Task {
+    private final TruncationSegments segments;
+    private final RaftStorage storage;
+
+    private PurgeLog(TruncationSegments segments, RaftStorage storage) {
+      this.segments = segments;
+      this.storage = storage;
+    }
+
+    @Override
+    void execute() throws IOException {
+      if (segments.toDelete != null) {
+        for (SegmentFileInfo fileInfo : segments.toDelete) {
+          File delFile = storage.getStorageDir()
+                  .getClosedLogFile(fileInfo.startIndex, fileInfo.endIndex);
+          FileUtils.deleteFile(delFile);
+        }
+      }
+    }
+
+    @Override
+    long getEndIndex() {
+      return 0;
+    }
+  }
+
   private class WriteLog extends Task {
     private final LogEntryProto entry;
     private final CompletableFuture<?> stateMachineFuture;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 5d151c6..607816e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -266,6 +266,20 @@ public class SegmentedRaftLog extends RaftLog {
     return CompletableFuture.completedFuture(index);
   }
 
+
+  @Override
+  public CompletableFuture<Long> purgeImpl(long index) {
+    try (AutoCloseableLock writeLock = writeLock()) {
+      RaftLogCache.TruncationSegments ts = cache.purge(index);
+      LOG.debug("truncating segments:{}", ts);
+      if (ts != null) {
+        Task task = fileLogWorker.purge(ts);
+        return task.getFuture();
+      }
+    }
+    return CompletableFuture.completedFuture(index);
+  }
+
   @Override
   CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
     checkLogState();
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
index d3c216d..a1157e9 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -19,6 +19,8 @@ package org.apache.ratis.server.storage;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.function.IntConsumer;
+import java.util.stream.IntStream;
 
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
@@ -217,6 +219,49 @@ public class TestRaftLogCache {
     Assert.assertEquals(249, ts.toTruncate.endIndex);
   }
 
+  @Test
+  public void testOpenSegmentPurge() {
+    int start = 0;
+    int end = 5;
+    int segmentSize = 100;
+    populatedSegment(start, end, segmentSize, false);
+
+    int sIndex = (end - start) * segmentSize;
+    populatedSegment(end, end + 1, segmentSize, true);
+
+    int purgeIndex = sIndex;
+    // open segment should never be purged
+    TruncationSegments ts = cache.purge(purgeIndex);
+    Assert.assertNull(ts.toTruncate);
+    Assert.assertEquals(end - start, ts.toDelete.length);
+    Assert.assertEquals(sIndex, cache.getStartIndex());
+  }
+
+  @Test
+  public void testCloseSegmentPurge() {
+    int start = 0;
+    int end = 5;
+    int segmentSize = 100;
+    populatedSegment(start, end, segmentSize, false);
+
+    int purgeIndex = (end - start) * segmentSize - 1;
+
+    // overlapped close segment will not purged.
+    TruncationSegments ts = cache.purge(purgeIndex);
+    Assert.assertNull(ts.toTruncate);
+    Assert.assertEquals(end - start - 1, ts.toDelete.length);
+    Assert.assertEquals(1, cache.getNumOfSegments());
+  }
+
+  private void populatedSegment(int start, int end, int segmentSize, boolean 
isOpen) {
+    IntStream.range(start, end).forEach(x -> {
+      int startIndex = x * segmentSize;
+      int endIndex = startIndex + segmentSize - 1;
+      LogSegment s = prepareLogSegment(startIndex, endIndex, isOpen);
+      cache.addSegment(s);
+    });
+  }
+
   private void testIterator(long startIndex) throws IOException {
     Iterator<TermIndex> iterator = cache.iterator(startIndex);
     TermIndex prev = null;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index a247678..8007f4d 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -362,6 +362,38 @@ public class TestSegmentedRaftLog extends BaseTest {
     }
   }
 
+  @Test
+  public void testPurgeOnOpenSegment() throws Exception {
+    int startTerm = 0;
+    int endTerm = 5;
+    int segmentSize = 200;
+    long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
+    long expectedIndex = segmentSize * (endTerm - startTerm - 1);
+    purgeAndVerify(startTerm, endTerm, segmentSize, beginIndexOfOpenSegment, 
expectedIndex);
+  }
+
+  @Test
+  public void testPurgeOnClosedSegments() throws Exception {
+    int startTerm = 0;
+    int endTerm = 5;
+    int segmentSize = 200;
+    long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
+    long expectedIndex = segmentSize * (endTerm - startTerm - 2);
+    purgeAndVerify(startTerm, endTerm, segmentSize, endIndexOfClosedSegment, 
expectedIndex);
+  }
+
+  private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, 
long purgeIndex, long expectedIndex) throws IOException {
+    List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, 
0);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, 
storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
+      raftLog.purge(purgeIndex).join();
+      Assert.assertEquals(expectedIndex, 
raftLog.getRaftLogCache().getStartIndex());
+    }
+  }
+
   /**
    * Test append with inconsistent entries
    */

Reply via email to