http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
new file mode 100644
index 0000000..6ed70fa
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_WRITE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_WRITE_BUFFER_SIZE_KEY;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.zip.Checksum;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.PureJavaCrc32C;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogOutputStream implements Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LogOutputStream.class);
+
+  private static final ByteBuffer fill;
+  private static final int BUFFER_SIZE = 1024 * 1024; // 1 MB
+  static {
+    fill = ByteBuffer.allocateDirect(BUFFER_SIZE);
+    fill.position(0);
+    for (int i = 0; i < fill.capacity(); i++) {
+      fill.put(RaftServerConstants.LOG_TERMINATE_BYTE);
+    }
+  }
+
+  private File file;
+  private FileChannel fc; // channel of the file stream for sync
+  private BufferedWriteChannel out; // buffered FileChannel for writing
+  private final Checksum checksum;
+
+  private final long segmentMaxSize;
+  private final long preallocatedSize;
+  private long preallocatedPos;
+
+  public LogOutputStream(File file, boolean append, RaftProperties properties)
+      throws IOException {
+    this.file = file;
+    this.checksum = new PureJavaCrc32C();
+    this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY,
+        RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT);
+    this.preallocatedSize = properties.getLong(
+        RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY,
+        RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
+    RandomAccessFile rp = new RandomAccessFile(file, "rw");
+    fc = rp.getChannel();
+    fc.position(fc.size());
+    preallocatedPos = fc.size();
+
+    int bufferSize = properties.getInt(RAFT_LOG_WRITE_BUFFER_SIZE_KEY,
+        RAFT_LOG_WRITE_BUFFER_SIZE_DEFAULT);
+    out = new BufferedWriteChannel(fc, bufferSize);
+
+    if (!append) {
+      create();
+    }
+  }
+
+  /**
+   * Format:
+   * LogEntryProto's protobuf
+   * 4-byte checksum of the above protobuf
+   */
+  public void write(LogEntryProto entry) throws IOException {
+    final int serialized = entry.getSerializedSize();
+    final int bufferSize = CodedOutputStream.computeUInt32SizeNoTag(serialized)
+        + serialized;
+
+    preallocateIfNecessary(bufferSize + 4);
+
+    byte[] buf = new byte[bufferSize];
+    CodedOutputStream cout = CodedOutputStream.newInstance(buf);
+    cout.writeUInt32NoTag(serialized);
+    entry.writeTo(cout);
+
+    checksum.reset();
+    checksum.update(buf, 0, buf.length);
+    final int sum = (int) checksum.getValue();
+
+    out.write(buf);
+    writeInt(sum);
+  }
+
+  private void writeInt(int v) throws IOException {
+    out.write((v >>> 24) & 0xFF);
+    out.write((v >>> 16) & 0xFF);
+    out.write((v >>>  8) & 0xFF);
+    out.write((v) & 0xFF);
+  }
+
+  private void create() throws IOException {
+    fc.truncate(0);
+    fc.position(0);
+    preallocatedPos = 0;
+    preallocate(); // preallocate file
+
+    out.write(SegmentedRaftLog.HEADER_BYTES);
+    flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      out.flush(false);
+      if (fc != null && fc.isOpen()) {
+        fc.truncate(fc.position());
+      }
+    } finally {
+      RaftUtils.cleanup(LOG, fc, out);
+      fc = null;
+      out = null;
+    }
+  }
+
+  /**
+   * Flush data to persistent store.
+   * Collect sync metrics.
+   */
+  public void flush() throws IOException {
+    if (out == null) {
+      throw new IOException("Trying to use aborted output stream");
+    }
+    out.flush(true);
+  }
+
+  private void preallocate() throws IOException {
+    fill.position(0);
+    long targetSize = Math.min(segmentMaxSize - fc.size(), preallocatedSize);
+    int allocated = 0;
+    while (allocated < targetSize) {
+      int size = (int) Math.min(BUFFER_SIZE, targetSize - allocated);
+      ByteBuffer buffer = fill.slice();
+      buffer.limit(size);
+      RaftUtils.writeFully(fc, buffer, preallocatedPos);
+      preallocatedPos += size;
+      allocated += size;
+    }
+    LOG.debug("Pre-allocated {} bytes for the log segment", allocated);
+  }
+
+  private void preallocateIfNecessary(int size) throws IOException {
+    if (out.position() + size > preallocatedPos) {
+      preallocate();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "(" + file + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
new file mode 100644
index 0000000..0e5a168
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.io.BufferedInputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.Checksum;
+
+import org.apache.commons.io.Charsets;
+import org.apache.ratis.protocol.ChecksumException;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.shaded.com.google.protobuf.CodedInputStream;
+import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.PureJavaCrc32C;
+import org.apache.ratis.util.RaftUtils;
+
+import com.google.common.base.Preconditions;
+
+public class LogReader implements Closeable {
+  /**
+   * InputStream wrapper that keeps track of the current stream position.
+   *
+   * This stream also allows us to set a limit on how many bytes we can read
+   * without getting an exception.
+   */
+  public static class LimitedInputStream extends FilterInputStream {
+    private long curPos = 0;
+    private long markPos = -1;
+    private long limitPos = Long.MAX_VALUE;
+
+    public LimitedInputStream(InputStream is) {
+      super(is);
+    }
+
+    private void checkLimit(long amt) throws IOException {
+      long extra = (curPos + amt) - limitPos;
+      if (extra > 0) {
+        throw new IOException("Tried to read " + amt + " byte(s) past " +
+            "the limit at offset " + limitPos);
+      }
+    }
+
+    @Override
+    public int read() throws IOException {
+      checkLimit(1);
+      int ret = super.read();
+      if (ret != -1) curPos++;
+      return ret;
+    }
+
+    @Override
+    public int read(byte[] data) throws IOException {
+      checkLimit(data.length);
+      int ret = super.read(data);
+      if (ret > 0) curPos += ret;
+      return ret;
+    }
+
+    @Override
+    public int read(byte[] data, int offset, int length) throws IOException {
+      checkLimit(length);
+      int ret = super.read(data, offset, length);
+      if (ret > 0) curPos += ret;
+      return ret;
+    }
+
+    public void setLimit(long limit) {
+      limitPos = curPos + limit;
+    }
+
+    public void clearLimit() {
+      limitPos = Long.MAX_VALUE;
+    }
+
+    @Override
+    public void mark(int limit) {
+      super.mark(limit);
+      markPos = curPos;
+    }
+
+    @Override
+    public void reset() throws IOException {
+      if (markPos == -1) {
+        throw new IOException("Not marked!");
+      }
+      super.reset();
+      curPos = markPos;
+      markPos = -1;
+    }
+
+    public long getPos() {
+      return curPos;
+    }
+
+    @Override
+    public long skip(long amt) throws IOException {
+      long extra = (curPos + amt) - limitPos;
+      if (extra > 0) {
+        throw new IOException("Tried to skip " + extra + " bytes past " +
+            "the limit at offset " + limitPos);
+      }
+      long ret = super.skip(amt);
+      curPos += ret;
+      return ret;
+    }
+  }
+
+  private static final int maxOpSize = 32 * 1024 * 1024;
+
+  private final LimitedInputStream limiter;
+  private final DataInputStream in;
+  private byte[] temp = new byte[4096];
+  private final Checksum checksum;
+
+  LogReader(File file) throws FileNotFoundException {
+    this.limiter = new LimitedInputStream(
+        new BufferedInputStream(new FileInputStream(file)));
+    in = new DataInputStream(limiter);
+    checksum = new PureJavaCrc32C();
+  }
+
+  String readLogHeader() throws IOException {
+    byte[] header = new byte[SegmentedRaftLog.HEADER_BYTES.length];
+    int num = in.read(header);
+    if (num < header.length) {
+      throw new EOFException("EOF before reading a complete log header");
+    }
+    return new String(header, Charsets.UTF_8);
+  }
+
+  /**
+   * Read a log entry from the input stream.
+   *
+   * @return the operation read from the stream, or null at the end of the
+   *         file
+   * @throws IOException on error.  This function should only throw an
+   *         exception when skipBrokenEdits is false.
+   */
+  LogEntryProto readEntry() throws IOException {
+    try {
+      return decodeEntry();
+    } catch (IOException e) {
+      in.reset();
+
+      throw e;
+    } catch (Throwable e) {
+      // raft log requires no gap between any two entries. thus if an entry is
+      // broken, throw the exception instead of skipping broken entries
+      in.reset();
+      throw new IOException("got unexpected exception " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Scan and validate a log entry.
+   * @return the index of the log entry
+   */
+  long scanEntry() throws IOException {
+    LogEntryProto entry = decodeEntry();
+    return entry != null ? entry.getIndex() : 
RaftServerConstants.INVALID_LOG_INDEX;
+  }
+
+  void verifyTerminator() throws IOException {
+     // The end of the log should contain 0x00 bytes.
+     // If it contains other bytes, the log itself may be corrupt.
+    limiter.clearLimit();
+    int numRead = -1, idx = 0;
+    while (true) {
+      try {
+        numRead = -1;
+        numRead = in.read(temp);
+        if (numRead == -1) {
+          return;
+        }
+        for (idx = 0; idx < numRead; idx++) {
+          if (temp[idx] != RaftServerConstants.LOG_TERMINATE_BYTE) {
+            throw new IOException("Read extra bytes after the terminator!");
+          }
+        }
+      } finally {
+        // After reading each group of bytes, we reposition the mark one
+        // byte before the next group. Similarly, if there is an error, we
+        // want to reposition the mark one byte before the error
+        if (numRead != -1) {
+          in.reset();
+          RaftUtils.skipFully(in, idx);
+          in.mark(temp.length + 1);
+          RaftUtils.skipFully(in, 1);
+        }
+      }
+    }
+  }
+
+  /**
+   * Decode the log entry "frame". This includes reading the log entry, and
+   * validating the checksum.
+   *
+   * The input stream will be advanced to the end of the op at the end of this
+   * function.
+   *
+   * @return The log entry, or null if we hit EOF.
+   */
+  private LogEntryProto decodeEntry() throws IOException {
+    limiter.setLimit(maxOpSize);
+    in.mark(maxOpSize);
+
+    byte nextByte;
+    try {
+      nextByte = in.readByte();
+    } catch (EOFException eof) {
+      // EOF at an opcode boundary is expected.
+      return null;
+    }
+    // Each log entry starts with a var-int. Thus a valid entry's first byte
+    // should not be 0. So if the terminate byte is 0, we should hit the end
+    // of the segment.
+    if (nextByte == RaftServerConstants.LOG_TERMINATE_BYTE) {
+      verifyTerminator();
+      return null;
+    }
+
+    // Here, we verify that the Op size makes sense and that the
+    // data matches its checksum before attempting to construct an Op.
+    int entryLength = CodedInputStream.readRawVarint32(nextByte, in);
+    if (entryLength > maxOpSize) {
+      throw new IOException("Entry has size " + entryLength
+          + ", but maxOpSize = " + maxOpSize);
+    }
+
+    final int varintLength = CodedOutputStream.computeUInt32SizeNoTag(
+        entryLength);
+    final int totalLength = varintLength + entryLength;
+    checkBufferSize(totalLength);
+    in.reset();
+    in.mark(maxOpSize);
+    RaftUtils.readFully(in, temp, 0, totalLength);
+
+    // verify checksum
+    checksum.reset();
+    checksum.update(temp, 0, totalLength);
+    int expectedChecksum = in.readInt();
+    int calculatedChecksum = (int) checksum.getValue();
+    if (expectedChecksum != calculatedChecksum) {
+      throw new ChecksumException("LogEntry is corrupt. Calculated checksum is 
"
+          + calculatedChecksum + " but read checksum " + expectedChecksum,
+          limiter.markPos);
+    }
+
+    // parse the buffer
+    return LogEntryProto.parseFrom(
+        CodedInputStream.newInstance(temp, varintLength, entryLength));
+  }
+
+  private void checkBufferSize(int entryLength) {
+    Preconditions.checkArgument(entryLength <= maxOpSize);
+    int length = temp.length;
+    if (length < entryLength) {
+      while (length < entryLength) {
+        length = Math.min(length * 2, maxOpSize);
+      }
+      temp = new byte[length];
+    }
+  }
+
+  long getPos() {
+    return limiter.getPos();
+  }
+
+  void skipFully(long length) throws IOException {
+    limiter.clearLimit();
+    RaftUtils.skipFully(limiter, length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    RaftUtils.cleanup(null, in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
new file mode 100644
index 0000000..af9ee66
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ratis.server.impl.ConfigurationManager;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.FileUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * In-memory cache for a log segment file. All the updates will be first 
written
+ * into LogSegment then into corresponding files in the same order.
+ *
+ * This class will be protected by the RaftServer's lock.
+ */
+class LogSegment implements Comparable<Long> {
+  static class LogRecord {
+    /** starting offset in the file */
+    final long offset;
+    final LogEntryProto entry;
+
+    LogRecord(long offset, LogEntryProto entry) {
+      this.offset = offset;
+      this.entry = entry;
+    }
+  }
+
+  static class SegmentFileInfo {
+    final long startIndex; // start index of the
+    final long endIndex; // original end index
+    final boolean isOpen;
+    final long targetLength; // position for truncation
+    final long newEndIndex; // new end index after the truncation
+
+    SegmentFileInfo(long start, long end, boolean isOpen, long targetLength,
+        long newEndIndex) {
+      this.startIndex = start;
+      this.endIndex = end;
+      this.isOpen = isOpen;
+      this.targetLength = targetLength;
+      this.newEndIndex = newEndIndex;
+    }
+  }
+
+  static long getEntrySize(LogEntryProto entry) {
+    final int serialized = entry.getSerializedSize();
+    return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 
4;
+  }
+
+  private boolean isOpen;
+  private final List<LogRecord> records = new ArrayList<>();
+  private long totalSize;
+  private final long startIndex;
+  private long endIndex;
+
+  private LogSegment(boolean isOpen, long start, long end) {
+    this.isOpen = isOpen;
+    this.startIndex = start;
+    this.endIndex = end;
+    totalSize = SegmentedRaftLog.HEADER_BYTES.length;
+  }
+
+  static LogSegment newOpenSegment(long start) {
+    Preconditions.checkArgument(start >= 0);
+    return new LogSegment(true, start, start - 1);
+  }
+
+  private static LogSegment newCloseSegment(long start, long end) {
+    Preconditions.checkArgument(start >= 0 && end >= start);
+    return new LogSegment(false, start, end);
+  }
+
+  static LogSegment loadSegment(File file, long start, long end, boolean 
isOpen,
+      ConfigurationManager confManager) throws IOException {
+    final LogSegment segment;
+    try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) {
+      segment = isOpen ? LogSegment.newOpenSegment(start) :
+          LogSegment.newCloseSegment(start, end);
+      LogEntryProto next;
+      LogEntryProto prev = null;
+      while ((next = in.nextEntry()) != null) {
+        if (prev != null) {
+          Preconditions.checkState(next.getIndex() == prev.getIndex() + 1,
+              "gap between entry %s and entry %s", prev, next);
+        }
+        segment.append(next);
+        if (confManager != null &&
+            next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
+          confManager.addConfiguration(next.getIndex(),
+              ServerProtoUtils.toRaftConfiguration(next.getIndex(),
+                  next.getConfigurationEntry()));
+        }
+        prev = next;
+      }
+    }
+
+    // truncate padding if necessary
+    if (file.length() > segment.getTotalSize()) {
+      FileUtils.truncateFile(file, segment.getTotalSize());
+    }
+
+    Preconditions.checkState(start == segment.records.get(0).entry.getIndex());
+    if (!isOpen) {
+      Preconditions.checkState(segment.getEndIndex() == end);
+    }
+    return segment;
+  }
+
+  long getStartIndex() {
+    return startIndex;
+  }
+
+  long getEndIndex() {
+    return endIndex;
+  }
+
+  boolean isOpen() {
+    return isOpen;
+  }
+
+  int numOfEntries() {
+    return (int) (endIndex - startIndex + 1);
+  }
+
+  void appendToOpenSegment(LogEntryProto... entries) {
+    Preconditions.checkState(isOpen(),
+        "The log segment %s is not open for append", this.toString());
+    append(entries);
+  }
+
+  private void append(LogEntryProto... entries) {
+    Preconditions.checkArgument(entries != null && entries.length > 0);
+    final long term = entries[0].getTerm();
+    if (records.isEmpty()) {
+      Preconditions.checkArgument(entries[0].getIndex() == startIndex,
+          "gap between start index %s and first entry to append %s",
+          startIndex, entries[0].getIndex());
+    }
+    for (LogEntryProto entry : entries) {
+      // all these entries should be of the same term
+      Preconditions.checkArgument(entry.getTerm() == term,
+          "expected term:%s, term of the entry:%s", term, entry.getTerm());
+      final LogRecord currentLast = getLastRecord();
+      if (currentLast != null) {
+        Preconditions.checkArgument(
+            entry.getIndex() == currentLast.entry.getIndex() + 1,
+            "gap between entries %s and %s", entry.getIndex(),
+            currentLast.entry.getIndex());
+      }
+
+      final LogRecord record = new LogRecord(totalSize, entry);
+      records.add(record);
+      totalSize += getEntrySize(entry);
+      endIndex = entry.getIndex();
+    }
+  }
+
+  LogRecord getLogRecord(long index) {
+    if (index >= startIndex && index <= endIndex) {
+      return records.get((int) (index - startIndex));
+    }
+    return null;
+  }
+
+  LogRecord getLastRecord() {
+    return records.isEmpty() ? null : records.get(records.size() - 1);
+  }
+
+  long getTotalSize() {
+    return totalSize;
+  }
+
+  /**
+   * Remove records from the given index (inclusive)
+   */
+  void truncate(long fromIndex) {
+    Preconditions.checkArgument(fromIndex >= startIndex && fromIndex <= 
endIndex);
+    LogRecord record = records.get((int) (fromIndex - startIndex));
+    for (long index = endIndex; index >= fromIndex; index--) {
+      records.remove((int)(index - startIndex));
+    }
+    totalSize = record.offset;
+    isOpen = false;
+    this.endIndex = fromIndex - 1;
+  }
+
+  void close() {
+    Preconditions.checkState(isOpen());
+    isOpen = false;
+  }
+
+  @Override
+  public String toString() {
+    return isOpen() ? "log-" + startIndex + "-inprogress" :
+        "log-" + startIndex + "-" + endIndex;
+  }
+
+  @Override
+  public int compareTo(Long l) {
+    return (l >= getStartIndex() && l <= getEndIndex()) ? 0 :
+        (this.getEndIndex() < l ? -1 : 1);
+  }
+
+  void clear() {
+    records.clear();
+    endIndex = startIndex - 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
new file mode 100644
index 0000000..8a275ec
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.CodeInjectionForTesting;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A simple RaftLog implementation in memory. Used only for testing.
+ */
+public class MemoryRaftLog extends RaftLog {
+  private final List<LogEntryProto> entries = new ArrayList<>();
+
+  public MemoryRaftLog(String selfId) {
+    super(selfId);
+  }
+
+  @Override
+  public LogEntryProto get(long index) {
+    checkLogState();
+    try(AutoCloseableLock readLock = readLock()) {
+      final int i = (int) index;
+      return i >= 0 && i < entries.size() ? entries.get(i) : null;
+    }
+  }
+
+  @Override
+  public LogEntryProto[] getEntries(long startIndex, long endIndex) {
+    checkLogState();
+    try(AutoCloseableLock readLock = readLock()) {
+      final int i = (int) startIndex;
+      if (startIndex >= entries.size()) {
+        return null;
+      }
+      final int toIndex = (int) Math.min(entries.size(), endIndex);
+      return entries.subList(i, toIndex).toArray(EMPTY_LOGENTRY_ARRAY);
+    }
+  }
+
+  @Override
+  void truncate(long index) {
+    checkLogState();
+    try(AutoCloseableLock writeLock = writeLock()) {
+      Preconditions.checkArgument(index >= 0);
+      final int truncateIndex = (int) index;
+      for (int i = entries.size() - 1; i >= truncateIndex; i--) {
+        entries.remove(i);
+      }
+    }
+  }
+
+  @Override
+  public LogEntryProto getLastEntry() {
+    checkLogState();
+    try(AutoCloseableLock readLock = readLock()) {
+      final int size = entries.size();
+      return size == 0 ? null : entries.get(size - 1);
+    }
+  }
+
+  @Override
+  void appendEntry(LogEntryProto entry) {
+    checkLogState();
+    try(AutoCloseableLock writeLock = writeLock()) {
+      entries.add(entry);
+    }
+  }
+
+  @Override
+  public long append(long term, RaftConfiguration newConf) {
+    checkLogState();
+    try(AutoCloseableLock writeLock = writeLock()) {
+      final long nextIndex = getNextIndex();
+      final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term,
+          nextIndex);
+      entries.add(e);
+      return nextIndex;
+    }
+  }
+
+  @Override
+  public long getStartIndex() {
+    return entries.isEmpty() ? RaftServerConstants.INVALID_LOG_INDEX :
+        entries.get(0).getIndex();
+  }
+
+  @Override
+  public void append(LogEntryProto... entries) {
+    checkLogState();
+    try(AutoCloseableLock writeLock = writeLock()) {
+      if (entries == null || entries.length == 0) {
+        return;
+      }
+      // Before truncating the entries, we first need to check if some
+      // entries are duplicated. If the leader sends entry 6, entry 7, then
+      // entry 6 again, without this check the follower may truncate entry 7
+      // when receiving entry 6 again. Then before the leader detects this
+      // truncation in the next appendEntries RPC, leader may think entry 7 has
+      // been committed but in the system the entry has not been committed to
+      // the quorum of peers' disks.
+      // TODO add a unit test for this
+      boolean toTruncate = false;
+      int truncateIndex = (int) entries[0].getIndex();
+      int index = 0;
+      for (; truncateIndex < getNextIndex() && index < entries.length;
+           index++, truncateIndex++) {
+        if (this.entries.get(truncateIndex).getTerm() !=
+            entries[index].getTerm()) {
+          toTruncate = true;
+          break;
+        }
+      }
+      if (toTruncate) {
+        truncate(truncateIndex);
+      }
+      //  Collections.addAll(this.entries, entries);
+      for (int i = index; i < entries.length; i++) {
+        this.entries.add(entries[i]);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "last=" + ServerProtoUtils.toString(getLastEntry())
+        + ", committed="
+        + ServerProtoUtils.toString(get(getLastCommittedIndex()));
+  }
+
+  public String getEntryString() {
+    return "entries=" + entries;
+  }
+
+  @Override
+  public void logSync() {
+    CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
+    // do nothing
+  }
+
+  @Override
+  public long getLatestFlushedIndex() {
+    return getNextIndex() - 1;
+  }
+
+  @Override
+  public void writeMetadata(long term, String votedFor) {
+    // do nothing
+  }
+
+  @Override
+  public Metadata loadMetadata() {
+    return new Metadata(null, 0);
+  }
+
+  @Override
+  public void syncWithSnapshot(long lastSnapshotIndex) {
+    // do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
new file mode 100644
index 0000000..8deb7e3
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import com.google.common.base.Charsets;
+
+import org.apache.ratis.util.AtomicFileOutputStream;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Properties;
+
+/**
+ * Class that represents a file on disk which persistently stores
+ * a single <code>long</code> value. The file is updated atomically
+ * and durably (i.e fsynced).
+ */
+class MetaFile {
+  private static final Logger LOG = LoggerFactory.getLogger(MetaFile.class);
+  private static final String TERM_KEY = "term";
+  private static final String VOTEDFOR_KEY = "votedFor";
+  static final long DEFAULT_TERM = 0;
+  static final String EMPTY_VOTEFOR = "";
+
+  private final File file;
+  private boolean loaded = false;
+  private long term;
+  private String votedFor;
+
+  MetaFile(File file) {
+    this.file = file;
+    term = DEFAULT_TERM;
+    votedFor = EMPTY_VOTEFOR;
+  }
+
+  boolean exists() {
+    return this.file.exists();
+  }
+
+  long getTerm() throws IOException {
+    if (!loaded) {
+      readFile();
+      loaded = true;
+    }
+    return term;
+  }
+
+  String getVotedFor() throws IOException {
+    if (!loaded) {
+      readFile();
+      loaded = true;
+    }
+    return votedFor;
+  }
+
+  void set(long newTerm, String newVotedFor) throws IOException {
+    newVotedFor = newVotedFor == null ? EMPTY_VOTEFOR : newVotedFor;
+    if (!loaded || (newTerm != term || !newVotedFor.equals(votedFor))) {
+      writeFile(newTerm, newVotedFor);
+    }
+    term = newTerm;
+    votedFor = newVotedFor;
+    loaded = true;
+  }
+
+  /**
+   * Atomically write the given term and votedFor information to the given 
file,
+   * including fsyncing.
+   *
+   * @throws IOException if the file cannot be written
+   */
+  void writeFile(long term, String votedFor) throws IOException {
+    AtomicFileOutputStream fos = new AtomicFileOutputStream(file);
+    Properties properties = new Properties();
+    properties.setProperty(TERM_KEY, Long.toString(term));
+    properties.setProperty(VOTEDFOR_KEY, votedFor);
+    try {
+      properties.store(
+          new BufferedWriter(new OutputStreamWriter(fos, Charsets.UTF_8)), "");
+      fos.close();
+      fos = null;
+    } finally {
+      if (fos != null) {
+        fos.abort();
+      }
+    }
+  }
+
+  void readFile() throws IOException {
+    term = DEFAULT_TERM;
+    votedFor = EMPTY_VOTEFOR;
+    if (file.exists()) {
+      BufferedReader br = new BufferedReader(
+          new InputStreamReader(new FileInputStream(file), Charsets.UTF_8));
+      try {
+        Properties properties = new Properties();
+        properties.load(br);
+        if (properties.containsKey(TERM_KEY) &&
+            properties.containsKey(VOTEDFOR_KEY)) {
+          term = Long.parseLong((String) properties.get(TERM_KEY));
+          votedFor = (String) properties.get(VOTEDFOR_KEY);
+        } else {
+          throw new IOException("Corrupted term/votedFor properties: "
+              + properties);
+        }
+      } catch(IOException e) {
+        LOG.warn("Cannot load term/votedFor properties from {}", file, e);
+        throw e;
+      } finally {
+        RaftUtils.cleanup(LOG, br);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
new file mode 100644
index 0000000..05307f2
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ratis.server.impl.ConfigurationManager;
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.ProtoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base class of RaftLog. Currently we provide two types of RaftLog
+ * implementation:
+ * 1. MemoryRaftLog: all the log entries are stored in memory. This is only 
used
+ *    for testing.
+ * 2. Segmented RaftLog: the log entries are persisted on disk, and are stored
+ *    in segments.
+ */
+public abstract class RaftLog implements Closeable {
+  public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
+  public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new 
LogEntryProto[0];
+  public static final String LOG_SYNC = RaftLog.class.getSimpleName() + 
".logSync";
+
+  /**
+   * The largest committed index. Note the last committed log may be included
+   * in the latest snapshot file.
+   */
+  protected final AtomicLong lastCommitted =
+      new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX);
+  private final String selfId;
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+  private volatile boolean isOpen = false;
+
+  public RaftLog(String selfId) {
+    this.selfId = selfId;
+  }
+
+  public long getLastCommittedIndex() {
+    return lastCommitted.get();
+  }
+
+  public void checkLogState() {
+    Preconditions.checkState(isOpen,
+        "The RaftLog has not been opened or has been closed");
+  }
+
+  /**
+   * Update the last committed index.
+   * @param majorityIndex the index that has achieved majority.
+   * @param currentTerm the current term.
+   */
+  public void updateLastCommitted(long majorityIndex, long currentTerm) {
+    try(AutoCloseableLock writeLock = writeLock()) {
+      if (lastCommitted.get() < majorityIndex) {
+        // Only update last committed index for current term. See §5.4.2 in
+        // paper for details.
+        final LogEntryProto entry = get(majorityIndex);
+        if (entry != null && entry.getTerm() == currentTerm) {
+          LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex);
+          lastCommitted.set(majorityIndex);
+        }
+      }
+    }
+  }
+
+  /**
+   * Does the log contains the given term and index? Used to check the
+   * consistency between the local log of a follower and the log entries sent
+   * by the leader.
+   */
+  public boolean contains(TermIndex ti) {
+    if (ti == null) {
+      return false;
+    }
+    LogEntryProto entry = get(ti.getIndex());
+    TermIndex local = ServerProtoUtils.toTermIndex(entry);
+    return ti.equals(local);
+  }
+
+  /**
+   * @return the index of the next log entry to append.
+   */
+  public long getNextIndex() {
+    final LogEntryProto last = getLastEntry();
+    if (last == null) {
+      // if the log is empty, the last committed index should be consistent 
with
+      // the last index included in the latest snapshot.
+      return getLastCommittedIndex() + 1;
+    }
+    return last.getIndex() + 1;
+  }
+
+  /**
+   * Generate a log entry for the given term and message, and append the entry.
+   * Used by the leader.
+   * @return the index of the new log entry.
+   */
+  public long append(long term, TransactionContext operation) throws 
IOException {
+    checkLogState();
+    try(AutoCloseableLock writeLock = writeLock()) {
+      final long nextIndex = getNextIndex();
+
+      // This is called here to guarantee strict serialization of callback 
executions in case
+      // the SM wants to attach a logic depending on ordered execution in the 
log commit order.
+      operation = operation.preAppendTransaction();
+
+      // build the log entry after calling the StateMachine
+      final LogEntryProto e = ProtoUtils.toLogEntryProto(
+          operation.getSMLogEntry().get(), term, nextIndex);
+
+      appendEntry(e);
+      operation.setLogEntry(e);
+      return nextIndex;
+    }
+  }
+
+  /**
+   * Generate a log entry for the given term and configurations,
+   * and append the entry. Used by the leader.
+   * @return the index of the new log entry.
+   */
+  public long append(long term, RaftConfiguration newConf) {
+    checkLogState();
+    try(AutoCloseableLock writeLock = writeLock()) {
+      final long nextIndex = getNextIndex();
+      final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term,
+          nextIndex);
+      appendEntry(e);
+      return nextIndex;
+    }
+  }
+
+  public void open(ConfigurationManager confManager, long lastIndexInSnapshot)
+      throws IOException {
+    isOpen = true;
+  }
+
+  public abstract long getStartIndex();
+
+  /**
+   * Get the log entry of the given index.
+   *
+   * @param index The given index.
+   * @return The log entry associated with the given index.
+   *         Null if there is no log entry with the index.
+   */
+  public abstract LogEntryProto get(long index);
+
+  /**
+   * @param startIndex the starting log index (inclusive)
+   * @param endIndex the ending log index (exclusive)
+   * @return all log entries within the given index range. Null if startIndex
+   *         is greater than the smallest available index.
+   */
+  public abstract LogEntryProto[] getEntries(long startIndex, long endIndex);
+
+  /**
+   * @return the last log entry.
+   */
+  public abstract LogEntryProto getLastEntry();
+
+  /**
+   * Truncate the log entries till the given index. The log with the given 
index
+   * will also be truncated (i.e., inclusive).
+   */
+  abstract void truncate(long index);
+
+  /**
+   * Used by the leader when appending a new entry based on client's request
+   * or configuration change.
+   */
+  abstract void appendEntry(LogEntryProto entry);
+
+  /**
+   * Append all the given log entries. Used by the followers.
+   *
+   * If an existing entry conflicts with a new one (same index but different
+   * terms), delete the existing entry and all entries that follow it (§5.3).
+   *
+   * This method, {@link #append(long, TransactionContext)},
+   * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)},
+   * do not guarantee the changes are persisted.
+   * Need to call {@link #logSync()} to persist the changes.
+   */
+  public abstract void append(LogEntryProto... entries);
+
+  /**
+   * Flush and sync the log.
+   * It is triggered by AppendEntries RPC request from the leader.
+   */
+  public abstract void logSync() throws InterruptedException;
+
+  /**
+   * @return the index of the latest entry that has been flushed to the local
+   *         storage.
+   */
+  public abstract long getLatestFlushedIndex();
+
+  /**
+   * Write and flush the metadata (votedFor and term) into the meta file.
+   *
+   * We need to guarantee that the order of writeMetadata calls is the same 
with
+   * that when we change the in-memory term/votedFor. Otherwise we may persist
+   * stale term/votedFor in file.
+   *
+   * Since the leader change is not frequent, currently we simply put this call
+   * in the RaftPeer's lock. Later we can use an IO task queue to enforce the
+   * order.
+   */
+  public abstract void writeMetadata(long term, String votedFor)
+      throws IOException;
+
+  public abstract Metadata loadMetadata() throws IOException;
+
+  public abstract void syncWithSnapshot(long lastSnapshotIndex);
+
+  @Override
+  public String toString() {
+    return ServerProtoUtils.toString(getLastEntry());
+  }
+
+  public static class Metadata {
+    private final String votedFor;
+    private final long term;
+
+    public Metadata(String votedFor, long term) {
+      this.votedFor = votedFor;
+      this.term = term;
+    }
+
+    public String getVotedFor() {
+      return votedFor;
+    }
+
+    public long getTerm() {
+      return term;
+    }
+  }
+
+  public AutoCloseableLock readLock() {
+    return AutoCloseableLock.acquire(lock.readLock());
+  }
+
+  public AutoCloseableLock writeLock() {
+    return AutoCloseableLock.acquire(lock.writeLock());
+  }
+
+  public boolean hasWriteLock() {
+    return this.lock.isWriteLockedByCurrentThread();
+  }
+
+  public boolean hasReadLock() {
+    return this.lock.getReadHoldCount() > 0 || hasWriteLock();
+  }
+
+  @Override
+  public void close() throws IOException {
+    isOpen = false;
+  }
+
+  public String getSelfId() {
+    return selfId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
new file mode 100644
index 0000000..90dd7fd
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.storage.LogSegment.LogRecord;
+import org.apache.ratis.server.storage.LogSegment.SegmentFileInfo;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * In-memory RaftLog Cache. Currently we provide a simple implementation that
+ * caches all the segments in the memory. The cache is not thread-safe and
+ * requires external lock protection.
+ */
+class RaftLogCache {
+  private LogSegment openSegment;
+  private final List<LogSegment> closedSegments;
+
+  RaftLogCache() {
+    closedSegments = new ArrayList<>();
+  }
+
+  private boolean areConsecutiveSegments(LogSegment prev, LogSegment segment) {
+    return !prev.isOpen() && prev.getEndIndex() + 1 == segment.getStartIndex();
+  }
+
+  private LogSegment getLastClosedSegment() {
+    return closedSegments.isEmpty() ?
+        null : closedSegments.get(closedSegments.size() - 1);
+  }
+
+  private void validateAdding(LogSegment segment) {
+    final LogSegment lastClosed = getLastClosedSegment();
+    if (!segment.isOpen()) {
+      Preconditions.checkState(lastClosed == null ||
+          areConsecutiveSegments(lastClosed, segment));
+    } else {
+      Preconditions.checkState(openSegment == null &&
+          (lastClosed == null || areConsecutiveSegments(lastClosed, segment)));
+    }
+  }
+
+  void addSegment(LogSegment segment) {
+    validateAdding(segment);
+    if (segment.isOpen()) {
+      openSegment = segment;
+    } else {
+      closedSegments.add(segment);
+    }
+  }
+
+  LogEntryProto getEntry(long index) {
+    if (openSegment != null && index >= openSegment.getStartIndex()) {
+      final LogRecord record = openSegment.getLogRecord(index);
+      return record == null ? null : record.entry;
+    } else {
+      int segmentIndex = Collections.binarySearch(closedSegments, index);
+      if (segmentIndex < 0) {
+        return null;
+      } else {
+        return closedSegments.get(segmentIndex).getLogRecord(index).entry;
+      }
+    }
+  }
+
+  /**
+   * @param startIndex inclusive
+   * @param endIndex exclusive
+   */
+  LogEntryProto[] getEntries(final long startIndex, final long endIndex) {
+    if (startIndex < 0 || startIndex < getStartIndex()) {
+      throw new IndexOutOfBoundsException("startIndex = " + startIndex
+          + ", log cache starts from index " + getStartIndex());
+    }
+    if (startIndex > endIndex) {
+      throw new IndexOutOfBoundsException("startIndex(" + startIndex
+          + ") > endIndex(" + endIndex + ")");
+    }
+    final long realEnd = Math.min(getEndIndex() + 1, endIndex);
+    if (startIndex >= realEnd) {
+      return RaftLog.EMPTY_LOGENTRY_ARRAY;
+    }
+
+    LogEntryProto[] entries = new LogEntryProto[(int) (realEnd - startIndex)];
+    int segmentIndex = Collections.binarySearch(closedSegments, startIndex);
+    if (segmentIndex < 0) {
+      getEntriesFromSegment(openSegment, startIndex, entries, 0, 
entries.length);
+    } else {
+      long index = startIndex;
+      for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; 
i++) {
+        LogSegment s = closedSegments.get(i);
+        int numberFromSegment = (int) Math.min(realEnd - index,
+            s.getEndIndex() - index + 1);
+        getEntriesFromSegment(s, index, entries, (int) (index - startIndex),
+            numberFromSegment);
+        index += numberFromSegment;
+      }
+      if (index < realEnd) {
+        getEntriesFromSegment(openSegment, index, entries,
+            (int) (index - startIndex), (int) (realEnd - index));
+      }
+    }
+    return entries;
+  }
+
+  private void getEntriesFromSegment(LogSegment segment, long startIndex,
+      LogEntryProto[] entries, int offset, int size) {
+    long endIndex = segment.getEndIndex();
+    endIndex = Math.min(endIndex, startIndex + size - 1);
+    int index = offset;
+    for (long i = startIndex; i <= endIndex; i++) {
+      entries[index++] = segment.getLogRecord(i).entry;
+    }
+  }
+
+  long getStartIndex() {
+    if (closedSegments.isEmpty()) {
+      return openSegment != null ? openSegment.getStartIndex() :
+          RaftServerConstants.INVALID_LOG_INDEX;
+    } else {
+      return closedSegments.get(0).getStartIndex();
+    }
+  }
+
+  @VisibleForTesting
+  long getEndIndex() {
+    return openSegment != null ? openSegment.getEndIndex() :
+        (closedSegments.isEmpty() ?
+            INVALID_LOG_INDEX :
+            closedSegments.get(closedSegments.size() - 1).getEndIndex());
+  }
+
+  LogEntryProto getLastEntry() {
+    return (openSegment != null && openSegment.numOfEntries() > 0) ?
+        openSegment.getLastRecord().entry :
+        (closedSegments.isEmpty() ? null :
+            closedSegments.get(closedSegments.size() - 
1).getLastRecord().entry);
+  }
+
+  LogSegment getOpenSegment() {
+    return openSegment;
+  }
+
+  void appendEntry(LogEntryProto entry) {
+    // SegmentedRaftLog does the segment creation/rolling work. Here we just
+    // simply append the entry into the open segment.
+    Preconditions.checkState(openSegment != null);
+    openSegment.appendToOpenSegment(entry);
+  }
+
+  /**
+   * finalize the current open segment, and start a new open segment
+   */
+  void rollOpenSegment(boolean createNewOpen) {
+    Preconditions.checkState(openSegment != null
+        && openSegment.numOfEntries() > 0);
+    final long nextIndex = openSegment.getEndIndex() + 1;
+    openSegment.close();
+    closedSegments.add(openSegment);
+    if (createNewOpen) {
+      openSegment = LogSegment.newOpenSegment(nextIndex);
+    } else {
+      openSegment = null;
+    }
+  }
+
+  private SegmentFileInfo deleteOpenSegment() {
+    final long oldEnd = openSegment.getEndIndex();
+    openSegment.clear();
+    SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(),
+        oldEnd, true, 0, openSegment.getEndIndex());
+    openSegment = null;
+    return info;
+  }
+
+  /**
+   * truncate log entries starting from the given index (inclusive)
+   */
+  TruncationSegments truncate(long index) {
+    int segmentIndex = Collections.binarySearch(closedSegments, index);
+    if (segmentIndex == -closedSegments.size() - 1) {
+      if (openSegment != null && openSegment.getEndIndex() >= index) {
+        final long oldEnd = openSegment.getEndIndex();
+        if (index == openSegment.getStartIndex()) {
+          // the open segment should be deleted
+          return new TruncationSegments(null,
+              Collections.singletonList(deleteOpenSegment()));
+        } else {
+          openSegment.truncate(index);
+          Preconditions.checkState(!openSegment.isOpen());
+          SegmentFileInfo info = new 
SegmentFileInfo(openSegment.getStartIndex(),
+              oldEnd, true, openSegment.getTotalSize(),
+              openSegment.getEndIndex());
+          closedSegments.add(openSegment);
+          openSegment = null;
+          return new TruncationSegments(info, Collections.emptyList());
+        }
+      }
+    } else if (segmentIndex >= 0) {
+      LogSegment ts = closedSegments.get(segmentIndex);
+      final long oldEnd = ts.getEndIndex();
+      List<SegmentFileInfo> list = new ArrayList<>();
+      ts.truncate(index);
+      final int size = closedSegments.size();
+      for (int i = size - 1;
+           i >= (ts.numOfEntries() == 0 ? segmentIndex : segmentIndex + 1);
+           i-- ) {
+        LogSegment s = closedSegments.remove(i);
+        final long endOfS = i == segmentIndex ? oldEnd : s.getEndIndex();
+        s.clear();
+        list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0,
+            s.getEndIndex()));
+      }
+      if (openSegment != null) {
+        list.add(deleteOpenSegment());
+      }
+      SegmentFileInfo t = ts.numOfEntries() == 0 ? null :
+          new SegmentFileInfo(ts.getStartIndex(), oldEnd, false,
+              ts.getTotalSize(), ts.getEndIndex());
+      return new TruncationSegments(t, list);
+    }
+    return null;
+  }
+
+  static class TruncationSegments {
+    final SegmentFileInfo toTruncate; // name of the file to be truncated
+    final SegmentFileInfo[] toDelete; // names of the files to be deleted
+
+    TruncationSegments(SegmentFileInfo toTruncate,
+        List<SegmentFileInfo> toDelete) {
+      this.toDelete = toDelete == null ? null :
+          toDelete.toArray(new SegmentFileInfo[toDelete.size()]);
+      this.toTruncate = toTruncate;
+    }
+  }
+
+  Iterator<LogEntryProto> iterator(long startIndex) {
+    return new EntryIterator(startIndex);
+  }
+
+  private class EntryIterator implements Iterator<LogEntryProto> {
+    private long nextIndex;
+    private LogSegment currentSegment;
+    private int segmentIndex;
+
+    EntryIterator(long start) {
+      this.nextIndex = start;
+      segmentIndex = Collections.binarySearch(closedSegments, nextIndex);
+      if (segmentIndex >= 0) {
+        currentSegment = closedSegments.get(segmentIndex);
+      } else {
+        segmentIndex = -segmentIndex - 1;
+        if (segmentIndex == closedSegments.size()) {
+          currentSegment = openSegment;
+        } else {
+          // the start index is smaller than the first closed segment's start
+          // index. We no longer keep the log entry (because of the snapshot) 
or
+          // the start index is invalid.
+          Preconditions.checkState(segmentIndex == 0);
+          throw new IndexOutOfBoundsException();
+        }
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return currentSegment != null &&
+          currentSegment.getLogRecord(nextIndex) != null;
+    }
+
+    @Override
+    public LogEntryProto next() {
+      LogRecord record;
+      if (currentSegment == null ||
+          (record = currentSegment.getLogRecord(nextIndex)) == null) {
+        throw new NoSuchElementException();
+      }
+      if (++nextIndex > currentSegment.getEndIndex()) {
+        if (currentSegment != openSegment) {
+          segmentIndex++;
+          currentSegment = segmentIndex == closedSegments.size() ?
+              openSegment : closedSegments.get(segmentIndex);
+        }
+      }
+      return record.entry;
+    }
+  }
+
+  @VisibleForTesting
+  int getNumOfSegments() {
+    return closedSegments.size() + (openSegment == null ? 0 : 1);
+  }
+
+  boolean isEmpty() {
+    return closedSegments.isEmpty() && openSegment == null;
+  }
+
+  void clear() {
+    openSegment = null;
+    closedSegments.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
new file mode 100644
index 0000000..e057a51
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_DEFAULT;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_KEY;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.io.nativeio.NativeIO;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.storage.LogSegment.SegmentFileInfo;
+import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
+import org.apache.ratis.server.storage.SegmentedRaftLog.Task;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class takes the responsibility of all the raft log related I/O ops for 
a
+ * raft peer.
+ */
+class RaftLogWorker implements Runnable {
+  static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class);
+  /**
+   * The task queue accessed by rpc handler threads and the io worker thread.
+   */
+  private final BlockingQueue<Task> queue = new ArrayBlockingQueue<>(4096);
+  private volatile boolean running = true;
+  private final Thread workerThread;
+
+  private final RaftStorage storage;
+  private LogOutputStream out;
+  private final RaftServerImpl raftServer;
+
+  /**
+   * The number of entries that have been written into the LogOutputStream but
+   * has not been flushed.
+   */
+  private int pendingFlushNum = 0;
+  /** the index of the last entry that has been written */
+  private long lastWrittenIndex;
+  /** the largest index of the entry that has been flushed */
+  private volatile long flushedIndex;
+
+  private final int forceSyncNum;
+
+  private final  RaftProperties properties;
+
+  RaftLogWorker(RaftServerImpl raftServer, RaftStorage storage,
+                RaftProperties properties) {
+    this.raftServer = raftServer;
+    this.storage = storage;
+    this.properties = properties;
+    this.forceSyncNum = properties.getInt(RAFT_LOG_FORCE_SYNC_NUM_KEY,
+        RAFT_LOG_FORCE_SYNC_NUM_DEFAULT);
+    workerThread = new Thread(this,
+        getClass().getSimpleName() + " for " + storage);
+  }
+
+  void start(long latestIndex, File openSegmentFile) throws IOException {
+    lastWrittenIndex = latestIndex;
+    flushedIndex = latestIndex;
+    if (openSegmentFile != null) {
+      Preconditions.checkArgument(openSegmentFile.exists());
+      out = new LogOutputStream(openSegmentFile, true, properties);
+    }
+    workerThread.start();
+  }
+
+  void close() {
+    this.running = false;
+    workerThread.interrupt();
+    try {
+      workerThread.join();
+    } catch (InterruptedException ignored) {
+    }
+  }
+
+  /**
+   * A snapshot has just been installed on the follower. Need to update the IO
+   * worker's state accordingly.
+   */
+  void syncWithSnapshot(long lastSnapshotIndex) {
+    queue.clear();
+    lastWrittenIndex = lastSnapshotIndex;
+    flushedIndex = lastSnapshotIndex;
+    pendingFlushNum = 0;
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "-"
+        + (raftServer != null ? raftServer.getId() : "");
+  }
+
+  /**
+   * This is protected by the RaftServer and RaftLog's lock.
+   */
+  private Task addIOTask(Task task) {
+    LOG.debug("add task {}", task);
+    try {
+      if (!queue.offer(task, 1, TimeUnit.SECONDS)) {
+        Preconditions.checkState(isAlive(),
+            "the worker thread is not alive");
+        queue.put(task);
+      }
+    } catch (Throwable t) {
+      if (t instanceof InterruptedException && !running) {
+        LOG.info("Got InterruptedException when adding task " + task
+            + ". The RaftLogWorker already stopped.");
+      } else {
+        ExitUtils.terminate(2, "Failed to add IO task " + task, t, LOG);
+      }
+    }
+    return task;
+  }
+
+  boolean isAlive() {
+    return running && workerThread.isAlive();
+  }
+
+  @Override
+  public void run() {
+    while (running) {
+      try {
+        Task task = queue.poll(1, TimeUnit.SECONDS);
+        if (task != null) {
+          try {
+            task.execute();
+          } catch (IOException e) {
+            if (task.getEndIndex() < lastWrittenIndex) {
+              LOG.info("Ignore IOException when handling task " + task
+                  + " which is smaller than the lastWrittenIndex."
+                  + " There should be a snapshot installed.", e);
+            } else {
+              throw e;
+            }
+          }
+          task.done();
+        }
+      } catch (InterruptedException e) {
+        LOG.info(Thread.currentThread().getName()
+            + " was interrupted, exiting. There are " + queue.size()
+            + " tasks remaining in the queue.");
+      } catch (Throwable t) {
+        // TODO avoid terminating the jvm by supporting multiple log 
directories
+        ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.", 
t, LOG);
+      }
+    }
+  }
+
+  private boolean shouldFlush() {
+    return pendingFlushNum >= forceSyncNum ||
+        (pendingFlushNum > 0 && queue.isEmpty());
+  }
+
+  private void flushWrites() throws IOException {
+    if (out != null) {
+      LOG.debug("flush data to " + out + ", reset pending_sync_number to 0");
+      out.flush();
+      updateFlushedIndex();
+    }
+  }
+
+  private void updateFlushedIndex() {
+    flushedIndex = lastWrittenIndex;
+    pendingFlushNum = 0;
+    if (raftServer != null) {
+      raftServer.submitLocalSyncEvent();
+    }
+  }
+
+  /**
+   * The following several methods (startLogSegment, rollLogSegment,
+   * writeLogEntry, and truncate) are only called by SegmentedRaftLog which is
+   * protected by RaftServer's lock.
+   *
+   * Thus all the tasks are created and added sequentially.
+   */
+  Task startLogSegment(long startIndex) {
+    return addIOTask(new StartLogSegment(startIndex));
+  }
+
+  Task rollLogSegment(LogSegment segmentToClose) {
+    addIOTask(new FinalizeLogSegment(segmentToClose));
+    return addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
+  }
+
+  Task writeLogEntry(LogEntryProto entry) {
+    return addIOTask(new WriteLog(entry));
+  }
+
+  Task truncate(TruncationSegments ts) {
+    return addIOTask(new TruncateLog(ts));
+  }
+
+  // TODO we can add another level of buffer for writing here
+  private class WriteLog extends Task {
+    private final LogEntryProto entry;
+
+    WriteLog(LogEntryProto entry) {
+      this.entry = entry;
+    }
+
+    @Override
+    public void execute() throws IOException {
+      Preconditions.checkState(out != null);
+      Preconditions.checkState(lastWrittenIndex + 1 == entry.getIndex(),
+          "lastWrittenIndex == %s, entry == %s", lastWrittenIndex, entry);
+      out.write(entry);
+      lastWrittenIndex = entry.getIndex();
+      pendingFlushNum++;
+      if (shouldFlush()) {
+        flushWrites();
+      }
+    }
+
+    @Override
+    long getEndIndex() {
+      return entry.getIndex();
+    }
+  }
+
+  private class FinalizeLogSegment extends Task {
+    private final LogSegment segmentToClose;
+
+    FinalizeLogSegment(LogSegment segmentToClose) {
+      this.segmentToClose = segmentToClose;
+    }
+
+    @Override
+    public void execute() throws IOException {
+      RaftUtils.cleanup(null, out);
+      out = null;
+      Preconditions.checkState(segmentToClose != null);
+
+      File openFile = storage.getStorageDir()
+          .getOpenLogFile(segmentToClose.getStartIndex());
+      Preconditions.checkState(openFile.exists(),
+          "File %s does not exist.", openFile);
+      if (segmentToClose.numOfEntries() > 0) {
+        // finalize the current open segment
+        File dstFile = storage.getStorageDir().getClosedLogFile(
+            segmentToClose.getStartIndex(), segmentToClose.getEndIndex());
+        Preconditions.checkState(!dstFile.exists());
+
+        NativeIO.renameTo(openFile, dstFile);
+      } else { // delete the file of the empty segment
+        FileUtils.deleteFile(openFile);
+      }
+      updateFlushedIndex();
+    }
+
+    @Override
+    long getEndIndex() {
+      return segmentToClose.getEndIndex();
+    }
+  }
+
+  private class StartLogSegment extends Task {
+    private final long newStartIndex;
+
+    StartLogSegment(long newStartIndex) {
+      this.newStartIndex = newStartIndex;
+    }
+
+    @Override
+    void execute() throws IOException {
+      File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex);
+      Preconditions.checkState(!openFile.exists(), "open file %s exists for 
%s",
+          openFile.getAbsolutePath(), RaftLogWorker.this.toString());
+      Preconditions.checkState(out == null && pendingFlushNum == 0);
+      out = new LogOutputStream(openFile, false, properties);
+    }
+
+    @Override
+    long getEndIndex() {
+      return newStartIndex;
+    }
+  }
+
+  private class TruncateLog extends Task {
+    private final TruncationSegments segments;
+
+    TruncateLog(TruncationSegments ts) {
+      this.segments = ts;
+    }
+
+    @Override
+    void execute() throws IOException {
+      RaftUtils.cleanup(null, out);
+      out = null;
+      if (segments.toTruncate != null) {
+        File fileToTruncate = segments.toTruncate.isOpen ?
+            storage.getStorageDir().getOpenLogFile(
+                segments.toTruncate.startIndex) :
+            storage.getStorageDir().getClosedLogFile(
+                segments.toTruncate.startIndex,
+                segments.toTruncate.endIndex);
+        FileUtils.truncateFile(fileToTruncate, 
segments.toTruncate.targetLength);
+
+        // rename the file
+        File dstFile = storage.getStorageDir().getClosedLogFile(
+            segments.toTruncate.startIndex, segments.toTruncate.newEndIndex);
+        Preconditions.checkState(!dstFile.exists());
+        NativeIO.renameTo(fileToTruncate, dstFile);
+
+        // update lastWrittenIndex
+        lastWrittenIndex = segments.toTruncate.newEndIndex;
+      }
+      if (segments.toDelete != null && segments.toDelete.length > 0) {
+        long minStart = segments.toDelete[0].startIndex;
+        for (SegmentFileInfo del : segments.toDelete) {
+          final File delFile;
+          if (del.isOpen) {
+            delFile = storage.getStorageDir().getOpenLogFile(del.startIndex);
+          } else {
+            delFile = storage.getStorageDir()
+                .getClosedLogFile(del.startIndex, del.endIndex);
+          }
+          FileUtils.deleteFile(delFile);
+          minStart = Math.min(minStart, del.startIndex);
+        }
+        if (segments.toTruncate == null) {
+          lastWrittenIndex = minStart - 1;
+        }
+      }
+      updateFlushedIndex();
+    }
+
+    @Override
+    long getEndIndex() {
+      if (segments.toTruncate != null) {
+        return segments.toTruncate.newEndIndex;
+      } else if (segments.toDelete.length > 0) {
+        return segments.toDelete[segments.toDelete.length - 1].endIndex;
+      }
+      return RaftServerConstants.INVALID_LOG_INDEX;
+    }
+  }
+
+  long getFlushedIndex() {
+    return flushedIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
new file mode 100644
index 0000000..09ea55c
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachineStorage;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_DEFAULT;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+public class RaftStorage implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(RaftStorage.class);
+
+  // TODO support multiple storage directories
+  private final RaftStorageDirectory storageDir;
+  private final StorageState state;
+  private volatile MetaFile metaFile;
+  private StateMachineStorage stateMachineStorage;
+
+  public RaftStorage(RaftProperties prop, RaftServerConstants.StartupOption 
option)
+      throws IOException {
+    final String dir = prop.get(RAFT_SERVER_STORAGE_DIR_KEY,
+        RAFT_SERVER_STORAGE_DIR_DEFAULT);
+    storageDir = new RaftStorageDirectory(
+        new File(FileUtils.stringAsURI(dir).getPath()));
+    if (option == RaftServerConstants.StartupOption.FORMAT) {
+      if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) {
+        throw new IOException("Cannot format " + storageDir);
+      }
+      storageDir.lock();
+      format();
+      state = storageDir.analyzeStorage(false);
+      Preconditions.checkState(state == StorageState.NORMAL);
+    } else {
+      state = analyzeAndRecoverStorage(true); // metaFile is initialized here
+      if (state != StorageState.NORMAL) {
+        storageDir.unlock();
+        throw new IOException("Cannot load " + storageDir
+            + ". Its state: " + state);
+      }
+    }
+  }
+
+  StorageState getState() {
+    return state;
+  }
+
+  private void format() throws IOException {
+    storageDir.clearDirectory();
+    metaFile = writeMetaFile(MetaFile.DEFAULT_TERM, MetaFile.EMPTY_VOTEFOR);
+    LOG.info("Storage directory " + storageDir.getRoot()
+        + " has been successfully formatted.");
+  }
+
+  private MetaFile writeMetaFile(long term, String votedFor) throws 
IOException {
+    MetaFile metaFile = new MetaFile(storageDir.getMetaFile());
+    metaFile.set(term, votedFor);
+    return metaFile;
+  }
+
+  private void cleanMetaTmpFile() throws IOException {
+    Files.deleteIfExists(storageDir.getMetaTmpFile().toPath());
+  }
+
+  private StorageState analyzeAndRecoverStorage(boolean toLock)
+      throws IOException {
+    StorageState storageState = storageDir.analyzeStorage(toLock);
+    if (storageState == StorageState.NORMAL) {
+      metaFile = new MetaFile(storageDir.getMetaFile());
+      assert metaFile.exists();
+      metaFile.readFile();
+      // Existence of raft-meta.tmp means the change of votedFor/term has not
+      // been committed. Thus we should delete the tmp file.
+      cleanMetaTmpFile();
+      return StorageState.NORMAL;
+    } else if (storageState == StorageState.NOT_FORMATTED &&
+        storageDir.isCurrentEmpty()) {
+      format();
+      return StorageState.NORMAL;
+    } else {
+      return storageState;
+    }
+  }
+
+  public RaftStorageDirectory getStorageDir() {
+    return storageDir;
+  }
+
+  @Override
+  public void close() throws IOException {
+    storageDir.unlock();
+  }
+
+  MetaFile getMetaFile() {
+    return metaFile;
+  }
+
+  public SnapshotInfo getLastestSnapshot() throws IOException {
+    return getStateMachineStorage().getLatestSnapshot();
+  }
+
+  /**
+   * Called by the state machine after it has initialized the 
StateMachineStorage.
+   */
+  public void setStateMachineStorage(StateMachineStorage smStorage) {
+    this.stateMachineStorage = smStorage;
+  }
+
+  public StateMachineStorage getStateMachineStorage() {
+    return stateMachineStorage;
+  }
+
+  @Override
+  public String toString() {
+    return getStorageDir() + "";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
new file mode 100644
index 0000000..bfa691d
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.ratis.util.AtomicFileOutputStream;
+import org.apache.ratis.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.management.ManagementFactory;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.nio.file.Files.newDirectoryStream;
+import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+
+public class RaftStorageDirectory {
+  static final Logger LOG = 
LoggerFactory.getLogger(RaftStorageDirectory.class);
+
+  static final String STORAGE_DIR_CURRENT = "current";
+  static final String STORAGE_FILE_LOCK = "in_use.lock";
+  static final String META_FILE_NAME = "raft-meta";
+  static final String LOG_FILE_INPROGRESS = "inprogress";
+  static final String LOG_FILE_PREFIX = "log";
+  static final String STATE_MACHINE = "sm"; // directory containing state 
machine snapshots
+  static final String TEMP = "tmp";
+  static final Pattern CLOSED_SEGMENT_REGEX = 
Pattern.compile("log_(\\d+)-(\\d+)");
+  static final Pattern OPEN_SEGMENT_REGEX = 
Pattern.compile("log_inprogress_(\\d+)(?:\\..*)?");
+
+  private static final List<Pattern> LOGSEGMENTS_REGEXES =
+      ImmutableList.of(CLOSED_SEGMENT_REGEX, OPEN_SEGMENT_REGEX);
+
+  enum StorageState {
+    NON_EXISTENT,
+    NOT_FORMATTED,
+    NORMAL
+  }
+
+  public static class LogPathAndIndex {
+    public final Path path;
+    public final long startIndex;
+    public final long endIndex;
+
+    LogPathAndIndex(Path path, long startIndex, long endIndex) {
+      this.path = path;
+      this.startIndex = startIndex;
+      this.endIndex = endIndex;
+    }
+
+    @Override
+    public String toString() {
+      return path + "-" + startIndex + "-" + endIndex;
+    }
+  }
+
+  private final File root; // root directory
+  private FileLock lock;   // storage lock
+
+  /**
+   * Constructor
+   * @param dir directory corresponding to the storage
+   */
+  RaftStorageDirectory(File dir) {
+    this.root = dir;
+    this.lock = null;
+  }
+
+  /**
+   * Get root directory of this storage
+   */
+  //TODO
+  public File getRoot() {
+    return root;
+  }
+
+  /**
+   * Clear and re-create storage directory.
+   * <p>
+   * Removes contents of the current directory and creates an empty directory.
+   *
+   * This does not fully format storage directory.
+   * It cannot write the version file since it should be written last after
+   * all other storage type dependent files are written.
+   * Derived storage is responsible for setting specific storage values and
+   * writing the version file to disk.
+   */
+  void clearDirectory() throws IOException {
+    File curDir = this.getCurrentDir();
+    clearDirectory(curDir);
+    clearDirectory(getStateMachineDir());
+  }
+
+  void clearDirectory(File dir) throws IOException {
+    if (dir.exists()) {
+      File[] files = FileUtils.listFiles(dir);
+      LOG.info("Will remove files: " + Arrays.toString(files));
+      if (!(FileUtils.fullyDelete(dir)))
+        throw new IOException("Cannot remove directory: " + dir);
+    }
+    if (!dir.mkdirs())
+      throw new IOException("Cannot create directory " + dir);
+  }
+
+  /**
+   * Directory {@code current} contains latest files defining
+   * the file system meta-data.
+   *
+   * @return the directory path
+   */
+  File getCurrentDir() {
+    return new File(root, STORAGE_DIR_CURRENT);
+  }
+
+  File getMetaFile() {
+    return new File(getCurrentDir(), META_FILE_NAME);
+  }
+
+  File getMetaTmpFile() {
+    return new File(getCurrentDir(), META_FILE_NAME
+        + AtomicFileOutputStream.TMP_EXTENSION);
+  }
+
+  File getOpenLogFile(long startIndex) {
+    return new File(getCurrentDir(), getOpenLogFileName(startIndex));
+  }
+
+  static String getOpenLogFileName(long startIndex) {
+    return LOG_FILE_PREFIX + "_" + LOG_FILE_INPROGRESS + "_" + startIndex;
+  }
+
+  File getClosedLogFile(long startIndex, long endIndex) {
+    return new File(getCurrentDir(), getClosedLogFileName(startIndex, 
endIndex));
+  }
+
+  static String getClosedLogFileName(long startIndex, long endIndex) {
+    return LOG_FILE_PREFIX + "_" + startIndex + "-" + endIndex;
+  }
+
+  public File getStateMachineDir() {
+    return new File(getRoot(), STATE_MACHINE);
+  }
+
+  /** Returns a uniquely named temporary directory under $rootdir/tmp/ */
+  public File getNewTempDir() {
+    return new File(new File(getRoot(), TEMP), UUID.randomUUID().toString());
+  }
+
+  public Path relativizeToRoot(Path p) {
+    if (p.isAbsolute()) {
+      return getRoot().toPath().relativize(p);
+    }
+    return p;
+  }
+
+  /**
+   * @return log segment files sorted based on their index.
+   */
+  @VisibleForTesting
+  public List<LogPathAndIndex> getLogSegmentFiles() throws IOException {
+    List<LogPathAndIndex> list = new ArrayList<>();
+    try (DirectoryStream<Path> stream =
+             Files.newDirectoryStream(getCurrentDir().toPath())) {
+      for (Path path : stream) {
+        for (Pattern pattern : LOGSEGMENTS_REGEXES) {
+          Matcher matcher = pattern.matcher(path.getFileName().toString());
+          if (matcher.matches()) {
+            final long startIndex = Long.parseLong(matcher.group(1));
+            final long endIndex = matcher.groupCount() == 2 ?
+                Long.parseLong(matcher.group(2)) : INVALID_LOG_INDEX;
+            list.add(new LogPathAndIndex(path, startIndex, endIndex));
+          }
+        }
+      }
+    }
+    Collections.sort(list,
+        (o1, o2) -> o1.startIndex == o2.startIndex ?
+            0 : (o1.startIndex < o2.startIndex ? -1 : 1));
+    return list;
+  }
+
+  /**
+   * Check to see if current/ directory is empty.
+   */
+  boolean isCurrentEmpty() throws IOException {
+    File currentDir = getCurrentDir();
+    if(!currentDir.exists()) {
+      // if current/ does not exist, it's safe to format it.
+      return true;
+    }
+    try(DirectoryStream<Path> dirStream =
+            newDirectoryStream(currentDir.toPath())) {
+      if (dirStream.iterator().hasNext()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check consistency of the storage directory.
+   *
+   * @return state {@link StorageState} of the storage directory
+   */
+  StorageState analyzeStorage(boolean toLock) throws IOException {
+    Preconditions.checkState(root != null, "root directory is null");
+
+    String rootPath = root.getCanonicalPath();
+    try { // check that storage exists
+      if (!root.exists()) {
+        LOG.info(rootPath + " does not exist. Creating ...");
+        if (!root.mkdirs()) {
+          throw new IOException("Cannot create directory " + rootPath);
+        }
+      }
+      // or is inaccessible
+      if (!root.isDirectory()) {
+        LOG.warn(rootPath + "is not a directory");
+        return StorageState.NON_EXISTENT;
+      }
+      if (!FileUtils.canWrite(root)) {
+        LOG.warn("Cannot access storage directory " + rootPath);
+        return StorageState.NON_EXISTENT;
+      }
+    } catch(SecurityException ex) {
+      LOG.warn("Cannot access storage directory " + rootPath, ex);
+      return StorageState.NON_EXISTENT;
+    }
+
+    if (toLock) {
+      this.lock(); // lock storage if it exists
+    }
+
+    // check whether current directory is valid
+    if (hasMetaFile()) {
+      return StorageState.NORMAL;
+    } else {
+      return StorageState.NOT_FORMATTED;
+    }
+  }
+
+  boolean hasMetaFile() throws IOException {
+    return getMetaFile().exists();
+  }
+
+  /**
+   * Lock storage to provide exclusive access.
+   *
+   * <p> Locking is not supported by all file systems.
+   * E.g., NFS does not consistently support exclusive locks.
+   *
+   * <p> If locking is supported we guarantee exclusive access to the
+   * storage directory. Otherwise, no guarantee is given.
+   *
+   * @throws IOException if locking fails
+   */
+  public void lock() throws IOException {
+    FileLock newLock = tryLock();
+    if (newLock == null) {
+      String msg = "Cannot lock storage " + this.root
+          + ". The directory is already locked";
+      LOG.info(msg);
+      throw new IOException(msg);
+    }
+    // Don't overwrite lock until success - this way if we accidentally
+    // call lock twice, the internal state won't be cleared by the second
+    // (failed) lock attempt
+    lock = newLock;
+  }
+
+  /**
+   * Attempts to acquire an exclusive lock on the storage.
+   *
+   * @return A lock object representing the newly-acquired lock or
+   * <code>null</code> if storage is already locked.
+   * @throws IOException if locking fails.
+   */
+  private FileLock tryLock() throws IOException {
+    boolean deletionHookAdded = false;
+    File lockF = new File(root, STORAGE_FILE_LOCK);
+    if (!lockF.exists()) {
+      lockF.deleteOnExit();
+      deletionHookAdded = true;
+    }
+    RandomAccessFile file = new RandomAccessFile(lockF, "rws");
+    String jvmName = ManagementFactory.getRuntimeMXBean().getName();
+    FileLock res;
+    try {
+      res = file.getChannel().tryLock();
+      if (null == res) {
+        LOG.error("Unable to acquire file lock on path " + lockF.toString());
+        throw new OverlappingFileLockException();
+      }
+      file.write(jvmName.getBytes(Charsets.UTF_8));
+      LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName);
+    } catch (OverlappingFileLockException oe) {
+      // Cannot read from the locked file on Windows.
+      LOG.error("It appears that another process "
+          + "has already locked the storage directory: " + root, oe);
+      file.close();
+      return null;
+    } catch(IOException e) {
+      LOG.error("Failed to acquire lock on " + lockF
+          + ". If this storage directory is mounted via NFS, "
+          + "ensure that the appropriate nfs lock services are running.", e);
+      file.close();
+      throw e;
+    }
+    if (!deletionHookAdded) {
+      // If the file existed prior to our startup, we didn't
+      // call deleteOnExit above. But since we successfully locked
+      // the dir, we can take care of cleaning it up.
+      lockF.deleteOnExit();
+    }
+    return res;
+  }
+
+  /**
+   * Unlock storage.
+   */
+  public void unlock() throws IOException {
+    if (this.lock == null)
+      return;
+    this.lock.release();
+    lock.channel().close();
+    lock = null;
+  }
+
+  @Override
+  public String toString() {
+    return "Storage Directory " + this.root;
+  }
+}


Reply via email to