http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
new file mode 100644
index 0000000..5f1d901
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.shaded.io.grpc.Status;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.grpc.RaftGRpcService;
+import org.apache.ratis.grpc.RaftGrpcConfigKeys;
+import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.server.impl.LeaderState;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.CodeInjectionForTesting;
+
+import static org.apache.ratis.grpc.RaftGRpcService.GRPC_SEND_SERVER_REQUEST;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A new log appender implementation using grpc bi-directional stream API.
+ */
+public class GRpcLogAppender extends LogAppender {
+  private final RaftServerProtocolClient client;
+  private final Queue<AppendEntriesRequestProto> pendingRequests;
+  private final int maxPendingRequestsNum;
+  private volatile boolean firstResponseReceived = false;
+
+  private final AppendLogResponseHandler appendResponseHandler;
+  private final InstallSnapshotResponseHandler snapshotResponseHandler;
+
+  private volatile StreamObserver<AppendEntriesRequestProto> 
appendLogRequestObserver;
+  private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver;
+
+  public GRpcLogAppender(RaftServerImpl server, LeaderState leaderState,
+                         FollowerInfo f) {
+    super(server, leaderState, f);
+
+    RaftGRpcService rpcService = (RaftGRpcService) server.getServerRpc();
+    client = rpcService.getRpcClient(f.getPeer());
+    maxPendingRequestsNum = server.getProperties().getInt(
+        RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY,
+        RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT);
+    pendingRequests = new ConcurrentLinkedQueue<>();
+
+    appendResponseHandler = new AppendLogResponseHandler();
+    snapshotResponseHandler = new InstallSnapshotResponseHandler();
+  }
+
+  @Override
+  public void run() {
+    while (isAppenderRunning()) {
+      if (shouldSendRequest()) {
+        SnapshotInfo snapshot = shouldInstallSnapshot();
+        if (snapshot != null) {
+          installSnapshot(snapshot, snapshotResponseHandler);
+        } else {
+          // keep appending log entries or sending heartbeats
+          appendLog();
+        }
+      }
+
+      if (isAppenderRunning() && !shouldSendRequest()) {
+        // use lastSend time instead of lastResponse time
+        final long waitTime = getHeartbeatRemainingTime(
+            follower.getLastRpcTime());
+        if (waitTime > 0) {
+          synchronized (this) {
+            try {
+              LOG.debug("{} decides to wait {}ms before appending to {}",
+                  server.getId(), waitTime, follower.getPeer());
+              wait(waitTime);
+            } catch (InterruptedException ignored) {
+            }
+          }
+        }
+      }
+    }
+    appendLogRequestObserver.onCompleted();
+  }
+
+  private boolean shouldWait() {
+    return pendingRequests.size() >= maxPendingRequestsNum ||
+        shouldWaitForFirstResponse();
+  }
+
+  private void appendLog() {
+    if (appendLogRequestObserver == null) {
+      appendLogRequestObserver = client.appendEntries(appendResponseHandler);
+    }
+    AppendEntriesRequestProto pending = null;
+    final StreamObserver<AppendEntriesRequestProto> s;
+    synchronized (this) {
+      // if the queue's size >= maxSize, wait
+      while (isAppenderRunning() && shouldWait()) {
+        try {
+          LOG.debug("{} wait to send the next AppendEntries to {}",
+              server.getId(), follower.getPeer());
+          this.wait();
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      if (isAppenderRunning()) {
+        // prepare and enqueue the append request. note changes on follower's
+        // nextIndex and ops on pendingRequests should always be associated
+        // together and protected by the lock
+        pending = createRequest();
+        if (pending != null) {
+          Preconditions.checkState(pendingRequests.offer(pending));
+          updateNextIndex(pending);
+        }
+      }
+      s = appendLogRequestObserver;
+    }
+
+    if (pending != null && isAppenderRunning()) {
+      sendRequest(pending, s);
+    }
+  }
+
+  private void sendRequest(AppendEntriesRequestProto request,
+      StreamObserver<AppendEntriesRequestProto> s) {
+    CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, server.getId(),
+        null, request);
+
+    s.onNext(request);
+    follower.updateLastRpcSendTime();
+  }
+
+  private void updateNextIndex(AppendEntriesRequestProto request) {
+    final int count = request.getEntriesCount();
+    if (count > 0) {
+      follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1);
+    }
+  }
+
+  /**
+   * if this is the first append, wait for the response of the first append so
+   * that we can get the correct next index.
+   */
+  private boolean shouldWaitForFirstResponse() {
+    return pendingRequests.size() > 0 && !firstResponseReceived;
+  }
+
+  /**
+   * StreamObserver for handling responses from the follower
+   */
+  private class AppendLogResponseHandler
+      implements StreamObserver<AppendEntriesReplyProto> {
+    /**
+     * After receiving a appendEntries reply, do the following:
+     * 1. If the reply is success, update the follower's match index and submit
+     *    an event to leaderState
+     * 2. If the reply is NOT_LEADER, step down
+     * 3. If the reply is INCONSISTENCY, decrease the follower's next index
+     *    based on the response
+     */
+    @Override
+    public void onNext(AppendEntriesReplyProto reply) {
+      LOG.debug("{} received {} response from {}", server.getId(),
+          (!firstResponseReceived ? "the first" : "a"),
+          follower.getPeer());
+
+      // update the last rpc time
+      follower.updateLastRpcResponseTime();
+
+      if (!firstResponseReceived) {
+        firstResponseReceived = true;
+      }
+      switch (reply.getResult()) {
+        case SUCCESS:
+          onSuccess(reply);
+          break;
+        case NOT_LEADER:
+          onNotLeader(reply);
+          break;
+        case INCONSISTENCY:
+          onInconsistency(reply);
+          break;
+        default:
+          break;
+      }
+      notifyAppend();
+    }
+
+    /**
+     * for now we simply retry the first pending request
+     */
+    @Override
+    public void onError(Throwable t) {
+      if (!isAppenderRunning()) {
+        LOG.info("{} is stopped", GRpcLogAppender.this);
+        return;
+      }
+      LOG.warn("{} got error when appending entries to {}, exception: {}.",
+          server.getId(), follower.getPeer().getId(), t);
+
+      synchronized (this) {
+        final Status cause = Status.fromThrowable(t);
+        if (cause != null && cause.getCode() == Status.Code.INTERNAL) {
+          // TODO check other Status. Add sleep to avoid tight loop
+          LOG.debug("{} restarts Append call to {} due to error {}",
+              server.getId(), follower.getPeer(), t);
+          // recreate the StreamObserver
+          appendLogRequestObserver = 
client.appendEntries(appendResponseHandler);
+          // reset firstResponseReceived to false
+          firstResponseReceived = false;
+        }
+
+        // clear the pending requests queue and reset the next index of 
follower
+        AppendEntriesRequestProto request = pendingRequests.peek();
+        if (request != null) {
+          final long nextIndex = request.hasPreviousLog() ?
+              request.getPreviousLog().getIndex() + 1 : 
raftLog.getStartIndex();
+          clearPendingRequests(nextIndex);
+        }
+      }
+    }
+
+    @Override
+    public void onCompleted() {
+      LOG.info("{} stops appending log entries to follower {}", server.getId(),
+          follower);
+    }
+  }
+
+  private void clearPendingRequests(long newNextIndex) {
+    pendingRequests.clear();
+    follower.decreaseNextIndex(newNextIndex);
+  }
+
+  private void onSuccess(AppendEntriesReplyProto reply) {
+    AppendEntriesRequestProto request = pendingRequests.poll();
+    final long replyNextIndex = reply.getNextIndex();
+    Preconditions.checkNotNull(request,
+        "Got reply with next index %s but the pending queue is empty",
+        replyNextIndex);
+
+    if (request.getEntriesCount() == 0) {
+      Preconditions.checkState(!request.hasPreviousLog() ||
+              replyNextIndex - 1 == request.getPreviousLog().getIndex(),
+          "reply's next index is %s, request's previous is %s",
+          replyNextIndex, request.getPreviousLog());
+    } else {
+      // check if the reply and the pending request is consistent
+      final long lastEntryIndex = request
+          .getEntries(request.getEntriesCount() - 1).getIndex();
+      Preconditions.checkState(replyNextIndex == lastEntryIndex + 1,
+          "reply's next index is %s, request's last entry index is %s",
+          replyNextIndex, lastEntryIndex);
+      follower.updateMatchIndex(lastEntryIndex);
+      submitEventOnSuccessAppend();
+    }
+  }
+
+  private void onNotLeader(AppendEntriesReplyProto reply) {
+    checkResponseTerm(reply.getTerm());
+    // the running loop will end and the connection will onComplete
+  }
+
+  private synchronized void onInconsistency(AppendEntriesReplyProto reply) {
+    AppendEntriesRequestProto request = pendingRequests.peek();
+    Preconditions.checkState(request.hasPreviousLog());
+    if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) {
+      clearPendingRequests(reply.getNextIndex());
+    }
+  }
+
+  private class InstallSnapshotResponseHandler
+      implements StreamObserver<InstallSnapshotReplyProto> {
+    private final Queue<Integer> pending;
+    private final AtomicBoolean done = new AtomicBoolean(false);
+
+    InstallSnapshotResponseHandler() {
+      pending = new LinkedList<>();
+    }
+
+    synchronized void addPending(InstallSnapshotRequestProto request) {
+      pending.offer(request.getRequestIndex());
+    }
+
+    synchronized void removePending(InstallSnapshotReplyProto reply) {
+      int index = pending.poll();
+      Preconditions.checkState(index == reply.getRequestIndex());
+    }
+
+    boolean isDone() {
+      return done.get();
+    }
+
+    void close() {
+      done.set(true);
+      GRpcLogAppender.this.notifyAppend();
+    }
+
+    synchronized boolean hasAllResponse() {
+      return pending.isEmpty();
+    }
+
+    @Override
+    public void onNext(InstallSnapshotReplyProto reply) {
+      LOG.debug("{} received {} response from {}", server.getId(),
+          (!firstResponseReceived ? "the first" : "a"),
+          follower.getPeer());
+
+      // update the last rpc time
+      follower.updateLastRpcResponseTime();
+
+      if (!firstResponseReceived) {
+        firstResponseReceived = true;
+      }
+
+      switch (reply.getResult()) {
+        case SUCCESS:
+          removePending(reply);
+          break;
+        case NOT_LEADER:
+          checkResponseTerm(reply.getTerm());
+          break;
+        case UNRECOGNIZED:
+          break;
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      if (!isAppenderRunning()) {
+        LOG.info("{} is stopped", GRpcLogAppender.this);
+        return;
+      }
+      LOG.info("{} got error when installing snapshot to {}, exception: {}",
+          server.getId(), follower.getPeer(), t);
+      close();
+    }
+
+    @Override
+    public void onCompleted() {
+      LOG.info("{} stops sending snapshots to follower {}", server.getId(),
+          follower);
+      close();
+    }
+  }
+
+  private void installSnapshot(SnapshotInfo snapshot,
+      InstallSnapshotResponseHandler responseHandler) {
+    LOG.info("{}: follower {}'s next index is {}," +
+            " log's start index is {}, need to install snapshot",
+        server.getId(), follower.getPeer(), follower.getNextIndex(),
+        raftLog.getStartIndex());
+
+    snapshotRequestObserver = client.installSnapshot(snapshotResponseHandler);
+    final String requestId = UUID.randomUUID().toString();
+    try {
+      for (InstallSnapshotRequestProto request :
+          new SnapshotRequestIter(snapshot, requestId)) {
+        if (isAppenderRunning()) {
+          snapshotRequestObserver.onNext(request);
+          follower.updateLastRpcSendTime();
+          responseHandler.addPending(request);
+        } else {
+          break;
+        }
+      }
+      snapshotRequestObserver.onCompleted();
+    } catch (Exception e) {
+      LOG.warn("{} failed to install snapshot {}. Exception: {}", this,
+          snapshot.getFiles(), e);
+      snapshotRequestObserver.onError(e);
+      return;
+    } finally {
+      snapshotRequestObserver = null;
+    }
+
+    synchronized (this) {
+      while (isAppenderRunning() && !responseHandler.isDone()) {
+        try {
+          wait();
+        } catch (InterruptedException ignored) {
+        }
+      }
+    }
+
+    if (responseHandler.hasAllResponse()) {
+      follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
+      follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
+      LOG.info("{}: install snapshot-{} successfully on follower {}",
+          server.getId(), snapshot.getTermIndex().getIndex(), 
follower.getPeer());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java
new file mode 100644
index 0000000..d30b391
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.server.impl.LeaderState;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.impl.LogAppenderFactory;
+import org.apache.ratis.server.impl.RaftServerImpl;
+
+public class PipelinedLogAppenderFactory implements LogAppenderFactory {
+  @Override
+  public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
+                                    FollowerInfo f) {
+    return new GRpcLogAppender(server, state, f);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
new file mode 100644
index 0000000..53df10d
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc;
+import 
org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub;
+import 
org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub;
+import org.apache.ratis.protocol.RaftPeer;
+
+/**
+ * This is a RaftClient implementation that supports streaming data to the raft
+ * ring. The stream implementation utilizes gRPC.
+ */
+public class RaftServerProtocolClient {
+  private final ManagedChannel channel;
+  private final RaftServerProtocolServiceBlockingStub blockingStub;
+  private final RaftServerProtocolServiceStub asyncStub;
+
+  public RaftServerProtocolClient(RaftPeer target) {
+    channel = ManagedChannelBuilder.forTarget(target.getAddress())
+        .usePlaintext(true).build();
+    blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
+    asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
+  }
+
+  public void shutdown() {
+    channel.shutdownNow();
+  }
+
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
+    // the StatusRuntimeException will be handled by the caller
+    return blockingStub.requestVote(request);
+  }
+
+  StreamObserver<AppendEntriesRequestProto> appendEntries(
+      StreamObserver<AppendEntriesReplyProto> responseHandler) {
+    return asyncStub.appendEntries(responseHandler);
+  }
+
+  StreamObserver<InstallSnapshotRequestProto> installSnapshot(
+      StreamObserver<InstallSnapshotReplyProto> responseHandler) {
+    return asyncStub.installSnapshot(responseHandler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
new file mode 100644
index 0000000..08e6a51
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.server.protocol.RaftServerProtocol;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+import 
org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RaftServerProtocolService extends 
RaftServerProtocolServiceImplBase {
+  public static final Logger LOG = 
LoggerFactory.getLogger(RaftServerProtocolService.class);
+
+  private final String id;
+  private final RaftServerProtocol server;
+
+  public RaftServerProtocolService(String id, RaftServerProtocol server) {
+    this.id = id;
+    this.server = server;
+  }
+
+  @Override
+  public void requestVote(RequestVoteRequestProto request,
+      StreamObserver<RequestVoteReplyProto> responseObserver) {
+    try {
+      final RequestVoteReplyProto reply = server.requestVote(request);
+      responseObserver.onNext(reply);
+      responseObserver.onCompleted();
+    } catch (Throwable e) {
+      LOG.info("{} got exception when handling requestVote {}: {}",
+          id, request.getServerRequest(), e);
+      responseObserver.onError(RaftGrpcUtil.wrapException(e));
+    }
+  }
+
+  @Override
+  public StreamObserver<AppendEntriesRequestProto> appendEntries(
+      StreamObserver<AppendEntriesReplyProto> responseObserver) {
+    return new StreamObserver<AppendEntriesRequestProto>() {
+      @Override
+      public void onNext(AppendEntriesRequestProto request) {
+        try {
+          final AppendEntriesReplyProto reply = server.appendEntries(request);
+          responseObserver.onNext(reply);
+        } catch (Throwable e) {
+          LOG.info("{} got exception when handling appendEntries {}: {}",
+              id, request.getServerRequest(), e);
+          responseObserver.onError(RaftGrpcUtil.wrapException(e));
+        }
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        // for now we just log a msg
+        LOG.info("{}: appendEntries on error. Exception: {}", id, t);
+      }
+
+      @Override
+      public void onCompleted() {
+        LOG.info("{}: appendEntries completed", id);
+        responseObserver.onCompleted();
+      }
+    };
+  }
+
+  @Override
+  public StreamObserver<InstallSnapshotRequestProto> installSnapshot(
+      StreamObserver<InstallSnapshotReplyProto> responseObserver) {
+    return new StreamObserver<InstallSnapshotRequestProto>() {
+      @Override
+      public void onNext(InstallSnapshotRequestProto request) {
+        try {
+          final InstallSnapshotReplyProto reply = 
server.installSnapshot(request);
+          responseObserver.onNext(reply);
+        } catch (Throwable e) {
+          LOG.info("{} got exception when handling installSnapshot {}: {}",
+              id, request.getServerRequest(), e);
+          responseObserver.onError(RaftGrpcUtil.wrapException(e));
+        }
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        LOG.info("{}: installSnapshot on error. Exception: {}", id, t);
+      }
+
+      @Override
+      public void onCompleted() {
+        LOG.info("{}: installSnapshot completed", id);
+        responseObserver.onCompleted();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
new file mode 100644
index 0000000..f5c7b3f
--- /dev/null
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.RaftGRpcService;
+import org.apache.ratis.grpc.RaftGrpcConfigKeys;
+import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc;
+import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
+import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.LogAppenderFactory;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.NetUtils;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
+  public static final Factory<MiniRaftClusterWithGRpc> FACTORY
+      = new Factory<MiniRaftClusterWithGRpc>() {
+    @Override
+    public MiniRaftClusterWithGRpc newCluster(
+        String[] ids, RaftProperties prop, boolean formatted) throws 
IOException {
+      return new MiniRaftClusterWithGRpc(ids, prop, formatted);
+    }
+  };
+
+  public static final DelayLocalExecutionInjection sendServerRequestInjection =
+      new 
DelayLocalExecutionInjection(RaftGRpcService.GRPC_SEND_SERVER_REQUEST);
+
+  public MiniRaftClusterWithGRpc(int numServers, RaftProperties properties)
+      throws IOException {
+    this(generateIds(numServers, 0), properties, true);
+  }
+
+  public MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties,
+      boolean formatted) throws IOException {
+    super(ids, getPropForGrpc(properties), formatted);
+    init(initRpcServices(getServers(), properties));
+  }
+
+  private static RaftProperties getPropForGrpc(RaftProperties prop) {
+    RaftProperties newProp = new RaftProperties(prop);
+    newProp.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
+        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
+    return newProp;
+  }
+
+  private static Map<RaftPeer, RaftGRpcService> initRpcServices(
+      Collection<RaftServerImpl> servers, RaftProperties prop) throws 
IOException {
+    final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>();
+
+    for (RaftServerImpl s : servers) {
+      final RaftGRpcService rpc = new RaftGRpcService(s, prop);
+      peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
+    }
+    return peerRpcs;
+  }
+
+  @Override
+  public RaftClientRequestSender getRaftClientRequestSender() {
+    return new RaftClientSenderWithGrpc(getPeers());
+  }
+
+  @Override
+  protected Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers,
+                                             Collection<RaftServerImpl> 
newServers, boolean startService)
+      throws IOException {
+    final Map<RaftPeer, RaftGRpcService> peers = initRpcServices(newServers, 
properties);
+    for (Map.Entry<RaftPeer, RaftGRpcService> entry : peers.entrySet()) {
+      RaftServerImpl server = servers.get(entry.getKey().getId());
+      server.setServerRpc(entry.getValue());
+      if (!startService) {
+        
BlockRequestHandlingInjection.getInstance().blockReplier(server.getId());
+      } else {
+        server.start();
+      }
+    }
+    return new ArrayList<>(peers.keySet());
+  }
+
+  @Override
+  protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
+    RaftServerImpl server = servers.get(peer.getId());
+    int port = NetUtils.newInetSocketAddress(peer.getAddress()).getPort();
+    int oldPort = 
properties.getInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY,
+        RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT);
+    properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, port);
+    final RaftGRpcService rpc = new RaftGRpcService(server, properties);
+    Preconditions.checkState(
+        rpc.getInetSocketAddress().toString().contains(peer.getAddress()),
+        "address in the raft conf: %s, address in rpc server: %s",
+        peer.getAddress(), rpc.getInetSocketAddress().toString());
+    server.setServerRpc(rpc);
+    properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, oldPort);
+    return server;
+  }
+
+  @Override
+  public void startServer(String id) {
+    super.startServer(id);
+    BlockRequestHandlingInjection.getInstance().unblockReplier(id);
+  }
+
+  @Override
+  protected void blockQueueAndSetDelay(String leaderId, int delayMs)
+      throws InterruptedException {
+    RaftTestUtil.blockQueueAndSetDelay(getServers(), 
sendServerRequestInjection,
+        leaderId, delayMs, getMaxTimeout());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
new file mode 100644
index 0000000..351e406
--- /dev/null
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftNotLeaderExceptionBaseTest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
+import org.apache.ratis.server.impl.LogAppenderFactory;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
+
+import java.io.IOException;
+
+public class TestNotLeaderExceptionWithGrpc extends 
RaftNotLeaderExceptionBaseTest {
+  @Override
+  public MiniRaftCluster initCluster() throws IOException {
+    String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0);
+    RaftProperties prop = new RaftProperties();
+    prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
+        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
+    return new MiniRaftClusterWithGRpc(s, prop, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
new file mode 100644
index 0000000..450eb6e
--- /dev/null
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
+import org.apache.ratis.grpc.server.RaftServerProtocolService;
+import org.apache.ratis.server.impl.LogAppenderFactory;
+import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.BeforeClass;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
+
+import java.io.IOException;
+
+public class TestRaftReconfigurationWithGRpc extends 
RaftReconfigurationBaseTest {
+  static {
+    RaftUtils.setLogLevel(RaftServerProtocolService.LOG, Level.DEBUG);
+  }
+
+  @BeforeClass
+  public static void setProp() {
+    prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
+        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
+  }
+
+  @Override
+  public MiniRaftClusterWithGRpc getCluster(int peerNum) throws IOException {
+    return new MiniRaftClusterWithGRpc(peerNum, prop);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
new file mode 100644
index 0000000..8a26435
--- /dev/null
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
+
+import java.io.IOException;
+
+public class TestRaftSnapshotWithGrpc extends RaftSnapshotBaseTest {
+  @Override
+  public MiniRaftCluster initCluster(int numServer, RaftProperties prop)
+      throws IOException {
+    return MiniRaftClusterWithGRpc.FACTORY.newCluster(numServer, prop, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
new file mode 100644
index 0000000..f4c8d27
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.client.AppendStreamer;
+import org.apache.ratis.grpc.client.RaftOutputStream;
+import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
+import org.apache.ratis.server.impl.LogAppenderFactory;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+import static 
org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
+import static org.junit.Assert.fail;
+
+public class TestRaftStream {
+  static {
+    RaftUtils.setLogLevel(AppendStreamer.LOG, Level.ALL);
+  }
+  static final Logger LOG = LoggerFactory.getLogger(TestRaftStream.class);
+
+  private static final RaftProperties prop = new RaftProperties();
+  private static final int NUM_SERVERS = 3;
+
+  private MiniRaftClusterWithGRpc cluster;
+
+
+  @BeforeClass
+  public static void setProp() {
+    prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
+        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private byte[] genContent(int count) {
+    return toBytes(count);
+  }
+
+  private byte[] toBytes(int i) {
+    byte[] b = new byte[4];
+    b[0] = (byte) ((i >>> 24) & 0xFF);
+    b[1] = (byte) ((i >>> 16) & 0xFF);
+    b[2] = (byte) ((i >>> 8) & 0xFF);
+    b[3] = (byte) (i & 0xFF);
+    return b;
+  }
+
+  @Test
+  public void testSimpleWrite() throws Exception {
+    LOG.info("Running testSimpleWrite");
+
+    // default 64K is too large for a test
+    prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4);
+    cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
+
+    cluster.start();
+    RaftServerImpl leader = waitForLeader(cluster);
+
+    int count = 1;
+    try (RaftOutputStream out = new RaftOutputStream(prop, "writer-1",
+        cluster.getPeers(), leader.getId())) {
+      for (int i = 0; i < 500; i++) { // generate 500 requests
+        out.write(genContent(count++));
+      }
+    }
+
+    // check the leader's raft log
+    final RaftLog raftLog = leader.getState().getLog();
+    final AtomicInteger currentNum = new AtomicInteger(1);
+    checkLog(raftLog, 500, () -> {
+      int value = currentNum.getAndIncrement();
+      return toBytes(value);
+    });
+  }
+
+  private void checkLog(RaftLog raftLog, long expectedCommittedIndex,
+      Supplier<byte[]> s) {
+    long committedIndex = raftLog.getLastCommittedIndex();
+    Assert.assertEquals(expectedCommittedIndex, committedIndex);
+    // check the log content
+    LogEntryProto[] entries = raftLog.getEntries(1, expectedCommittedIndex + 
1);
+    for (LogEntryProto entry : entries) {
+      byte[] logData = entry.getSmLogEntry().getData().toByteArray();
+      byte[] expected = s.get();
+      Assert.assertEquals("log entry: " + entry,
+          expected.length, logData.length);
+      Assert.assertArrayEquals(expected, logData);
+    }
+  }
+
+  @Test
+  public void testWriteAndFlush() throws Exception {
+    LOG.info("Running testWriteAndFlush");
+
+    prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE);
+    cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
+    cluster.start();
+
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftOutputStream out = new RaftOutputStream(prop, "writer",
+        cluster.getPeers(), leader.getId());
+
+    int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072};
+    ByteValue[] values = new ByteValue[lengths.length];
+    for (int i = 0; i < values.length; i++) {
+      values[i] = new ByteValue(lengths[i], (byte) 9);
+    }
+
+    List<byte[]> expectedTxs = new ArrayList<>();
+    for (ByteValue v : values) {
+      byte[] data = v.genData();
+      expectedTxs.addAll(v.getTransactions());
+      out.write(data);
+      out.flush();
+
+      // make sure after the flush the data has been committed
+      Assert.assertEquals(expectedTxs.size(),
+          leader.getState().getLastAppliedIndex());
+    }
+    out.close();
+
+    try {
+      out.write(0);
+      fail("The OutputStream has been closed");
+    } catch (IOException ignored) {
+    }
+
+    LOG.info("Start to check leader's log");
+    final AtomicInteger index = new AtomicInteger(0);
+    checkLog(leader.getState().getLog(), expectedTxs.size(),
+        () -> expectedTxs.get(index.getAndIncrement()));
+  }
+
+  private static class ByteValue {
+    final static int BUFFERSIZE = 1024;
+
+    final int length;
+    final byte value;
+    final int numTx;
+    byte[] data;
+
+    ByteValue(int length, byte value) {
+      this.length = length;
+      this.value = value;
+      numTx = (length - 1) / BUFFERSIZE + 1;
+    }
+
+    byte[] genData() {
+      data = new byte[length];
+      Arrays.fill(data, value);
+      return data;
+    }
+
+    Collection<byte[]> getTransactions() {
+      if (data.length <= BUFFERSIZE) {
+        return Collections.singletonList(data);
+      } else {
+        List<byte[]> list = new ArrayList<>();
+        for (int i = 0; i < numTx; i++) {
+          int txSize = Math.min(BUFFERSIZE, length - BUFFERSIZE * i);
+          byte[] t = new byte[txSize];
+          Arrays.fill(t, value);
+          list.add(t);
+        }
+        return list;
+      }
+    }
+  }
+
+  @Test
+  public void testWriteWithOffset() throws Exception {
+    LOG.info("Running testWriteWithOffset");
+    prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE);
+
+    cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
+    cluster.start();
+    RaftServerImpl leader = waitForLeader(cluster);
+
+    RaftOutputStream out = new RaftOutputStream(prop, "writer",
+        cluster.getPeers(), leader.getId());
+
+    byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2];
+    Arrays.fill(b1, (byte) 1);
+    byte[] b2 = new byte[ByteValue.BUFFERSIZE];
+    Arrays.fill(b2, (byte) 2);
+    byte[] b3 = new byte[ByteValue.BUFFERSIZE * 2 + ByteValue.BUFFERSIZE / 2];
+    Arrays.fill(b3, (byte) 3);
+    byte[] b4 = new byte[ByteValue.BUFFERSIZE * 4];
+    Arrays.fill(b3, (byte) 4);
+
+    byte[] expected = new byte[ByteValue.BUFFERSIZE * 8];
+    byte[][] data = new byte[][]{b1, b2, b3, b4};
+    final Random random = new Random();
+    int totalSize = 0;
+    for (byte[] b : data) {
+      System.arraycopy(b, 0, expected, totalSize, b.length);
+      totalSize += b.length;
+
+      int written = 0;
+      while (written < b.length) {
+        int toWrite = random.nextInt(b.length - written) + 1;
+        LOG.info("write {} bytes", toWrite);
+        out.write(b, written, toWrite);
+        written += toWrite;
+      }
+    }
+    out.close();
+
+    final RaftLog log = leader.getState().getLog();
+    // 0.5 + 1 + 2.5 + 4 = 8
+    Assert.assertEquals(8, leader.getState().getLastAppliedIndex());
+    Assert.assertEquals(8, log.getLastCommittedIndex());
+    LogEntryProto[] entries = log.getEntries(1, 9);
+    byte[] actual = new byte[ByteValue.BUFFERSIZE * 8];
+    totalSize = 0;
+    for (LogEntryProto e : entries) {
+      byte[] eValue = e.getSmLogEntry().getData().toByteArray();
+      Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length);
+      System.arraycopy(eValue, 0, actual, totalSize, eValue.length);
+      totalSize += eValue.length;
+    }
+    Assert.assertArrayEquals(expected, actual);
+  }
+
+  /**
+   * Write while leader is killed
+   */
+  @Test
+  public void testKillLeader() throws Exception {
+    LOG.info("Running testChangeLeader");
+
+    prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4);
+    cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
+    cluster.start();
+    final RaftServerImpl leader = waitForLeader(cluster);
+
+    final AtomicBoolean running  = new AtomicBoolean(true);
+    final AtomicBoolean success = new AtomicBoolean(false);
+    final AtomicInteger result = new AtomicInteger(0);
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    new Thread(() -> {
+      LOG.info("Writer thread starts");
+      int count = 0;
+      try (RaftOutputStream out = new RaftOutputStream(prop, "writer",
+          cluster.getPeers(), leader.getId())) {
+        while (running.get()) {
+          out.write(toBytes(count++));
+          Thread.sleep(10);
+        }
+        success.set(true);
+        result.set(count);
+      } catch (Exception e) {
+        LOG.info("Got exception when writing", e);
+        success.set(false);
+      } finally {
+        latch.countDown();
+      }
+    }).start();
+
+    // force change the leader
+    RaftTestUtil.waitAndKillLeader(cluster, true);
+    final RaftServerImpl newLeader = waitForLeader(cluster);
+    Assert.assertNotEquals(leader.getId(), newLeader.getId());
+    Thread.sleep(500);
+
+    running.set(false);
+    latch.await(5, TimeUnit.SECONDS);
+    Assert.assertTrue(success.get());
+    // total number of tx should be >= result + 2, where 2 means two NoOp from
+    // leaders. It may be larger than result+2 because the client may resend
+    // requests and we do not have retry cache on servers yet.
+    LOG.info("last applied index: {}. total number of requests: {}",
+        newLeader.getState().getLastAppliedIndex(), result.get());
+    Assert.assertTrue(
+        newLeader.getState().getLastAppliedIndex() >= result.get() + 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
new file mode 100644
index 0000000..b60e30d
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftBasicTests;
+import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
+import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
+import org.apache.ratis.server.impl.LogAppenderFactory;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
+
+import java.io.IOException;
+
+public class TestRaftWithGrpc extends RaftBasicTests {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+  }
+
+  private final MiniRaftClusterWithGRpc cluster;
+
+  @BeforeClass
+  public static void setProp() {
+    properties.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
+        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
+  }
+
+  public TestRaftWithGrpc() throws IOException {
+    cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, properties);
+    Assert.assertNull(cluster.getLeader());
+  }
+
+  @Override
+  public MiniRaftClusterWithGRpc getCluster() {
+    return cluster;
+  }
+
+  @Override
+  @Test
+  public void testEnforceLeader() throws Exception {
+    super.testEnforceLeader();
+
+    MiniRaftClusterWithGRpc.sendServerRequestInjection.clear();
+    BlockRequestHandlingInjection.getInstance().unblockAll();
+  }
+
+  @Override
+  @Test
+  public void testWithLoad() throws Exception {
+    super.testWithLoad();
+    BlockRequestHandlingInjection.getInstance().unblockAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/resources/log4j.properties 
b/ratis-grpc/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ced0687
--- /dev/null
+++ b/ratis-grpc/src/test/resources/log4j.properties
@@ -0,0 +1,18 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/ratis-hadoop/pom.xml b/ratis-hadoop/pom.xml
new file mode 100644
index 0000000..ab7fd9f
--- /dev/null
+++ b/ratis-hadoop/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>ratis-project-dist</artifactId>
+    <groupId>org.apache.ratis</groupId>
+    <version>1.0-SNAPSHOT</version>
+    <relativePath>../ratis-project-dist</relativePath>
+  </parent>
+
+  <artifactId>ratis-hadoop</artifactId>
+  <name>Ratis Hadoop Support</name>
+
+  <dependencies>
+    <dependency>
+      <artifactId>ratis-proto-shaded</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <artifactId>ratis-common</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-common</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    
+    <dependency>
+      <artifactId>ratis-client</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-client</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    
+    <dependency>
+      <artifactId>ratis-server</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-server</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>    
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java 
b/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java
new file mode 100644
index 0000000..a5a9654
--- /dev/null
+++ 
b/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputOutputStream;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ProtoUtil;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.shaded.com.google.protobuf.*;
+import 
org.apache.ratis.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import 
org.apache.ratis.shaded.org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
+import 
org.apache.ratis.shaded.org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
+import 
org.apache.ratis.shaded.org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
+
+import javax.net.SocketFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Copied from {@link org.apache.hadoop.ipc.ProtobufRpcEngine}
+ * and replaced the protobuf classes with the shaded classes.
+ */
[email protected]
+public class ProtobufRpcEngineShaded implements RpcEngine {
+  public static final Log LOG = 
LogFactory.getLog(ProtobufRpcEngineShaded.class);
+
+  static { // Register the rpcRequest deserializer for WritableRpcEngine
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(
+        RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
+        new Server.ProtoBufRpcInvoker());
+  }
+
+  private static final ClientCache CLIENTS = new ClientCache();
+
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory, int rpcTimeout) throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+        rpcTimeout, null);
+  }
+
+  @Override
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
+      ) throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+      rpcTimeout, connectionRetryPolicy, null);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
+
+    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
+        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
+    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
+        protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
+  }
+
+  @Override
+  public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
+      ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException {
+    Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
+    return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
+        (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
+            new Class[] { protocol }, new Invoker(protocol, connId, conf,
+                factory)), false);
+  }
+
+  private static class Invoker implements RpcInvocationHandler {
+    private final Map<String, Message> returnTypes =
+        new ConcurrentHashMap<String, Message>();
+    private boolean isClosed = false;
+    private final Client.ConnectionId remoteId;
+    private final Client client;
+    private final long clientProtocolVersion;
+    private final String protocolName;
+    private AtomicBoolean fallbackToSimpleAuth;
+
+    private Invoker(Class<?> protocol, InetSocketAddress addr,
+        UserGroupInformation ticket, Configuration conf, SocketFactory factory,
+        int rpcTimeout, RetryPolicy connectionRetryPolicy,
+        AtomicBoolean fallbackToSimpleAuth) throws IOException {
+      this(protocol, Client.ConnectionId.getConnectionId(
+          addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
+          conf, factory);
+      this.fallbackToSimpleAuth = fallbackToSimpleAuth;
+    }
+
+    /**
+     * This constructor takes a connectionId, instead of creating a new one.
+     */
+    private Invoker(Class<?> protocol, Client.ConnectionId connId,
+        Configuration conf, SocketFactory factory) {
+      this.remoteId = connId;
+      this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
+      this.protocolName = RPC.getProtocolName(protocol);
+      this.clientProtocolVersion = RPC
+          .getProtocolVersion(protocol);
+    }
+
+    private RequestHeaderProto constructRpcRequestHeader(Method method) {
+      RequestHeaderProto.Builder builder = RequestHeaderProto
+          .newBuilder();
+      builder.setMethodName(method.getName());
+
+
+      // For protobuf, {@code protocol} used when creating client side proxy is
+      // the interface extending BlockingInterface, which has the annotations
+      // such as ProtocolName etc.
+      //
+      // Using Method.getDeclaringClass(), as in WritableEngine to get at
+      // the protocol interface will return BlockingInterface, from where
+      // the annotation ProtocolName and Version cannot be
+      // obtained.
+      //
+      // Hence we simply use the protocol class used to create the proxy.
+      // For PB this may limit the use of mixins on client side.
+      builder.setDeclaringClassProtocolName(protocolName);
+      builder.setClientProtocolVersion(clientProtocolVersion);
+      return builder.build();
+    }
+
+    /**
+     * This is the client side invoker of RPC method. It only throws
+     * ServiceException, since the invocation proxy expects only
+     * ServiceException to be thrown by the method in case protobuf service.
+     *
+     * ServiceException has the following causes:
+     * <ol>
+     * <li>Exceptions encountered on the client side in this method are
+     * set as cause in ServiceException as is.</li>
+     * <li>Exceptions from the server are wrapped in RemoteException and are
+     * set as cause in ServiceException</li>
+     * </ol>
+     *
+     * Note that the client calling protobuf RPC methods, must handle
+     * ServiceException by getting the cause from the ServiceException. If the
+     * cause is RemoteException, then unwrap it to get the exception thrown by
+     * the server.
+     */
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws ServiceException {
+      long startTime = 0;
+      if (LOG.isDebugEnabled()) {
+        startTime = Time.now();
+      }
+
+      if (args.length != 2) { // RpcController + Message
+        throw new ServiceException("Too many parameters for request. Method: ["
+            + method.getName() + "]" + ", Expected: 2, Actual: "
+            + args.length);
+      }
+      if (args[1] == null) {
+        throw new ServiceException("null param while calling Method: ["
+            + method.getName() + "]");
+      }
+
+      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(Thread.currentThread().getId() + ": Call -> " +
+            remoteId + ": " + method.getName() +
+            " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
+      }
+
+
+      Message theRequest = (Message) args[1];
+      final RpcResponseWrapper val;
+      try {
+        val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
+            fallbackToSimpleAuth);
+
+      } catch (Throwable e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
+              remoteId + ": " + method.getName() +
+                " {" + e + "}");
+        }
+        throw new ServiceException(e);
+      }
+
+      if (LOG.isDebugEnabled()) {
+        long callTime = Time.now() - startTime;
+        LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
+      }
+
+      Message prototype = null;
+      try {
+        prototype = getReturnProtoType(method);
+      } catch (Exception e) {
+        throw new ServiceException(e);
+      }
+      Message returnMessage;
+      try {
+        returnMessage = prototype.newBuilderForType()
+            .mergeFrom(val.theResponseRead).build();
+
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(Thread.currentThread().getId() + ": Response <- " +
+              remoteId + ": " + method.getName() +
+                " {" + TextFormat.shortDebugString(returnMessage) + "}");
+        }
+
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
+      return returnMessage;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!isClosed) {
+        isClosed = true;
+        CLIENTS.stopClient(client);
+      }
+    }
+
+    private Message getReturnProtoType(Method method) throws Exception {
+      if (returnTypes.containsKey(method.getName())) {
+        return returnTypes.get(method.getName());
+      }
+
+      Class<?> returnType = method.getReturnType();
+      Method newInstMethod = returnType.getMethod("getDefaultInstance");
+      newInstMethod.setAccessible(true);
+      Message prototype = (Message) newInstMethod.invoke(null, (Object[]) 
null);
+      returnTypes.put(method.getName(), prototype);
+      return prototype;
+    }
+
+    @Override //RpcInvocationHandler
+    public ConnectionId getConnectionId() {
+      return remoteId;
+    }
+  }
+
+  interface RpcWrapper extends Writable {
+    int getLength();
+  }
+  /**
+   * Wrapper for Protocol Buffer Requests
+   *
+   * Note while this wrapper is writable, the request on the wire is in
+   * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
+   * use type Writable as a wrapper to work across multiple RpcEngine kinds.
+   */
+  private static abstract class RpcMessageWithHeader<T extends 
GeneratedMessage>
+    implements RpcWrapper {
+    T requestHeader;
+    Message theRequest; // for clientSide, the request is here
+    byte[] theRequestRead; // for server side, the request is here
+
+    public RpcMessageWithHeader() {
+    }
+
+    public RpcMessageWithHeader(T requestHeader, Message theRequest) {
+      this.requestHeader = requestHeader;
+      this.theRequest = theRequest;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      OutputStream os = DataOutputOutputStream.constructOutputStream(out);
+
+      ((Message)requestHeader).writeDelimitedTo(os);
+      theRequest.writeDelimitedTo(os);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      requestHeader = parseHeaderFrom(readVarintBytes(in));
+      theRequestRead = readMessageRequest(in);
+    }
+
+    abstract T parseHeaderFrom(byte[] bytes) throws IOException;
+
+    byte[] readMessageRequest(DataInput in) throws IOException {
+      return readVarintBytes(in);
+    }
+
+    private static byte[] readVarintBytes(DataInput in) throws IOException {
+      final int length = ProtoUtil.readRawVarint32(in);
+      final byte[] bytes = new byte[length];
+      in.readFully(bytes);
+      return bytes;
+    }
+
+    public T getMessageHeader() {
+      return requestHeader;
+    }
+
+    public byte[] getMessageBytes() {
+      return theRequestRead;
+    }
+
+    @Override
+    public int getLength() {
+      int headerLen = requestHeader.getSerializedSize();
+      int reqLen;
+      if (theRequest != null) {
+        reqLen = theRequest.getSerializedSize();
+      } else if (theRequestRead != null ) {
+        reqLen = theRequestRead.length;
+      } else {
+        throw new IllegalArgumentException(
+            "getLength on uninitialized RpcWrapper");
+      }
+      return CodedOutputStream.computeRawVarint32Size(headerLen) +  headerLen
+          + CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen;
+    }
+  }
+
+  private static class RpcRequestWrapper
+  extends RpcMessageWithHeader<RequestHeaderProto> {
+    @SuppressWarnings("unused")
+    public RpcRequestWrapper() {}
+
+    public RpcRequestWrapper(
+        RequestHeaderProto requestHeader, Message theRequest) {
+      super(requestHeader, theRequest);
+    }
+
+    @Override
+    RequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
+      return RequestHeaderProto.parseFrom(bytes);
+    }
+
+    @Override
+    public String toString() {
+      return requestHeader.getDeclaringClassProtocolName() + "." +
+          requestHeader.getMethodName();
+    }
+  }
+
+  @InterfaceAudience.LimitedPrivate({"RPC"})
+  public static class RpcRequestMessageWrapper
+  extends RpcMessageWithHeader<RpcRequestHeaderProto> {
+    public RpcRequestMessageWrapper() {}
+
+    public RpcRequestMessageWrapper(
+        RpcRequestHeaderProto requestHeader, Message theRequest) {
+      super(requestHeader, theRequest);
+    }
+
+    @Override
+    RpcRequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
+      return RpcRequestHeaderProto.parseFrom(bytes);
+    }
+  }
+
+  @InterfaceAudience.LimitedPrivate({"RPC"})
+  public static class RpcResponseMessageWrapper
+  extends RpcMessageWithHeader<RpcResponseHeaderProto> {
+    public RpcResponseMessageWrapper() {}
+
+    public RpcResponseMessageWrapper(
+        RpcResponseHeaderProto responseHeader, Message theRequest) {
+      super(responseHeader, theRequest);
+    }
+
+    @Override
+    byte[] readMessageRequest(DataInput in) throws IOException {
+      // error message contain no message body
+      switch (requestHeader.getStatus()) {
+        case ERROR:
+        case FATAL:
+          return null;
+        default:
+          return super.readMessageRequest(in);
+      }
+    }
+
+    @Override
+    RpcResponseHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
+      return RpcResponseHeaderProto.parseFrom(bytes);
+    }
+  }
+
+  /**
+   *  Wrapper for Protocol Buffer Responses
+   *
+   * Note while this wrapper is writable, the request on the wire is in
+   * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
+   * use type Writable as a wrapper to work across multiple RpcEngine kinds.
+   */
+  @InterfaceAudience.LimitedPrivate({"RPC"}) // temporarily exposed
+  public static class RpcResponseWrapper implements RpcWrapper {
+    Message theResponse; // for senderSide, the response is here
+    byte[] theResponseRead; // for receiver side, the response is here
+
+    public RpcResponseWrapper() {
+    }
+
+    public RpcResponseWrapper(Message message) {
+      this.theResponse = message;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      OutputStream os = DataOutputOutputStream.constructOutputStream(out);
+      theResponse.writeDelimitedTo(os);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int length = ProtoUtil.readRawVarint32(in);
+      theResponseRead = new byte[length];
+      in.readFully(theResponseRead);
+    }
+
+    @Override
+    public int getLength() {
+      int resLen;
+      if (theResponse != null) {
+        resLen = theResponse.getSerializedSize();
+      } else if (theResponseRead != null ) {
+        resLen = theResponseRead.length;
+      } else {
+        throw new IllegalArgumentException(
+            "getLength on uninitialized RpcWrapper");
+      }
+      return CodedOutputStream.computeRawVarint32Size(resLen) + resLen;
+    }
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static Client getClient(Configuration conf) {
+    return CLIENTS.getClient(conf, SocketFactory.getDefault(),
+        RpcResponseWrapper.class);
+  }
+
+
+
+  @Override
+  public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
+      String bindAddress, int port, int numHandlers, int numReaders,
+      int queueSizePerHandler, boolean verbose, Configuration conf,
+      SecretManager<? extends TokenIdentifier> secretManager,
+      String portRangeConfig)
+      throws IOException {
+    return new Server(protocol, protocolImpl, conf, bindAddress, port,
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
+        portRangeConfig);
+  }
+
+  public static class Server extends RPC.Server {
+    /**
+     * Construct an RPC server.
+     *
+     * @param protocolClass the class of protocol
+     * @param protocolImpl the protocolImpl whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     * @param portRangeConfig A config parameter that can be used to restrict
+     * the range of ports used when port is 0 (an ephemeral port)
+     */
+    public Server(Class<?> protocolClass, Object protocolImpl,
+        Configuration conf, String bindAddress, int port, int numHandlers,
+        int numReaders, int queueSizePerHandler, boolean verbose,
+        SecretManager<? extends TokenIdentifier> secretManager,
+        String portRangeConfig)
+        throws IOException {
+      super(bindAddress, port, null, numHandlers,
+          numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
+              .getClass().getName()), secretManager, portRangeConfig);
+      this.verbose = verbose;
+      registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
+          protocolImpl);
+    }
+
+    /**
+     * Protobuf invoker for {@link RpcInvoker}
+     */
+    static class ProtoBufRpcInvoker implements RpcInvoker {
+      private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
+          String protoName, long clientVersion) throws RpcServerException {
+        ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
+        ProtoClassProtoImpl impl =
+            server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
+        if (impl == null) { // no match for Protocol AND Version
+          VerProtocolImpl highest =
+              
server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+                  protoName);
+          if (highest == null) {
+            throw new RpcNoSuchProtocolException(
+                "Unknown protocol: " + protoName);
+          }
+          // protocol supported but not the version that client wants
+          throw new RPC.VersionMismatch(protoName, clientVersion,
+              highest.version);
+        }
+        return impl;
+      }
+
+      @Override
+      /**
+       * This is a server side method, which is invoked over RPC. On success
+       * the return response has protobuf response payload. On failure, the
+       * exception name and the stack trace are return in the resposne.
+       * See {@link HadoopRpcResponseProto}
+       *
+       * In this method there three types of exceptions possible and they are
+       * returned in response as follows.
+       * <ol>
+       * <li> Exceptions encountered in this method that are returned
+       * as {@link RpcServerException} </li>
+       * <li> Exceptions thrown by the service is wrapped in ServiceException.
+       * In that this method returns in response the exception thrown by the
+       * service.</li>
+       * <li> Other exceptions thrown by the service. They are returned as
+       * it is.</li>
+       * </ol>
+       */
+      public Writable call(RPC.Server server, String protocol,
+          Writable writableRequest, long receiveTime) throws Exception {
+        RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
+        RequestHeaderProto rpcRequest = request.requestHeader;
+        String methodName = rpcRequest.getMethodName();
+        String protoName = rpcRequest.getDeclaringClassProtocolName();
+        long clientVersion = rpcRequest.getClientProtocolVersion();
+        if (server.verbose)
+          LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
+
+        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
+            clientVersion);
+        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
+        MethodDescriptor methodDescriptor = service.getDescriptorForType()
+            .findMethodByName(methodName);
+        if (methodDescriptor == null) {
+          String msg = "Unknown method " + methodName + " called on " + 
protocol
+              + " protocol.";
+          LOG.warn(msg);
+          throw new RpcNoSuchMethodException(msg);
+        }
+        Message prototype = service.getRequestPrototype(methodDescriptor);
+        Message param = prototype.newBuilderForType()
+            .mergeFrom(request.theRequestRead).build();
+
+        Message result;
+        long startTime = Time.now();
+        int qTime = (int) (startTime - receiveTime);
+        Exception exception = null;
+        try {
+          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+          result = service.callBlockingMethod(methodDescriptor, null, param);
+        } catch (ServiceException e) {
+          exception = (Exception) e.getCause();
+          throw (Exception) e.getCause();
+        } catch (Exception e) {
+          exception = e;
+          throw e;
+        } finally {
+          int processingTime = (int) (Time.now() - startTime);
+          if (LOG.isDebugEnabled()) {
+            String msg = "Served: " + methodName + " queueTime= " + qTime +
+                " procesingTime= " + processingTime;
+            if (exception != null) {
+              msg += " exception= " + exception.getClass().getSimpleName();
+            }
+            LOG.debug(msg);
+          }
+          String detailedMetricsName = (exception == null) ?
+              methodName :
+              exception.getClass().getSimpleName();
+          server.rpcMetrics.addRpcQueueTime(qTime);
+          server.rpcMetrics.addRpcProcessingTime(processingTime);
+          server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
+              processingTime);
+        }
+        return new RpcResponseWrapper(result);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
new file mode 100644
index 0000000..a50b938
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+public interface HadoopConstants {
+  String RAFT_SERVER_KERBEROS_PRINCIPAL_KEY
+      = "raft.server.kerberos.principal";
+  String RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY
+      = "raft.client.kerberos.principal";
+  String RAFT_SERVER_PROTOCOL_NAME
+      = "org.apache.hadoop.raft.server.protocol.RaftServerProtocol";
+  String RAFT_CLIENT_PROTOCOL_NAME
+      = "org.apache.hadoop.raft.protocol.RaftClientProtocol";
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java
new file mode 100644
index 0000000..22edef1
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class Proxy<PROTOCOL> implements Closeable {
+  public static <PROTOCOL> PROTOCOL getProxy(
+      Class<PROTOCOL> clazz, String addressStr, Configuration conf)
+      throws IOException {
+    RPC.setProtocolEngine(conf, clazz, ProtobufRpcEngineShaded.class);
+    return RPC.getProxy(clazz, RPC.getProtocolVersion(clazz),
+        org.apache.ratis.util.NetUtils.newInetSocketAddress(addressStr),
+        UserGroupInformation.getCurrentUser(),
+        conf, NetUtils.getSocketFactory(conf, clazz));
+  }
+
+  private final PROTOCOL protocol;
+
+  public Proxy(Class<PROTOCOL> clazz, String addressStr, Configuration conf)
+      throws IOException {
+    this.protocol = getProxy(clazz, addressStr, conf);
+  }
+
+  public PROTOCOL getProtocol() {
+    return protocol;
+  }
+
+  @Override
+  public void close() {
+    RPC.stopProxy(protocol);
+  }
+}

Reply via email to