http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
new file mode 100644
index 0000000..f5a18ac
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.util.Iterator;
+
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.ProtoUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRaftLogCache {
+  private RaftLogCache cache;
+
+  @Before
+  public void setup() {
+    cache = new RaftLogCache();
+  }
+
+  private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
+    LogSegment s = LogSegment.newOpenSegment(start);
+    for (long i = start; i <= end; i++) {
+      SimpleOperation m = new SimpleOperation("m" + i);
+      LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
+          0, i);
+      s.appendToOpenSegment(entry);
+    }
+    if (!isOpen) {
+      s.close();
+    }
+    return s;
+  }
+
+  private void checkCache(long start, long end, int segmentSize) {
+    Assert.assertEquals(start, cache.getStartIndex());
+    Assert.assertEquals(end, cache.getEndIndex());
+
+    for (long index = start; index <= end; index++) {
+      LogEntryProto entry = cache.getEntry(index);
+      Assert.assertEquals(index, entry.getIndex());
+    }
+
+    long[] offsets = new long[]{start, start + 1, start + (end - start) / 2,
+        end - 1, end};
+    for (long offset : offsets) {
+      checkCacheEntries(offset, (int) (end - offset + 1), end);
+      checkCacheEntries(offset, 1, end);
+      checkCacheEntries(offset, 20, end);
+      checkCacheEntries(offset, segmentSize, end);
+      checkCacheEntries(offset, segmentSize - 1, end);
+    }
+  }
+
+  private void checkCacheEntries(long offset, int size, long end) {
+    LogEntryProto[] entries = cache.getEntries(offset, offset + size);
+    long realEnd = offset + size > end + 1 ? end + 1 : offset + size;
+    Assert.assertEquals(realEnd - offset, entries.length);
+    for (long i = offset; i < realEnd; i++) {
+      Assert.assertEquals(i, entries[(int) (i - offset)].getIndex());
+    }
+  }
+
+  @Test
+  public void testAddSegments() throws Exception {
+    LogSegment s1 = prepareLogSegment(1, 100, false);
+    cache.addSegment(s1);
+    checkCache(1, 100, 100);
+
+    try {
+      LogSegment s = prepareLogSegment(102, 103, true);
+      cache.addSegment(s);
+      Assert.fail("should fail since there is gap between two segments");
+    } catch (IllegalStateException ignored) {
+    }
+
+    LogSegment s2 = prepareLogSegment(101, 200, true);
+    cache.addSegment(s2);
+    checkCache(1, 200, 100);
+
+    try {
+      LogSegment s = prepareLogSegment(201, 202, true);
+      cache.addSegment(s);
+      Assert.fail("should fail since there is still an open segment in cache");
+    } catch (IllegalStateException ignored) {
+    }
+
+    cache.rollOpenSegment(false);
+    checkCache(1, 200, 100);
+
+    try {
+      LogSegment s = prepareLogSegment(202, 203, true);
+      cache.addSegment(s);
+      Assert.fail("should fail since there is gap between two segments");
+    } catch (IllegalStateException ignored) {
+    }
+
+    LogSegment s3 = prepareLogSegment(201, 300, true);
+    cache.addSegment(s3);
+    Assert.assertNotNull(cache.getOpenSegment());
+    checkCache(1, 300, 100);
+
+    cache.rollOpenSegment(true);
+    Assert.assertNotNull(cache.getOpenSegment());
+    checkCache(1, 300, 100);
+  }
+
+  @Test
+  public void testAppendEntry() throws Exception {
+    LogSegment closedSegment = prepareLogSegment(0, 99, false);
+    cache.addSegment(closedSegment);
+
+    final SimpleOperation m = new SimpleOperation("m");
+    try {
+      LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
+          0, 0);
+      cache.appendEntry(entry);
+      Assert.fail("the open segment is null");
+    } catch (IllegalStateException ignored) {
+    }
+
+    LogSegment openSegment = prepareLogSegment(100, 100, true);
+    cache.addSegment(openSegment);
+    for (long index = 101; index < 200; index++) {
+      LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
+          0, index);
+      cache.appendEntry(entry);
+    }
+
+    Assert.assertNotNull(cache.getOpenSegment());
+    checkCache(0, 199, 100);
+  }
+
+  @Test
+  public void testTruncate() throws Exception {
+    long start = 0;
+    for (int i = 0; i < 5; i++) { // 5 closed segments
+      LogSegment s = prepareLogSegment(start, start + 99, false);
+      cache.addSegment(s);
+      start += 100;
+    }
+    // add another open segment
+    LogSegment s = prepareLogSegment(start, start + 99, true);
+    cache.addSegment(s);
+
+    long end = cache.getEndIndex();
+    Assert.assertEquals(599, end);
+    int numOfSegments = 6;
+    // start truncation
+    for (int i = 0; i < 10; i++) { // truncate 10 times
+      // each time truncate 37 entries
+      end -= 37;
+      TruncationSegments ts = cache.truncate(end + 1);
+      checkCache(0, end, 100);
+
+      // check TruncationSegments
+      int currentNum= (int) (end / 100 + 1);
+      if (currentNum < numOfSegments) {
+        Assert.assertEquals(1, ts.toDelete.length);
+        numOfSegments = currentNum;
+      } else {
+        Assert.assertEquals(0, ts.toDelete.length);
+      }
+    }
+
+    // 230 entries remaining. truncate at the segment boundary
+    TruncationSegments ts = cache.truncate(200);
+    checkCache(0, 199, 100);
+    Assert.assertEquals(1, ts.toDelete.length);
+    Assert.assertEquals(200, ts.toDelete[0].startIndex);
+    Assert.assertEquals(229, ts.toDelete[0].endIndex);
+    Assert.assertEquals(0, ts.toDelete[0].targetLength);
+    Assert.assertFalse(ts.toDelete[0].isOpen);
+    Assert.assertNull(ts.toTruncate);
+
+    // add another open segment and truncate it as a whole
+    LogSegment newOpen = prepareLogSegment(200, 249, true);
+    cache.addSegment(newOpen);
+    ts = cache.truncate(200);
+    checkCache(0, 199, 100);
+    Assert.assertEquals(1, ts.toDelete.length);
+    Assert.assertEquals(200, ts.toDelete[0].startIndex);
+    Assert.assertEquals(249, ts.toDelete[0].endIndex);
+    Assert.assertEquals(0, ts.toDelete[0].targetLength);
+    Assert.assertTrue(ts.toDelete[0].isOpen);
+    Assert.assertNull(ts.toTruncate);
+
+    // add another open segment and truncate part of it
+    newOpen = prepareLogSegment(200, 249, true);
+    cache.addSegment(newOpen);
+    ts = cache.truncate(220);
+    checkCache(0, 219, 100);
+    Assert.assertNull(cache.getOpenSegment());
+    Assert.assertEquals(0, ts.toDelete.length);
+    Assert.assertTrue(ts.toTruncate.isOpen);
+    Assert.assertEquals(219, ts.toTruncate.newEndIndex);
+    Assert.assertEquals(200, ts.toTruncate.startIndex);
+    Assert.assertEquals(249, ts.toTruncate.endIndex);
+  }
+
+  private void testIterator(long startIndex) {
+    Iterator<LogEntryProto> iterator = cache.iterator(startIndex);
+    LogEntryProto prev = null;
+    while (iterator.hasNext()) {
+      LogEntryProto entry = iterator.next();
+      Assert.assertEquals(cache.getEntry(entry.getIndex()), entry);
+      if (prev != null) {
+        Assert.assertEquals(prev.getIndex() + 1, entry.getIndex());
+      }
+      prev = entry;
+    }
+    if (startIndex <= cache.getEndIndex()) {
+      Assert.assertNotNull(prev);
+      Assert.assertEquals(cache.getEndIndex(), prev.getIndex());
+    }
+  }
+
+  @Test
+  public void testIterator() throws Exception {
+    long start = 0;
+    for (int i = 0; i < 2; i++) { // 2 closed segments
+      LogSegment s = prepareLogSegment(start, start + 99, false);
+      cache.addSegment(s);
+      start += 100;
+    }
+    // add another open segment
+    LogSegment s = prepareLogSegment(start, start + 99, true);
+    cache.addSegment(s);
+
+    for (long startIndex = 0; startIndex < 300; startIndex += 50) {
+      testIterator(startIndex);
+    }
+    testIterator(299);
+
+    Iterator<LogEntryProto> iterator = cache.iterator(300);
+    Assert.assertFalse(iterator.hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
new file mode 100644
index 0000000..bcdb958
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ChecksumException;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
+import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.ProtoUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic functionality of LogReader, LogInputStream, and LogOutputStream.
+ */
+public class TestRaftLogReadWrite {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestRaftLogReadWrite.class);
+
+  private File storageDir;
+  private RaftProperties properties;
+  private int segmentMaxSize;
+
+  @Before
+  public void setup() throws Exception {
+    storageDir = RaftTestUtil.getTestDir(TestRaftLogReadWrite.class);
+    properties = new RaftProperties();
+    properties.set(RAFT_SERVER_STORAGE_DIR_KEY,
+        FileUtils.fileAsURI(storageDir).toString());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (storageDir != null) {
+      FileUtils.fullyDelete(storageDir.getParentFile());
+    }
+  }
+
+  private LogEntryProto[] readLog(File file, long startIndex, long endIndex,
+      boolean isOpen) throws IOException {
+    List<LogEntryProto> list = new ArrayList<>();
+    try (LogInputStream in =
+             new LogInputStream(file, startIndex, endIndex, isOpen)) {
+      LogEntryProto entry;
+      while ((entry = in.nextEntry()) != null) {
+        list.add(entry);
+      }
+    }
+    return list.toArray(new LogEntryProto[list.size()]);
+  }
+
+  private long writeMessages(LogEntryProto[] entries, LogOutputStream out)
+      throws IOException {
+    long size = 0;
+    for (int i = 0; i < entries.length; i++) {
+      SimpleOperation m = new SimpleOperation("m" + i);
+      entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+      final int s = entries[i].getSerializedSize();
+      size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4;
+      out.write(entries[i]);
+    }
+    return size;
+  }
+
+  /**
+   * Test basic functionality: write several log entries, then read
+   */
+  @Test
+  public void testReadWriteLog() throws IOException {
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    File openSegment = storage.getStorageDir().getOpenLogFile(0);
+    long size = SegmentedRaftLog.HEADER_BYTES.length;
+
+    final LogEntryProto[] entries = new LogEntryProto[100];
+    try (LogOutputStream out =
+             new LogOutputStream(openSegment, false, properties)) {
+      size += writeMessages(entries, out);
+    } finally {
+      storage.close();
+    }
+
+    Assert.assertEquals(size, openSegment.length());
+
+    LogEntryProto[] readEntries = readLog(openSegment, 0,
+        RaftServerConstants.INVALID_LOG_INDEX, true);
+    Assert.assertArrayEquals(entries, readEntries);
+  }
+
+  @Test
+  public void testAppendLog() throws IOException {
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    File openSegment = storage.getStorageDir().getOpenLogFile(0);
+    LogEntryProto[] entries = new LogEntryProto[200];
+    try (LogOutputStream out =
+             new LogOutputStream(openSegment, false, properties)) {
+      for (int i = 0; i < 100; i++) {
+        SimpleOperation m = new SimpleOperation("m" + i);
+        entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+        out.write(entries[i]);
+      }
+    }
+
+    try (LogOutputStream out =
+             new LogOutputStream(openSegment, true, properties)) {
+      for (int i = 100; i < 200; i++) {
+        SimpleOperation m = new SimpleOperation("m" + i);
+        entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+        out.write(entries[i]);
+      }
+    }
+
+    LogEntryProto[] readEntries = readLog(openSegment, 0,
+        RaftServerConstants.INVALID_LOG_INDEX, true);
+    Assert.assertArrayEquals(entries, readEntries);
+
+    storage.close();
+  }
+
+  /**
+   * Simulate the scenario that the peer is shutdown without truncating
+   * log segment file padding. Make sure the reader can correctly handle this.
+   */
+  @Test
+  public void testReadWithPadding() throws IOException {
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    File openSegment = storage.getStorageDir().getOpenLogFile(0);
+    long size = SegmentedRaftLog.HEADER_BYTES.length;
+
+    LogEntryProto[] entries = new LogEntryProto[100];
+    LogOutputStream out = new LogOutputStream(openSegment, false, properties);
+    size += writeMessages(entries, out);
+    out.flush();
+
+    // make sure the file contains padding
+    Assert.assertEquals(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
+        openSegment.length());
+
+    // check if the reader can correctly read the log file
+    LogEntryProto[] readEntries = readLog(openSegment, 0,
+        RaftServerConstants.INVALID_LOG_INDEX, true);
+    Assert.assertArrayEquals(entries, readEntries);
+
+    out.close();
+    Assert.assertEquals(size, openSegment.length());
+  }
+
+  /**
+   * corrupt the padding by inserting non-zero bytes. Make sure the reader
+   * throws exception.
+   */
+  @Test
+  public void testReadWithCorruptPadding() throws IOException {
+    properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 4 * 1024 * 
1024);
+    properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 16 * 1024 * 1024);
+
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    File openSegment = storage.getStorageDir().getOpenLogFile(0);
+
+    LogEntryProto[] entries = new LogEntryProto[10];
+    LogOutputStream out = new LogOutputStream(openSegment, false, properties);
+    for (int i = 0; i < 10; i++) {
+      SimpleOperation m = new SimpleOperation("m" + i);
+      entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+      out.write(entries[i]);
+    }
+    out.flush();
+
+    // make sure the file contains padding
+    Assert.assertEquals(4 * 1024 * 1024, openSegment.length());
+
+    try (FileOutputStream fout = new FileOutputStream(openSegment, true)) {
+      ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{-1, 1});
+      fout.getChannel()
+          .write(byteBuffer, 16 * 1024 * 1024 - 10);
+    }
+
+    List<LogEntryProto> list = new ArrayList<>();
+    try (LogInputStream in = new LogInputStream(openSegment, 0,
+        RaftServerConstants.INVALID_LOG_INDEX, true)) {
+      LogEntryProto entry;
+      while ((entry = in.nextEntry()) != null) {
+        list.add(entry);
+      }
+      Assert.fail("should fail since we corrupt the padding");
+    } catch (IOException e) {
+      boolean findVerifyTerminator = false;
+      for (StackTraceElement s : e.getStackTrace()) {
+        if (s.getMethodName().equals("verifyTerminator")) {
+          findVerifyTerminator = true;
+          break;
+        }
+      }
+      Assert.assertTrue(findVerifyTerminator);
+    }
+    Assert.assertArrayEquals(entries,
+        list.toArray(new LogEntryProto[list.size()]));
+  }
+
+  /**
+   * Test the log reader to make sure it can detect the checksum mismatch.
+   */
+  @Test
+  public void testReadWithEntryCorruption() throws IOException {
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    File openSegment = storage.getStorageDir().getOpenLogFile(0);
+    try (LogOutputStream out =
+             new LogOutputStream(openSegment, false, properties)) {
+      for (int i = 0; i < 100; i++) {
+        LogEntryProto entry = ProtoUtils.toLogEntryProto(
+            new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
+        out.write(entry);
+      }
+    } finally {
+      storage.close();
+    }
+
+    // corrupt the log file
+    try (RandomAccessFile raf = new 
RandomAccessFile(openSegment.getCanonicalFile(),
+        "rw")) {
+      raf.seek(100);
+      int correctValue = raf.read();
+      raf.seek(100);
+      raf.write(correctValue + 1);
+    }
+
+    try {
+      readLog(openSegment, 0, RaftServerConstants.INVALID_LOG_INDEX, true);
+      Assert.fail("The read of corrupted log file should fail");
+    } catch (ChecksumException e) {
+      LOG.info("Caught ChecksumException as expected", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
new file mode 100644
index 0000000..3092a21
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_WRITE_BUFFER_SIZE_KEY;
+import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+import static org.apache.ratis.server.storage.LogSegment.getEntrySize;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.ProtoUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test basic functionality of {@link LogSegment}
+ */
+public class TestRaftLogSegment {
+  private File storageDir;
+  private final RaftProperties properties = new RaftProperties();
+
+  @Before
+  public void setup() throws Exception {
+    storageDir = RaftTestUtil.getTestDir(TestRaftLogSegment.class);
+    properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY,
+        storageDir.getCanonicalPath());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (storageDir != null) {
+      FileUtils.fullyDelete(storageDir.getParentFile());
+    }
+  }
+
+  private File prepareLog(boolean isOpen, long start, int size, long term)
+      throws IOException {
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    File file = isOpen ? storage.getStorageDir().getOpenLogFile(start) :
+        storage.getStorageDir().getClosedLogFile(start, start + size - 1);
+
+    LogEntryProto[] entries = new LogEntryProto[size];
+    try (LogOutputStream out = new LogOutputStream(file, false, properties)) {
+      for (int i = 0; i < size; i++) {
+        SimpleOperation op = new SimpleOperation("m" + i);
+        entries[i] = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
+            term, i + start);
+        out.write(entries[i]);
+      }
+    }
+    storage.close();
+    return file;
+  }
+
+  private void checkLogSegment(LogSegment segment, long start, long end,
+      boolean isOpen, long totalSize, long term) {
+    Assert.assertEquals(start, segment.getStartIndex());
+    Assert.assertEquals(end, segment.getEndIndex());
+    Assert.assertEquals(isOpen, segment.isOpen());
+    Assert.assertEquals(totalSize, segment.getTotalSize());
+
+    long offset = SegmentedRaftLog.HEADER_BYTES.length;
+    for (long i = start; i <= end; i++) {
+      LogSegment.LogRecord record = segment.getLogRecord(i);
+      Assert.assertEquals(i, record.entry.getIndex());
+      Assert.assertEquals(term, record.entry.getTerm());
+      Assert.assertEquals(offset, record.offset);
+
+      offset += getEntrySize(record.entry);
+    }
+  }
+
+  @Test
+  public void testLoadLogSegment() throws Exception {
+    // load an open segment
+    File openSegmentFile = prepareLog(true, 0, 100, 0);
+    LogSegment openSegment = LogSegment.loadSegment(openSegmentFile, 0,
+        INVALID_LOG_INDEX, true, null);
+    checkLogSegment(openSegment, 0, 99, true, openSegmentFile.length(), 0);
+
+    // load a closed segment (1000-1099)
+    File closedSegmentFile = prepareLog(false, 1000, 100, 1);
+    LogSegment closedSegment = LogSegment.loadSegment(closedSegmentFile, 1000,
+        1099, false, null);
+    checkLogSegment(closedSegment, 1000, 1099, false,
+        closedSegment.getTotalSize(), 1);
+  }
+
+  @Test
+  public void testAppendEntries() throws Exception {
+    final long start = 1000;
+    LogSegment segment = LogSegment.newOpenSegment(start);
+    long size = SegmentedRaftLog.HEADER_BYTES.length;
+    final long max = 8 * 1024 * 1024;
+    checkLogSegment(segment, start, start - 1, true, size, 0);
+
+    // append till full
+    long term = 0;
+    int i = 0;
+    List<LogEntryProto> list = new ArrayList<>();
+    while (size < max) {
+      SimpleOperation op = new SimpleOperation("m" + i);
+      LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
+          term, i++ + start);
+      size += getEntrySize(entry);
+      list.add(entry);
+    }
+
+    segment.appendToOpenSegment(list.toArray(new LogEntryProto[list.size()]));
+    Assert.assertTrue(segment.getTotalSize() >= max);
+    checkLogSegment(segment, start, i - 1 + start, true, size, term);
+  }
+
+  @Test
+  public void testAppendWithGap() throws Exception {
+    LogSegment segment = LogSegment.newOpenSegment(1000);
+    SimpleOperation op = new SimpleOperation("m");
+    final SMLogEntryProto m = op.getLogEntryContent();
+    try {
+      LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001);
+      segment.appendToOpenSegment(entry);
+      Assert.fail("should fail since the entry's index needs to be 1000");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof IllegalArgumentException);
+    }
+
+    LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000);
+    segment.appendToOpenSegment(entry);
+
+    try {
+      entry = ProtoUtils.toLogEntryProto(m, 0, 1002);
+      segment.appendToOpenSegment(entry);
+      Assert.fail("should fail since the entry's index needs to be 1001");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof IllegalArgumentException);
+    }
+
+    LogEntryProto[] entries = new LogEntryProto[2];
+    for (int i = 0; i < 2; i++) {
+      entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2);
+    }
+    try {
+      segment.appendToOpenSegment(entries);
+      Assert.fail("should fail since there is gap between entries");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof IllegalArgumentException);
+    }
+  }
+
+  @Test
+  public void testTruncate() throws Exception {
+    final long term = 1;
+    final long start = 1000;
+    LogSegment segment = LogSegment.newOpenSegment(start);
+    for (int i = 0; i < 100; i++) {
+      LogEntryProto entry = ProtoUtils.toLogEntryProto(
+          new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
+      segment.appendToOpenSegment(entry);
+    }
+
+    // truncate an open segment (remove 1080~1099)
+    long newSize = segment.getLogRecord(start + 80).offset;
+    segment.truncate(start + 80);
+    Assert.assertEquals(80, segment.numOfEntries());
+    checkLogSegment(segment, start, start + 79, false, newSize, term);
+
+    // truncate a closed segment (remove 1050~1079)
+    newSize = segment.getLogRecord(start + 50).offset;
+    segment.truncate(start + 50);
+    Assert.assertEquals(50, segment.numOfEntries());
+    checkLogSegment(segment, start, start + 49, false, newSize, term);
+
+    // truncate all the remaining entries
+    segment.truncate(start);
+    Assert.assertEquals(0, segment.numOfEntries());
+    checkLogSegment(segment, start, start - 1, false,
+        SegmentedRaftLog.HEADER_BYTES.length, term);
+  }
+
+  private RaftProperties getProperties(long maxSegmentSize,
+      long preallocatedSize) {
+    RaftProperties p = new RaftProperties();
+    p.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY,
+        maxSegmentSize);
+    p.setLong(RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY,
+        preallocatedSize);
+    return p;
+  }
+
+  @Test
+  public void testPreallocateSegment() throws Exception {
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    final File file = storage.getStorageDir().getOpenLogFile(0);
+    final int[] maxSizes = new int[]{1024, 1025, 1024 * 1024 - 1, 1024 * 1024,
+        1024 * 1024 + 1, 2 * 1024 * 1024 - 1, 2 * 1024 * 1024,
+        2 * 1024 * 1024 + 1, 8 * 1024 * 1024};
+    final int[] preallocated = new int[]{512, 1024, 1025, 1024 * 1024,
+        1024 * 1024 + 1, 2 * 1024 * 1024};
+
+    // make sure preallocation is correct with different max/pre-allocated size
+    for (int max : maxSizes) {
+      for (int a : preallocated) {
+        try (LogOutputStream ignored =
+                 new LogOutputStream(file, false, getProperties(max, a))) {
+          Assert.assertEquals(file.length(), Math.min(max, a));
+        }
+        try (LogInputStream in =
+                 new LogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
+          LogEntryProto entry = in.nextEntry();
+          Assert.assertNull(entry);
+        }
+      }
+    }
+
+    // test the scenario where an entry's size is larger than the max size
+    final byte[] content = new byte[1024 * 2];
+    Arrays.fill(content, (byte) 1);
+    final long size;
+    try (LogOutputStream out = new LogOutputStream(file, false,
+        getProperties(1024, 1024))) {
+      SimpleOperation op = new SimpleOperation(new String(content));
+      LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
+          0, 0);
+      size = LogSegment.getEntrySize(entry);
+      out.write(entry);
+    }
+    Assert.assertEquals(file.length(),
+        size + SegmentedRaftLog.HEADER_BYTES.length);
+    try (LogInputStream in = new LogInputStream(file, 0,
+        INVALID_LOG_INDEX, true)) {
+      LogEntryProto entry = in.nextEntry();
+      Assert.assertArrayEquals(content,
+          entry.getSmLogEntry().getData().toByteArray());
+      Assert.assertNull(in.nextEntry());
+    }
+  }
+
+  /**
+   * Keep appending and check if pre-allocation is correct
+   */
+  @Test
+  public void testPreallocationAndAppend() throws Exception {
+    final long max = 2 * 1024 * 1024;
+    properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, max);
+    properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 16 * 1024);
+    properties.setLong(RAFT_LOG_WRITE_BUFFER_SIZE_KEY, 10 * 1024);
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    final File file = storage.getStorageDir().getOpenLogFile(0);
+
+    final byte[] content = new byte[1024];
+    Arrays.fill(content, (byte) 1);
+    SimpleOperation op = new SimpleOperation(new String(content));
+    LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
+        0, 0);
+    final long entrySize = LogSegment.getEntrySize(entry);
+
+    long totalSize = SegmentedRaftLog.HEADER_BYTES.length;
+    long preallocated = 16 * 1024;
+    try (LogOutputStream out = new LogOutputStream(file, false, properties)) {
+      Assert.assertEquals(preallocated, file.length());
+      while (totalSize + entrySize < max) {
+        totalSize += entrySize;
+        out.write(entry);
+        if (totalSize > preallocated) {
+          Assert.assertEquals("totalSize==" + totalSize,
+              preallocated + 16 * 1024, file.length());
+          preallocated += 16 * 1024;
+        }
+      }
+    }
+
+    Assert.assertEquals(totalSize, file.length());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
new file mode 100644
index 0000000..a51e933
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.io.nativeio.NativeIO;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.MetaFile;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
+import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
+import org.apache.ratis.statemachine.SimpleStateMachineStorage;
+import org.apache.ratis.util.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Test RaftStorage and RaftStorageDirectory
+ */
+public class TestRaftStorage {
+  private File storageDir;
+  private final RaftProperties properties = new RaftProperties();
+
+  @Before
+  public void setup() throws Exception {
+    storageDir = RaftTestUtil.getTestDir(TestRaftStorage.class);
+    properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY,
+        storageDir.getCanonicalPath());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (storageDir != null) {
+      FileUtils.fullyDelete(storageDir.getParentFile());
+    }
+  }
+
+  @Test
+  public void testNotExistent() throws IOException {
+    FileUtils.fullyDelete(storageDir);
+
+    // we will format the empty directory
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    Assert.assertEquals(StorageState.NORMAL, storage.getState());
+
+    try {
+      new RaftStorage(properties, StartupOption.FORMAT).close();
+      Assert.fail("the format should fail since the storage is still locked");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("directory is already 
locked"));
+    }
+
+    storage.close();
+    FileUtils.fullyDelete(storageDir);
+    Assert.assertTrue(storageDir.createNewFile());
+    try {
+      new RaftStorage(properties, StartupOption.REGULAR);
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(
+          e.getMessage().contains(StorageState.NON_EXISTENT.name()));
+    }
+  }
+
+  /**
+   * make sure the RaftStorage format works
+   */
+  @Test
+  public void testStorage() throws Exception {
+    RaftStorageDirectory sd = new RaftStorageDirectory(storageDir);
+    try {
+      StorageState state = sd.analyzeStorage(true);
+      Assert.assertEquals(StorageState.NOT_FORMATTED, state);
+      Assert.assertTrue(sd.isCurrentEmpty());
+    } finally {
+      sd.unlock();
+    }
+
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    Assert.assertEquals(StorageState.NORMAL, storage.getState());
+    storage.close();
+
+    Assert.assertEquals(StorageState.NORMAL, sd.analyzeStorage(false));
+    File m = sd.getMetaFile();
+    Assert.assertTrue(m.exists());
+    MetaFile metaFile = new MetaFile(m);
+    Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
+    Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
+
+    metaFile.set(123, "peer1");
+    metaFile.readFile();
+    Assert.assertEquals(123, metaFile.getTerm());
+    Assert.assertEquals("peer1", metaFile.getVotedFor());
+
+    MetaFile metaFile2 = new MetaFile(m);
+    Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, 
"loaded"));
+    Assert.assertEquals(123, metaFile.getTerm());
+    Assert.assertEquals("peer1", metaFile.getVotedFor());
+
+    // test format
+    storage = new RaftStorage(properties, StartupOption.FORMAT);
+    Assert.assertEquals(StorageState.NORMAL, storage.getState());
+    metaFile = new MetaFile(sd.getMetaFile());
+    Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
+    Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
+    storage.close();
+  }
+
+  @Test
+  public void testMetaFile() throws Exception {
+    RaftStorage storage = new RaftStorage(properties, StartupOption.FORMAT);
+    File m = storage.getStorageDir().getMetaFile();
+    Assert.assertTrue(m.exists());
+    MetaFile metaFile = new MetaFile(m);
+    Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
+    Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
+
+    metaFile.set(123, "peer1");
+    metaFile.readFile();
+    Assert.assertEquals(123, metaFile.getTerm());
+    Assert.assertEquals("peer1", metaFile.getVotedFor());
+
+    MetaFile metaFile2 = new MetaFile(m);
+    Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, 
"loaded"));
+    Assert.assertEquals(123, metaFile.getTerm());
+    Assert.assertEquals("peer1", metaFile.getVotedFor());
+
+    storage.close();
+  }
+
+  /**
+   * check if RaftStorage deletes tmp metafile when startup
+   */
+  @Test
+  public void testCleanMetaTmpFile() throws Exception {
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    Assert.assertEquals(StorageState.NORMAL, storage.getState());
+    storage.close();
+
+    RaftStorageDirectory sd = new RaftStorageDirectory(storageDir);
+    File metaFile = sd.getMetaFile();
+    NativeIO.renameTo(metaFile, sd.getMetaTmpFile());
+
+    Assert.assertEquals(StorageState.NOT_FORMATTED, sd.analyzeStorage(false));
+
+    try {
+      new RaftStorage(properties, StartupOption.REGULAR);
+      Assert.fail("should throw IOException since storage dir is not 
formatted");
+    } catch (IOException e) {
+      Assert.assertTrue(
+          e.getMessage().contains(StorageState.NOT_FORMATTED.name()));
+    }
+
+    // let the storage dir contain both raft-meta and raft-meta.tmp
+    new RaftStorage(properties, StartupOption.FORMAT).close();
+    Assert.assertTrue(sd.getMetaFile().exists());
+    Assert.assertTrue(sd.getMetaTmpFile().createNewFile());
+    Assert.assertTrue(sd.getMetaTmpFile().exists());
+    try {
+      storage = new RaftStorage(properties, StartupOption.REGULAR);
+      Assert.assertEquals(StorageState.NORMAL, storage.getState());
+      Assert.assertFalse(sd.getMetaTmpFile().exists());
+      Assert.assertTrue(sd.getMetaFile().exists());
+    } finally {
+      storage.close();
+    }
+  }
+
+  @Test
+  public void testSnapshotFileName() throws Exception {
+    final long term = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    final long index = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    final String name = SimpleStateMachineStorage.getSnapshotFileName(term, 
index);
+    System.out.println("name = " + name);
+    final File file = new File(storageDir, name);
+    final TermIndex ti = 
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(file);
+    System.out.println("file = " + file);
+    Assert.assertEquals(term, ti.getTerm());
+    Assert.assertEquals(index, ti.getIndex());
+    System.out.println("ti = " + ti);
+
+    final File foo = new File(storageDir, "foo");
+    try {
+      SimpleStateMachineStorage.getTermIndexFromSnapshotFile(foo);
+      Assert.fail();
+    } catch(IllegalArgumentException iae) {
+      System.out.println("Good " + iae);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
new file mode 100644
index 0000000..405a1a5
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.ConfigurationManager;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSegmentedRaftLog {
+  static {
+    RaftUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG);
+  }
+
+  private static final String peerId = "s0";
+
+  private static class SegmentRange {
+    final long start;
+    final long end;
+    final long term;
+    final boolean isOpen;
+
+    SegmentRange(long s, long e, long term, boolean isOpen) {
+      this.start = s;
+      this.end = e;
+      this.term = term;
+      this.isOpen = isOpen;
+    }
+  }
+
+  private File storageDir;
+  private RaftProperties properties;
+  private RaftStorage storage;
+  private final ConfigurationManager cm = new ConfigurationManager(
+      MiniRaftCluster.initConfiguration(MiniRaftCluster.generateIds(3, 0)));
+
+  @Before
+  public void setup() throws Exception {
+    storageDir = RaftTestUtil.getTestDir(TestSegmentedRaftLog.class);
+    properties = new RaftProperties();
+    properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY,
+        storageDir.getCanonicalPath());
+    storage = new RaftStorage(properties, 
RaftServerConstants.StartupOption.REGULAR);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (storageDir != null) {
+      FileUtils.fullyDelete(storageDir.getParentFile());
+    }
+  }
+
+  private LogEntryProto[] prepareLog(List<SegmentRange> list) throws 
IOException {
+    List<LogEntryProto> entryList = new ArrayList<>();
+    for (SegmentRange range : list) {
+      File file = range.isOpen ?
+          storage.getStorageDir().getOpenLogFile(range.start) :
+          storage.getStorageDir().getClosedLogFile(range.start, range.end);
+
+      final int size = (int) (range.end - range.start + 1);
+      LogEntryProto[] entries = new LogEntryProto[size];
+      try (LogOutputStream out = new LogOutputStream(file, false, properties)) 
{
+        for (int i = 0; i < size; i++) {
+          SimpleOperation m = new SimpleOperation("m" + (i + range.start));
+          entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
+              range.term, i + range.start);
+          out.write(entries[i]);
+        }
+      }
+      Collections.addAll(entryList, entries);
+    }
+    return entryList.toArray(new LogEntryProto[entryList.size()]);
+  }
+
+  private List<SegmentRange> prepareRanges(int number, int segmentSize,
+      long startIndex) {
+    List<SegmentRange> list = new ArrayList<>(number);
+    for (int i = 0; i < number; i++) {
+      list.add(new SegmentRange(startIndex, startIndex + segmentSize - 1, i,
+          i == number - 1));
+      startIndex += segmentSize;
+    }
+    return list;
+  }
+
+  @Test
+  public void testLoadLogSegments() throws Exception {
+    // first generate log files
+    List<SegmentRange> ranges = prepareRanges(5, 100, 0);
+    LogEntryProto[] entries = prepareLog(ranges);
+
+    // create RaftLog object and load log file
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      // check if log entries are loaded correctly
+      for (LogEntryProto e : entries) {
+        LogEntryProto entry = raftLog.get(e.getIndex());
+        Assert.assertEquals(e, entry);
+      }
+
+      Assert.assertArrayEquals(entries, raftLog.getEntries(0, 500));
+      Assert.assertEquals(entries[entries.length - 1], raftLog.getLastEntry());
+    }
+  }
+
+  List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist,
+      Supplier<String> stringSupplier) {
+    List<LogEntryProto> eList = new ArrayList<>();
+    for (SegmentRange range : slist) {
+      for (long index = range.start; index <= range.end; index++) {
+        SimpleOperation m = stringSupplier == null ?
+            new SimpleOperation("m" + index) :
+            new SimpleOperation(stringSupplier.get());
+        eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
+            range.term, index));
+      }
+    }
+    return eList;
+  }
+
+  /**
+   * Append entry one by one and check if log state is correct.
+   */
+  @Test
+  public void testAppendEntry() throws Exception {
+    List<SegmentRange> ranges = prepareRanges(5, 200, 0);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      // append entries to the raftlog
+      entries.forEach(raftLog::appendEntry);
+      raftLog.logSync();
+    }
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      // check if the raft log is correct
+      checkEntries(raftLog, entries, 0, entries.size());
+    }
+  }
+
+  /**
+   * Keep appending entries, make sure the rolling is correct.
+   */
+  @Test
+  public void testAppendAndRoll() throws Exception {
+    properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 16 * 1024);
+    properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 128 * 1024);
+
+    List<SegmentRange> ranges = prepareRanges(1, 1024, 0);
+    final byte[] content = new byte[1024];
+    List<LogEntryProto> entries = prepareLogEntries(ranges,
+        () -> new String(content));
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      // append entries to the raftlog
+      entries.forEach(raftLog::appendEntry);
+      raftLog.logSync();
+    }
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      // check if the raft log is correct
+      checkEntries(raftLog, entries, 0, entries.size());
+      Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
+    }
+  }
+
+  @Test
+  public void testTruncate() throws Exception {
+    // prepare the log for truncation
+    List<SegmentRange> ranges = prepareRanges(5, 200, 0);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      // append entries to the raftlog
+      entries.forEach(raftLog::appendEntry);
+      raftLog.logSync();
+    }
+
+    for (long fromIndex = 900; fromIndex >= 0; fromIndex -= 150) {
+      testTruncate(entries, fromIndex);
+    }
+  }
+
+  private void testTruncate(List<LogEntryProto> entries, long fromIndex)
+      throws Exception {
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      // truncate the log
+      raftLog.truncate(fromIndex);
+      raftLog.logSync();
+
+      checkEntries(raftLog, entries, 0, (int) fromIndex);
+    }
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      // check if the raft log is correct
+      if (fromIndex > 0) {
+        Assert.assertEquals(entries.get((int) (fromIndex - 1)),
+            raftLog.getLastEntry());
+      } else {
+        Assert.assertNull(raftLog.getLastEntry());
+      }
+      checkEntries(raftLog, entries, 0, (int) fromIndex);
+    }
+  }
+
+  private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected,
+      int offset, int size) {
+    if (size > 0) {
+      for (int i = offset; i < size + offset; i++) {
+        LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
+        Assert.assertEquals(expected.get(i), entry);
+      }
+      LogEntryProto[] entriesFromLog = raftLog.getEntries(
+          expected.get(offset).getIndex(),
+          expected.get(offset + size - 1).getIndex() + 1);
+      LogEntryProto[] expectedArray = expected.subList(offset, offset + size)
+          .toArray(SegmentedRaftLog.EMPTY_LOGENTRY_ARRAY);
+      Assert.assertArrayEquals(expectedArray, entriesFromLog);
+    }
+  }
+
+  /**
+   * Test append with inconsistent entries
+   */
+  @Test
+  public void testAppendEntriesWithInconsistency() throws Exception {
+    // prepare the log for truncation
+    List<SegmentRange> ranges = prepareRanges(5, 200, 0);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      // append entries to the raftlog
+      entries.forEach(raftLog::appendEntry);
+      raftLog.logSync();
+    }
+
+    // append entries whose first 100 entries are the same with existing log,
+    // and the next 100 are with different term
+    SegmentRange r1 = new SegmentRange(550, 599, 2, false);
+    SegmentRange r2 = new SegmentRange(600, 649, 3, false);
+    SegmentRange r3 = new SegmentRange(650, 749, 10, false);
+    List<LogEntryProto> newEntries = prepareLogEntries(
+        Arrays.asList(r1, r2, r3), null);
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()]));
+      raftLog.logSync();
+
+      checkEntries(raftLog, entries, 0, 650);
+      checkEntries(raftLog, newEntries, 100, 100);
+      Assert.assertEquals(newEntries.get(newEntries.size() - 1),
+          raftLog.getLastEntry());
+      Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
+          raftLog.getLatestFlushedIndex());
+    }
+
+    // load the raftlog again and check
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      checkEntries(raftLog, entries, 0, 650);
+      checkEntries(raftLog, newEntries, 100, 100);
+      Assert.assertEquals(newEntries.get(newEntries.size() - 1),
+          raftLog.getLastEntry());
+      Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
+          raftLog.getLatestFlushedIndex());
+
+      RaftLogCache cache = raftLog.getRaftLogCache();
+      Assert.assertEquals(5, cache.getNumOfSegments());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
new file mode 100644
index 0000000..6854b42
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.simulation.RequestHandler;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
+import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RaftSnapshotBaseTest {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  static final Logger LOG = 
LoggerFactory.getLogger(RaftSnapshotBaseTest.class);
+  private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
+
+  static File getSnapshotFile(MiniRaftCluster cluster, int i) {
+    final RaftServerImpl leader = cluster.getLeader();
+    final SimpleStateMachine4Testing sm = 
SimpleStateMachine4Testing.get(leader);
+    return sm.getStateMachineStorage().getSnapshotFile(
+        leader.getState().getCurrentTerm(), i);
+  }
+
+  static void assertLeaderContent(MiniRaftCluster cluster)
+      throws InterruptedException {
+    final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+    Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2,
+        leader.getState().getLog().getLastCommittedIndex());
+    final LogEntryProto[] entries = 
SimpleStateMachine4Testing.get(leader).getContent();
+
+    for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+      Assert.assertEquals(i+1, entries[i].getIndex());
+      Assert.assertArrayEquals(
+          new SimpleMessage("m" + i).getContent().toByteArray(),
+          entries[i].getSmLogEntry().getData().toByteArray());
+    }
+  }
+
+  private MiniRaftCluster cluster;
+
+  public abstract MiniRaftCluster initCluster(int numServer, RaftProperties 
prop)
+      throws IOException;
+
+  @Before
+  public void setup() throws IOException {
+    final RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+    prop.setLong(RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY,
+        SNAPSHOT_TRIGGER_THRESHOLD);
+    prop.setBoolean(RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY, true);
+    this.cluster = initCluster(1, prop);
+    cluster.start();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Keep generating writing traffic and make sure snapshots are taken.
+   * We then restart the whole raft peer and check if it can correctly load
+   * snapshots + raft log.
+   */
+  @Test
+  public void testRestartPeer() throws Exception {
+    RaftTestUtil.waitForLeader(cluster);
+    final String leaderId = cluster.getLeader().getId();
+    int i = 0;
+    try(final RaftClient client = cluster.createClient("client", leaderId)) {
+      for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+        RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+        Assert.assertTrue(reply.isSuccess());
+      }
+    }
+
+    // wait for the snapshot to be done
+    final File snapshotFile = getSnapshotFile(cluster, i);
+
+    int retries = 0;
+    do {
+      Thread.sleep(1000);
+    } while (!snapshotFile.exists() && retries++ < 10);
+
+    Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists());
+
+    // restart the peer and check if it can correctly load snapshot
+    cluster.restart(false);
+    try {
+      // 200 messages + two leader elections --> last committed = 201
+      assertLeaderContent(cluster);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Basic test for install snapshot: start a one node cluster and let it
+   * generate a snapshot. Then delete the log and restart the node, and add 
more
+   * nodes as followers.
+   */
+  @Test
+  public void testBasicInstallSnapshot() throws Exception {
+    List<LogPathAndIndex> logs = new ArrayList<>();
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final String leaderId = cluster.getLeader().getId();
+
+      int i = 0;
+      try(final RaftClient client = cluster.createClient("client", leaderId)) {
+        for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+          RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+      }
+
+      // wait for the snapshot to be done
+      RaftStorageDirectory storageDirectory = cluster.getLeader().getState()
+          .getStorage().getStorageDir();
+      final File snapshotFile = getSnapshotFile(cluster, i);
+      logs = storageDirectory.getLogSegmentFiles();
+
+      int retries = 0;
+      do {
+        Thread.sleep(1000);
+      } while (!snapshotFile.exists() && retries++ < 10);
+
+      Assert.assertTrue(snapshotFile + " does not exist", 
snapshotFile.exists());
+    } finally {
+      cluster.shutdown();
+    }
+
+    // delete the log segments from the leader
+    for (LogPathAndIndex path : logs) {
+      FileUtils.deleteFile(path.path.toFile());
+    }
+
+    // restart the peer
+    LOG.info("Restarting the cluster");
+    cluster.restart(false);
+    try {
+      assertLeaderContent(cluster);
+
+      // generate some more traffic
+      try(final RaftClient client = cluster.createClient("client",
+          cluster.getLeader().getId())) {
+        Assert.assertTrue(client.send(new SimpleMessage("test")).isSuccess());
+      }
+
+      // add two more peers
+      MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
+          new String[]{"s3", "s4"}, true);
+      // trigger setConfiguration
+      SetConfigurationRequest request = new SetConfigurationRequest("client",
+          cluster.getLeader().getId(), DEFAULT_SEQNUM, 
change.allPeersInNewConf);
+      LOG.info("Start changing the configuration: {}", request);
+      cluster.getLeader().setConfiguration(request);
+
+      RaftServerTestUtil.waitAndCheckNewConf(cluster, 
change.allPeersInNewConf, 0, null);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
new file mode 100644
index 0000000..cc82371
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.io.MD5Hash;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.LogInputStream;
+import org.apache.ratis.server.storage.LogOutputStream;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.MD5FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A {@link StateMachine} implementation example that simply stores all the log
+ * entries in a list. Mainly used for test.
+ *
+ * For snapshot it simply merges all the log segments together.
+ */
+public class SimpleStateMachine4Testing extends BaseStateMachine {
+  static volatile int SNAPSHOT_THRESHOLD = 100;
+  static final Logger LOG = 
LoggerFactory.getLogger(SimpleStateMachine4Testing.class);
+  public static final String RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_KEY
+      = "raft.test.simple.state.machine.take.snapshot";
+  public static final boolean 
RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT = false;
+
+  public static SimpleStateMachine4Testing get(RaftServerImpl s) {
+    return (SimpleStateMachine4Testing)RaftServerTestUtil.getStateMachine(s);
+  }
+
+  private final List<LogEntryProto> list =
+      Collections.synchronizedList(new ArrayList<>());
+  private final Daemon checkpointer;
+  private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
+  private final TermIndexTracker termIndexTracker = new TermIndexTracker();
+  private final RaftProperties properties = new RaftProperties();
+
+  private volatile boolean running = true;
+  private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
+
+  SimpleStateMachine4Testing() {
+    checkpointer = new Daemon(() -> {
+      while (running) {
+        try {
+          if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >=
+              SNAPSHOT_THRESHOLD) {
+            endIndexLastCkpt = takeSnapshot();
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ignored) {
+          }
+        } catch (IOException ioe) {
+          LOG.warn("Received IOException in Checkpointer", ioe);
+        }
+      }
+    });
+  }
+
+  @Override
+  public synchronized void initialize(String id, RaftProperties properties,
+      RaftStorage raftStorage) throws IOException {
+    LOG.info("Initializing " + getClass().getSimpleName() + ":" + id);
+    lifeCycle.startAndTransition(() -> {
+      super.initialize(id, properties, raftStorage);
+      storage.init(raftStorage);
+      loadSnapshot(storage.findLatestSnapshot());
+
+      if (properties.getBoolean(
+          RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_KEY,
+          RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT)) {
+        checkpointer.start();
+      }
+    });
+  }
+
+  @Override
+  public synchronized void pause() {
+    lifeCycle.transition(LifeCycle.State.PAUSING);
+    lifeCycle.transition(LifeCycle.State.PAUSED);
+  }
+
+  @Override
+  public synchronized void reinitialize(String id, RaftProperties properties,
+      RaftStorage storage) throws IOException {
+    LOG.info("Reinitializing " + getClass().getSimpleName() + ":" + id);
+    initialize(id, properties, storage);
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    LogEntryProto entry = trx.getLogEntry().get();
+    Preconditions.checkNotNull(entry);
+    list.add(entry);
+    termIndexTracker.update(ServerProtoUtils.toTermIndex(entry));
+    return CompletableFuture.completedFuture(
+        new SimpleMessage(entry.getIndex() + " OK"));
+  }
+
+  @Override
+  public long takeSnapshot() throws IOException {
+    TermIndex termIndex = termIndexTracker.getLatestTermIndex();
+    if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
+      return RaftServerConstants.INVALID_LOG_INDEX;
+    }
+    final long endIndex = termIndex.getIndex();
+
+    // TODO: snapshot should be written to a tmp file, then renamed
+    File snapshotFile = storage.getSnapshotFile(termIndex.getTerm(),
+        termIndex.getIndex());
+    LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", 
termIndex.getTerm(),
+        termIndex.getIndex(), snapshotFile);
+    try (LogOutputStream out = new LogOutputStream(snapshotFile, false, 
properties)) {
+      for (final LogEntryProto entry : list) {
+        if (entry.getIndex() > endIndex) {
+          break;
+        } else {
+          out.write(entry);
+        }
+      }
+      out.flush();
+    } catch (IOException e) {
+      LOG.warn("Failed to take snapshot", e);
+    }
+
+    try {
+      final MD5Hash digest = MD5FileUtil.computeMd5ForFile(snapshotFile);
+      MD5FileUtil.saveMD5File(snapshotFile, digest);
+    } catch (IOException e) {
+      LOG.warn("Hit IOException when computing MD5 for snapshot file "
+          + snapshotFile, e);
+    }
+
+    try {
+      this.storage.loadLatestSnapshot();
+    } catch (IOException e) {
+      LOG.warn("Hit IOException when loading latest snapshot for snapshot file 
"
+          + snapshotFile, e);
+    }
+    // TODO: purge log segments
+    return endIndex;
+  }
+
+  @Override
+  public SimpleStateMachineStorage getStateMachineStorage() {
+    return storage;
+  }
+
+  public synchronized long loadSnapshot(SingleFileSnapshotInfo snapshot)
+      throws IOException {
+    if (snapshot == null || !snapshot.getFile().getPath().toFile().exists()) {
+      LOG.info("The snapshot file {} does not exist",
+          snapshot == null ? null : snapshot.getFile());
+      return RaftServerConstants.INVALID_LOG_INDEX;
+    } else {
+      LOG.info("Loading snapshot with t:{}, i:{}, file:{}", snapshot.getTerm(),
+          snapshot.getIndex(), snapshot.getFile().getPath());
+      final long endIndex = snapshot.getIndex();
+      try (LogInputStream in = new LogInputStream(
+          snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
+        LogEntryProto entry;
+        while ((entry = in.nextEntry()) != null) {
+          list.add(entry);
+          termIndexTracker.update(ServerProtoUtils.toTermIndex(entry));
+        }
+      }
+      Preconditions.checkState(
+          !list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(),
+          "endIndex=%s, list=%s", endIndex, list);
+      this.endIndexLastCkpt = endIndex;
+      termIndexTracker.init(snapshot.getTermIndex());
+      this.storage.loadLatestSnapshot();
+      return endIndex;
+    }
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> query(
+      RaftClientRequest request) {
+    return CompletableFuture.completedFuture(
+        new RaftClientReply(request, new SimpleMessage("query success")));
+  }
+
+  @Override
+  public TransactionContext startTransaction(RaftClientRequest request)
+      throws IOException {
+    return new TransactionContext(this, request, SMLogEntryProto.newBuilder()
+        .setData(request.getMessage().getContent())
+        .build());
+  }
+
+  @Override
+  public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
+    // do nothing
+  }
+
+  @Override
+  public void close() {
+    lifeCycle.checkStateAndClose(() -> {
+      running = false;
+      checkpointer.interrupt();
+    });
+  }
+
+  public LogEntryProto[] getContent() {
+    return list.toArray(new LogEntryProto[list.size()]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java
new file mode 100644
index 0000000..31768e8
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+
+import org.apache.ratis.server.protocol.TermIndex;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Tracks the term index that is applied to the StateMachine for simple state 
machines with
+ * no concurrent snapshoting capabilities.
+ */
+class TermIndexTracker {
+  static final TermIndex INIT_TERMINDEX =
+      TermIndex.newTermIndex(INVALID_LOG_INDEX, INVALID_LOG_INDEX);
+
+  private TermIndex current = INIT_TERMINDEX;
+
+  //TODO: developer note: everything is synchronized for now for convenience.
+
+  /**
+   * Initialize the tracker with a term index (likely from a snapshot).
+   */
+  public synchronized void init(TermIndex termIndex) {
+    this.current = termIndex;
+  }
+
+  public synchronized void reset() {
+    init(INIT_TERMINDEX);
+  }
+
+  /**
+   * Update the tracker with a new TermIndex. It means that the StateMachine 
has
+   * this index in memory.
+   */
+  public synchronized void update(TermIndex termIndex) {
+    Preconditions.checkArgument(termIndex != null &&
+        termIndex.compareTo(current) >= 0);
+    this.current = termIndex;
+  }
+
+  /**
+   * Return latest term and index that is inserted to this tracker as an atomic
+   * entity.
+   */
+  public synchronized TermIndex getLatestTermIndex() {
+    return current;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
new file mode 100644
index 0000000..cdce568
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Test StateMachine related functionality
+ */
+public class TestStateMachine {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  public static final int NUM_SERVERS = 5;
+
+  private final RaftProperties properties = new RaftProperties();
+  {
+    // TODO: fix and run with in-memory log. It fails with NPE
+    properties.setBoolean(RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY, 
false);
+  }
+
+  private MiniRaftClusterWithSimulatedRpc cluster;
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(60 * 1000);
+
+  @Before
+  public void setup() throws IOException {
+  }
+
+  private void startCluster() {
+    cluster = new MiniRaftClusterWithSimulatedRpc(NUM_SERVERS, properties);
+    Assert.assertNull(getCluster().getLeader());
+    getCluster().start();
+  }
+
+  @After
+  public void tearDown() {
+    final MiniRaftCluster cluster = getCluster();
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  public MiniRaftClusterWithSimulatedRpc getCluster() {
+    return cluster;
+  }
+
+  public RaftProperties getProperties() {
+    return properties;
+  }
+
+  static class SMTransactionContext extends SimpleStateMachine4Testing {
+    public static SMTransactionContext get(RaftServerImpl s) {
+      return (SMTransactionContext)RaftServerTestUtil.getStateMachine(s);
+    }
+
+    AtomicReference<Throwable> throwable = new AtomicReference<>(null);
+    AtomicLong transactions = new AtomicLong(0);
+    AtomicBoolean isLeader = new AtomicBoolean(false);
+    AtomicLong numApplied = new AtomicLong(0);
+    ConcurrentLinkedQueue<Long> applied = new ConcurrentLinkedQueue<>();
+
+    @Override
+    public TransactionContext startTransaction(RaftClientRequest request) 
throws IOException {
+      // only leader will get this call
+      isLeader.set(true);
+      // send the next transaction id as the "context" from SM
+      return new TransactionContext(this, request, SMLogEntryProto.newBuilder()
+          .setData(request.getMessage().getContent())
+          .build(), transactions.incrementAndGet());
+    }
+
+    @Override
+    public CompletableFuture<Message> applyTransaction(TransactionContext trx) 
{
+      try {
+        assertTrue(trx.getLogEntry().isPresent());
+        assertTrue(trx.getSMLogEntry().isPresent());
+        Optional<Object> context = trx.getStateMachineContext();
+        if (isLeader.get()) {
+          assertTrue(trx.getClientRequest().isPresent());
+          assertTrue(context.isPresent());
+          assertTrue(context.get() instanceof Long);
+          Long val = (Long)context.get();
+          assertTrue(val <= transactions.get());
+          applied.add(val);
+        } else {
+          assertFalse(trx.getClientRequest().isPresent());
+          assertFalse(context.isPresent());
+        }
+        numApplied.incrementAndGet();
+      } catch (Throwable t) {
+        throwable.set(t);
+      }
+      return CompletableFuture.completedFuture(null);
+    }
+
+    void rethrowIfException() throws Throwable {
+      Throwable t = throwable.get();
+      if (t != null) {
+        throw t;
+      }
+    }
+  }
+
+  @Test
+  public void testTransactionContextIsPassedBack() throws Throwable {
+    // tests that the TrxContext set by the StateMachine in Leader is passed 
back to the SM
+    properties.setClass(
+        MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SMTransactionContext.class, StateMachine.class);
+    startCluster();
+
+    int numTrx = 100;
+    final RaftTestUtil.SimpleMessage[] messages = 
RaftTestUtil.SimpleMessage.create(numTrx);
+    try(final RaftClient client = cluster.createClient("client", null)) {
+      for (RaftTestUtil.SimpleMessage message : messages) {
+        client.send(message);
+      }
+    }
+
+    // TODO: there eshould be a better way to ensure all data is replicated 
and applied
+    Thread.sleep(cluster.getMaxTimeout() + 100);
+
+    for (RaftServerImpl raftServer : cluster.getServers()) {
+      final SMTransactionContext sm = SMTransactionContext.get(raftServer);
+      sm.rethrowIfException();
+      assertEquals(numTrx, sm.numApplied.get());
+    }
+
+    // check leader
+    RaftServerImpl raftServer = cluster.getLeader();
+    // assert every transaction has obtained context in leader
+    final SMTransactionContext sm = SMTransactionContext.get(raftServer);
+    List<Long> ll = sm.applied.stream().collect(Collectors.toList());
+    Collections.sort(ll);
+    assertEquals(ll.toString(), ll.size(), numTrx);
+    for (int i=0; i < numTrx; i++) {
+      assertEquals(ll.toString(), Long.valueOf(i+1), ll.get(i));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/resources/log4j.properties 
b/ratis-server/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ced0687
--- /dev/null
+++ b/ratis-server/src/test/resources/log4j.properties
@@ -0,0 +1,18 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n

Reply via email to