RATIS-122. Add a FileStore example.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8a40ee4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8a40ee4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8a40ee4b

Branch: refs/heads/master
Commit: 8a40ee4bccc23c37027829a9cc17838200650a0d
Parents: 830bd61
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Wed Nov 8 10:44:52 2017 -0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Wed Nov 8 15:02:40 2017 -0800

----------------------------------------------------------------------
 .../ratis/client/impl/ClientProtoUtils.java     |   4 +-
 .../org/apache/ratis/util/CollectionUtils.java  |  13 +
 .../java/org/apache/ratis/util/FileUtils.java   |   9 +-
 .../java/org/apache/ratis/util/JavaUtils.java   |   6 +
 .../java/org/apache/ratis/util/LogUtils.java    |  73 ++++-
 .../java/org/apache/ratis/util/ProtoUtils.java  |  30 ++-
 .../org/apache/ratis/util/ReflectionUtils.java  |   3 +-
 .../java/org/apache/ratis/util/StringUtils.java |  23 +-
 .../java/org/apache/ratis/util/TaskQueue.java   | 125 +++++++++
 .../test/java/org/apache/ratis/BaseTest.java    |   2 +-
 .../ratis/examples/filestore/FileInfo.java      | 266 +++++++++++++++++++
 .../ratis/examples/filestore/FileStore.java     | 212 +++++++++++++++
 .../examples/filestore/FileStoreClient.java     | 122 +++++++++
 .../examples/filestore/FileStoreCommon.java     |  56 ++++
 .../filestore/FileStoreStateMachine.java        | 186 +++++++++++++
 .../examples/filestore/FileStoreBaseTest.java   | 201 ++++++++++++++
 .../filestore/TestFileStoreWithGrpc.java        |  25 ++
 .../filestore/TestFileStoreWithNetty.java       |  25 ++
 .../ratis/netty/server/NettyRpcService.java     |   6 +-
 .../src/main/proto/Examples.proto               |  67 +++++
 ratis-proto-shaded/src/main/proto/Raft.proto    |   2 +
 .../ratis/server/impl/RaftServerImpl.java       |   2 +-
 .../ratis/server/storage/RaftLogWorker.java     |  37 ++-
 .../ratis/statemachine/BaseStateMachine.java    |   6 +
 .../apache/ratis/statemachine/StateMachine.java |  13 +
 .../ratis/statemachine/TransactionContext.java  |  11 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  |  27 +-
 .../org/apache/ratis/RaftExceptionBaseTest.java |   2 +-
 .../java/org/apache/ratis/RaftTestUtil.java     |  15 +-
 .../server/impl/ServerInformationBaseTest.java  |   9 +-
 30 files changed, 1531 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 053d952..a01c376 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -23,9 +23,7 @@ import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.ReflectionUtils;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 
 import static 
org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
 import static 
org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
@@ -125,7 +123,7 @@ public class ClientProtoUtils {
         final Throwable t = sme.getCause() != null ? sme.getCause() : sme;
         smeBuilder.setExceptionClassName(t.getClass().getName())
             .setErrorMsg(t.getMessage())
-            .setStacktrace(ProtoUtils.toByteString(t.getStackTrace()));
+            
.setStacktrace(ProtoUtils.writeObject2ByteString(t.getStackTrace()));
         b.setStateMachineException(smeBuilder.build());
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index 5ea8d4f..57222a6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -23,6 +23,7 @@ package org.apache.ratis.util;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -82,4 +83,16 @@ public interface CollectionUtils {
       INPUT[] array, Function<INPUT, OUTPUT> converter) {
     return as(Arrays.asList(array), converter);
   }
+
+  static <K, V> void putNew(K key, V value, Map<K, V> map, Supplier<String> 
name) {
+    final V returned = map.put(key, value);
+    Preconditions.assertTrue(returned == null,
+        () -> "Entry already exists for key " + key + " in map " + name.get());
+  }
+
+  static <K, V> void replaceExisting(K key, V oldValue, V newValue, Map<K, V> 
map, Supplier<String> name) {
+    final boolean replaced = map.replace(key, oldValue, newValue);
+    Preconditions.assertTrue(replaced,
+        () -> "Entry not found for key " + key + " in map " + name.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
index 38fdff3..3171d4e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.file.*;
 import java.nio.file.attribute.BasicFileAttributes;
 
@@ -38,6 +39,12 @@ public interface FileUtils {
         () -> "FileOutputStream.getChannel().truncate " + f + " to target 
length " + target);
   }
 
+  static OutputStream createNewFile(Path p) throws IOException {
+    return LogUtils.supplyAndLog(LOG,
+        () -> Files.newOutputStream(p, StandardOpenOption.CREATE_NEW),
+        () -> "Files.newOutputStream " + StandardOpenOption.CREATE_NEW + " " + 
p);
+  }
+
   static void createDirectories(File dir) throws IOException {
     createDirectories(dir.toPath());
   }
@@ -93,7 +100,7 @@ public interface FileUtils {
    */
   static void deleteFully(Path p) throws IOException {
     if (!Files.exists(p, LinkOption.NOFOLLOW_LINKS)) {
-      LOG.trace("deleteFully: {} does not exist.");
+      LOG.trace("deleteFully: {} does not exist.", p);
       return;
     }
     Files.walkFileTree(p, new SimpleFileVisitor<Path>() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index ed115cb..89407eb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -177,4 +177,10 @@ public interface JavaUtils {
       println.accept(ti.toString());
     }
   }
+
+  static <E> CompletableFuture<E> completeExceptionally(Throwable t) {
+    final CompletableFuture<E> future = new CompletableFuture<>();
+    future.completeExceptionally(t);
+    return future;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
index ebb61be..6a4d833 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
@@ -24,6 +24,8 @@ import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.slf4j.Logger;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
 /**
@@ -40,9 +42,6 @@ public interface LogUtils {
       throws THROWABLE {
     try {
       op.run();
-      if (log.isTraceEnabled()) {
-        log.trace("Executed " + opName.get() + " successfully.");
-      }
     } catch (Throwable t) {
       if (log.isTraceEnabled()) {
         log.trace("Failed to " + opName.get(), t);
@@ -51,5 +50,73 @@ public interface LogUtils {
       }
       throw t;
     }
+
+    if (log.isTraceEnabled()) {
+      log.trace("Successfully ran " + opName.get());
+    }
+  }
+
+  static <OUTPUT, THROWABLE extends Throwable> OUTPUT supplyAndLog(
+      Logger log, CheckedSupplier<OUTPUT, THROWABLE> supplier, 
Supplier<String> name)
+      throws THROWABLE {
+    final OUTPUT output;
+    try {
+      output = supplier.get();
+    } catch (Throwable t) {
+      if (log.isTraceEnabled()) {
+        log.trace("Failed to " + name.get(), t);
+      } else if (log.isWarnEnabled()){
+        log.warn("Failed to " + name.get() + ": " + t);
+      }
+      throw (THROWABLE)t;
+    }
+
+    if (log.isTraceEnabled()) {
+      log.trace("Successfully supplied " + name.get());
+    }
+    return output;
+  }
+
+  static Runnable newRunnable(Logger log, Runnable runnable, Supplier<String> 
name) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        runAndLog(log, runnable::run, name);
+      }
+
+      @Override
+      public String toString() {
+        return name.get();
+      }
+    };
+  }
+
+  static <T> Callable<T> newCallable(Logger log, Callable<T> callable, 
Supplier<String> name) {
+    return new Callable<T>() {
+      @Override
+      public T call() throws Exception {
+        return supplyAndLog(log, callable::call, name);
+      }
+
+      @Override
+      public String toString() {
+        return name.get();
+      }
+    };
+  }
+
+  static <OUTPUT, THROWABLE extends Throwable> CheckedSupplier<OUTPUT, 
THROWABLE> newCheckedSupplier(
+      Logger log, CheckedSupplier<OUTPUT, THROWABLE> supplier, 
Supplier<String> name) {
+    return new CheckedSupplier<OUTPUT, THROWABLE>() {
+      @Override
+      public OUTPUT get() throws THROWABLE {
+        return supplyAndLog(log, supplier, name);
+      }
+
+      @Override
+      public String toString() {
+        return name.get();
+      }
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 73e8646..2acda38 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -21,6 +21,7 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -29,8 +30,8 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-public class ProtoUtils {
-  public static ByteString toByteString(Object obj) {
+public interface ProtoUtils {
+  public static ByteString writeObject2ByteString(Object obj) {
     final ByteString.Output byteOut = ByteString.newOutput();
     try(final ObjectOutputStream objOut = new ObjectOutputStream(byteOut)) {
       objOut.writeObject(obj);
@@ -52,6 +53,10 @@ public class ProtoUtils {
     }
   }
 
+  static ByteString toByteString(String string) {
+    return ByteString.copyFromUtf8(string);
+  }
+
   public static ByteString toByteString(byte[] bytes) {
     return toByteString(bytes, 0, bytes.length);
   }
@@ -133,6 +138,27 @@ public class ProtoUtils {
         .build();
   }
 
+  /**
+   * If the given entry is {@link LogEntryBodyCase#SMLOGENTRY} and it has 
state machine data,
+   * build a new entry without the state machine data.
+   *
+   * @return a new entry without the state machine data if the given has state 
machine data;
+   *         otherwise, return the given entry.
+   */
+  static LogEntryProto removeStateMachineData(LogEntryProto entry) {
+    if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) {
+      return entry;
+    }
+    final SMLogEntryProto smLog = entry.getSmLogEntry();
+    if (smLog.getStateMachineData().isEmpty()) {
+      return entry;
+    }
+    // build a new LogEntryProto without state machine data
+    return LogEntryProto.newBuilder(entry)
+        .setSmLogEntry(SMLogEntryProto.newBuilder().setData(smLog.getData()))
+        .build();
+  }
+
   public static IOException toIOException(ServiceException se) {
     final Throwable t = se.getCause();
     if (t == null) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java
index 700965f..c185e66 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java
@@ -173,7 +173,8 @@ public interface ReflectionUtils {
       ctor = Constructors.get(clazz, argClasses);
     } catch (NoSuchMethodException e) {
       throw new UnsupportedOperationException(
-          "Unable to find suitable constructor for class " + clazz.getName(), 
e);
+          "Unable to find suitable constructor for class " + clazz.getName()
+          + ", argument classes = " + Arrays.toString(argClasses), e);
     }
     return instantiate(clazz.getName(), ctor, args);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index 7a2ddd0..07a8973 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -22,8 +22,10 @@ import 
org.apache.ratis.shaded.com.google.common.collect.Interners;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.nio.ByteBuffer;
 import java.util.Locale;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 public class StringUtils {
   public static final String[] EMPTY_STRING_ARRAY = {};
@@ -64,11 +66,17 @@ public class StringUtils {
 
   public static String bytes2HexString(byte[] bytes) {
     Objects.requireNonNull(bytes, "bytes == null");
+    return bytes2HexString(ByteBuffer.wrap(bytes));
+  }
+
+  public static String bytes2HexString(ByteBuffer bytes) {
+    Objects.requireNonNull(bytes, "bytes == null");
 
-    final StringBuilder s = new StringBuilder(2 * bytes.length);
-    for(byte b : bytes) {
-      s.append(format("%02x", b));
+    final StringBuilder s = new StringBuilder(2 * bytes.remaining());
+    for(; bytes.remaining() > 0; ) {
+      s.append(format("%02x", bytes.get()));
     }
+    bytes.flip();
     return s.toString();
   }
 
@@ -93,4 +101,13 @@ public class StringUtils {
     wrt.close();
     return stm.toString();
   }
+
+  public static Object stringSupplierAsObject(Supplier<String> supplier) {
+    return new Object() {
+      @Override
+      public String toString() {
+        return supplier.get();
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/TaskQueue.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TaskQueue.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TaskQueue.java
new file mode 100644
index 0000000..c72067b
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TaskQueue.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+/**
+ * A queue with execution order guarantee such that
+ * each task is submitted for execution only if it becomes the head of the 
queue.
+ * Tasks are executed sequentially without any overlap.
+ *
+ * By the definition of a queue, a task can become the head iff
+ * (1) the queue is empty when offering it, or
+ * (2) it is the next to the head and the head is polled out from the queue.
+ *
+ * A typically use case is to submit concurrent tasks
+ * with in-order guarantee for some of the tasks.
+ *
+ * One example use case is to submit tasks to write multiple files:
+ * - A file may requires multiple write tasks.
+ * - Multiple files are written at the same time.
+ * A solution is to create a {@link TaskQueue} for each file
+ * and then submit the write tasks to the corresponding queue.
+ * The files will be written concurrently and the writes to each file are 
in-order.
+ */
+public class TaskQueue {
+  public static final Logger LOG = LoggerFactory.getLogger(TaskQueue.class);
+
+  private final String name;
+  private final Queue<Runnable> q = new LinkedList<>();
+
+  public TaskQueue(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Poll the current head from this queue
+   * and then submit the next head, if there is any.
+   */
+  private synchronized Runnable pollAndSubmit(ExecutorService executor) {
+    final Runnable head = q.poll();
+    final Runnable next = q.peek();
+    if (next != null) {
+      executor.submit(next);
+    }
+    return head;
+  }
+
+  /**
+   * Offer the given task to this queue.
+   * If it is the first task, submit it.
+   */
+  private synchronized void offerAndSubmit(Runnable task, ExecutorService 
executor) {
+    q.offer(task);
+    if (q.size() == 1) {
+      executor.submit(task);
+    }
+  }
+
+  /**
+   * The same as submit(task, executor, Function.identity());
+   */
+  public <OUTPUT, THROWABLE extends Throwable> CompletableFuture<OUTPUT> 
submit(
+      CheckedSupplier<OUTPUT, THROWABLE> task, ExecutorService executor) {
+    return submit(task, executor, Function.identity());
+  }
+
+  /**
+   * Offer the given task to this queue
+   * and then submit the tasks one by one in the queue order for execution.
+   *
+   * @param task the task to be submitted.
+   * @param executor to execute tasks.
+   * @param newThrowable When the task throws a throwable, create a new 
Throwable
+   *                     in order to include more error message.
+   * @param <OUTPUT> the output type of the task.
+   * @param <THROWABLE> the throwable type of the task.
+   * @return a future of the output.
+   */
+  public <OUTPUT, THROWABLE extends Throwable> CompletableFuture<OUTPUT> 
submit(
+      CheckedSupplier<OUTPUT, THROWABLE> task, ExecutorService executor,
+      Function<Throwable, Throwable> newThrowable) {
+    final CompletableFuture<OUTPUT> f = new CompletableFuture<>();
+    final Runnable runnable = LogUtils.newRunnable(LOG, () -> {
+      LOG.trace("{}: running {}", this, task);
+      try {
+        f.complete(task.get());
+      } catch (Throwable e) {
+        f.completeExceptionally(newThrowable.apply(e));
+      }
+
+      pollAndSubmit(executor);
+    }, task::toString);
+
+    offerAndSubmit(runnable, executor);
+    return f;
+  }
+
+  @Override
+  public synchronized String toString() {
+    return name + "-" + getClass().getSimpleName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java 
b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 1c27420..f4e622a 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -52,7 +52,7 @@ public abstract class BaseTest {
   private static final Supplier<File> rootTestDir = JavaUtils.memoize(
       () -> JavaUtils.callAsUnchecked(() -> {
         final File dir = new File(System.getProperty("test.build.data", 
"target/test/data"),
-            Long.toHexString(ThreadLocalRandom.current().nextLong()));
+            Integer.toHexString(ThreadLocalRandom.current().nextInt()));
         if (dir.exists() && !dir.isDirectory()) {
           throw new IOException(dir + " already exists and is not a 
directory");
         } else if (!dir.exists() && !dir.mkdirs()) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
new file mode 100644
index 0000000..d8b3eb4
--- /dev/null
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+abstract class FileInfo {
+  public static final Logger LOG = LoggerFactory.getLogger(FileInfo.class);
+
+  private final Path relativePath;
+
+  FileInfo(Path relativePath) {
+    this.relativePath = relativePath;
+  }
+
+  Path getRelativePath() {
+    return relativePath;
+  }
+
+  long getSize() {
+    throw new UnsupportedOperationException(
+        "File " + getRelativePath() + " size is unknown.");
+  }
+
+  void flush() throws IOException {
+    // no-op
+  }
+
+  ByteString read(CheckedFunction<Path, Path, IOException> resolver, long 
offset, long length)
+      throws IOException {
+    flush();
+    if (offset + length > getSize()) {
+      throw new IOException("Failed to read: offset (=" + offset
+          + " + length (=" + length + ") > size = " + getSize()
+          + ", path=" + getRelativePath());
+    }
+
+    try(final SeekableByteChannel in = Files.newByteChannel(
+        resolver.apply(getRelativePath()), StandardOpenOption.READ)) {
+      final ByteBuffer buffer = 
ByteBuffer.allocateDirect(FileStoreCommon.getChunkSize(length));
+      in.position(offset).read(buffer);
+      buffer.flip();
+      return ByteString.copyFrom(buffer);
+    }
+  }
+
+  UnderConstruction asUnderConstruction() {
+    throw new UnsupportedOperationException(
+        "File " + getRelativePath() + " is not under construction.");
+  }
+
+  static class ReadOnly extends FileInfo {
+    private final long size;
+
+    ReadOnly(UnderConstruction f) {
+      super(f.getRelativePath());
+      this.size = f.getSize();
+    }
+
+    @Override
+    long getSize() {
+      return size;
+    }
+  }
+
+  static class FileOut implements Closeable {
+    private final OutputStream out;
+    private final WritableByteChannel channel;
+
+    FileOut(Path p) throws IOException {
+      this.out = FileUtils.createNewFile(p);
+      this.channel = Channels.newChannel(out);
+    }
+
+    int write(ByteBuffer data) throws IOException {
+      return channel.write(data);
+    }
+
+    void flush() throws IOException {
+      out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      channel.close();
+      out.close();
+    }
+  }
+
+  static class UnderConstruction extends FileInfo {
+    private FileOut out;
+
+    /** The size written to a local file. */
+    private volatile long writeSize;
+    /** The size committed to client. */
+    private volatile long committedSize;
+    /** The size at last flush. */
+    private volatile long flushSize;
+
+    /** A queue to make sure that the writes are in order. */
+    private final TaskQueue writeQueue = new TaskQueue("writeQueue");
+    /** A queue to make sure that the commits are in order. */
+    private final TaskQueue commitQueue = new TaskQueue("commitQueue");
+    /** Futures to make sure that each commit is executed the corresponding 
write. */
+    private final Map<Long, CompletableFuture<Integer>> writeFutures = new 
ConcurrentHashMap<>();
+
+    UnderConstruction(Path relativePath) {
+      super(relativePath);
+    }
+
+    @Override
+    UnderConstruction asUnderConstruction() {
+      return this;
+    }
+
+    @Override
+    long getSize() {
+      return committedSize;
+    }
+
+    CompletableFuture<Integer> submitCreate(
+        CheckedFunction<Path, Path, IOException> resolver, ByteString data, 
boolean close,
+        ExecutorService executor, RaftPeerId id, long index) {
+      final Supplier<String> name = () -> "create(" + getRelativePath() + ", "
+          + close + ") @" + id + ":" + index;
+      final CheckedSupplier<Integer, IOException> task = 
LogUtils.newCheckedSupplier(LOG, () -> {
+        if (out == null) {
+          out = new FileOut(resolver.apply(getRelativePath()));
+        }
+        return write(0L, data, close);
+      }, name);
+      return submitWrite(task, executor, id, index);
+    }
+
+    CompletableFuture<Integer> submitWrite(
+        long offset, ByteString data, boolean close, ExecutorService executor,
+        RaftPeerId id, long index) {
+      final Supplier<String> name = () -> "write(" + getRelativePath() + ", "
+          + offset + ", " + close + ") @" + id + ":" + index;
+      final CheckedSupplier<Integer, IOException> task = 
LogUtils.newCheckedSupplier(LOG,
+          () -> write(offset, data, close), name);
+      return submitWrite(task, executor, id, index);
+    }
+
+    private CompletableFuture<Integer> submitWrite(
+        CheckedSupplier<Integer, IOException> task, ExecutorService executor,
+      RaftPeerId id, long index) {
+      final CompletableFuture<Integer> f = writeQueue.submit(task, executor,
+          e -> new IOException("Failed " + task, e));
+      CollectionUtils.putNew(index, f, writeFutures, () ->  id + 
":writeFutures");
+      return f;
+    }
+
+    private int write(long offset, ByteString data, boolean close) throws 
IOException {
+      if (offset != writeSize) {
+        throw new IOException("Offset/size mismatched: offset = " + offset
+            + " != writeSize = " + writeSize + ", path=" + getRelativePath());
+      }
+      if (out == null) {
+        throw new IOException("File output is not initialized, path=" + 
getRelativePath());
+      }
+
+      synchronized (out) {
+        int n = 0;
+        if (data != null) {
+          final ByteBuffer buffer = data.asReadOnlyByteBuffer();
+          try {
+            for (; buffer.remaining() > 0; ) {
+              n += out.write(buffer);
+            }
+          } finally {
+            writeSize += n;
+          }
+        }
+
+        if (close) {
+          out.close();
+        }
+        return n;
+      }
+    }
+
+    void flush() throws IOException {
+      if (flushSize >= committedSize) {
+        return;
+      }
+      synchronized (out) {
+        if (flushSize >= committedSize) {
+          return;
+        }
+        out.flush();
+        flushSize = writeSize;
+      }
+    }
+
+    CompletableFuture<Integer> submitCommit(
+        long offset, int size, Function<UnderConstruction, ReadOnly> converter,
+        ExecutorService executor, RaftPeerId id, long index) {
+      final Supplier<String> name = () -> "commit(" + getRelativePath() + ", "
+          + offset + ", " + size + ") @" + id + ":" + index;
+      final CheckedSupplier<Integer, IOException> task = 
LogUtils.newCheckedSupplier(LOG, () -> {
+        if (offset != committedSize) {
+          throw new IOException("Offset/size mismatched: offset = "
+              + offset + " != committedSize = " + committedSize
+              + ", path=" + getRelativePath());
+        } else if (committedSize + size > writeSize) {
+          throw new IOException("Offset/size mismatched: committed (=" + 
committedSize
+              + ") + size (=" + size + ") > writeSize = " + writeSize);
+        }
+        committedSize += size;
+
+        if (converter != null) {
+          converter.apply(this);
+        }
+        return size;
+      }, name);
+
+      final CompletableFuture<Integer> write = writeFutures.remove(index);
+      if (write == null) {
+        return JavaUtils.completeExceptionally(
+            new IOException(name.get() + " is already committed."));
+      }
+      return write.thenComposeAsync(writeSize -> {
+        Preconditions.assertTrue(size == writeSize);
+        return commitQueue.submit(task, executor,
+            e -> new IOException("Failed " + task, e));
+      }, executor);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
new file mode 100644
index 0000000..aba2a19
--- /dev/null
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.examples.filestore.FileInfo.ReadOnly;
+import org.apache.ratis.examples.filestore.FileInfo.UnderConstruction;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.ExamplesProtos.*;
+import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.*;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class FileStore implements Closeable {
+  public static final Logger LOG = LoggerFactory.getLogger(FileStore.class);
+
+  static class FileMap {
+    private final Object name;
+    private final Map<Path, FileInfo> map = new ConcurrentHashMap<>();
+
+    FileMap(Supplier<String> name) {
+      this.name = StringUtils.stringSupplierAsObject(name);
+    }
+
+    FileInfo get(String relative) throws FileNotFoundException {
+      return applyFunction(relative, map::get);
+    }
+
+    FileInfo remove(String relative) throws FileNotFoundException {
+      LOG.trace("{}: remove {}", name, relative);
+      return applyFunction(relative, map::remove);
+    }
+
+    private FileInfo applyFunction(String relative, Function<Path, FileInfo> f)
+        throws FileNotFoundException {
+      final FileInfo info = f.apply(normalize(relative));
+      if (info == null) {
+        throw new FileNotFoundException("File " + relative + " not found in " 
+ name);
+      }
+      return info;
+    }
+
+    void putNew(UnderConstruction uc) {
+      LOG.trace("{}: putNew {}", name, uc.getRelativePath());
+      CollectionUtils.putNew(uc.getRelativePath(), uc, map, name::toString);
+    }
+
+    ReadOnly close(UnderConstruction uc) {
+      LOG.trace("{}: close {}", name, uc.getRelativePath());
+      final ReadOnly ro = new ReadOnly(uc);
+      CollectionUtils.replaceExisting(uc.getRelativePath(), uc, ro, map, 
name::toString);
+      return ro;
+    }
+  }
+
+  private final Supplier<RaftPeerId> idSupplier;
+  private final Supplier<Path> rootSupplier;
+  private final FileMap files;
+
+  private final ExecutorService writer = Executors.newFixedThreadPool(10);
+  private final ExecutorService committer = Executors.newFixedThreadPool(3);
+  private final ExecutorService reader = Executors.newFixedThreadPool(10);
+  private final ExecutorService deleter = Executors.newFixedThreadPool(3);
+
+  public FileStore(Supplier<RaftPeerId> idSupplier, Path dir) {
+    this.idSupplier = idSupplier;
+    this.rootSupplier = JavaUtils.memoize(
+        () -> dir.resolve(getId().toString()).normalize().toAbsolutePath());
+    this.files = new FileMap(JavaUtils.memoize(() -> idSupplier.get() + 
":files"));
+  }
+
+  public RaftPeerId getId() {
+    return Objects.requireNonNull(idSupplier.get(), getClass().getSimpleName() 
+ " is not initialized.");
+  }
+
+  public Path getRoot() {
+    return rootSupplier.get();
+  }
+
+  static Path normalize(String path) {
+    Objects.requireNonNull(path, "path == null");
+    return Paths.get(path).normalize();
+  }
+
+  Path resolve(Path relative) throws IOException {
+    final Path root = getRoot();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    if (full.equals(root)) {
+      throw new IOException("The file path " + relative + " resolved to " + 
full
+          + " is the root directory " + root);
+    } else if (!full.startsWith(root)) {
+      throw new IOException("The file path " + relative + " resolved to " + 
full
+          + " is not a sub-path under root directory " + root);
+    }
+    return full;
+  }
+
+  CompletableFuture<ReadReplyProto> read(String relative, long offset, long 
length) {
+    final Supplier<String> name = () -> "read(" + relative
+        + ", " + offset + ", " + length + ") @" + getId();
+    final CheckedSupplier<ReadReplyProto, IOException> task = 
LogUtils.newCheckedSupplier(LOG, () -> {
+      final FileInfo info = files.get(relative);
+      final ReadReplyProto.Builder reply = ReadReplyProto.newBuilder()
+          
.setResolvedPath(FileStoreCommon.toByteString(info.getRelativePath()))
+          .setOffset(offset);
+
+      final ByteString bytes = info.read(this::resolve, offset, length);
+      return reply.setData(bytes).build();
+    }, name);
+    return submit(task, reader);
+  }
+
+  CompletableFuture<Path> delete(long index, String relative) {
+    final Supplier<String> name = () -> "delete(" + relative + ") @" + getId() 
+ ":" + index;
+    final CheckedSupplier<Path, IOException> task = 
LogUtils.newCheckedSupplier(LOG, () -> {
+      final FileInfo info = files.remove(relative);
+      FileUtils.delete(resolve(info.getRelativePath()));
+      return info.getRelativePath();
+    }, name);
+    return submit(task, deleter);
+  }
+
+  static <T> CompletableFuture<T> submit(
+      CheckedSupplier<T, IOException> task, ExecutorService executor) {
+    final CompletableFuture<T> f = new CompletableFuture<>();
+    executor.submit(() -> {
+      try {
+        f.complete(task.get());
+      } catch (IOException e) {
+        f.completeExceptionally(new IOException("Failed " + task, e));
+      }
+    });
+    return f;
+  }
+
+  CompletableFuture<WriteReplyProto> submitCommit(
+      long index, String relative, boolean close, long offset, int size) {
+    final Function<UnderConstruction, ReadOnly> converter = close ? 
files::close: null;
+    final UnderConstruction uc;
+    try {
+      uc = files.get(relative).asUnderConstruction();
+    } catch (FileNotFoundException e) {
+      return FileStoreCommon.completeExceptionally(
+          index, "Failed to write to " + relative, e);
+    }
+
+    return uc.submitCommit(offset, size, converter, committer, getId(), index)
+        .thenApply(n -> WriteReplyProto.newBuilder()
+            
.setResolvedPath(FileStoreCommon.toByteString(uc.getRelativePath()))
+            .setOffset(offset)
+            .setLength(n)
+            .build());
+  }
+
+  CompletableFuture<Integer> write(
+      long index, String relative, boolean close, long offset, ByteString 
data) {
+    final int size = data != null? data.size(): 0;
+    LOG.trace("write {}, offset={}, size={}, close? {} @{}:{}",
+        relative, offset, size, close, getId(), index);
+    final boolean createNew = offset == 0L;
+    final UnderConstruction uc;
+    if (createNew) {
+      uc = new UnderConstruction(normalize(relative));
+      files.putNew(uc);
+    } else {
+      try {
+        uc = files.get(relative).asUnderConstruction();
+      } catch (FileNotFoundException e) {
+        return FileStoreCommon.completeExceptionally(
+            index, "Failed to write to " + relative, e);
+      }
+    }
+
+    return size == 0 && !close? CompletableFuture.completedFuture(0)
+        : createNew? uc.submitCreate(this::resolve, data, close, writer, 
getId(), index)
+        : uc.submitWrite(offset, data, close, writer, getId(), index);
+  }
+
+  @Override
+  public void close() {
+    writer.shutdownNow();
+    committer.shutdownNow();
+    reader.shutdownNow();
+    deleter.shutdownNow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
new file mode 100644
index 0000000..59d5079
--- /dev/null
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.ExamplesProtos.*;
+import org.apache.ratis.util.CheckedFunction;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** A standalone server using raft with a configurable state machine. */
+public class FileStoreClient implements Closeable {
+  public static final Logger LOG = 
LoggerFactory.getLogger(FileStoreClient.class);
+
+  private final RaftClient client;
+
+  public FileStoreClient(RaftGroup group, RaftProperties properties)
+      throws IOException {
+    this.client = RaftClient.newBuilder()
+        .setProperties(properties)
+        .setRaftGroup(group)
+        .build();
+  }
+
+  @Override
+  public void close() throws IOException {
+    client.close();
+  }
+
+  static ByteString send(
+      ByteString request, CheckedFunction<Message, RaftClientReply, 
IOException> sendFunction)
+      throws IOException {
+    final RaftClientReply reply = sendFunction.apply(() -> request);
+    if (reply.hasStateMachineException()) {
+      throw new IOException("Failed to send request " + request, 
reply.getStateMachineException());
+    }
+    Preconditions.assertTrue(reply.isSuccess(), () -> "reply=" + reply);
+    return reply.getMessage().getContent();
+  }
+
+  private ByteString send(ByteString request) throws IOException {
+    return send(request, client::send);
+  }
+
+  private ByteString sendReadOnly(ByteString request) throws IOException {
+    return send(request, client::sendReadOnly);
+  }
+
+  public ByteString read(String path, long offset, long length) throws 
IOException {
+    return readImpl(path, offset, length).getData();
+  }
+
+  private ReadReplyProto readImpl(String path, long offset, long length) 
throws IOException {
+    final ReadRequestProto read = ReadRequestProto.newBuilder()
+        .setPath(ProtoUtils.toByteString(path))
+        .setOffset(offset)
+        .setLength(length)
+        .build();
+
+    return ReadReplyProto.parseFrom(sendReadOnly(read.toByteString()));
+  }
+
+  public long write(String path, long offset, boolean close, ByteBuffer buffer)
+      throws IOException {
+    final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining());
+    buffer.limit(chunkSize);
+    final WriteReplyProto proto = writeImpl(path, offset, close, 
ByteString.copyFrom(buffer));
+    return proto.getLength();
+  }
+
+  private WriteReplyProto writeImpl(String path, long offset, boolean close, 
ByteString data)
+      throws IOException {
+    final WriteRequestHeaderProto.Builder header = 
WriteRequestHeaderProto.newBuilder()
+        .setPath(ProtoUtils.toByteString(path))
+        .setOffset(offset)
+        .setClose(close);
+
+    final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
+        .setHeader(header)
+        .setData(data);
+
+    final FileStoreRequestProto request = 
FileStoreRequestProto.newBuilder().setWrite(write).build();
+    return WriteReplyProto.parseFrom(send(request.toByteString()));
+  }
+
+  private DeleteReplyProto deleteImpl(String path) throws IOException {
+    final DeleteRequestProto.Builder delete = DeleteRequestProto.newBuilder()
+        .setPath(ProtoUtils.toByteString(path));
+    final FileStoreRequestProto request = 
FileStoreRequestProto.newBuilder().setDelete(delete).build();
+    return DeleteReplyProto.parseFrom(send(request.toByteString()));
+  }
+
+  public void delete(String path) throws IOException {
+    deleteImpl(path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
new file mode 100644
index 0000000..8a92adf
--- /dev/null
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.util.*;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+public interface FileStoreCommon {
+  String STATEMACHINE_DIR_KEY = "example.filestore.statemachine.dir";
+
+  SizeInBytes MAX_CHUNK_SIZE = SizeInBytes.valueOf(64, 
TraditionalBinaryPrefix.MEGA);
+
+  static int getChunkSize(long suggestedSize) {
+    return Math.toIntExact(Math.min(suggestedSize, MAX_CHUNK_SIZE.getSize()));
+  }
+
+  static ByteString toByteString(Path p) {
+    return ProtoUtils.toByteString(p.toString());
+  }
+
+  static <T> CompletableFuture<T> completeExceptionally(
+      long index, String message) {
+    return completeExceptionally(index, message, null);
+  }
+
+  static <T> CompletableFuture<T> completeExceptionally(
+      long index, String message, Throwable cause) {
+    return completeExceptionally(message + ", index=" + index, cause);
+  }
+
+  static <T> CompletableFuture<T> completeExceptionally(
+      String message, Throwable cause) {
+    return JavaUtils.completeExceptionally(
+        new IOException(message).initCause(cause));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
new file mode 100644
index 0000000..538c970
--- /dev/null
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import 
org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.shaded.proto.ExamplesProtos.*;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.statemachine.BaseStateMachine;
+import org.apache.ratis.statemachine.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.StateMachineStorage;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FileStoreStateMachine extends BaseStateMachine {
+  public static final Logger LOG = 
LoggerFactory.getLogger(FileStoreStateMachine.class);
+
+  private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
+  private final AtomicReference<TermIndex> latestTermIndex = new 
AtomicReference<>();
+
+  private final FileStore files;
+
+  public FileStoreStateMachine(RaftProperties properties) {
+    final File dir = ConfUtils.getFile(properties::getFile, 
FileStoreCommon.STATEMACHINE_DIR_KEY, null);
+    Objects.requireNonNull(dir, FileStoreCommon.STATEMACHINE_DIR_KEY + " is 
not set.");
+    this.files = new FileStore(this::getId, dir.toPath());
+  }
+
+  @Override
+  public void initialize(RaftPeerId id, RaftProperties properties, RaftStorage 
raftStorage)
+      throws IOException {
+    super.initialize(id, properties, raftStorage);
+    this.storage.init(raftStorage);
+    FileUtils.createDirectories(files.getRoot());
+  }
+
+  @Override
+  public StateMachineStorage getStateMachineStorage() {
+    return storage;
+  }
+
+  @Override
+  public void close() {
+    files.close();
+    latestTermIndex.set(null);
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> query(RaftClientRequest request) {
+    final ReadRequestProto proto;
+    try {
+      proto = ReadRequestProto.parseFrom(request.getMessage().getContent());
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally("Failed to parse " + 
request, e);
+    }
+
+    final String path = proto.getPath().toStringUtf8();
+    return files.read(path, proto.getOffset(), proto.getLength())
+        .thenApply(reply -> new RaftClientReply(request, () -> 
reply.toByteString()));
+  }
+
+  @Override
+  public TransactionContext startTransaction(RaftClientRequest request) throws 
IOException {
+    final ByteString content = request.getMessage().getContent();
+    final FileStoreRequestProto proto = 
FileStoreRequestProto.parseFrom(content);
+    final SMLogEntryProto log;
+    if (proto.getRequestCase() == FileStoreRequestProto.RequestCase.WRITE) {
+      final WriteRequestProto write = proto.getWrite();
+      final FileStoreRequestProto newProto = FileStoreRequestProto.newBuilder()
+          .setWriteHeader(write.getHeader()).build();
+      log = SMLogEntryProto.newBuilder()
+          .setData(newProto.toByteString())
+          .setStateMachineData(write.getData())
+          .build();
+    } else {
+      log = SMLogEntryProto.newBuilder().setData(content).build();
+    }
+
+    return new TransactionContext(this, request, log);
+  }
+
+  @Override
+  public CompletableFuture<Integer> writeStateMachineData(LogEntryProto entry) 
{
+    final SMLogEntryProto smLog = entry.getSmLogEntry();
+    final ByteString data = smLog.getData();
+    final FileStoreRequestProto proto;
+    try {
+      proto = FileStoreRequestProto.parseFrom(data);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          entry.getIndex(), "Failed to parse data, entry=" + entry, e);
+    }
+    if (proto.getRequestCase() != 
FileStoreRequestProto.RequestCase.WRITEHEADER) {
+      return null;
+    }
+
+    final WriteRequestHeaderProto h = proto.getWriteHeader();
+    final CompletableFuture<Integer> f = files.write(entry.getIndex(),
+        h.getPath().toStringUtf8(), h.getClose(), h.getOffset(), 
smLog.getStateMachineData());
+    // sync only if closing the file
+    return h.getClose()? f: null;
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    final LogEntryProto entry = trx.getLogEntry();
+
+    final long index = entry.getIndex();
+    updateLatestTermIndex(entry.getTerm(), index);
+
+    final SMLogEntryProto smLog = entry.getSmLogEntry();
+    final FileStoreRequestProto request;
+    try {
+      request = FileStoreRequestProto.parseFrom(smLog.getData());
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(index,
+          "Failed to parse SmLogEntry", e);
+    }
+
+    switch(request.getRequestCase()) {
+      case DELETE:
+        return delete(index, request.getDelete());
+      case WRITEHEADER:
+        return writeCommit(index, request.getWriteHeader(), 
smLog.getStateMachineData().size());
+      case WRITE:
+        // WRITE should not happen here since
+        // startTransaction converts WRITE requests to WRITEHEADER requests.
+      default:
+        LOG.error(getId() + ": Unexpected request case " + 
request.getRequestCase());
+        return FileStoreCommon.completeExceptionally(index,
+            "Unexpected request case " + request.getRequestCase());
+    }
+  }
+
+  private CompletableFuture<Message> writeCommit(
+      long index, WriteRequestHeaderProto header, int size) {
+    final String path = header.getPath().toStringUtf8();
+    return files.submitCommit(index, path, header.getClose(), 
header.getOffset(), size)
+        .thenApply(reply -> () -> reply.toByteString());
+  }
+
+  private CompletableFuture<Message> delete(long index, DeleteRequestProto 
request) {
+    final String path = request.getPath().toStringUtf8();
+    return files.delete(index, path).thenApply(resolved -> () ->
+        DeleteReplyProto.newBuilder().setResolvedPath(
+            FileStoreCommon.toByteString(resolved)).build().toByteString());
+  }
+
+  private void updateLatestTermIndex(long term, long index) {
+    final TermIndex newTI = TermIndex.newTermIndex(term, index);
+    final TermIndex oldTI = latestTermIndex.getAndSet(newTI);
+    if (oldTI != null) {
+      Preconditions.assertTrue(newTI.compareTo(oldTI) >= 0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
new file mode 100644
index 0000000..8991b0d
--- /dev/null
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.io.netty.util.internal.ThreadLocalRandom;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.*;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final Logger LOG = 
LoggerFactory.getLogger(FileStoreBaseTest.class);
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        FileStoreStateMachine.class, StateMachine.class);
+    ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
+        new File(BaseTest.getRootTestDir(), "filestore"));
+  }
+
+  static final int NUM_PEERS = 3;
+
+  @Test
+  public void testFileStore() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+
+    final CheckedSupplier<FileStoreClient, IOException> newClient =
+        () -> new FileStoreClient(cluster.getGroup(), getProperties());
+
+    testSingleFile("foo", SizeInBytes.valueOf("10M"), newClient);
+    testMultipleFiles("file", 100, SizeInBytes.valueOf("1M"), newClient);
+
+    cluster.shutdown();
+  }
+
+  private static void testSingleFile(
+      String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient, 
IOException> newClient)
+      throws Exception {
+    LOG.info("runTestSingleFile with path={}, fileLength={}", path, 
fileLength);
+
+    try (final Writer w = new Writer(path, fileLength, newClient)) {
+      w.write().verify().delete();
+    }
+  }
+
+  private static void testMultipleFiles(
+      String pathPrefix, int numFile, SizeInBytes fileLength,
+      CheckedSupplier<FileStoreClient, IOException> newClient) throws 
Exception {
+    LOG.info("runTestMultipleFile with pathPrefix={}, numFile={}, 
fileLength={}",
+        pathPrefix, numFile, fileLength);
+
+    final ExecutorService executor = Executors.newFixedThreadPool(20);
+
+    final List<Future<Writer>> writerFutures = new ArrayList<>();
+    for (int i = 0; i < numFile; i++) {
+      final String path = String.format("%s%02d", pathPrefix, i);
+      final Callable<Writer> callable = LogUtils.newCallable(LOG,
+          () -> new Writer(path, fileLength, newClient).write(),
+          () -> path + ":" + fileLength);
+      writerFutures.add(executor.submit(callable));
+    }
+
+    final List<Writer> writers = new ArrayList<>();
+    for(Future<Writer> f : writerFutures) {
+      writers.add(f.get());
+    }
+
+    writerFutures.clear();
+    for (Writer w : writers) {
+      writerFutures.add(executor.submit(() -> w.verify().delete()));
+    }
+    for(Future<Writer> f : writerFutures) {
+      f.get().close();
+    }
+
+    executor.shutdown();
+  }
+
+  static class Writer implements Closeable {
+    final long seed = ThreadLocalRandom.current().nextLong();
+    final byte[] buffer = new byte[4 << 10];
+
+    final String fileName;
+    final SizeInBytes fileSize;
+    final FileStoreClient client;
+
+    Writer(String fileName, SizeInBytes fileSize, 
CheckedSupplier<FileStoreClient, IOException> newClient)
+        throws IOException {
+      this.fileName = fileName;
+      this.fileSize = fileSize;
+      this.client = newClient.get();
+    }
+
+    ByteBuffer randomBytes(int length, Random random) {
+      Preconditions.assertTrue(length <= buffer.length);
+      random.nextBytes(buffer);
+      final ByteBuffer b = ByteBuffer.wrap(buffer);
+      b.limit(length);
+      return b;
+    }
+
+    Writer write() throws IOException {
+      final Random r = new Random(seed);
+      final int size = fileSize.getSizeInt();
+
+      for(int offset = 0; offset < size; ) {
+        final int remaining = size - offset;
+        final int n = Math.min(remaining, buffer.length);
+        final boolean close = n == remaining;
+
+        final ByteBuffer b = randomBytes(n, r);
+
+        LOG.trace("client write {}, offset={}", fileName, offset);
+        final long written = client.write(fileName, offset, close, b);
+        Assert.assertEquals(n, written);
+        offset += written;
+      }
+      return this;
+    }
+
+    Writer verify() throws IOException {
+      final Random r = new Random(seed);
+      final int size = fileSize.getSizeInt();
+
+      for(int offset = 0; offset < size; ) {
+        final int remaining = size - offset;
+        final int n = Math.min(remaining, buffer.length);
+
+        final ByteString read = client.read(fileName, offset, n);
+        Assert.assertEquals(n, read.size());
+
+        final ByteBuffer b = randomBytes(n, r);
+
+        assertBuffers(offset, n, b, read.asReadOnlyByteBuffer());
+        offset += n;
+      }
+      return this;
+    }
+
+    Writer delete() throws IOException {
+      client.delete(fileName);
+      return this;
+    }
+
+    @Override
+    public void close() throws IOException {
+      client.close();
+    }
+  }
+
+  static void assertBuffers(int offset, int length, ByteBuffer expected, 
ByteBuffer computed) {
+    try {
+      Assert.assertEquals(expected, computed);
+    } catch(AssertionError e) {
+      LOG.error("Buffer mismatched at offset=" + offset + ", length=" + length
+          + "expected = " + StringUtils.bytes2HexString(expected) + "\n"
+          + "computed = " + StringUtils.bytes2HexString(computed) + "\n", e);
+      throw e;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
new file mode 100644
index 0000000..71ae294
--- /dev/null
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.grpc.MiniRaftClusterWithGRpc;
+
+public class TestFileStoreWithGrpc
+    extends FileStoreBaseTest<MiniRaftClusterWithGRpc>
+    implements MiniRaftClusterWithGRpc.FactoryGet {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
new file mode 100644
index 0000000..9b38e21
--- /dev/null
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.netty.MiniRaftClusterWithNetty;
+
+public class TestFileStoreWithNetty
+    extends FileStoreBaseTest<MiniRaftClusterWithNetty>
+    implements MiniRaftClusterWithNetty.FactoryGet {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index cb337b5..ebe3184 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -21,7 +21,6 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyRpcProxy;
 import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.ServerInformationReply;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -44,15 +43,12 @@ import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyExceptionReplyPr
 import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerReplyProto;
 import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
 import org.apache.ratis.util.CodeInjectionForTesting;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.ProtoUtils;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.Objects;
-import java.util.function.Supplier;
 
 /**
  * A netty server endpoint that acts as the communication layer.
@@ -237,7 +233,7 @@ public final class NettyRpcService extends 
RaftServerRpcWithProxy<NettyRpcProxy,
         .setSuccess(false);
     final RaftNettyExceptionReplyProto.Builder ioe = 
RaftNettyExceptionReplyProto.newBuilder()
         .setRpcReply(rpcReply)
-        .setException(ProtoUtils.toByteString(e));
+        .setException(ProtoUtils.writeObject2ByteString(e));
     return 
RaftNettyServerReplyProto.newBuilder().setExceptionReply(ioe).build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-proto-shaded/src/main/proto/Examples.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Examples.proto 
b/ratis-proto-shaded/src/main/proto/Examples.proto
new file mode 100644
index 0000000..6efef5b
--- /dev/null
+++ b/ratis-proto-shaded/src/main/proto/Examples.proto
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+option java_package = "org.apache.ratis.shaded.proto";
+option java_outer_classname = "ExamplesProtos";
+option java_generate_equals_and_hash = true;
+package ratis.example;
+
+message FileStoreRequestProto {
+  oneof Request {
+    WriteRequestHeaderProto writeHeader = 1;
+    WriteRequestProto write = 2;
+    DeleteRequestProto delete = 3;
+  }
+}
+
+message ReadRequestProto {
+  bytes path = 1;
+  uint64 offset = 2;
+  uint64 length = 3;
+}
+
+message WriteRequestHeaderProto {
+  bytes path = 1;
+  bool close = 2; // close the file after write?
+  uint64 offset = 3;
+}
+
+message WriteRequestProto {
+  WriteRequestHeaderProto header = 1;
+  bytes data = 2;
+}
+
+message DeleteRequestProto {
+  bytes path = 1;
+}
+
+message ReadReplyProto {
+  bytes resolvedPath = 1;
+  uint64 offset = 2;
+  bytes data = 3; // returned data size may be smaller than the requested size
+}
+
+message WriteReplyProto {
+  bytes resolvedPath = 1;
+  uint64 offset = 2;
+  uint64 length = 3; // bytes actually written
+}
+
+message DeleteReplyProto {
+  bytes resolvedPath = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto 
b/ratis-proto-shaded/src/main/proto/Raft.proto
index 52a1761..a8284fb 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -44,6 +44,8 @@ message SMLogEntryProto {
   // TODO: This is not super efficient if the SM itself uses PB to serialize 
its own data for a
   // log entry. Data will be copied twice. We should directly support having 
any Message from SM
   bytes data = 1;
+
+  bytes stateMachineData = 2; // State machine specific data which is not 
written to log.
 }
 
 message LeaderNoOp {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f761a24..122ff51 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -136,7 +136,7 @@ public class RaftServerImpl implements RaftServerProtocol,
     return groupId;
   }
 
-  StateMachine getStateMachine() {
+  public StateMachine getStateMachine() {
     return proxy.getStateMachine();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index e80ca02..23d7c9a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -27,18 +27,14 @@ import 
org.apache.ratis.server.storage.RaftLogCache.SegmentFileInfo;
 import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
 import org.apache.ratis.server.storage.SegmentedRaftLog.Task;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.util.ExitUtils;
-import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.IOUtils;
-import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * This class takes the responsibility of all the raft log related I/O ops for 
a
@@ -58,6 +54,7 @@ class RaftLogWorker implements Runnable {
   private final RaftStorage storage;
   private volatile LogOutputStream out;
   private final RaftServerImpl raftServer;
+  private final StateMachine stateMachine;
 
   /**
    * The number of entries that have been written into the LogOutputStream but
@@ -81,6 +78,7 @@ class RaftLogWorker implements Runnable {
     LOG.info("new {} for {}", name, storage);
 
     this.raftServer = raftServer;
+    this.stateMachine = raftServer != null? raftServer.getStateMachine(): null;
 
     this.storage = storage;
     this.segmentMaxSize =
@@ -251,9 +249,32 @@ class RaftLogWorker implements Runnable {
 
   private class WriteLog extends Task {
     private final LogEntryProto entry;
+    private final CompletableFuture<?> stateMachineFuture;
 
     WriteLog(LogEntryProto entry) {
-      this.entry = entry;
+      this.entry = ProtoUtils.removeStateMachineData(entry);
+      if (this.entry == entry || stateMachine == null) {
+        this.stateMachineFuture = null;
+      } else {
+        // this.entry != entry iff the entry has state machine data
+        this.stateMachineFuture = stateMachine.writeStateMachineData(entry);
+      }
+    }
+
+    @Override
+    void waitForDone() throws InterruptedException {
+      super.waitForDone();
+      // TODO: It does not work since logSync only wait for the last task L.
+      // TODO: If some task T earlier than L has a writeStateMachineData 
future, it will not be sync'ed.
+      // TODO: Need RATIS-124
+
+      if (stateMachineFuture != null) {
+        try {
+          stateMachineFuture.get();
+        } catch (ExecutionException e) {
+          // ignore
+        }
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
index d35dc88..dbf19b0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
@@ -38,11 +38,16 @@ import org.apache.ratis.util.LifeCycle;
  */
 public class BaseStateMachine implements StateMachine {
 
+  private volatile RaftPeerId id;
   protected RaftProperties properties;
   protected RaftStorage storage;
   protected RaftConfiguration raftConf;
   protected final LifeCycle lifeCycle = new 
LifeCycle(getClass().getSimpleName());
 
+  public RaftPeerId getId() {
+    return id;
+  }
+
   @Override
   public LifeCycle.State getLifeCycleState() {
     return lifeCycle.getCurrentState();
@@ -51,6 +56,7 @@ public class BaseStateMachine implements StateMachine {
   @Override
   public void initialize(RaftPeerId id, RaftProperties properties,
       RaftStorage storage) throws IOException {
+    this.id = id;
     lifeCycle.setName(getClass().getSimpleName() + ":" + id);
     this.properties = properties;
     this.storage = storage;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 17990cd..ae26cb1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -24,7 +24,9 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.LifeCycle;
 
 import java.io.Closeable;
@@ -124,6 +126,17 @@ public interface StateMachine extends Closeable {
       throws IOException;
 
   /**
+   * Write asynchronously the state machine data to this state machine.
+   *
+   * @return a future for the write task
+   *         if {@link RaftLog#logSync()} should also sync writing the state 
machine data;
+   *         otherwise, return null.
+   */
+  default CompletableFuture<?> writeStateMachineData(LogEntryProto entry) {
+    return null;
+  }
+
+  /**
    * This is called before the transaction passed from the StateMachine is 
appended to the raft log.
    * This method will be called from log append and having the same strict 
serial order that the
    * transactions will have in the RAFT log. Since this is called serially in 
the critical path of

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
index 3a6adfc..df9a4fe 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
@@ -19,11 +19,13 @@ package org.apache.ratis.statemachine;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Optional;
+import java.util.Objects;
 
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.util.Preconditions;
 
 /**
  * Context for a transaction.
@@ -129,8 +131,8 @@ public class TransactionContext {
    */
   public TransactionContext(StateMachine stateMachine, LogEntryProto logEntry) 
{
     this(stateMachine);
+    setLogEntry(logEntry);
     this.smLogEntryProto = logEntry.getSmLogEntry();
-    this.logEntry = logEntry;
   }
 
   public RaftClientRequest getClientRequest() {
@@ -155,6 +157,11 @@ public class TransactionContext {
   }
 
   public TransactionContext setLogEntry(LogEntryProto logEntry) {
+    Objects.requireNonNull(logEntry, "logEntry == null");
+    Preconditions.assertTrue(logEntry.getLogEntryBodyCase() == 
LogEntryBodyCase.SMLOGENTRY,
+        () -> "LogEntryBodyCase = " + logEntry.getLogEntryBodyCase()
+            + " != " + LogEntryBodyCase.SMLOGENTRY + ", logEntry=" + logEntry);
+    Preconditions.assertTrue(this.logEntry == null, "this.logEntry != null");
     this.logEntry = logEntry;
     return this;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index a7a9412..019e8cd 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -43,6 +43,7 @@ import java.net.InetSocketAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -56,7 +57,17 @@ public abstract class MiniRaftCluster {
 
   public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
     public interface Get<CLUSTER extends MiniRaftCluster> {
+      Supplier<RaftProperties> properties = JavaUtils.memoize(() -> new 
RaftProperties());
+
       Factory<CLUSTER> getFactory();
+
+      default RaftProperties getProperties() {
+        return properties.get();
+      }
+
+      default CLUSTER newCluster(int numPeers) throws IOException {
+        return getFactory().newCluster(numPeers, getProperties());
+      }
     }
 
     public abstract CLUSTER newCluster(
@@ -216,7 +227,21 @@ public abstract class MiniRaftCluster {
         STATEMACHINE_CLASS_KEY,
         STATEMACHINE_CLASS_DEFAULT,
         StateMachine.class);
-    return ReflectionUtils.newInstance(smClass);
+
+    final RuntimeException exception;
+    try {
+      return ReflectionUtils.newInstance(smClass);
+    } catch(RuntimeException e) {
+      exception = e;
+    }
+
+    try {
+      final Class<?>[] argClasses = {RaftProperties.class};
+      return ReflectionUtils.newInstance(smClass, argClasses, properties);
+    } catch(RuntimeException e) {
+      exception.addSuppressed(e);
+    }
+    throw exception;
   }
 
   public static List<RaftPeer> toRaftPeers(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 3877083..e5439d6 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -50,7 +50,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends 
MiniRaftCluster>
 
   @Before
   public void setup() throws IOException {
-    cluster = getFactory().newCluster(NUM_PEERS, new RaftProperties());
+    cluster = newCluster(NUM_PEERS);
     cluster.start();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 4fe9edc..927ad88 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -33,12 +33,12 @@ import 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Objects;
@@ -162,10 +162,6 @@ public interface RaftTestUtil {
     }
   }
 
-  static ByteString toByteString(String string) {
-    return ByteString.copyFrom(string, StandardCharsets.UTF_8);
-  }
-
   class SimpleMessage implements Message {
     public static SimpleMessage[] create(int numMessages) {
       return create(numMessages, "m");
@@ -180,9 +176,11 @@ public interface RaftTestUtil {
     }
 
     final String messageId;
+    final ByteString bytes;
 
     public SimpleMessage(final String messageId) {
       this.messageId = messageId;
+      this.bytes = ProtoUtils.toByteString(messageId);
     }
 
     @Override
@@ -209,15 +207,18 @@ public interface RaftTestUtil {
 
     @Override
     public ByteString getContent() {
-      return toByteString(messageId);
+      return bytes;
     }
   }
 
   class SimpleOperation {
     private final String op;
+    private final SMLogEntryProto smLogEntryProto;
 
     public SimpleOperation(String op) {
       this.op = Objects.requireNonNull(op);
+      this.smLogEntryProto = SMLogEntryProto.newBuilder()
+          .setData(ProtoUtils.toByteString(op)).build();
     }
 
     @Override
@@ -238,7 +239,7 @@ public interface RaftTestUtil {
     }
 
     public SMLogEntryProto getLogEntryContent() {
-      return SMLogEntryProto.newBuilder().setData(toByteString(op)).build();
+      return smLogEntryProto;
     }
   }
 


Reply via email to