Repository: incubator-ratis
Updated Branches:
  refs/heads/master 718fa9ea3 -> 8ac50a721


RATIS-22. Use a factory method to create RaftClientRequestSender.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8ac50a72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8ac50a72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8ac50a72

Branch: refs/heads/master
Commit: 8ac50a721505076dc3a7f7dc5416ed27a3273e8a
Parents: 718fa9e
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Tue Feb 28 14:39:22 2017 -0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Tue Feb 28 14:39:22 2017 -0800

----------------------------------------------------------------------
 .../org/apache/ratis/client/ClientFactory.java  | 35 ++++++++++++++
 .../org/apache/ratis/client/RaftClient.java     | 21 +++------
 .../ratis/client/impl/RaftClientImpl.java       | 24 ++++------
 .../org/apache/ratis/rpc/SupportedRpcType.java  |  2 +-
 .../java/org/apache/ratis/util/RaftUtils.java   | 17 +++++++
 .../java/org/apache/ratis/grpc/GrpcFactory.java | 49 ++++++++++++++++++++
 .../grpc/client/RaftClientSenderWithGrpc.java   | 19 ++------
 .../ratis/grpc/server/GrpcServerFactory.java    | 42 -----------------
 .../ratis/grpc/MiniRaftClusterWithGRpc.java     |  7 ---
 .../apache/ratis/hadooprpc/HadoopFactory.java   |  9 +++-
 .../client/HadoopClientRequestSender.java       |  5 +-
 .../hadooprpc/MiniRaftClusterWithHadoopRpc.java |  8 +---
 .../org/apache/ratis/netty/NettyFactory.java    |  9 +++-
 .../netty/client/NettyClientRequestSender.java  | 16 ++-----
 .../ratis/netty/MiniRaftClusterWithNetty.java   |  7 ---
 .../java/org/apache/ratis/MiniRaftCluster.java  | 11 +++--
 .../MiniRaftClusterWithSimulatedRpc.java        |  9 +---
 .../ratis/server/simulation/SimulatedRpc.java   | 12 ++++-
 18 files changed, 165 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java 
b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
new file mode 100644
index 0000000..b775319
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client;
+
+import org.apache.ratis.rpc.RpcFactory;
+
+/** A factory interface for creating client components. */
+public interface ClientFactory extends RpcFactory {
+  static ClientFactory cast(RpcFactory rpcFactory) {
+    if (rpcFactory instanceof ClientFactory) {
+      return (ClientFactory)rpcFactory;
+    }
+    throw new ClassCastException("Cannot cast " + rpcFactory.getClass()
+        + " to " + ClientFactory.class
+        + "; rpc type is " + rpcFactory.getRpcType());
+  }
+
+  /** Create a {@link RaftClientRequestSender}. */
+  RaftClientRequestSender newRaftClientRequestSender();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 41022e2..4f86d40 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -17,20 +17,16 @@
  */
 package org.apache.ratis.client;
 
-import com.google.common.base.Preconditions;
 import org.apache.ratis.client.impl.ClientImplUtils;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Objects;
 
 /** A client who sends requests to a raft service. */
 public interface RaftClient extends Closeable {
@@ -74,22 +70,19 @@ public interface RaftClient extends Closeable {
 
     /** @return a {@link RaftClient} object. */
     public RaftClient build() {
-      Preconditions.checkNotNull(requestSender);
-      Preconditions.checkNotNull(servers);
-
       if (clientId == null) {
         clientId = ClientId.createId();
       }
-      if (leaderId == null) {
-        leaderId = servers.iterator().next().getId(); //use the first peer
-      }
       if (properties != null) {
         retryInterval = properties.getInt(
             RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_KEY,
             RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT);
       }
-      return ClientImplUtils.newRaftClient(clientId, servers, leaderId,
-          requestSender, retryInterval);
+      return ClientImplUtils.newRaftClient(clientId,
+          Objects.requireNonNull(servers, "The 'server' field is not 
initialized."),
+          leaderId,
+          Objects.requireNonNull(requestSender, "The 'requestSender' field is 
not initialized."),
+          retryInterval);
     }
 
     /** Set {@link RaftClient} ID. */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index d29eebe..ff49ab6 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -26,16 +26,13 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Map;
-import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 /** A client who sends requests to a raft service. */
 final class RaftClientImpl implements RaftClient {
   private final ClientId clientId;
   private final RaftClientRequestSender requestSender;
-  private final Map<RaftPeerId, RaftPeer> peers;
+  private final Collection<RaftPeer> peers;
   private final int retryInterval;
 
   private volatile RaftPeerId leaderId;
@@ -45,10 +42,11 @@ final class RaftClientImpl implements RaftClient {
       int retryInterval) {
     this.clientId = clientId;
     this.requestSender = requestSender;
-    this.peers = peers.stream().collect(
-        Collectors.toMap(RaftPeer::getId, Function.identity()));
+    this.peers = peers;
     this.leaderId = leaderId != null? leaderId : 
peers.iterator().next().getId();
     this.retryInterval = retryInterval;
+
+    requestSender.addServers(peers);
   }
 
   @Override
@@ -122,20 +120,18 @@ final class RaftClientImpl implements RaftClient {
 
   private void handleNotLeaderException(RaftClientRequest request,
       NotLeaderException nle) {
-    refreshPeers(nle.getPeers());
+    refreshPeers(Arrays.asList(nle.getPeers()));
     final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null
         : nle.getSuggestedLeader().getId();
     handleIOException(request, nle, newLeader);
   }
 
-  private void refreshPeers(RaftPeer[] newPeers) {
-    if (newPeers != null && newPeers.length > 0) {
+  private void refreshPeers(Collection<RaftPeer> newPeers) {
+    if (newPeers != null && newPeers.size() > 0) {
       peers.clear();
-      for (RaftPeer p : newPeers) {
-        peers.put(p.getId(), p);
-      }
+      peers.addAll(newPeers);
       // also refresh the rpc proxies for these peers
-      requestSender.addServers(Arrays.asList(newPeers));
+      requestSender.addServers(newPeers);
     }
   }
 
@@ -144,7 +140,7 @@ final class RaftClientImpl implements RaftClient {
     LOG.debug("{}: Failed with {}", clientId, ioe);
     final RaftPeerId oldLeader = request.getServerId();
     if (newLeader == null && oldLeader.equals(leaderId)) {
-      newLeader = RaftUtils.next(oldLeader, peers.keySet());
+      newLeader = RaftUtils.next(oldLeader, RaftUtils.as(peers, 
RaftPeer::getId));
     }
     if (newLeader != null && oldLeader.equals(leaderId)) {
       LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, 
newLeader);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java 
b/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
index d222495..dcba59b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
@@ -23,7 +23,7 @@ import org.apache.ratis.util.RaftUtils;
 /** The RPC types supported. */
 public enum SupportedRpcType implements RpcType {
   NETTY("org.apache.ratis.netty.NettyFactory"),
-  GRPC("org.apache.ratis.grpc.server.GrpcServerFactory"),
+  GRPC("org.apache.ratis.grpc.GrpcFactory"),
   HADOOP("org.apache.ratis.hadooprpc.HadoopFactory");
 
   /** Same as {@link #valueOf(String)} except that this method is case 
insensitive. */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
index 5f62f47..7f5703d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
@@ -34,6 +34,7 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 public abstract class RaftUtils {
@@ -305,4 +306,20 @@ public abstract class RaftUtils {
     }
     return first;
   }
+
+  public static <INPUT, OUTPUT> Iterable<OUTPUT> as(
+      Iterable<INPUT> iteration, Function<INPUT, OUTPUT> converter) {
+    final Iterator<INPUT> i = iteration.iterator();
+    return () -> new Iterator<OUTPUT>() {
+      @Override
+      public boolean hasNext() {
+        return i.hasNext();
+      }
+
+      @Override
+      public OUTPUT next() {
+        return converter.apply(i.next());
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
new file mode 100644
index 0000000..df69490
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc;
+import org.apache.ratis.grpc.server.GRpcLogAppender;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.impl.*;
+
+public class GrpcFactory implements ServerFactory, ClientFactory {
+  @Override
+  public SupportedRpcType getRpcType() {
+    return SupportedRpcType.GRPC;
+  }
+
+  @Override
+  public LogAppender newLogAppender(RaftServerImpl server, LeaderState state,
+                                    FollowerInfo f) {
+    return new GRpcLogAppender(server, state, f);
+  }
+
+  @Override
+  public RaftGRpcService newRaftServerRpc(RaftServerImpl server) {
+    return RaftGRpcService.newBuilder()
+        .setServer(server)
+        .build();
+  }
+
+  @Override
+  public RaftClientSenderWithGrpc newRaftClientRequestSender() {
+    return new RaftClientSenderWithGrpc();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
index 50b05da..9a0eca3 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
@@ -17,41 +17,32 @@
  */
 package org.apache.ratis.grpc.client;
 
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.client.RaftClientRequestSender;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.util.PeerProxyMap;
 import org.apache.ratis.util.RaftUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.ratis.client.impl.ClientProtoUtils.*;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import static org.apache.ratis.client.impl.ClientProtoUtils.*;
+
 public class RaftClientSenderWithGrpc implements RaftClientRequestSender {
   public static final Logger LOG = 
LoggerFactory.getLogger(RaftClientSenderWithGrpc.class);
 
   private final PeerProxyMap<RaftClientProtocolClient> proxies
       = new PeerProxyMap<>(RaftClientProtocolClient::new);
 
-  public RaftClientSenderWithGrpc(Collection<RaftPeer> peers) {
-    addServers(peers);
-  }
-
   @Override
   public RaftClientReply sendRequest(RaftClientRequest request)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
deleted file mode 100644
index 9c1d6f0..0000000
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.grpc.RaftGRpcService;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.*;
-
-public class GrpcServerFactory implements ServerFactory {
-  @Override
-  public SupportedRpcType getRpcType() {
-    return SupportedRpcType.GRPC;
-  }
-
-  @Override
-  public LogAppender newLogAppender(RaftServerImpl server, LeaderState state,
-                                    FollowerInfo f) {
-    return new GRpcLogAppender(server, state, f);
-  }
-
-  @Override
-  public RaftGRpcService newRaftServerRpc(RaftServerImpl server) {
-    return RaftGRpcService.newBuilder()
-        .setServer(server)
-        .build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
index 186cac6..4c7d74d 100644
--- 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
@@ -20,9 +20,7 @@ package org.apache.ratis.grpc;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.client.RaftClientRequestSender;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -60,11 +58,6 @@ public class MiniRaftClusterWithGRpc extends 
MiniRaftCluster.RpcBase {
   }
 
   @Override
-  public RaftClientRequestSender getRaftClientRequestSender() {
-    return new RaftClientSenderWithGrpc(getPeers());
-  }
-
-  @Override
   protected Collection<RaftPeer> addNewPeers(
       Collection<RaftServerImpl> newServers, boolean startService) {
     final Collection<RaftPeer> peers = toRaftPeers(newServers);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
index 063858a..9ff493f 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -18,12 +18,14 @@
 package org.apache.ratis.hadooprpc;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.hadooprpc.client.HadoopClientRequestSender;
 import org.apache.ratis.hadooprpc.server.HadoopRpcService;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerFactory;
 
-public class HadoopFactory extends ServerFactory.BaseFactory {
+public class HadoopFactory extends ServerFactory.BaseFactory implements 
ClientFactory {
   private Configuration conf;
 
   public void setConf(Configuration conf) {
@@ -42,4 +44,9 @@ public class HadoopFactory extends ServerFactory.BaseFactory {
         .setConf(conf)
         .build();
   }
+
+  @Override
+  public HadoopClientRequestSender newRaftClientRequestSender() {
+    return new HadoopClientRequestSender(conf);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
index 918a191..1a10dab 100644
--- 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
@@ -24,17 +24,14 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.util.PeerProxyMap;
 
 import java.io.IOException;
-import java.util.Collection;
 
 public class HadoopClientRequestSender implements RaftClientRequestSender {
 
   private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies;
 
-  public HadoopClientRequestSender(
-      Collection<RaftPeer> peers, final Configuration conf) {
+  public HadoopClientRequestSender(final Configuration conf) {
     this.proxies  = new PeerProxyMap<>(
         p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), 
conf));
-    proxies.addPeers(peers);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
index c09c300..76acafd 100644
--- 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
+++ 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
@@ -21,9 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.client.RaftClientRequestSender;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.hadooprpc.client.HadoopClientRequestSender;
 import org.apache.ratis.hadooprpc.server.HadoopRpcServerConfigKeys;
 import org.apache.ratis.hadooprpc.server.HadoopRpcService;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -69,6 +67,7 @@ public class MiniRaftClusterWithHadoopRpc extends 
MiniRaftCluster.RpcBase {
     super(ids, properties, formatted);
     this.hadoopConf = hadoopConf;
     getServers().stream().forEach(s -> setConf(s));
+    ((HadoopFactory)clientFactory).setConf(hadoopConf);
   }
 
   private void setConf(RaftServerImpl server) {
@@ -88,11 +87,6 @@ public class MiniRaftClusterWithHadoopRpc extends 
MiniRaftCluster.RpcBase {
   }
 
   @Override
-  public RaftClientRequestSender getRaftClientRequestSender() {
-    return new HadoopClientRequestSender(getPeers(), hadoopConf);
-  }
-
-  @Override
   public void blockQueueAndSetDelay(String leaderId, int delayMs)
       throws InterruptedException {
     RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index ac71f44..fb27eaa 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -17,12 +17,14 @@
  */
 package org.apache.ratis.netty;
 
+import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.netty.client.NettyClientRequestSender;
 import org.apache.ratis.netty.server.NettyRpcService;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerFactory;
 
-public class NettyFactory extends ServerFactory.BaseFactory {
+public class NettyFactory extends ServerFactory.BaseFactory implements 
ClientFactory {
   @Override
   public SupportedRpcType getRpcType() {
     return SupportedRpcType.NETTY;
@@ -32,4 +34,9 @@ public class NettyFactory extends ServerFactory.BaseFactory {
   public NettyRpcService newRaftServerRpc(RaftServerImpl server) {
     return NettyRpcService.newBuilder().setServer(server).build();
   }
+
+  @Override
+  public NettyClientRequestSender newRaftClientRequestSender() {
+    return new NettyClientRequestSender();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
index 1604b5c..5b36fde 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
@@ -17,28 +17,20 @@
  */
 package org.apache.ratis.netty.client;
 
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.netty.NettyRpcProxy;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
 import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
-import org.apache.ratis.client.RaftClientRequestSender;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.netty.NettyRpcProxy;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.SetConfigurationRequest;
 
 import java.io.IOException;
 
 public class NettyClientRequestSender implements RaftClientRequestSender {
   private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap();
 
-  public NettyClientRequestSender(Iterable<RaftPeer> servers) {
-    addServers(servers);
-  }
-
   @Override
   public RaftClientReply sendRequest(RaftClientRequest request) throws 
IOException {
     final RaftPeerId serverId = request.getServerId();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
 
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index d6d1dc9..29857dd 100644
--- 
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ 
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -20,9 +20,7 @@ package org.apache.ratis.netty;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.client.RaftClientRequestSender;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.netty.client.NettyClientRequestSender;
 import org.apache.ratis.netty.server.NettyRpcService;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -56,11 +54,6 @@ public class MiniRaftClusterWithNetty extends 
MiniRaftCluster.RpcBase {
   }
 
   @Override
-  public RaftClientRequestSender getRaftClientRequestSender() {
-    return new NettyClientRequestSender(getPeers());
-  }
-
-  @Override
   protected void blockQueueAndSetDelay(String leaderId, int delayMs)
       throws InterruptedException {
     RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index afd6eca..751854c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -18,11 +18,12 @@
 package org.apache.ratis;
 
 import com.google.common.base.Preconditions;
+import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.RaftClientRequestSender;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.*;
 import org.apache.ratis.server.storage.MemoryRaftLog;
@@ -140,6 +141,7 @@ public abstract class MiniRaftCluster {
     return ids;
   }
 
+  protected final ClientFactory clientFactory;
   protected RaftConfiguration conf;
   protected final RaftProperties properties;
   private final String testBaseDir;
@@ -150,6 +152,9 @@ public abstract class MiniRaftCluster {
       boolean formatted) {
     this.conf = initConfiguration(ids);
     this.properties = new RaftProperties(properties);
+
+    final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
+    this.clientFactory = ClientFactory.cast(rpcType.newFactory(properties));
     this.testBaseDir = getBaseDirectory();
 
     conf.getPeers().forEach(
@@ -221,8 +226,6 @@ public abstract class MiniRaftCluster {
     return RaftUtils.newInstance(smClass);
   }
 
-  public abstract RaftClientRequestSender getRaftClientRequestSender();
-
   public static Collection<RaftPeer> toRaftPeers(
       Collection<RaftServerImpl> servers) {
     return servers.stream()
@@ -391,7 +394,7 @@ public abstract class MiniRaftCluster {
     return RaftClient.newBuilder()
         .setServers(conf.getPeers())
         .setLeaderId(leaderId)
-        .setRequestSender(getRaftClientRequestSender())
+        .setRequestSender(clientFactory.newRaftClientRequestSender())
         .setProperties(properties)
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index c978bbe..11c4c0a 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -19,11 +19,9 @@ package org.apache.ratis.server.simulation;
 
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.client.RaftClientRequestSender;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +68,8 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
     client2serverRequestReply = new 
SimulatedClientRequestReply(simulateLatencyMs);
     getServers().stream().forEach(s -> initRpc(s));
     addPeersToRpc(toRaftPeers(getServers()));
+    ((SimulatedRpc.Factory)clientFactory).initRpc(
+        serverRequestReply, client2serverRequestReply);
   }
 
   private void initRpc(RaftServerImpl s) {
@@ -111,11 +111,6 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
   }
 
   @Override
-  public RaftClientRequestSender getRaftClientRequestSender() {
-    return client2serverRequestReply;
-  }
-
-  @Override
   public void blockQueueAndSetDelay(String leaderId, int delayMs)
       throws InterruptedException {
     // block leader sendRequest if delayMs > 0

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
index ec85661..9d855c3 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.simulation;
 
+import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.rpc.RpcFactory;
 import org.apache.ratis.rpc.RpcType;
@@ -38,7 +39,7 @@ class SimulatedRpc implements RpcType {
     return new Factory();
   }
 
-  static class Factory extends ServerFactory.BaseFactory {
+  static class Factory extends ServerFactory.BaseFactory implements 
ClientFactory {
     private SimulatedRequestReply<RaftServerRequest, RaftServerReply> 
serverRequestReply;
     private SimulatedClientRequestReply client2serverRequestReply;
 
@@ -51,7 +52,14 @@ class SimulatedRpc implements RpcType {
 
     @Override
     public SimulatedServerRpc newRaftServerRpc(RaftServerImpl server) {
-      return new SimulatedServerRpc(server, serverRequestReply, 
client2serverRequestReply);
+      return new SimulatedServerRpc(server,
+          Objects.requireNonNull(serverRequestReply),
+          Objects.requireNonNull(client2serverRequestReply));
+    }
+
+    @Override
+    public SimulatedClientRequestReply newRaftClientRequestSender() {
+      return Objects.requireNonNull(client2serverRequestReply);
     }
 
     @Override

Reply via email to