Repository: incubator-ratis Updated Branches: refs/heads/master 392fa5141 -> fdbbdf98c
RATIS-32. Rename RaftClientRequestSender to RaftClientRpc. Contributed by Tsz Wo Nicholas Sze Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/fdbbdf98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/fdbbdf98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/fdbbdf98 Branch: refs/heads/master Commit: fdbbdf98c0d5db82ba6fc2e78893bfd006040abe Parents: 392fa51 Author: Mingliang Liu <[email protected]> Authored: Tue Feb 28 15:46:18 2017 -0800 Committer: Mingliang Liu <[email protected]> Committed: Tue Feb 28 15:46:18 2017 -0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/ClientFactory.java | 4 +- .../org/apache/ratis/client/RaftClient.java | 16 +-- .../ratis/client/RaftClientRequestSender.java | 34 ------ .../org/apache/ratis/client/RaftClientRpc.java | 34 ++++++ .../ratis/client/impl/ClientImplUtils.java | 11 +- .../ratis/client/impl/RaftClientImpl.java | 20 ++-- .../java/org/apache/ratis/grpc/GrpcFactory.java | 6 +- .../apache/ratis/grpc/client/GrpcClientRpc.java | 111 +++++++++++++++++++ .../grpc/client/RaftClientSenderWithGrpc.java | 111 ------------------- .../apache/ratis/hadooprpc/HadoopFactory.java | 6 +- .../client/HadoopClientRequestSender.java | 68 ------------ .../ratis/hadooprpc/client/HadoopClientRpc.java | 68 ++++++++++++ .../org/apache/ratis/netty/NettyFactory.java | 6 +- .../netty/client/NettyClientRequestSender.java | 64 ----------- .../ratis/netty/client/NettyClientRpc.java | 64 +++++++++++ .../java/org/apache/ratis/MiniRaftCluster.java | 2 +- .../ratis/RaftNotLeaderExceptionBaseTest.java | 6 +- .../impl/RaftReconfigurationBaseTest.java | 10 +- .../MiniRaftClusterWithSimulatedRpc.java | 4 +- .../simulation/SimulatedClientRequestReply.java | 41 ------- .../server/simulation/SimulatedClientRpc.java | 41 +++++++ .../ratis/server/simulation/SimulatedRpc.java | 6 +- 22 files changed, 366 insertions(+), 367 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java index b775319..6407ba8 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java @@ -30,6 +30,6 @@ public interface ClientFactory extends RpcFactory { + "; rpc type is " + rpcFactory.getRpcType()); } - /** Create a {@link RaftClientRequestSender}. */ - RaftClientRequestSender newRaftClientRequestSender(); + /** Create a {@link RaftClientRpc}. */ + RaftClientRpc newRaftClientRpc(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 4f86d40..8fbffd3 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -36,8 +36,8 @@ public interface RaftClient extends Closeable { /** @return the id of this client. */ ClientId getId(); - /** @return the request sender of this client. */ - RaftClientRequestSender getRequestSender(); + /** @return the client rpct. */ + RaftClientRpc getClientRpc(); /** * Send the given message to the raft service. @@ -60,7 +60,7 @@ public interface RaftClient extends Closeable { /** To build {@link RaftClient} objects. */ class Builder { private ClientId clientId; - private RaftClientRequestSender requestSender; + private RaftClientRpc clientRpc; private Collection<RaftPeer> servers; private RaftPeerId leaderId; private RaftProperties properties; @@ -79,9 +79,9 @@ public interface RaftClient extends Closeable { RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT); } return ClientImplUtils.newRaftClient(clientId, - Objects.requireNonNull(servers, "The 'server' field is not initialized."), + Objects.requireNonNull(servers, "The 'servers' field is not initialized."), leaderId, - Objects.requireNonNull(requestSender, "The 'requestSender' field is not initialized."), + Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."), retryInterval); } @@ -103,9 +103,9 @@ public interface RaftClient extends Closeable { return this; } - /** Set {@link RaftClientRequestSender}. */ - public Builder setRequestSender(RaftClientRequestSender requestSender) { - this.requestSender = requestSender; + /** Set {@link RaftClientRpc}. */ + public Builder setClientRpc(RaftClientRpc clientRpc) { + this.clientRpc = clientRpc; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRequestSender.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRequestSender.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRequestSender.java deleted file mode 100644 index b2541e1..0000000 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRequestSender.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.client; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeer; - -/** Send requests to a raft service. */ -public interface RaftClientRequestSender extends Closeable { - /** Send a request. */ - RaftClientReply sendRequest(RaftClientRequest request) throws IOException; - - /** Add the information of the given raft servers */ - void addServers(Iterable<RaftPeer> servers); -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java new file mode 100644 index 0000000..ca1864b --- /dev/null +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.client; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeer; + +/** The client side rpc of a raft service. */ +public interface RaftClientRpc extends Closeable { + /** Send a request. */ + RaftClientReply sendRequest(RaftClientRequest request) throws IOException; + + /** Add the information of the given raft servers */ + void addServers(Iterable<RaftPeer> servers); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java index 134e610..85901db 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java @@ -18,7 +18,7 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -27,10 +27,9 @@ import java.util.Collection; /** Client utilities for internal use. */ public class ClientImplUtils { - public static RaftClient newRaftClient(ClientId clientId, - Collection<RaftPeer> peers, RaftPeerId leaderId, - RaftClientRequestSender requestSender, int retryInterval) { - return new RaftClientImpl(clientId, peers, leaderId, requestSender, - retryInterval); + public static RaftClient newRaftClient( + ClientId clientId, Collection<RaftPeer> peers, RaftPeerId leaderId, + RaftClientRpc clientRpc, int retryInterval) { + return new RaftClientImpl(clientId, peers, leaderId, clientRpc, retryInterval); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index ff49ab6..3a6fd58 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -18,7 +18,7 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.protocol.*; import org.apache.ratis.util.RaftUtils; @@ -31,22 +31,22 @@ import java.util.function.Supplier; /** A client who sends requests to a raft service. */ final class RaftClientImpl implements RaftClient { private final ClientId clientId; - private final RaftClientRequestSender requestSender; + private final RaftClientRpc clientRpc; private final Collection<RaftPeer> peers; private final int retryInterval; private volatile RaftPeerId leaderId; RaftClientImpl(ClientId clientId, Collection<RaftPeer> peers, - RaftPeerId leaderId, RaftClientRequestSender requestSender, + RaftPeerId leaderId, RaftClientRpc clientRpc, int retryInterval) { this.clientId = clientId; - this.requestSender = requestSender; + this.clientRpc = clientRpc; this.peers = peers; this.leaderId = leaderId != null? leaderId : peers.iterator().next().getId(); this.retryInterval = retryInterval; - requestSender.addServers(peers); + clientRpc.addServers(peers); } @Override @@ -102,7 +102,7 @@ final class RaftClientImpl implements RaftClient { private RaftClientReply sendRequest(RaftClientRequest request) throws StateMachineException { try { - RaftClientReply reply = requestSender.sendRequest(request); + RaftClientReply reply = clientRpc.sendRequest(request); if (reply.isNotLeader()) { handleNotLeaderException(request, reply.getNotLeaderException()); return null; @@ -131,7 +131,7 @@ final class RaftClientImpl implements RaftClient { peers.clear(); peers.addAll(newPeers); // also refresh the rpc proxies for these peers - requestSender.addServers(newPeers); + clientRpc.addServers(newPeers); } } @@ -149,12 +149,12 @@ final class RaftClientImpl implements RaftClient { } @Override - public RaftClientRequestSender getRequestSender() { - return requestSender; + public RaftClientRpc getClientRpc() { + return clientRpc; } @Override public void close() throws IOException { - requestSender.close(); + clientRpc.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java index df69490..3ae2602 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java @@ -18,7 +18,7 @@ package org.apache.ratis.grpc; import org.apache.ratis.client.ClientFactory; -import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc; +import org.apache.ratis.grpc.client.GrpcClientRpc; import org.apache.ratis.grpc.server.GRpcLogAppender; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.*; @@ -43,7 +43,7 @@ public class GrpcFactory implements ServerFactory, ClientFactory { } @Override - public RaftClientSenderWithGrpc newRaftClientRequestSender() { - return new RaftClientSenderWithGrpc(); + public GrpcClientRpc newRaftClientRpc() { + return new GrpcClientRpc(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java new file mode 100644 index 0000000..3f7343a --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.*; +import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.util.PeerProxyMap; +import org.apache.ratis.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.ratis.client.impl.ClientProtoUtils.*; + +public class GrpcClientRpc implements RaftClientRpc { + public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class); + + private final PeerProxyMap<RaftClientProtocolClient> proxies + = new PeerProxyMap<>(RaftClientProtocolClient::new); + + @Override + public RaftClientReply sendRequest(RaftClientRequest request) + throws IOException { + final RaftPeerId serverId = request.getServerId(); + final RaftClientProtocolClient proxy = proxies.getProxy(serverId); + if (request instanceof SetConfigurationRequest) { + SetConfigurationRequestProto setConf = + toSetConfigurationRequestProto((SetConfigurationRequest) request); + return toRaftClientReply(proxy.setConfiguration(setConf)); + } else { + RaftClientRequestProto requestProto = toRaftClientRequestProto(request); + CompletableFuture<RaftClientReplyProto> replyFuture = + new CompletableFuture<>(); + final StreamObserver<RaftClientRequestProto> requestObserver = + proxy.append(new StreamObserver<RaftClientReplyProto>() { + @Override + public void onNext(RaftClientReplyProto value) { + replyFuture.complete(value); + } + + @Override + public void onError(Throwable t) { + // This implementation is used as RaftClientRpc. Retry + // logic on Exception is in RaftClient. + final IOException e; + if (t instanceof StatusRuntimeException) { + e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); + } else { + e = RaftUtils.asIOException(t); + } + replyFuture.completeExceptionally(e); + } + + @Override + public void onCompleted() { + if (!replyFuture.isDone()) { + replyFuture.completeExceptionally( + new IOException("No reply for request " + request)); + } + } + }); + requestObserver.onNext(requestProto); + requestObserver.onCompleted(); + + // TODO: timeout support + try { + return toRaftClientReply(replyFuture.get()); + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted while waiting for response of request " + request); + } catch (ExecutionException e) { + throw RaftUtils.toIOException(e); + } + } + } + + @Override + public void addServers(Iterable<RaftPeer> servers) { + proxies.addPeers(servers); + } + + @Override + public void close() throws IOException { + proxies.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java deleted file mode 100644 index 9a0eca3..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.client; - -import org.apache.ratis.client.RaftClientRequestSender; -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.*; -import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; -import org.apache.ratis.util.PeerProxyMap; -import org.apache.ratis.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -import static org.apache.ratis.client.impl.ClientProtoUtils.*; - -public class RaftClientSenderWithGrpc implements RaftClientRequestSender { - public static final Logger LOG = LoggerFactory.getLogger(RaftClientSenderWithGrpc.class); - - private final PeerProxyMap<RaftClientProtocolClient> proxies - = new PeerProxyMap<>(RaftClientProtocolClient::new); - - @Override - public RaftClientReply sendRequest(RaftClientRequest request) - throws IOException { - final RaftPeerId serverId = request.getServerId(); - final RaftClientProtocolClient proxy = proxies.getProxy(serverId); - if (request instanceof SetConfigurationRequest) { - SetConfigurationRequestProto setConf = - toSetConfigurationRequestProto((SetConfigurationRequest) request); - return toRaftClientReply(proxy.setConfiguration(setConf)); - } else { - RaftClientRequestProto requestProto = toRaftClientRequestProto(request); - CompletableFuture<RaftClientReplyProto> replyFuture = - new CompletableFuture<>(); - final StreamObserver<RaftClientRequestProto> requestObserver = - proxy.append(new StreamObserver<RaftClientReplyProto>() { - @Override - public void onNext(RaftClientReplyProto value) { - replyFuture.complete(value); - } - - @Override - public void onError(Throwable t) { - // This implementation is used as RaftClientRequestSender. Retry - // logic on Exception is in RaftClient. - final IOException e; - if (t instanceof StatusRuntimeException) { - e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); - } else { - e = RaftUtils.asIOException(t); - } - replyFuture.completeExceptionally(e); - } - - @Override - public void onCompleted() { - if (!replyFuture.isDone()) { - replyFuture.completeExceptionally( - new IOException("No reply for request " + request)); - } - } - }); - requestObserver.onNext(requestProto); - requestObserver.onCompleted(); - - // TODO: timeout support - try { - return toRaftClientReply(replyFuture.get()); - } catch (InterruptedException e) { - throw new InterruptedIOException( - "Interrupted while waiting for response of request " + request); - } catch (ExecutionException e) { - throw RaftUtils.toIOException(e); - } - } - } - - @Override - public void addServers(Iterable<RaftPeer> servers) { - proxies.addPeers(servers); - } - - @Override - public void close() throws IOException { - proxies.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java index 9ff493f..7b9e20f 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java @@ -19,7 +19,7 @@ package org.apache.ratis.hadooprpc; import org.apache.hadoop.conf.Configuration; import org.apache.ratis.client.ClientFactory; -import org.apache.ratis.hadooprpc.client.HadoopClientRequestSender; +import org.apache.ratis.hadooprpc.client.HadoopClientRpc; import org.apache.ratis.hadooprpc.server.HadoopRpcService; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.RaftServerImpl; @@ -46,7 +46,7 @@ public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFa } @Override - public HadoopClientRequestSender newRaftClientRequestSender() { - return new HadoopClientRequestSender(conf); + public HadoopClientRpc newRaftClientRpc() { + return new HadoopClientRpc(conf); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java deleted file mode 100644 index 1a10dab..0000000 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.hadooprpc.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.ratis.client.RaftClientRequestSender; -import org.apache.ratis.protocol.*; -import org.apache.ratis.util.PeerProxyMap; - -import java.io.IOException; - -public class HadoopClientRequestSender implements RaftClientRequestSender { - - private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies; - - public HadoopClientRequestSender(final Configuration conf) { - this.proxies = new PeerProxyMap<>( - p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), conf)); - } - - @Override - public RaftClientReply sendRequest(RaftClientRequest request) - throws IOException { - final RaftPeerId serverId = request.getServerId(); - final RaftClientProtocolClientSideTranslatorPB proxy = - proxies.getProxy(serverId); - try { - if (request instanceof SetConfigurationRequest) { - return proxy.setConfiguration((SetConfigurationRequest) request); - } else { - return proxy.submitClientRequest(request); - } - } catch (RemoteException e) { - throw e.unwrapRemoteException( - StateMachineException.class, - ReconfigurationTimeoutException.class, - ReconfigurationInProgressException.class, - RaftException.class, - LeaderNotReadyException.class); - } - } - - @Override - public void addServers(Iterable<RaftPeer> servers) { - proxies.addPeers(servers); - } - - @Override - public void close() { - proxies.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java new file mode 100644 index 0000000..25c0ecd --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.protocol.*; +import org.apache.ratis.util.PeerProxyMap; + +import java.io.IOException; + +public class HadoopClientRpc implements RaftClientRpc { + + private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies; + + public HadoopClientRpc(final Configuration conf) { + this.proxies = new PeerProxyMap<>( + p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), conf)); + } + + @Override + public RaftClientReply sendRequest(RaftClientRequest request) + throws IOException { + final RaftPeerId serverId = request.getServerId(); + final RaftClientProtocolClientSideTranslatorPB proxy = + proxies.getProxy(serverId); + try { + if (request instanceof SetConfigurationRequest) { + return proxy.setConfiguration((SetConfigurationRequest) request); + } else { + return proxy.submitClientRequest(request); + } + } catch (RemoteException e) { + throw e.unwrapRemoteException( + StateMachineException.class, + ReconfigurationTimeoutException.class, + ReconfigurationInProgressException.class, + RaftException.class, + LeaderNotReadyException.class); + } + } + + @Override + public void addServers(Iterable<RaftPeer> servers) { + proxies.addPeers(servers); + } + + @Override + public void close() { + proxies.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java index fb27eaa..525b991 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java @@ -18,7 +18,7 @@ package org.apache.ratis.netty; import org.apache.ratis.client.ClientFactory; -import org.apache.ratis.netty.client.NettyClientRequestSender; +import org.apache.ratis.netty.client.NettyClientRpc; import org.apache.ratis.netty.server.NettyRpcService; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.RaftServerImpl; @@ -36,7 +36,7 @@ public class NettyFactory extends ServerFactory.BaseFactory implements ClientFac } @Override - public NettyClientRequestSender newRaftClientRequestSender() { - return new NettyClientRequestSender(); + public NettyClientRpc newRaftClientRpc() { + return new NettyClientRpc(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java deleted file mode 100644 index 5b36fde..0000000 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.netty.client; - -import org.apache.ratis.client.RaftClientRequestSender; -import org.apache.ratis.client.impl.ClientProtoUtils; -import org.apache.ratis.netty.NettyRpcProxy; -import org.apache.ratis.protocol.*; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; -import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto; - -import java.io.IOException; - -public class NettyClientRequestSender implements RaftClientRequestSender { - private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap(); - - @Override - public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { - final RaftPeerId serverId = request.getServerId(); - final NettyRpcProxy proxy = proxies.getProxy(serverId); - - final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder(); - final RaftRpcRequestProto rpcRequest; - if (request instanceof SetConfigurationRequest) { - final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto( - (SetConfigurationRequest)request); - b.setSetConfigurationRequest(proto); - rpcRequest = proto.getRpcRequest(); - } else { - final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request); - b.setRaftClientRequest(proto); - rpcRequest = proto.getRpcRequest(); - } - return ClientProtoUtils.toRaftClientReply( - proxy.send(rpcRequest, b.build()).getRaftClientReply()); - } - - @Override - public void addServers(Iterable<RaftPeer> servers) { - proxies.addPeers(servers); - } - - @Override - public void close() { - proxies.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java new file mode 100644 index 0000000..74afddc --- /dev/null +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty.client; + +import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.netty.NettyRpcProxy; +import org.apache.ratis.protocol.*; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto; + +import java.io.IOException; + +public class NettyClientRpc implements RaftClientRpc { + private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap(); + + @Override + public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { + final RaftPeerId serverId = request.getServerId(); + final NettyRpcProxy proxy = proxies.getProxy(serverId); + + final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder(); + final RaftRpcRequestProto rpcRequest; + if (request instanceof SetConfigurationRequest) { + final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto( + (SetConfigurationRequest)request); + b.setSetConfigurationRequest(proto); + rpcRequest = proto.getRpcRequest(); + } else { + final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request); + b.setRaftClientRequest(proto); + rpcRequest = proto.getRpcRequest(); + } + return ClientProtoUtils.toRaftClientReply( + proxy.send(rpcRequest, b.build()).getRaftClientReply()); + } + + @Override + public void addServers(Iterable<RaftPeer> servers) { + proxies.addPeers(servers); + } + + @Override + public void close() { + proxies.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 751854c..9c566a9 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -394,7 +394,7 @@ public abstract class MiniRaftCluster { return RaftClient.newBuilder() .setServers(conf.getPeers()) .setLeaderId(leaderId) - .setRequestSender(clientFactory.newRaftClientRequestSender()) + .setClientRpc(clientFactory.newRaftClientRpc()) .setProperties(properties) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java index fcb0bc2..a5d1127 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java @@ -20,7 +20,7 @@ package org.apache.ratis; import org.apache.log4j.Level; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientReply; @@ -87,7 +87,7 @@ public abstract class RaftNotLeaderExceptionBaseTest { RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, leaderId); Assert.assertNotEquals(leaderId, newLeader); - RaftClientRequestSender rpc = client.getRequestSender(); + RaftClientRpc rpc = client.getClientRpc(); reply= null; for (int i = 0; reply == null && i < 10; i++) { try { @@ -133,7 +133,7 @@ public abstract class RaftNotLeaderExceptionBaseTest { } LOG.info(cluster.printServers()); - RaftClientRequestSender rpc = client.getRequestSender(); + RaftClientRpc rpc = client.getClientRpc(); RaftClientReply reply = null; // it is possible that the remote peer's rpc server is not ready. need retry for (int i = 0; reply == null && i < 10; i++) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 5b46af8..3017634 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -23,7 +23,7 @@ import org.apache.ratis.MiniRaftCluster.PeerChanges; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; @@ -253,7 +253,7 @@ public abstract class RaftReconfigurationBaseTest { asList(c1.allPeersInNewConf)); Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional()); - final RaftClientRequestSender sender = client.getRequestSender(); + final RaftClientRpc sender = client.getClientRpc(); final SetConfigurationRequest request = new SetConfigurationRequest( client.getId(), leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf); try { @@ -470,7 +470,7 @@ public abstract class RaftReconfigurationBaseTest { try(final RaftClient client2 = cluster.createClient(leaderId)) { latch.await(); LOG.info("client2 starts to change conf"); - final RaftClientRequestSender sender2 = client2.getRequestSender(); + final RaftClientRpc sender2 = client2.getClientRpc(); sender2.sendRequest(new SetConfigurationRequest( client2.getId(), leaderId, DEFAULT_SEQNUM, peersInRequest2)); } catch (ReconfigurationInProgressException e) { @@ -534,7 +534,7 @@ public abstract class RaftReconfigurationBaseTest { new Thread(() -> { try(final RaftClient client = cluster.createClient(leaderId)) { LOG.info("client starts to change conf"); - final RaftClientRequestSender sender = client.getRequestSender(); + final RaftClientRpc sender = client.getClientRpc(); RaftClientReply reply = sender.sendRequest(new SetConfigurationRequest( client.getId(), leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf)); if (reply.isNotLeader()) { @@ -595,7 +595,7 @@ public abstract class RaftReconfigurationBaseTest { AtomicBoolean success = new AtomicBoolean(false); new Thread(() -> { final RaftClient client = cluster.createClient(leaderId); - final RaftClientRequestSender sender = client.getRequestSender(); + final RaftClientRpc sender = client.getClientRpc(); final RaftClientRequest request = new RaftClientRequest(client.getId(), leaderId, 0, new SimpleMessage("test")); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java index 11c4c0a..e33d64f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -49,7 +49,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { }; private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply; - private SimulatedClientRequestReply client2serverRequestReply; + private SimulatedClientRpc client2serverRequestReply; private MiniRaftClusterWithSimulatedRpc(String[] ids, RaftProperties properties, boolean formatted) { @@ -65,7 +65,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { LOG.info(SimulatedRequestReply.SIMULATE_LATENCY_KEY + " = " + simulateLatencyMs); serverRequestReply = new SimulatedRequestReply<>(simulateLatencyMs); - client2serverRequestReply = new SimulatedClientRequestReply(simulateLatencyMs); + client2serverRequestReply = new SimulatedClientRpc(simulateLatencyMs); getServers().stream().forEach(s -> initRpc(s)); addPeersToRpc(toRaftPeers(getServers())); ((SimulatedRpc.Factory)clientFactory).initRpc( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java deleted file mode 100644 index 65fe7ad..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.simulation; - -import org.apache.ratis.client.RaftClientRequestSender; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeer; - -class SimulatedClientRequestReply - extends SimulatedRequestReply<RaftClientRequest, RaftClientReply> - implements RaftClientRequestSender { - SimulatedClientRequestReply(int simulateLatencyMs) { - super(simulateLatencyMs); - } - - @Override - public void addServers(Iterable<RaftPeer> servers) { - // do nothing - } - - @Override - public void close() { - // do nothing - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java new file mode 100644 index 0000000..a62ec16 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeer; + +class SimulatedClientRpc + extends SimulatedRequestReply<RaftClientRequest, RaftClientReply> + implements RaftClientRpc { + SimulatedClientRpc(int simulateLatencyMs) { + super(simulateLatencyMs); + } + + @Override + public void addServers(Iterable<RaftPeer> servers) { + // do nothing + } + + @Override + public void close() { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java index 9d855c3..67193b4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java @@ -41,11 +41,11 @@ class SimulatedRpc implements RpcType { static class Factory extends ServerFactory.BaseFactory implements ClientFactory { private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply; - private SimulatedClientRequestReply client2serverRequestReply; + private SimulatedClientRpc client2serverRequestReply; public void initRpc( SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, - SimulatedClientRequestReply client2serverRequestReply) { + SimulatedClientRpc client2serverRequestReply) { this.serverRequestReply = Objects.requireNonNull(serverRequestReply); this.client2serverRequestReply = Objects.requireNonNull(client2serverRequestReply); } @@ -58,7 +58,7 @@ class SimulatedRpc implements RpcType { } @Override - public SimulatedClientRequestReply newRaftClientRequestSender() { + public SimulatedClientRpc newRaftClientRpc() { return Objects.requireNonNull(client2serverRequestReply); }
