http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java 
b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
deleted file mode 100644
index e04d141..0000000
--- a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft;
-
-import org.apache.log4j.Level;
-import org.apache.raft.RaftTestUtil.SimpleMessage;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.examples.RaftExamplesTestUtil;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.simulation.RequestHandler;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.statemachine.SimpleStateMachine4Testing;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.util.RaftUtils;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * Test restarting raft peers.
- */
-@RunWith(Parameterized.class)
-public class TestRestartRaftPeer {
-  static Logger LOG = LoggerFactory.getLogger(TestRestartRaftPeer.class);
-  static {
-    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-    RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
-    RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
-    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() throws IOException {
-    RaftProperties prop = new RaftProperties();
-    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
-        SimpleStateMachine4Testing.class, StateMachine.class);
-    prop.setInt(RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 1024 * 8);
-    return RaftExamplesTestUtil.getMiniRaftClusters(prop, 3);
-  }
-
-  @Parameterized.Parameter
-  public MiniRaftCluster cluster;
-
-  @Rule
-  public Timeout globalTimeout = new Timeout(60 * 1000);
-
-  @Test
-  public void restartFollower() throws Exception {
-    cluster.start();
-    RaftTestUtil.waitForLeader(cluster);
-    final String leaderId = cluster.getLeader().getId();
-    final RaftClient client = cluster.createClient("client", leaderId);
-
-    // write some messages
-    final byte[] content = new byte[1024];
-    Arrays.fill(content, (byte) 1);
-    final SimpleMessage message = new SimpleMessage(new String(content));
-    for (int i = 0; i < 10; i++) {
-      Assert.assertTrue(client.send(message).isSuccess());
-    }
-
-    // restart a follower
-    String followerId = cluster.getFollowers().get(0).getId();
-    LOG.info("Restart follower {}", followerId);
-    cluster.restartServer(followerId, false);
-
-    // write some more messages
-    for (int i = 0; i < 10; i++) {
-      Assert.assertTrue(client.send(message).isSuccess());
-    }
-    client.close();
-
-    // make sure the restarted follower can catchup
-    boolean catchup = false;
-    long lastAppliedIndex = 0;
-    for (int i = 0; i < 10 && !catchup; i++) {
-      Thread.sleep(500);
-      lastAppliedIndex = 
cluster.getServer(followerId).getState().getLastAppliedIndex();
-      catchup = lastAppliedIndex >= 20;
-    }
-    Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup);
-
-    // make sure the restarted peer's log segments is correct
-    cluster.restartServer(followerId, false);
-    Assert.assertTrue(cluster.getServer(followerId).getState().getLog()
-        .getLastEntry().getIndex() >= 20);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/java/org/apache/raft/examples/RaftExamplesTestUtil.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/test/java/org/apache/raft/examples/RaftExamplesTestUtil.java
 
b/raft-examples/src/test/java/org/apache/raft/examples/RaftExamplesTestUtil.java
deleted file mode 100644
index ec622f4..0000000
--- 
a/raft-examples/src/test/java/org/apache/raft/examples/RaftExamplesTestUtil.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.examples;
-
-import org.apache.raft.MiniRaftCluster;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.grpc.MiniRaftClusterWithGRpc;
-import org.apache.raft.hadooprpc.MiniRaftClusterWithHadoopRpc;
-import org.apache.raft.netty.MiniRaftClusterWithNetty;
-import org.apache.raft.server.simulation.MiniRaftClusterWithSimulatedRpc;
-import org.apache.raft.statemachine.StateMachine;
-
-import java.io.IOException;
-import java.util.*;
-
-public class RaftExamplesTestUtil {
-  private static void add(
-      Collection<Object[]> clusters, MiniRaftCluster.Factory factory,
-      String[] ids, RaftProperties properties)
-      throws IOException {
-    clusters.add(new Object[]{factory.newCluster(ids, properties, true)});
-  }
-
-  public static Collection<Object[]> getMiniRaftClusters(
-      RaftProperties prop, int clusterSize, Class<?>... clusterClasses)
-      throws IOException {
-    final List<Class<?>> classes = Arrays.asList(clusterClasses);
-    final boolean isAll = classes.isEmpty(); //empty means all
-
-    final Iterator<String[]> ids = new Iterator<String[]>() {
-      private int i = 0;
-      @Override
-      public boolean hasNext() {
-        return true;
-      }
-      @Override
-      public String[] next() {
-        return MiniRaftCluster.generateIds(clusterSize, i++*clusterSize);
-      }
-    };
-
-    final List<Object[]> clusters = new ArrayList<>();
-
-    if (isAll || classes.contains(MiniRaftClusterWithSimulatedRpc.class)) {
-      add(clusters, MiniRaftClusterWithSimulatedRpc.FACTORY, ids.next(), prop);
-    }
-    if (isAll || classes.contains(MiniRaftClusterWithHadoopRpc.class)) {
-      add(clusters, MiniRaftClusterWithHadoopRpc.FACTORY, ids.next(), prop);
-    }
-    if (isAll || classes.contains(MiniRaftClusterWithNetty.class)) {
-      add(clusters, MiniRaftClusterWithNetty.FACTORY, ids.next(), prop);
-    }
-    if (isAll || classes.contains(MiniRaftClusterWithGRpc.class)) {
-      add(clusters, MiniRaftClusterWithGRpc.FACTORY, ids.next(), prop);
-    }
-    return clusters;
-  }
-
-  public static <S extends StateMachine> Collection<Object[]> 
getMiniRaftClusters(
-      Class<S> stateMachineClass, Class<?>... clusterClasses) throws 
IOException {
-    final RaftProperties prop = new RaftProperties();
-    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
-        stateMachineClass, StateMachine.class);
-    return getMiniRaftClusters(prop, 3, clusterClasses);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/TestArithmetic.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/TestArithmetic.java
 
b/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/TestArithmetic.java
deleted file mode 100644
index 27b3814..0000000
--- 
a/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/TestArithmetic.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.examples.arithmetic;
-
-
-import org.apache.log4j.Level;
-import org.apache.raft.MiniRaftCluster;
-import org.apache.raft.RaftTestUtil;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.examples.RaftExamplesTestUtil;
-import org.apache.raft.examples.arithmetic.expression.*;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.util.RaftUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Collection;
-
-@RunWith(Parameterized.class)
-public class TestArithmetic {
-  static {
-    RaftUtils.setLogLevel(ArithmeticStateMachine.LOG, Level.ALL);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() throws IOException {
-    return 
RaftExamplesTestUtil.getMiniRaftClusters(ArithmeticStateMachine.class);
-  }
-
-  @Parameterized.Parameter
-  public MiniRaftCluster cluster;
-
-  @Test
-  public void testPythagorean() throws Exception {
-    cluster.start();
-    RaftTestUtil.waitForLeader(cluster);
-    final String leaderId = cluster.getLeader().getId();
-    final RaftClient client = cluster.createClient("pythagorean", leaderId);
-
-    final Variable a = new Variable("a");
-    final Variable b = new Variable("b");
-    final Variable c = new Variable("c");
-    final BinaryExpression a2 = new BinaryExpression(BinaryExpression.Op.MULT, 
a, a);
-    final BinaryExpression b2 = new BinaryExpression(BinaryExpression.Op.MULT, 
b, b);
-    final BinaryExpression c2 = new BinaryExpression(BinaryExpression.Op.ADD, 
a2, b2);
-    final AssignmentMessage pythagorean = new AssignmentMessage(c,
-        new UnaryExpression(UnaryExpression.Op.SQRT, c2));
-
-    final AssignmentMessage nullA = new AssignmentMessage(a, 
NullValue.getInstance());
-    final AssignmentMessage nullB = new AssignmentMessage(b, 
NullValue.getInstance());
-    final AssignmentMessage nullC = new AssignmentMessage(c, 
NullValue.getInstance());
-
-    for(int n = 3; n < 100; n += 2) {
-      int n2 = n*n;
-      int half_n2 = n2/2;
-
-      RaftClientReply r;
-      r = client.send(new AssignmentMessage(a, new DoubleValue(n)));
-      assertRaftClientReply(r, (double)n);
-      r = client.sendReadOnly(Expression.Utils.toMessage(a2));
-      assertRaftClientReply(r, (double)n2);
-      r = client.send(new AssignmentMessage(b, new DoubleValue(half_n2)));
-      assertRaftClientReply(r, (double)half_n2);
-      r = client.sendReadOnly(Expression.Utils.toMessage(b2));
-      assertRaftClientReply(r, (double)half_n2*half_n2);
-      r = client.send(pythagorean);
-      assertRaftClientReply(r, (double)half_n2 + 1);
-
-      r = client.send(nullA);
-      assertRaftClientReply(r, null);
-      r = client.send(nullB);
-      assertRaftClientReply(r, null);
-      r = client.send(nullC);
-      assertRaftClientReply(r, null);
-    }
-    client.close();
-    cluster.shutdown();
-  }
-
-  static void assertRaftClientReply(RaftClientReply reply, Double expected) {
-    Assert.assertTrue(reply.isSuccess());
-    final Expression e = Expression.Utils.bytes2Expression(
-        reply.getMessage().getContent().toByteArray(), 0);
-    Assert.assertEquals(expected, e.evaluate(null));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/expression/TestExpression.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/expression/TestExpression.java
 
b/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/expression/TestExpression.java
deleted file mode 100644
index f06c88e..0000000
--- 
a/raft-examples/src/test/java/org/apache/raft/examples/arithmetic/expression/TestExpression.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.examples.arithmetic.expression;
-
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-public class TestExpression {
-  static final Logger LOG = LoggerFactory.getLogger(TestExpression.class);
-
-  @Test
-  public void testArithmeticUtils() throws Exception {
-    final Random ran = ThreadLocalRandom.current();
-    final byte[] buf = new byte[1024];
-    int offset = 0;
-
-    for(int i = 0; i < 10; i++) {
-      {
-        final int n = ran.nextInt();
-        Expression.Utils.int2bytes(n, buf, offset);
-        final int m = Expression.Utils.bytes2int(buf, offset);
-        Assert.assertEquals(n, m);
-        offset += 4;
-      }
-      {
-        final long n = ran.nextLong();
-        Expression.Utils.long2bytes(n, buf, offset);
-        final long m = Expression.Utils.bytes2long(buf, offset);
-        Assert.assertEquals(n, m);
-        offset += 8;
-      }
-      {
-        final double n = ran.nextDouble();
-        Expression.Utils.double2bytes(n, buf, offset);
-        final double m = Expression.Utils.bytes2double(buf, offset);
-        Assert.assertTrue(n == m);
-        offset += 8;
-      }
-    }
-  }
-  @Test
-  public void testOp() throws Exception {
-    for(BinaryExpression.Op op : BinaryExpression.Op.values()) {
-      final byte b = op.byteValue();
-      Assert.assertEquals(op, BinaryExpression.Op.valueOf(b));
-    }
-    for(UnaryExpression.Op op : UnaryExpression.Op.values()) {
-      final byte b = op.byteValue();
-      Assert.assertEquals(op, UnaryExpression.Op.valueOf(b));
-    }
-  }
-
-  @Test
-  public void testExpression() throws Exception {
-    final byte[] buf = new byte[1024];
-    int offset = 0;
-
-    {
-      final Variable a = new Variable("pi");
-      LOG.info("var a: " + a);
-      final int len = a.toBytes(buf, offset);
-      final Variable a2 = new Variable(buf, offset);
-      LOG.info("var a2: " + a2);
-      Assert.assertEquals(a.getName(), a2.getName());
-      Assert.assertEquals(len, a.length());
-      Assert.assertEquals(len, a2.length());
-      offset += len;
-    }
-
-    {
-      final DoubleValue three = new DoubleValue(3);
-      LOG.info("double three: " + three.evaluate(null));
-      final int len = three.toBytes(buf, offset);
-      final DoubleValue three2 = new DoubleValue(buf, offset);
-      LOG.info("double three2: " + three2.evaluate(null));
-      Assert.assertTrue(three.evaluate(null).equals(three2.evaluate(null)));
-      Assert.assertEquals(len, three.length());
-      Assert.assertEquals(len, three2.length());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
 
b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
deleted file mode 100644
index 0832579..0000000
--- 
a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.log4j.Level;
-import org.apache.raft.MiniRaftCluster;
-import org.apache.raft.RaftTestUtil;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.examples.RaftExamplesTestUtil;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.StateMachineException;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.simulation.RequestHandler;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.util.RaftUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.fail;
-
-@RunWith(Parameterized.class)
-public class TestRaftStateMachineException {
-  static {
-    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-    RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
-    RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
-    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
-  }
-
-  protected static class StateMachineWithException extends 
SimpleStateMachine4Testing {
-    @Override
-    public CompletableFuture<Message> applyTransaction(TransactionContext trx) 
{
-      CompletableFuture<Message> future = new CompletableFuture<>();
-      future.completeExceptionally(new StateMachineException("Fake 
Exception"));
-      return future;
-    }
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() throws IOException {
-    return RaftExamplesTestUtil.getMiniRaftClusters(
-        StateMachineWithException.class);
-  }
-
-  @Parameterized.Parameter
-  public MiniRaftCluster cluster;
-
-  @Test
-  public void testHandleStateMachineException() throws Exception {
-    cluster.start();
-    RaftTestUtil.waitForLeader(cluster);
-
-    final String leaderId = cluster.getLeader().getId();
-
-    try(final RaftClient client = cluster.createClient("client", leaderId)) {
-      client.send(new RaftTestUtil.SimpleMessage("m"));
-      fail("Exception expected");
-    } catch (StateMachineException e) {
-      e.printStackTrace();
-      Assert.assertTrue(e.getMessage().contains("Fake Exception"));
-    }
-
-    cluster.shutdown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-examples/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/raft-examples/src/test/resources/log4j.properties 
b/raft-examples/src/test/resources/log4j.properties
deleted file mode 100644
index ced0687..0000000
--- a/raft-examples/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-# log4j configuration used during build and unit tests
-
-log4j.rootLogger=info,stdout
-log4j.threshold=ALL
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/pom.xml
----------------------------------------------------------------------
diff --git a/raft-grpc/pom.xml b/raft-grpc/pom.xml
deleted file mode 100644
index 9b712fd..0000000
--- a/raft-grpc/pom.xml
+++ /dev/null
@@ -1,93 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>raft-project-dist</artifactId>
-    <groupId>com.hortonworks.raft</groupId>
-    <version>1.0-SNAPSHOT</version>
-    <relativePath>../raft-project-dist</relativePath>
-  </parent>
-
-  <artifactId>raft-grpc</artifactId>
-  <name>Raft gRPC Support</name>
-
-  <dependencies>
-    <dependency>
-      <artifactId>raft-proto-shaded</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <artifactId>raft-common</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <artifactId>raft-common</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    
-    <dependency>
-      <artifactId>raft-client</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <artifactId>raft-client</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    
-    <dependency>
-      <artifactId>raft-server</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <artifactId>raft-server</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
deleted file mode 100644
index 1184e2e..0000000
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.grpc.client.RaftClientProtocolService;
-import org.apache.raft.grpc.server.RaftServerProtocolClient;
-import org.apache.raft.grpc.server.RaftServerProtocolService;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.RaftServer;
-import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.shaded.io.grpc.Server;
-import org.apache.raft.shaded.io.grpc.ServerBuilder;
-import org.apache.raft.shaded.io.grpc.netty.NettyServerBuilder;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.util.CodeInjectionForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static 
org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT;
-import static 
org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY;
-
-public class RaftGRpcService implements RaftServerRpc {
-  static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class);
-  public static final String GRPC_SEND_SERVER_REQUEST =
-      RaftGRpcService.class.getSimpleName() + ".sendRequest";
-
-  private final Server server;
-  private final InetSocketAddress address;
-  private final Map<String, RaftServerProtocolClient> peers =
-      Collections.synchronizedMap(new HashMap<>());
-  private final String selfId;
-
-  public RaftGRpcService(RaftServer raftServer, RaftProperties properties) {
-    int port = properties.getInt(RAFT_GRPC_SERVER_PORT_KEY,
-        RAFT_GRPC_SERVER_PORT_DEFAULT);
-    int maxMessageSize = properties.getInt(
-        RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_KEY,
-        RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT);
-    ServerBuilder serverBuilder = ServerBuilder.forPort(port);
-    selfId = raftServer.getId();
-    server = ((NettyServerBuilder) 
serverBuilder).maxMessageSize(maxMessageSize)
-        .addService(new RaftServerProtocolService(selfId, raftServer))
-        .addService(new RaftClientProtocolService(selfId, raftServer))
-        .build();
-
-    // start service to determine the port (in case port is configured as 0)
-    startService();
-    address = new InetSocketAddress(server.getPort());
-    LOG.info("Server started, listening on " + address.getPort());
-  }
-
-  @Override
-  public void start() {
-    // do nothing
-  }
-
-  private void startService() {
-    try {
-      server.start();
-    } catch (IOException e) {
-      LOG.error("Failed to start Grpc server", e);
-      System.exit(1);
-    }
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        System.err.println("*** shutting down gRPC server since JVM is 
shutting down");
-        RaftGRpcService.this.close();
-        System.err.println("*** server shut down");
-      }
-    });
-  }
-
-  @Override
-  public void close() {
-    if (server != null) {
-      server.shutdown();
-    }
-    shutdownClients();
-  }
-
-  @Override
-  public InetSocketAddress getInetSocketAddress() {
-    return address;
-  }
-
-  @Override
-  public AppendEntriesReplyProto appendEntries(
-      AppendEntriesRequestProto request) throws IOException {
-    throw new UnsupportedOperationException(
-        "Blocking AppendEntries call is not supported");
-  }
-
-  @Override
-  public InstallSnapshotReplyProto installSnapshot(
-      InstallSnapshotRequestProto request) throws IOException {
-    throw new UnsupportedOperationException(
-        "Blocking InstallSnapshot call is not supported");
-  }
-
-  @Override
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
-      throws IOException {
-    CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, selfId,
-        null, request);
-
-    RaftServerProtocolClient target = Preconditions.checkNotNull(
-        peers.get(request.getServerRequest().getReplyId()));
-    return target.requestVote(request);
-  }
-
-  @Override
-  public void addPeers(Iterable<RaftPeer> newPeers) {
-    for (RaftPeer p : newPeers) {
-      if (!peers.containsKey(p.getId())) {
-        peers.put(p.getId(), new RaftServerProtocolClient(p));
-      }
-    }
-  }
-
-  private void shutdownClients() {
-    peers.values().forEach(RaftServerProtocolClient::shutdown);
-  }
-
-  public RaftServerProtocolClient getRpcClient(RaftPeer peer) {
-    return peers.get(peer.getId());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcConfigKeys.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcConfigKeys.java
deleted file mode 100644
index 395848b..0000000
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcConfigKeys.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc;
-
-import org.apache.raft.client.RaftClientConfigKeys;
-
-public interface RaftGrpcConfigKeys {
-  String PREFIX = "raft.grpc";
-
-  String RAFT_GRPC_SERVER_PORT_KEY = PREFIX + ".server.port";
-  int RAFT_GRPC_SERVER_PORT_DEFAULT = 0;
-
-  String RAFT_GRPC_MESSAGE_MAXSIZE_KEY = PREFIX + ".message.maxsize";
-  int RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT = 64 * 1024 * 1024; // 64 MB
-
-  String RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY =
-      PREFIX + "leader.max.outstanding.appends";
-  int RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT = 128;
-
-  String RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY =
-      PREFIX + "client.max.outstanding.appends";
-  int RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT = 128;
-
-  String RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY = "raft.outputstream.buffer.size";
-  int RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT = 64 * 1024;
-
-  String RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY = 
"raft.outputstream.max.retry.times";
-  int RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT = 5;
-
-  String RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY = 
"raft.outputstream.retry.interval";
-  long RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT = 
RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcUtil.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcUtil.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcUtil.java
deleted file mode 100644
index 6afb39b..0000000
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGrpcUtil.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc;
-
-import org.apache.raft.shaded.io.grpc.Metadata;
-import org.apache.raft.shaded.io.grpc.Status;
-import org.apache.raft.shaded.io.grpc.StatusRuntimeException;
-import org.apache.raft.util.RaftUtils;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-
-public class RaftGrpcUtil {
-  public static final Metadata.Key<String> EXCEPTION_TYPE_KEY =
-      Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
-
-  public static String stringifyException(Throwable e) {
-    StringWriter stm = new StringWriter();
-    PrintWriter wrt = new PrintWriter(stm);
-    e.printStackTrace(wrt);
-    wrt.close();
-    return stm.toString();
-  }
-
-  public static StatusRuntimeException wrapException(Throwable t) {
-    Metadata trailers = new Metadata();
-    trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
-    return new StatusRuntimeException(
-        Status.INTERNAL.withDescription(RaftGrpcUtil.stringifyException(t)),
-        trailers);
-  }
-
-  public static IOException unwrapException(StatusRuntimeException se) {
-    final Metadata trailers = se.getTrailers();
-    final Status status = se.getStatus();
-    if (trailers != null && status != null) {
-      final String className = trailers.get(EXCEPTION_TYPE_KEY);
-      if (className != null) {
-        try {
-          Class<?> clazz = Class.forName(className);
-          final Exception unwrapped = instantiateException(
-              clazz.asSubclass(Exception.class), status.getDescription(), se);
-          return RaftUtils.asIOException(unwrapped);
-        } catch (Exception e) {
-          return new IOException(se);
-        }
-      }
-    }
-    return new IOException(se);
-  }
-
-  public static IOException unwrapIOException(Throwable t) {
-    final IOException e;
-    if (t instanceof StatusRuntimeException) {
-      e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
-    } else {
-      e = RaftUtils.asIOException(t);
-    }
-    return e;
-  }
-
-  private static Exception instantiateException(Class<? extends Exception> cls,
-      String message, Exception from) throws Exception {
-    Constructor<? extends Exception> cn = cls.getConstructor(String.class);
-    cn.setAccessible(true);
-    Exception ex = cn.newInstance(message);
-    ex.initCause(from);
-    return ex;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/AppendStreamer.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/client/AppendStreamer.java
deleted file mode 100644
index 9cf8cd5..0000000
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/AppendStreamer.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc.client;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.grpc.RaftGrpcConfigKeys;
-import org.apache.raft.grpc.RaftGrpcUtil;
-import org.apache.raft.protocol.NotLeaderException;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.shaded.com.google.protobuf.ByteString;
-import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.raft.shaded.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.PeerProxyMap;
-import org.apache.raft.util.RaftUtils;
-import org.apache.raft.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static org.apache.raft.client.impl.ClientProtoUtils.*;
-import static 
org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT;
-import static 
org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY;
-
-public class AppendStreamer implements Closeable {
-  public static final Logger LOG = 
LoggerFactory.getLogger(AppendStreamer.class);
-
-  enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR}
-
-  private static class ExceptionAndRetry {
-    private final Map<String, IOException> exceptionMap = new HashMap<>();
-    private final AtomicInteger retryTimes = new AtomicInteger(0);
-    private final int maxRetryTimes;
-    private final long retryInterval;
-
-    ExceptionAndRetry(RaftProperties prop) {
-      maxRetryTimes = prop.getInt(
-          RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY,
-          RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT);
-      retryInterval = prop.getTimeDuration(
-          RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY,
-          RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT,
-          TimeUnit.MILLISECONDS);
-    }
-
-    void addException(String peer, IOException e) {
-      exceptionMap.put(peer, e);
-      retryTimes.incrementAndGet();
-    }
-
-    IOException getCombinedException() {
-      return new IOException("Exceptions: " + exceptionMap);
-    }
-
-    boolean shouldRetry() {
-      return retryTimes.get() <= maxRetryTimes;
-    }
-  }
-
-  private final Deque<RaftClientRequestProto> dataQueue;
-  private final Deque<RaftClientRequestProto> ackQueue;
-  private final int maxPendingNum;
-
-  private final PeerProxyMap<RaftClientProtocolProxy> proxyMap;
-  private final Map<String, RaftPeer> peers;
-  private String leaderId;
-  private volatile RaftClientProtocolProxy leaderProxy;
-  private final String clientId;
-
-  private volatile RunningState running = RunningState.RUNNING;
-  private final ExceptionAndRetry exceptionAndRetry;
-  private final Sender senderThread;
-
-  AppendStreamer(RaftProperties prop, Collection<RaftPeer> peers,
-      String leaderId, String clientId) {
-    this.clientId = clientId;
-    maxPendingNum = prop.getInt(
-        RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY,
-        RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT);
-    dataQueue = new ConcurrentLinkedDeque<>();
-    ackQueue = new ConcurrentLinkedDeque<>();
-    exceptionAndRetry = new ExceptionAndRetry(prop);
-
-    this.peers = peers.stream().collect(
-        Collectors.toMap(RaftPeer::getId, Function.identity()));
-    proxyMap = new PeerProxyMap<>(
-        raftPeer -> new RaftClientProtocolProxy(raftPeer, 
ResponseHandler::new));
-    proxyMap.addPeers(peers);
-    refreshLeaderProxy(leaderId, null);
-
-    senderThread = new Sender();
-    senderThread.setName(this.toString() + "-sender");
-    senderThread.start();
-  }
-
-  private synchronized void refreshLeaderProxy(String suggested,
-      String oldLeader) {
-    if (suggested != null) {
-      leaderId = suggested;
-    } else {
-      if (oldLeader == null) {
-        leaderId = peers.keySet().iterator().next();
-      } else {
-        leaderId = StringUtils.next(oldLeader, peers.keySet());
-      }
-    }
-    LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this,
-          oldLeader, leaderId, suggested);
-    if (leaderProxy != null) {
-      leaderProxy.closeCurrentSession();
-    }
-    try {
-      leaderProxy = proxyMap.getProxy(leaderId);
-    } catch (IOException e) {
-      LOG.error("Should not hit IOException here", e);
-      refreshLeader(null, leaderId);
-    }
-  }
-
-  private boolean isRunning() {
-    return running == RunningState.RUNNING ||
-        running == RunningState.LOOK_FOR_LEADER;
-  }
-
-  private void checkState() throws IOException {
-    if (!isRunning()) {
-      throwException("The AppendStreamer has been closed");
-    }
-  }
-
-  synchronized void write(ByteString content, long seqNum)
-      throws IOException {
-    checkState();
-    while (isRunning() && dataQueue.size() >= maxPendingNum) {
-      try {
-        wait();
-      } catch (InterruptedException ignored) {
-      }
-    }
-    if (isRunning()) {
-      // wrap the current buffer into a RaftClientRequestProto
-      final RaftClientRequestProto request = genRaftClientRequestProto(
-          clientId, leaderId, seqNum, content, false);
-      dataQueue.offer(request);
-      this.notifyAll();
-    } else {
-      throwException(this + " got closed.");
-    }
-  }
-
-  synchronized void flush() throws IOException {
-    checkState();
-    if (dataQueue.isEmpty() && ackQueue.isEmpty()) {
-      return;
-    }
-    // wait for the pending Q to become empty
-    while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
-      try {
-        wait();
-      } catch (InterruptedException ignored) {
-      }
-    }
-    if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
-      throwException(this + " got closed before finishing flush");
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (!isRunning()) {
-      return;
-    }
-    flush();
-
-    running = RunningState.CLOSED;
-    senderThread.interrupt();
-    try {
-      senderThread.join();
-    } catch (InterruptedException ignored) {
-    }
-    proxyMap.close();
-  }
-
-  @Override
-  public String toString() {
-    return this.getClass().getSimpleName() + "-" + clientId;
-  }
-
-  private class Sender extends Daemon {
-    @Override
-    public void run() {
-      while (isRunning()) {
-
-        synchronized (AppendStreamer.this) {
-          while (isRunning() && shouldWait()) {
-            try {
-              AppendStreamer.this.wait();
-            } catch (InterruptedException ignored) {
-            }
-          }
-          if (running == RunningState.RUNNING) {
-            RaftClientRequestProto next = dataQueue.poll();
-            leaderProxy.onNext(next);
-            ackQueue.offer(next);
-          }
-        }
-      }
-    }
-
-    private boolean shouldWait() {
-      // the sender should wait if any of the following is true
-      // 1) there is no data to send
-      // 2) there are too many outstanding pending requests
-      // 3) Error/NotLeaderException just happened, we're still waiting for
-      //    the first response to confirm the new leader
-      return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum ||
-          running == RunningState.LOOK_FOR_LEADER;
-    }
-  }
-
-  /** the response handler for stream RPC */
-  private class ResponseHandler implements
-      RaftClientProtocolProxy.CloseableStreamObserver {
-    private final String targetId;
-    // once handled the first NotLeaderException or Error, the handler should
-    // be inactive and should not make any further action.
-    private volatile boolean active = true;
-
-    ResponseHandler(RaftPeer target) {
-      targetId = target.getId();
-    }
-
-    @Override
-    public String toString() {
-      return AppendStreamer.this + "-ResponseHandler-" + targetId;
-    }
-
-    @Override
-    public void onNext(RaftClientReplyProto reply) {
-      if (!active) {
-        return;
-      }
-      synchronized (AppendStreamer.this) {
-        RaftClientRequestProto pending = Preconditions.checkNotNull(
-            ackQueue.peek());
-        if (reply.getRpcReply().getSuccess()) {
-          Preconditions.checkState(pending.getRpcRequest().getSeqNum() ==
-              reply.getRpcReply().getSeqNum());
-          ackQueue.poll();
-          LOG.trace("{} received success ack for request {}", this,
-              pending.getRpcRequest());
-          // we've identified the correct leader
-          if (running == RunningState.LOOK_FOR_LEADER) {
-            running = RunningState.RUNNING;
-          }
-        } else {
-          // this may be a NotLeaderException
-          RaftClientReply r = toRaftClientReply(reply);
-          if (r.isNotLeader()) {
-            LOG.debug("{} received a NotLeaderException from {}", this,
-                r.getReplierId());
-            handleNotLeader(r.getNotLeaderException(), targetId);
-          }
-        }
-        AppendStreamer.this.notifyAll();
-      }
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      if (active) {
-        synchronized (AppendStreamer.this) {
-          handleError(t, this);
-          AppendStreamer.this.notifyAll();
-        }
-      }
-    }
-
-    @Override
-    public void onCompleted() {
-      LOG.info("{} onCompleted, pending requests #: {}", this,
-          ackQueue.size());
-    }
-
-    @Override // called by handleError and handleNotLeader
-    public void close() throws IOException {
-      active = false;
-    }
-  }
-
-  private void throwException(String msg) throws IOException {
-    if (running == RunningState.ERROR) {
-      throw exceptionAndRetry.getCombinedException();
-    } else {
-      throw new IOException(msg);
-    }
-  }
-
-  private void handleNotLeader(NotLeaderException nle,
-      String oldLeader) {
-    Preconditions.checkState(Thread.holdsLock(AppendStreamer.this));
-    // handle NotLeaderException: refresh leader and RaftConfiguration
-    refreshPeers(nle.getPeers());
-
-    refreshLeader(nle.getSuggestedLeader().getId(), oldLeader);
-  }
-
-  private void handleError(Throwable t, ResponseHandler handler) {
-    Preconditions.checkState(Thread.holdsLock(AppendStreamer.this));
-    final IOException e = RaftGrpcUtil.unwrapIOException(t);
-
-    exceptionAndRetry.addException(handler.targetId, e);
-    LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.",
-        handler, e, exceptionAndRetry.retryTimes.get(),
-        exceptionAndRetry.maxRetryTimes);
-
-    leaderProxy.onError();
-    if (exceptionAndRetry.shouldRetry()) {
-      refreshLeader(null, leaderId);
-    } else {
-      running = RunningState.ERROR;
-    }
-  }
-
-  private void refreshLeader(String suggestedLeader, String oldLeader) {
-    running = RunningState.LOOK_FOR_LEADER;
-    refreshLeaderProxy(suggestedLeader, oldLeader);
-    reQueuePendingRequests(leaderId);
-
-    final RaftClientRequestProto request = Preconditions.checkNotNull(
-        dataQueue.poll());
-    ackQueue.offer(request);
-    try {
-      Thread.sleep(exceptionAndRetry.retryInterval);
-    } catch (InterruptedException ignored) {
-    }
-    leaderProxy.onNext(request);
-  }
-
-  private void reQueuePendingRequests(String newLeader) {
-    if (isRunning()) {
-      // resend all the pending requests
-      while (!ackQueue.isEmpty()) {
-        RaftClientRequestProto oldRequest = ackQueue.pollLast();
-        RaftRpcRequestProto r = oldRequest.getRpcRequest();
-        RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder()
-            .setMessage(oldRequest.getMessage())
-            .setReadOnly(oldRequest.getReadOnly())
-            .setRpcRequest(toRaftRpcRequestProtoBuilder(
-                clientId, newLeader, r.getSeqNum()))
-            .build();
-        dataQueue.offerFirst(newRequest);
-      }
-    }
-  }
-
-  private void refreshPeers(RaftPeer[] newPeers) {
-    if (newPeers != null && newPeers.length > 0) {
-      // we only add new peers, we do not remove any peer even if it no longer
-      // belongs to the current raft conf
-      Arrays.stream(newPeers).forEach(peer -> {
-        peers.putIfAbsent(peer.getId(), peer);
-        proxyMap.putIfAbsent(peer);
-      });
-
-      LOG.debug("refreshed peers: {}", peers);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolClient.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolClient.java
deleted file mode 100644
index a8372a3..0000000
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolClient.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc.client;
-
-import org.apache.raft.grpc.RaftGrpcUtil;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.shaded.io.grpc.ManagedChannel;
-import org.apache.raft.shaded.io.grpc.ManagedChannelBuilder;
-import org.apache.raft.shaded.io.grpc.StatusRuntimeException;
-import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
-import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.raft.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.raft.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
-import 
org.apache.raft.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
-import 
org.apache.raft.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class RaftClientProtocolClient implements Closeable {
-  private final RaftPeer target;
-  private final ManagedChannel channel;
-  private final RaftClientProtocolServiceBlockingStub blockingStub;
-  private final RaftClientProtocolServiceStub asyncStub;
-
-  public RaftClientProtocolClient(RaftPeer target) {
-    this.target = target;
-    channel = ManagedChannelBuilder.forTarget(target.getAddress())
-        .usePlaintext(true).build();
-    blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
-    asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
-  }
-
-  @Override
-  public void close() {
-    channel.shutdownNow();
-  }
-
-  public RaftClientReplyProto setConfiguration(
-      SetConfigurationRequestProto request) throws IOException {
-    try {
-      return blockingStub.setConfiguration(request);
-    } catch (StatusRuntimeException e) {
-      // unwrap StatusRuntimeException
-      throw RaftGrpcUtil.unwrapException(e);
-    }
-  }
-
-  StreamObserver<RaftClientRequestProto> append(
-      StreamObserver<RaftClientReplyProto> responseHandler) {
-    return asyncStub.append(responseHandler);
-  }
-
-  public RaftPeer getTarget() {
-    return target;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolProxy.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolProxy.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolProxy.java
deleted file mode 100644
index 01ec023..0000000
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolProxy.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc.client;
-
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
-import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.function.Function;
-
-public class RaftClientProtocolProxy implements Closeable {
-  private final RaftClientProtocolClient proxy;
-  private final Function<RaftPeer, CloseableStreamObserver> 
responseHandlerCreation;
-  private RpcSession currentSession;
-
-  public RaftClientProtocolProxy(RaftPeer target,
-      Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation) {
-    proxy = new RaftClientProtocolClient(target);
-    this.responseHandlerCreation = responseHandlerCreation;
-  }
-
-  @Override
-  public void close() throws IOException {
-    closeCurrentSession();
-    proxy.close();
-  }
-
-  @Override
-  public String toString() {
-    return "ProxyTo:" + proxy.getTarget();
-  }
-
-  public void closeCurrentSession() {
-    if (currentSession != null) {
-      currentSession.close();
-      currentSession = null;
-    }
-  }
-
-  public void onNext(RaftClientRequestProto request) {
-    if (currentSession == null) {
-      currentSession = new RpcSession(
-          responseHandlerCreation.apply(proxy.getTarget()));
-    }
-    currentSession.requestObserver.onNext(request);
-  }
-
-  public void onError() {
-    if (currentSession != null) {
-      currentSession.onError();
-    }
-  }
-
-  public interface CloseableStreamObserver
-      extends StreamObserver<RaftClientReplyProto>, Closeable {
-  }
-
-  class RpcSession implements Closeable {
-    private final StreamObserver<RaftClientRequestProto> requestObserver;
-    private final CloseableStreamObserver responseHandler;
-    private boolean hasError = false;
-
-    RpcSession(CloseableStreamObserver responseHandler) {
-      this.responseHandler = responseHandler;
-      this.requestObserver = proxy.append(responseHandler);
-    }
-
-    void onError() {
-      hasError = true;
-    }
-
-    @Override
-    public void close() {
-      if (!hasError) {
-        try {
-          requestObserver.onCompleted();
-        } catch (Exception ignored) {
-        }
-      }
-      try {
-        responseHandler.close();
-      } catch (IOException ignored) {
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
deleted file mode 100644
index 8f41bdc..0000000
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc.client;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.client.impl.ClientProtoUtils;
-import org.apache.raft.grpc.RaftGrpcUtil;
-import org.apache.raft.protocol.RaftClientAsynchronousProtocol;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
-import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.raft.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import 
org.apache.raft.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-
-public class RaftClientProtocolService extends 
RaftClientProtocolServiceImplBase {
-  static final Logger LOG = 
LoggerFactory.getLogger(RaftClientProtocolService.class);
-
-  private static class PendingAppend implements Comparable<PendingAppend> {
-    private final long seqNum;
-    private volatile RaftClientReply reply;
-
-    PendingAppend(long seqNum) {
-      this.seqNum = seqNum;
-    }
-
-    boolean isReady() {
-      return reply != null || this == COMPLETED;
-    }
-
-    void setReply(RaftClientReply reply) {
-      this.reply = reply;
-    }
-
-    @Override
-    public int compareTo(PendingAppend p) {
-      return seqNum == p.seqNum ? 0 : (seqNum < p.seqNum ? -1 : 1);
-    }
-
-    @Override
-    public String toString() {
-      return seqNum + ", reply:" + (reply == null ? "null" : reply.toString());
-    }
-  }
-  private static final PendingAppend COMPLETED = new 
PendingAppend(Long.MAX_VALUE);
-
-  private final String id;
-  private final RaftClientAsynchronousProtocol client;
-
-  public RaftClientProtocolService(String id, RaftClientAsynchronousProtocol 
client) {
-    this.id = id;
-    this.client = client;
-  }
-
-  @Override
-  public void setConfiguration(SetConfigurationRequestProto request,
-      StreamObserver<RaftClientReplyProto> responseObserver) {
-    try {
-      CompletableFuture<RaftClientReply> future = client.setConfigurationAsync(
-          ClientProtoUtils.toSetConfigurationRequest(request));
-      future.whenCompleteAsync((reply, exception) -> {
-        if (exception != null) {
-          responseObserver.onError(RaftGrpcUtil.wrapException(exception));
-        } else {
-          
responseObserver.onNext(ClientProtoUtils.toRaftClientReplyProto(reply));
-          responseObserver.onCompleted();
-        }
-      });
-    } catch (Exception e) {
-      responseObserver.onError(RaftGrpcUtil.wrapException(e));
-    }
-  }
-
-  @Override
-  public StreamObserver<RaftClientRequestProto> append(
-      StreamObserver<RaftClientReplyProto> responseObserver) {
-    return new AppendRequestStreamObserver(responseObserver);
-  }
-
-  private class AppendRequestStreamObserver implements
-      StreamObserver<RaftClientRequestProto> {
-    private final List<PendingAppend> pendingList = new LinkedList<>();
-    private final StreamObserver<RaftClientReplyProto> responseObserver;
-
-    AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
-      this.responseObserver = ro;
-    }
-
-    @Override
-    public void onNext(RaftClientRequestProto request) {
-      try {
-        PendingAppend p = new 
PendingAppend(request.getRpcRequest().getSeqNum());
-        synchronized (pendingList) {
-          pendingList.add(p);
-        }
-
-        CompletableFuture<RaftClientReply> future = 
client.submitClientRequestAsync(
-            ClientProtoUtils.toRaftClientRequest(request));
-        future.whenCompleteAsync((reply, exception) -> {
-          if (exception != null) {
-            // TODO: the exception may be from either raft or state machine.
-            // Currently we skip all the following responses when getting an
-            // exception from the state machine.
-            responseObserver.onError(RaftGrpcUtil.wrapException(exception));
-          } else {
-            final long replySeq = reply.getSeqNum();
-            synchronized (pendingList) {
-              Preconditions.checkState(!pendingList.isEmpty(),
-                  "PendingList is empty when handling onNext for seqNum %s",
-                  replySeq);
-              final long headSeqNum = pendingList.get(0).seqNum;
-              // we assume the seqNum is consecutive for a stream RPC call
-              final PendingAppend pendingForReply = pendingList.get(
-                  (int) (replySeq - headSeqNum));
-              Preconditions.checkState(pendingForReply != null &&
-                      pendingForReply.seqNum == replySeq,
-                  "pending for reply is: %s, the pending list: %s",
-                  pendingForReply, pendingList);
-              pendingForReply.setReply(reply);
-
-              if (headSeqNum == replySeq) {
-                Collection<PendingAppend> readySet = new ArrayList<>();
-                // if this is head, we send back all the ready responses
-                Iterator<PendingAppend> iter = pendingList.iterator();
-                PendingAppend pending;
-                while (iter.hasNext() && ((pending = iter.next()).isReady())) {
-                  readySet.add(pending);
-                  iter.remove();
-                }
-                sendReadyReplies(readySet);
-              }
-            }
-          }
-        });
-      } catch (Throwable e) {
-        LOG.info("{} got exception when handling client append request {}: {}",
-            id, request.getRpcRequest(), e);
-        responseObserver.onError(RaftGrpcUtil.wrapException(e));
-      }
-    }
-
-    private void sendReadyReplies(Collection<PendingAppend> readySet) {
-      readySet.forEach(ready -> {
-        Preconditions.checkState(ready.isReady());
-        if (ready == COMPLETED) {
-          responseObserver.onCompleted();
-        } else {
-          responseObserver.onNext(
-              ClientProtoUtils.toRaftClientReplyProto(ready.reply));
-        }
-      });
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      // for now we just log a msg
-      LOG.warn("{} onError: client Append cancelled", id, t);
-      synchronized (pendingList) {
-        pendingList.clear();
-      }
-    }
-
-    @Override
-    public void onCompleted() {
-      synchronized (pendingList) {
-        if (pendingList.isEmpty()) {
-          responseObserver.onCompleted();
-        } else {
-          pendingList.add(COMPLETED);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientSenderWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientSenderWithGrpc.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientSenderWithGrpc.java
deleted file mode 100644
index 7351e1a..0000000
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientSenderWithGrpc.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc.client;
-
-import org.apache.raft.client.RaftClientRequestSender;
-import org.apache.raft.grpc.RaftGrpcUtil;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.shaded.io.grpc.StatusRuntimeException;
-import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
-import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.raft.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.raft.util.PeerProxyMap;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.raft.client.impl.ClientProtoUtils.*;
-
-public class RaftClientSenderWithGrpc implements RaftClientRequestSender {
-  public static final Logger LOG = 
LoggerFactory.getLogger(RaftClientSenderWithGrpc.class);
-
-  private final PeerProxyMap<RaftClientProtocolClient> proxies
-      = new PeerProxyMap<>(RaftClientProtocolClient::new);
-
-  public RaftClientSenderWithGrpc(Collection<RaftPeer> peers) {
-    addServers(peers);
-  }
-
-  @Override
-  public RaftClientReply sendRequest(RaftClientRequest request)
-      throws IOException {
-    final String serverId = request.getReplierId();
-    final RaftClientProtocolClient proxy = proxies.getProxy(serverId);
-    if (request instanceof SetConfigurationRequest) {
-      SetConfigurationRequestProto setConf =
-          toSetConfigurationRequestProto((SetConfigurationRequest) request);
-      return toRaftClientReply(proxy.setConfiguration(setConf));
-    } else {
-      RaftClientRequestProto requestProto = toRaftClientRequestProto(request);
-      CompletableFuture<RaftClientReplyProto> replyFuture =
-          new CompletableFuture<>();
-      final StreamObserver<RaftClientRequestProto> requestObserver =
-          proxy.append(new StreamObserver<RaftClientReplyProto>() {
-            @Override
-            public void onNext(RaftClientReplyProto value) {
-              replyFuture.complete(value);
-            }
-
-            @Override
-            public void onError(Throwable t) {
-              // This implementation is used as RaftClientRequestSender. Retry
-              // logic on Exception is in RaftClient.
-              final IOException e;
-              if (t instanceof StatusRuntimeException) {
-                e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
-              } else {
-                e = RaftUtils.asIOException(t);
-              }
-              replyFuture.completeExceptionally(e);
-            }
-
-            @Override
-            public void onCompleted() {
-              if (!replyFuture.isDone()) {
-                replyFuture.completeExceptionally(
-                    new IOException("No reply for request " + request));
-              }
-            }
-          });
-      requestObserver.onNext(requestProto);
-      requestObserver.onCompleted();
-
-      // TODO: timeout support
-      try {
-        return toRaftClientReply(replyFuture.get());
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException(
-            "Interrupted while waiting for response of request " + request);
-      } catch (ExecutionException e) {
-        throw RaftUtils.toIOException(e);
-      }
-    }
-  }
-
-  @Override
-  public void addServers(Iterable<RaftPeer> servers) {
-    proxies.addPeers(servers);
-  }
-
-  @Override
-  public void close() throws IOException {
-    proxies.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftOutputStream.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftOutputStream.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftOutputStream.java
deleted file mode 100644
index 7edcab9..0000000
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftOutputStream.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc.client;
-
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.util.ProtoUtils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collection;
-
-import static 
org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT;
-import static 
org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY;
-
-public class RaftOutputStream extends OutputStream {
-  /** internal buffer */
-  private final byte buf[];
-  private int count;
-  private long seqNum = 0;
-  private final String clientId;
-  private final AppendStreamer streamer;
-
-  private boolean closed = false;
-
-  public RaftOutputStream(RaftProperties prop, String clientId,
-      Collection<RaftPeer> peers, String leaderId) {
-    final int bufferSize = prop.getInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY,
-        RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT);
-    buf = new byte[bufferSize];
-    count = 0;
-    this.clientId = clientId;
-    streamer = new AppendStreamer(prop, peers, leaderId, clientId);
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    checkClosed();
-    buf[count++] = (byte)b;
-    flushIfNecessary();
-  }
-
-  private void flushIfNecessary() throws IOException {
-    if(count == buf.length) {
-      flushToStreamer();
-    }
-  }
-
-  @Override
-  public void write(byte b[], int off, int len) throws IOException {
-    checkClosed();
-    if (off < 0 || len < 0 || off > b.length - len) {
-      throw new ArrayIndexOutOfBoundsException();
-    }
-
-    int total = 0;
-    while (total < len) {
-      int toWrite = Math.min(len - total, buf.length - count);
-      System.arraycopy(b, off + total, buf, count, toWrite);
-      count += toWrite;
-      total += toWrite;
-      flushIfNecessary();
-    }
-  }
-
-  private void flushToStreamer() throws IOException {
-    if (count > 0) {
-      streamer.write(ProtoUtils.toByteString(buf, 0, count), seqNum++);
-      count = 0;
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    checkClosed();
-    flushToStreamer();
-    streamer.flush();
-  }
-
-  @Override
-  public void close() throws IOException {
-    flushToStreamer();
-    streamer.close(); // streamer will flush
-    this.closed = true;
-  }
-
-  @Override
-  public String toString() {
-    return "RaftOutputStream-" + clientId;
-  }
-
-  private void checkClosed() throws IOException {
-    if (closed) {
-      throw new IOException(this.toString() + " was closed.");
-    }
-  }
-}

Reply via email to