Repository: incubator-ratis Updated Branches: refs/heads/master c7c4d1140 -> d7fb566c4
RATIS-28. Create RaftServerRpc using ServerFactory. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/d7fb566c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/d7fb566c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/d7fb566c Branch: refs/heads/master Commit: d7fb566c4a0571b92354f9bfb57cdb0a3cd315e4 Parents: c7c4d11 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Fri Feb 24 16:17:19 2017 -0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Fri Feb 24 16:17:19 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/RaftConfigKeys.java | 42 ++++++++ .../java/org/apache/ratis/conf/ConfUtils.java | 21 ++-- .../java/org/apache/ratis/util/NetUtils.java | 16 ++- .../ratis/examples/RaftExamplesTestUtil.java | 1 - .../org/apache/ratis/grpc/RaftGRpcService.java | 33 ++---- .../ratis/grpc/server/GrpcServerFactory.java | 15 ++- .../ratis/grpc/MiniRaftClusterWithGRpc.java | 64 +++--------- .../apache/ratis/hadooprpc/HadoopFactory.java | 40 ++++++++ .../server/HadoopRpcServerConfigKeys.java | 2 +- .../hadooprpc/server/HadoopRpcService.java | 7 +- .../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 61 +++-------- .../org/apache/ratis/netty/NettyConfigKeys.java | 42 ++++++++ .../org/apache/ratis/netty/NettyFactory.java | 30 ++++++ .../ratis/netty/server/NettyRpcService.java | 10 +- .../ratis/netty/MiniRaftClusterWithNetty.java | 57 ++--------- .../org/apache/ratis/server/RaftServer.java | 11 +- .../ratis/server/RaftServerConfigKeys.java | 11 +- .../org/apache/ratis/server/RaftServerRpc.java | 23 +---- .../ratis/server/impl/ConfigurationManager.java | 11 +- .../ratis/server/impl/RaftServerImpl.java | 97 ++++++++---------- .../apache/ratis/server/impl/ServerFactory.java | 7 +- .../apache/ratis/server/impl/ServerState.java | 8 -- .../java/org/apache/ratis/MiniRaftCluster.java | 101 +++++++++---------- .../MiniRaftClusterWithSimulatedRpc.java | 44 ++++---- .../simulation/SimulatedClientRequestReply.java | 10 +- .../simulation/SimulatedRequestReply.java | 14 +-- .../server/simulation/SimulationFactory.java | 41 ++++++++ 27 files changed, 424 insertions(+), 395 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java new file mode 100644 index 0000000..f3a5942 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.ratis.conf.ConfUtils; + +import java.util.function.BiConsumer; +import java.util.function.BiFunction; + +public interface RaftConfigKeys { + String PREFIX = "raft"; + + abstract class Rpc { + public static final String PREFIX = RaftConfigKeys.PREFIX + ".rpc"; + + public static final String TYPE_KEY = PREFIX + ".type"; + public static final RpcType TYPE_DEFAULT = RpcType.GRPC; + + public static RpcType type(BiFunction<String, RpcType, RpcType> getRpcType) { + return ConfUtils.get(getRpcType, TYPE_KEY, TYPE_DEFAULT); + } + + public static void setType(BiConsumer<String, RpcType> setRpcType, RpcType type) { + ConfUtils.set(setRpcType, TYPE_KEY, type); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java index e43ac60..f3870e5 100644 --- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java @@ -27,10 +27,9 @@ public abstract class ConfUtils { return value; } - public static String getString( - BiFunction<String, String, String> getString, - String key, String defaultValue) { - final String value = getString.apply(key, defaultValue); + public static <T> T get(BiFunction<String, T, T> getString, + String key, T defaultValue) { + final T value = getString.apply(key, defaultValue); LOG.info(key + " = " + value); return value; } @@ -38,13 +37,17 @@ public abstract class ConfUtils { public static InetSocketAddress getInetSocketAddress( BiFunction<String, String, String> getString, String key, String defaultValue) { - return NetUtils.createSocketAddr(getString(getString, key, defaultValue)); + return NetUtils.createSocketAddr(get(getString, key, defaultValue)); + } + + public static void setInt(BiConsumer<String, Integer> setInt, + String key, int value) { + setInt.accept(key, value); + LOG.info("set " + key + " = " + value); } - public static void setString( - BiConsumer<String, String> setString, - String key, String value) { - setString.accept(key, value); + public static <T> void set(BiConsumer<String, T> set, String key, T value) { + set.accept(key, value); LOG.info("set " + key + " = " + value); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java index b6634b6..6f0bc2b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java @@ -21,10 +21,8 @@ import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.UnknownHostException; +import java.io.IOException; +import java.net.*; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -148,4 +146,14 @@ public abstract class NetUtils { } return addr; } + + public static InetSocketAddress createLocalServerAddress() { + try(final ServerSocket s = new ServerSocket()) { + s.setReuseAddress(true); + s.bind(null); + return (InetSocketAddress) s.getLocalSocketAddress(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java index 873c0da..f54b766 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java @@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; -import java.util.stream.Collectors; public class RaftExamplesTestUtil { public static final Logger LOG = LoggerFactory.getLogger(RaftExamplesTestUtil.class); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index 473a5c6..22b295c 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -18,7 +18,6 @@ package org.apache.ratis.grpc; import com.google.common.base.Preconditions; -import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.RpcType; import org.apache.ratis.grpc.client.RaftClientProtocolService; import org.apache.ratis.grpc.server.RaftServerProtocolClient; @@ -51,28 +50,7 @@ public class RaftGRpcService implements RaftServerRpc { RaftGRpcService.class.getSimpleName() + ".sendRequest"; public static class Builder extends RaftServerRpc.Builder<Builder,RaftGRpcService> { - private int maxMessageSize = RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT; - - private Builder() { - super(RAFT_GRPC_SERVER_PORT_DEFAULT); - } - - public int getMaxMessageSize() { - return maxMessageSize; - } - - public Builder setMaxMessageSize(int maxMessageSize) { - this.maxMessageSize = maxMessageSize; - return this; - } - - public Builder setFromRaftProperties(RaftProperties properties) { - setPort(properties.getInt(RAFT_GRPC_SERVER_PORT_KEY, - RAFT_GRPC_SERVER_PORT_DEFAULT)); - setMaxMessageSize(properties.getInt(RAFT_GRPC_MESSAGE_MAXSIZE_KEY, - RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT)); - return this; - } + private Builder() {} @Override public Builder getThis() { @@ -81,7 +59,7 @@ public class RaftGRpcService implements RaftServerRpc { @Override public RaftGRpcService build() { - return new RaftGRpcService(getServer(), getPort(), getMaxMessageSize()); + return new RaftGRpcService(getServer()); } } @@ -95,6 +73,13 @@ public class RaftGRpcService implements RaftServerRpc { Collections.synchronizedMap(new HashMap<>()); private final RaftPeerId selfId; + private RaftGRpcService(RaftServer server) { + this(server, + server.getProperties().getInt(RAFT_GRPC_SERVER_PORT_KEY, + RAFT_GRPC_SERVER_PORT_DEFAULT), + server.getProperties().getInt(RAFT_GRPC_MESSAGE_MAXSIZE_KEY, + RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT)); + } private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) { ServerBuilder serverBuilder = ServerBuilder.forPort(port); selfId = raftServer.getId(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java index 09e3265..e280faf 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java @@ -17,11 +17,9 @@ */ package org.apache.ratis.grpc.server; -import org.apache.ratis.server.impl.FollowerInfo; -import org.apache.ratis.server.impl.LeaderState; -import org.apache.ratis.server.impl.LogAppender; -import org.apache.ratis.server.impl.ServerFactory; -import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.grpc.RaftGRpcService; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.impl.*; public class GrpcServerFactory implements ServerFactory { @Override @@ -29,4 +27,11 @@ public class GrpcServerFactory implements ServerFactory { FollowerInfo f) { return new GRpcLogAppender(server, state, f); } + + @Override + public RaftServerRpc newRaftServerRpc(RaftServerImpl server) { + return RaftGRpcService.newBuilder() + .setServer(server) + .build(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java index bab8ee8..c0c4f6d 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java @@ -17,9 +17,10 @@ */ package org.apache.ratis.grpc; -import com.google.common.base.Preconditions; import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RpcType; import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc; @@ -28,13 +29,8 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.util.NetUtils; -import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { public static final Factory<MiniRaftClusterWithGRpc> FACTORY @@ -42,6 +38,7 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { @Override public MiniRaftClusterWithGRpc newCluster( String[] ids, RaftProperties prop, boolean formatted) { + RaftConfigKeys.Rpc.setType(prop::setEnum, RpcType.GRPC); return new MiniRaftClusterWithGRpc(ids, prop, formatted); } }; @@ -51,22 +48,15 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties, boolean formatted) { - super(ids, new RaftProperties(properties), formatted); - init(initRpcServices(getServers(), properties)); + super(ids, properties, formatted); } - private static Map<RaftPeer, RaftGRpcService> initRpcServices( - Collection<RaftServerImpl> servers, RaftProperties prop) { - final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>(); - - for (RaftServerImpl s : servers) { - final RaftGRpcService rpc = RaftGRpcService.newBuilder() - .setFromRaftProperties(prop) - .setServer(s) - .build(); - peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); - } - return peerRpcs; + @Override + protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { + final RaftServerImpl s = super.newRaftServer(id, format); + s.getProperties().setInt( + RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, getPort(s)); + return s; } @Override @@ -75,40 +65,18 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { } @Override - protected Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, - Collection<RaftServerImpl> newServers, boolean startService) - throws IOException { - final Map<RaftPeer, RaftGRpcService> peers = initRpcServices(newServers, properties); - for (Map.Entry<RaftPeer, RaftGRpcService> entry : peers.entrySet()) { - RaftServerImpl server = servers.get(entry.getKey().getId()); - server.setServerRpc(entry.getValue()); + protected Collection<RaftPeer> addNewPeers( + Collection<RaftServerImpl> newServers, boolean startService) { + final Collection<RaftPeer> peers = toRaftPeers(newServers); + for (RaftPeer p: peers) { + final RaftServerImpl server = servers.get(p.getId()); if (!startService) { BlockRequestHandlingInjection.getInstance().blockReplier(server.getId().toString()); } else { server.start(); } } - return new ArrayList<>(peers.keySet()); - } - - @Override - protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { - RaftServerImpl server = servers.get(peer.getId()); - int port = NetUtils.newInetSocketAddress(peer.getAddress()).getPort(); - int oldPort = properties.getInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, - RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT); - properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, port); - final RaftGRpcService rpc = RaftGRpcService.newBuilder() - .setFromRaftProperties(properties) - .setServer(server) - .build(); - Preconditions.checkState( - rpc.getInetSocketAddress().toString().contains(peer.getAddress()), - "address in the raft conf: %s, address in rpc server: %s", - peer.getAddress(), rpc.getInetSocketAddress().toString()); - server.setServerRpc(rpc); - properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, oldPort); - return server; + return peers; } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java new file mode 100644 index 0000000..a083f05 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.ratis.hadooprpc.server.HadoopRpcService; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerFactory; + +public class HadoopFactory extends ServerFactory.BaseFactory { + private Configuration conf; + + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public RaftServerRpc newRaftServerRpc(RaftServerImpl server) { + return HadoopRpcService.newBuilder() + .setServer(server) + .setConf(conf) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java index 4cdbdb3..bfdf05b 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java @@ -50,7 +50,7 @@ public interface HadoopRpcServerConfigKeys { public static void setAddress( BiConsumer<String, String> setString, String address) { - ConfUtils.setString(setString, ADDRESS_KEY, address); + ConfUtils.set(setString, ADDRESS_KEY, address); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java index e05e2a8..74716be 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -53,9 +53,7 @@ public class HadoopRpcService implements RaftServerRpc { public static class Builder extends RaftServerRpc.Builder<Builder, HadoopRpcService> { private Configuration conf; - private Builder() { - super(0); - } + private Builder() {} public Configuration getConf() { if (conf == null) { @@ -117,7 +115,8 @@ public class HadoopRpcService implements RaftServerRpc { return ipcServerAddress; } - private RPC.Server newRpcServer(RaftServerProtocol serverProtocol, final Configuration conf) + private static RPC.Server newRpcServer( + RaftServerProtocol serverProtocol, final Configuration conf) throws IOException { final int handlerCount = HadoopRpcServerConfigKeys.Ipc.handlers(conf::getInt); final InetSocketAddress address = HadoopRpcServerConfigKeys.Ipc.address(conf::getTrimmed); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java index f3ee3f6..157891f 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -17,26 +17,22 @@ */ package org.apache.ratis.hadooprpc; -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RpcType; import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.hadooprpc.client.HadoopClientRequestSender; import org.apache.ratis.hadooprpc.server.HadoopRpcServerConfigKeys; import org.apache.ratis.hadooprpc.server.HadoopRpcService; -import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; import org.apache.ratis.server.impl.RaftServerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { static final Logger LOG = LoggerFactory.getLogger(MiniRaftClusterWithHadoopRpc.class); @@ -55,6 +51,7 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { public MiniRaftClusterWithHadoopRpc newCluster( String[] ids, RaftProperties prop, Configuration conf, boolean formatted) { + RaftConfigKeys.Rpc.setType(prop::setEnum, RpcType.HADOOP); HadoopRpcServerConfigKeys.Ipc.setAddress(conf::set, "0.0.0.0:0"); return new MiniRaftClusterWithHadoopRpc(ids, prop, conf, formatted); } @@ -67,53 +64,27 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { private final Configuration hadoopConf; - private MiniRaftClusterWithHadoopRpc(String[] ids, RaftProperties properties, Configuration hadoopConf, boolean formatted) { super(ids, properties, formatted); this.hadoopConf = hadoopConf; - - init(initRpcServices(getServers(), hadoopConf)); - } - - private static Map<RaftPeer, HadoopRpcService> initRpcServices( - Collection<RaftServerImpl> servers, Configuration hadoopConf) { - final Map<RaftPeer, HadoopRpcService> peerRpcs = new HashMap<>(); - - for(RaftServerImpl s : servers) { - final HadoopRpcService rpc = HadoopRpcService.newBuilder() - .setServer(s) - .setConf(hadoopConf) - .build(); - peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); - } - return peerRpcs; + getServers().stream().forEach(s -> setConf(s)); } - @Override - protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { - Configuration hconf = new Configuration(hadoopConf); - HadoopRpcServerConfigKeys.Ipc.setAddress(hconf::set, peer.getAddress()); - - RaftServerImpl server = servers.get(peer.getId()); - final HadoopRpcService rpc = HadoopRpcService.newBuilder() - .setServer(server) - .setConf(hconf) - .build(); - Preconditions.checkState( - rpc.getInetSocketAddress().toString().contains(peer.getAddress()), - "address in the raft conf: %s, address in rpc server: %s", - peer.getAddress(), rpc.getInetSocketAddress()); - server.setServerRpc(rpc); - return server; + private void setConf(RaftServerImpl server) { + final Configuration conf = new Configuration(hadoopConf); + final String address = "0.0.0.0:" + getPort(server); + HadoopRpcServerConfigKeys.Ipc.setAddress(conf::set, address); + ((HadoopFactory)server.getFactory()).setConf(conf); } @Override - public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, - Collection<RaftServerImpl> newServers, boolean startService) - throws IOException { - return addNewPeers(initRpcServices(newServers, hadoopConf), - newServers, startService); + protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { + final RaftServerImpl s = super.newRaftServer(id, format); + if (hadoopConf != null) { + setConf(s); + } + return s; } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java new file mode 100644 index 0000000..07d131c --- /dev/null +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.conf.ConfUtils; + +import java.util.function.BiConsumer; +import java.util.function.BiFunction; + +public interface NettyConfigKeys { + String PREFIX = "raft.netty"; + + abstract class Server { + static String PREFIX = NettyConfigKeys.PREFIX + ".server"; + + public static String PORT_KEY = PREFIX + ".port"; + public static int PORT_DEFAULT = 0; + + public static int port(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, PORT_KEY, PORT_DEFAULT, 0, 65536); + } + + public static void setPort(BiConsumer<String, Integer> setString, int port) { + ConfUtils.setInt(setString, PORT_KEY, port); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java new file mode 100644 index 0000000..6265f02 --- /dev/null +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.netty.server.NettyRpcService; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerFactory; + +public class NettyFactory extends ServerFactory.BaseFactory { + @Override + public RaftServerRpc newRaftServerRpc(RaftServerImpl server) { + return NettyRpcService.newBuilder().setServer(server).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index 5a2bac5..140cbb1 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -26,6 +26,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.RpcType; import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; @@ -58,9 +59,7 @@ public final class NettyRpcService implements RaftServerRpc { public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest"; public static class Builder extends RaftServerRpc.Builder<Builder, NettyRpcService> { - private Builder() { - super(0); - } + private Builder() {} @Override public Builder getThis() { @@ -69,7 +68,7 @@ public final class NettyRpcService implements RaftServerRpc { @Override public NettyRpcService build() { - return new NettyRpcService(getServer(), getPort()); + return new NettyRpcService(getServer()); } } @@ -98,7 +97,7 @@ public final class NettyRpcService implements RaftServerRpc { } /** Constructs a netty server with the given port. */ - private NettyRpcService(RaftServer server, int port) { + private NettyRpcService(RaftServer server) { this.server = server; this.id = server.getId(); @@ -117,6 +116,7 @@ public final class NettyRpcService implements RaftServerRpc { } }; + final int port = NettyConfigKeys.Server.port(server.getProperties()::getInt); channelFuture = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java index 4ddab6e..7167d0d 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -18,22 +18,16 @@ package org.apache.ratis.netty; import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RpcType; import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.netty.client.NettyClientRequestSender; import org.apache.ratis.netty.server.NettyRpcService; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; -import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.util.NetUtils; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { public static final Factory<MiniRaftClusterWithNetty> FACTORY @@ -41,6 +35,7 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { @Override public MiniRaftClusterWithNetty newCluster( String[] ids, RaftProperties prop, boolean formatted) { + RaftConfigKeys.Rpc.setType(prop::setEnum, RpcType.NETTY); return new MiniRaftClusterWithNetty(ids, prop, formatted); } }; @@ -51,56 +46,16 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { private MiniRaftClusterWithNetty( String[] ids, RaftProperties properties, boolean formatted) { super(ids, properties, formatted); - init(initRpcServices(getServers(), getConf())); - } - - private static String getAddress(RaftPeerId id, RaftConfiguration conf) { - final RaftPeer peer = conf.getPeer(id); - if (peer != null) { - final String address = peer.getAddress(); - if (address != null) { - return address; - } - } - return "0.0.0.0:0"; - } - - private static NettyRpcService newNettyRpcService( - RaftServerImpl s, RaftConfiguration conf) { - final String address = getAddress(s.getId(), conf); - final int port = NetUtils.newInetSocketAddress(address).getPort(); - return NettyRpcService.newBuilder().setServer(s).setPort(port).build(); - } - - private static Map<RaftPeer, NettyRpcService> initRpcServices( - Collection<RaftServerImpl> servers, RaftConfiguration conf) { - final Map<RaftPeer, NettyRpcService> peerRpcs = new HashMap<>(); - - for (RaftServerImpl s : servers) { - final NettyRpcService rpc = newNettyRpcService(s, conf); - peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); - } - - return peerRpcs; } @Override - protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { - final RaftServerImpl s = servers.get(peer.getId()); - final NettyRpcService rpc = newNettyRpcService(s, conf); - s.setServerRpc(rpc); + protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { + final RaftServerImpl s = super.newRaftServer(id, format); + NettyConfigKeys.Server.setPort(s.getProperties()::setInt, getPort(s)); return s; } @Override - protected Collection<RaftPeer> addNewPeers( - Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers, - boolean startService) throws IOException { - return addNewPeers(initRpcServices(newServers, conf), - newServers, startService); - } - - @Override public RaftClientRequestSender getRaftClientRequestSender() { return new NettyClientRequestSender(getPeers()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index bfbd75f..3eed3c1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -18,11 +18,13 @@ package org.apache.ratis.server; import com.google.common.base.Preconditions; +import org.apache.ratis.RpcType; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; import org.apache.ratis.protocol.RaftClientProtocol; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.impl.ServerFactory; import org.apache.ratis.server.impl.ServerImplUtils; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.statemachine.StateMachine; @@ -31,13 +33,16 @@ import java.io.Closeable; import java.io.IOException; /** Raft server interface */ -public interface RaftServer extends Closeable, RaftServerProtocol, +public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, RaftClientProtocol, RaftClientAsynchronousProtocol { /** @return the server ID. */ RaftPeerId getId(); - /** Set server RPC service. */ - void setServerRpc(RaftServerRpc serverRpc); + /** @return the server properties. */ + RaftProperties getProperties(); + + /** @return the factory for creating server components. */ + ServerFactory getFactory(); /** Start this server. */ void start(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 8e6d54f..1203cdd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -18,21 +18,16 @@ package org.apache.ratis.server; import org.apache.ratis.RpcType; -import org.apache.ratis.util.NetUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; public interface RaftServerConfigKeys { String PREFIX = "raft.server"; enum Factory { - NETTY("org.apache.ratis.server.impl.ServerFactory$BaseFactory"), + NETTY("org.apache.ratis.netty.NettyFactory"), GRPC("org.apache.ratis.grpc.server.GrpcServerFactory"), - HADOOP("org.apache.ratis.server.impl.ServerFactory$BaseFactory"), - SIMULATED("org.apache.ratis.server.impl.ServerFactory$BaseFactory"); + HADOOP("org.apache.ratis.hadooprpc.HadoopFactory"), + SIMULATED("org.apache.ratis.server.simulation.SimulationFactory"); public static String getKey(String rpcType) { return RaftServerConfigKeys.PREFIX + ".factory." + rpcType + ".class"; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java index e68c536..40a8363 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java @@ -22,8 +22,8 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.server.protocol.RaftServerProtocol; import java.io.Closeable; -import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Objects; /** * An server-side interface for supporting different RPC implementations @@ -33,15 +33,10 @@ public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeabl /** To build {@link RaftServerRpc} objects. */ abstract class Builder<B extends Builder, RPC extends RaftServerRpc> { private RaftServer server; - private int port; - - /** Construct a builder with the default port. */ - protected Builder(int defaultPort) { - this.port = defaultPort; - } public RaftServer getServer() { - return server; + return Objects.requireNonNull(server, + "The 'server' field is not initialized."); } public B setServer(RaftServer server) { @@ -49,19 +44,9 @@ public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeabl return getThis(); } - public int getPort() { - return port; - } - - /** Set the port for the server to listen to. */ - public B setPort(int port) { - this.port = port; - return getThis(); - } - protected abstract B getThis(); - public abstract RPC build() throws IOException; + public abstract RPC build(); } /** Start the RPC service. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java index 7769de1..c000238 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java @@ -29,7 +29,7 @@ import java.util.*; * entries. */ public class ConfigurationManager { - private RaftConfiguration initialConf; + private final RaftConfiguration initialConf; private final NavigableMap<Long, RaftConfiguration> configurations = new TreeMap<>(); /** @@ -39,15 +39,6 @@ public class ConfigurationManager { private RaftConfiguration currentConf; ConfigurationManager(RaftConfiguration initialConf) { - setInitialConf(initialConf); - } - - @VisibleForTesting - public synchronized void setInitialConf(RaftConfiguration initialConf) { - /** - * initialConf should actually be defined as "final". But for tests we want - * to change the initial configuration to reflect the correct port binding. - */ this.initialConf = initialConf; this.currentConf = initialConf; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 152d6a5..c5fe336 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -17,44 +17,18 @@ */ package org.apache.ratis.server.impl; -import static org.apache.ratis.util.LifeCycle.State.CLOSED; -import static org.apache.ratis.util.LifeCycle.State.CLOSING; -import static org.apache.ratis.util.LifeCycle.State.RUNNING; -import static org.apache.ratis.util.LifeCycle.State.STARTING; - -import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.function.Supplier; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.RpcType; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.LeaderNotReadyException; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.NotLeaderException; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftException; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.ReconfigurationInProgressException; -import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.FileInfo; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult; -import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; @@ -65,8 +39,18 @@ import org.apache.ratis.util.RaftUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; +import static org.apache.ratis.util.LifeCycle.State.*; public class RaftServerImpl implements RaftServer { public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class); @@ -82,6 +66,7 @@ public class RaftServerImpl implements RaftServer { LEADER, CANDIDATE, FOLLOWER } + private final RpcType rpcType; private final int minTimeoutMs; private final int maxTimeoutMs; @@ -100,14 +85,15 @@ public class RaftServerImpl implements RaftServer { /** used when the peer is leader */ private volatile LeaderState leaderState; - private RaftServerRpc serverRpc; + private final Supplier<RaftServerRpc> serverRpc; - private final Supplier<ServerFactory> factory ; + private final ServerFactory factory; RaftServerImpl(RaftPeerId id, StateMachine stateMachine, RaftConfiguration raftConf, RaftProperties properties) throws IOException { this.lifeCycle = new LifeCycle(id); + this.rpcType = RaftConfigKeys.Rpc.type(properties::getEnum); minTimeoutMs = properties.getInt( RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY, RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); @@ -119,12 +105,18 @@ public class RaftServerImpl implements RaftServer { this.properties = properties; this.stateMachine = stateMachine; this.state = new ServerState(id, raftConf, properties, this, stateMachine); - this.factory = RaftUtils.memoize( - () -> ServerFactory.Util.newServerFactory(getServerRpc().getRpcType(), properties)); + this.factory = ServerFactory.Util.newServerFactory(rpcType, properties); + this.serverRpc = RaftUtils.memoize(() -> initRaftServerRpc()); } - ServerFactory getFactory() { - return factory.get(); + @Override + public RpcType getRpcType() { + return rpcType; + } + + @Override + public ServerFactory getFactory() { + return factory; } int getMinTimeoutMs() { @@ -144,26 +136,18 @@ public class RaftServerImpl implements RaftServer { return this.stateMachine; } - /** - * Used by tests to set initial raft configuration with correct port bindings. - */ - @VisibleForTesting - public void setInitialConf(RaftConfiguration conf) { - this.state.setInitialConf(conf); - } - - @Override - public void setServerRpc(RaftServerRpc serverRpc) { - this.serverRpc = serverRpc; + private RaftServerRpc initRaftServerRpc() { + final RaftServerRpc rpc = getFactory().newRaftServerRpc(this); // add peers into rpc service RaftConfiguration conf = getRaftConf(); if (conf != null) { - serverRpc.addPeers(conf.getPeers()); + rpc.addPeers(conf.getPeers()); } + return rpc; } public RaftServerRpc getServerRpc() { - return serverRpc; + return serverRpc.get(); } @Override @@ -188,7 +172,7 @@ public class RaftServerImpl implements RaftServer { heartbeatMonitor = new FollowerState(this); heartbeatMonitor.start(); - serverRpc.start(); + getServerRpc().start(); lifeCycle.transition(RUNNING); } @@ -200,7 +184,7 @@ public class RaftServerImpl implements RaftServer { private void startInitializing() { role = Role.FOLLOWER; // do not start heartbeatMonitoring - serverRpc.start(); + getServerRpc().start(); } public ServerState getState() { @@ -224,7 +208,7 @@ public class RaftServerImpl implements RaftServer { shutdownElectionDaemon(); shutdownLeaderState(); - serverRpc.close(); + getServerRpc().close(); state.close(); } catch (Exception ignored) { LOG.warn("Failed to kill " + state.getSelfId(), ignored); @@ -831,6 +815,7 @@ public class RaftServerImpl implements RaftServer { return null; } + @Override public RaftProperties getProperties() { return this.properties; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java index 38caba7..2b7fb77 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java @@ -19,9 +19,12 @@ package org.apache.ratis.server.impl; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.RpcType; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.util.RaftUtils; +import java.io.IOException; import java.util.Objects; /** A factory interface for creating server components. */ @@ -29,7 +32,9 @@ public interface ServerFactory { /** Create a new {@link LogAppender}. */ LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f); - class BaseFactory implements ServerFactory { + RaftServerRpc newRaftServerRpc(RaftServerImpl server); + + abstract class BaseFactory implements ServerFactory { @Override public LogAppender newLogAppender( RaftServerImpl server, LeaderState state, FollowerInfo f) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 150b2c1..dd2d784 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -100,14 +100,6 @@ public class ServerState implements Closeable { lastApplied, prop); } - /** - * Used by tests to set initial raft configuration with correct port bindings. - */ - @VisibleForTesting - public void setInitialConf(RaftConfiguration initialConf) { - configurationManager.setInitialConf(initialConf); - } - private long initStatemachine(StateMachine sm, RaftProperties properties) throws IOException { sm.initialize(selfId, properties, storage); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index ce3cafc..afd6eca 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -24,18 +24,14 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.RaftServerRpc; -import org.apache.ratis.server.impl.DelayLocalExecutionInjection; -import org.apache.ratis.server.impl.LeaderState; -import org.apache.ratis.server.impl.RaftConfiguration; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.ServerImplUtils; +import org.apache.ratis.server.impl.*; import org.apache.ratis.server.storage.MemoryRaftLog; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.statemachine.BaseStateMachine; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.NetUtils; import org.apache.ratis.util.RaftUtils; import org.junit.Assert; import org.slf4j.Logger; @@ -43,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.*; import java.util.stream.Collectors; @@ -73,25 +70,30 @@ public abstract class MiniRaftCluster { super(ids, properties, formatted); } - protected abstract RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException; - - @Override - protected void setPeerRpc() throws IOException { - for (RaftPeer p : conf.getPeers()) { - setPeerRpc(p); - } - } - @Override public void restartServer(String id, boolean format) throws IOException { super.restartServer(id, format); - setPeerRpc(conf.getPeer(new RaftPeerId(id))).start(); + getServer(id).start(); } @Override public void setBlockRequestsFrom(String src, boolean block) { RaftTestUtil.setBlockRequestsFrom(src, block); } + + public static int getPort(RaftServerImpl server) { + final int port = getPort(server.getId(), server.getState().getRaftConf()); + LOG.info(server.getId() + "(" + server.getRpcType() + "), port=" + port); + return port; + } + + public static int getPort(RaftPeerId id, RaftConfiguration conf) { + final RaftPeer peer = conf.getPeer(id); + final String address = peer != null? peer.getAddress(): null; + final InetSocketAddress inetAddress = address != null? + NetUtils.createSocketAddr(address): NetUtils.createLocalServerAddress(); + return inetAddress.getPort(); + } } public static class PeerChanges { @@ -112,7 +114,9 @@ public abstract class MiniRaftCluster { public static RaftConfiguration initConfiguration(String[] ids) { return RaftConfiguration.newBuilder() - .setConf(Arrays.stream(ids).map(id -> new RaftPeer(new RaftPeerId(id))) + .setConf(Arrays.stream(ids) + .map(id -> new RaftPeerId(id)) + .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())) .collect(Collectors.toList())) .build(); } @@ -154,16 +158,6 @@ public abstract class MiniRaftCluster { ExitUtils.disableSystemExit(); } - protected <RPC extends RaftServerRpc> void init(Map<RaftPeer, RPC> peers) { - LOG.info("peers = " + peers.keySet()); - conf = RaftConfiguration.newBuilder().setConf(peers.keySet()).build(); - for (Map.Entry<RaftPeer, RPC> entry : peers.entrySet()) { - final RaftServerImpl server = servers.get(entry.getKey().getId()); - server.setInitialConf(conf); - server.setServerRpc(entry.getValue()); - } - } - public void start() { LOG.info("Starting " + getClass().getSimpleName()); servers.values().forEach(RaftServerImpl::start); @@ -188,11 +182,11 @@ public abstract class MiniRaftCluster { servers.put(id, newRaftServer(id, format)); } - setPeerRpc(); + initRpc(); start(); } - protected abstract void setPeerRpc() throws IOException; + protected void initRpc() {} public int getMaxTimeout() { return properties.getInt( @@ -204,20 +198,19 @@ public abstract class MiniRaftCluster { return conf; } - private RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { - final RaftServerImpl s; + protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { try { final String dirStr = testBaseDir + id; if (format) { formatDir(dirStr); } - properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr); + final RaftProperties prop = new RaftProperties(properties); + prop.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr); final StateMachine stateMachine = getStateMachine4Test(properties); - s = ServerImplUtils.newRaftServer(id, stateMachine, conf, properties); + return ServerImplUtils.newRaftServer(id, stateMachine, conf, prop); } catch (IOException e) { throw new RuntimeException(e); } - return s; } static StateMachine getStateMachine4Test(RaftProperties properties) { @@ -230,23 +223,22 @@ public abstract class MiniRaftCluster { public abstract RaftClientRequestSender getRaftClientRequestSender(); - protected <RPC extends RaftServerRpc> Collection<RaftPeer> addNewPeers( - Map<RaftPeer, RPC> newPeers, Collection<RaftServerImpl> newServers, - boolean startService) throws IOException { - for (Map.Entry<RaftPeer, RPC> entry : newPeers.entrySet()) { - RaftServerImpl server = servers.get(entry.getKey().getId()); - server.setServerRpc(entry.getValue()); - } + public static Collection<RaftPeer> toRaftPeers( + Collection<RaftServerImpl> servers) { + return servers.stream() + .map(s -> new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress())) + .collect(Collectors.toList()); + } + + protected Collection<RaftPeer> addNewPeers( + Collection<RaftServerImpl> newServers, boolean startService) { + final Collection<RaftPeer> peers = toRaftPeers(newServers); if (startService) { newServers.forEach(RaftServerImpl::start); } - return new ArrayList<>(newPeers.keySet()); + return peers; } - protected abstract Collection<RaftPeer> addNewPeers( - Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers, - boolean startService) throws IOException; - public PeerChanges addNewPeers(int number, boolean startNewPeer) throws IOException { return addNewPeers(generateIds(number, servers.size()), startNewPeer); @@ -255,22 +247,19 @@ public abstract class MiniRaftCluster { public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) throws IOException { LOG.info("Add new peers {}", Arrays.asList(ids)); - Collection<RaftPeer> newPeers = new ArrayList<>(ids.length); - for (String id : ids) { - newPeers.add(new RaftPeer(new RaftPeerId(id))); - } // create and add new RaftServers final List<RaftServerImpl> newServers = new ArrayList<>(ids.length); - for (RaftPeer p : newPeers) { - RaftServerImpl newServer = newRaftServer(p.getId(), true); - Preconditions.checkArgument(!servers.containsKey(p.getId())); - servers.put(p.getId(), newServer); + for (String id : ids) { + Preconditions.checkArgument(!servers.containsKey(id)); + + final RaftPeerId peerId = new RaftPeerId(id); + final RaftServerImpl newServer = newRaftServer(peerId, true); + servers.put(peerId, newServer); newServers.add(newServer); } - // for hadoop-rpc-enabled peer, we assign inetsocketaddress here - newPeers = addNewPeers(newPeers, newServers, startNewPeer); + final Collection<RaftPeer> newPeers = addNewPeers(newServers, startNewPeer); final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]); newPeers.addAll(conf.getPeers()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java index 1745321..121159b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -18,6 +18,8 @@ package org.apache.ratis.server.simulation; import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.RpcType; import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; @@ -39,6 +41,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { @Override public MiniRaftClusterWithSimulatedRpc newCluster( String[] ids, RaftProperties prop, boolean formatted) { + RaftConfigKeys.Rpc.setType(prop::setEnum, RpcType.SIMULATED); if (ThreadLocalRandom.current().nextBoolean()) { // turn off simulate latency half of the times. prop.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0); @@ -56,28 +59,24 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { initRpc(); } - private void initRpc() { - final Collection<RaftPeer> peers = getConf().getPeers(); + @Override + protected void initRpc() { final int simulateLatencyMs = properties.getInt( SimulatedRequestReply.SIMULATE_LATENCY_KEY, SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT); LOG.info(SimulatedRequestReply.SIMULATE_LATENCY_KEY + " = " + simulateLatencyMs); - serverRequestReply = new SimulatedRequestReply<>(peers, simulateLatencyMs); - client2serverRequestReply = new SimulatedClientRequestReply(peers, - simulateLatencyMs); - - setRpcServers(getServers()); + serverRequestReply = new SimulatedRequestReply<>(simulateLatencyMs); + client2serverRequestReply = new SimulatedClientRequestReply(simulateLatencyMs); + getServers().stream().forEach(s -> initRpc(s)); + addPeersToRpc(toRaftPeers(getServers())); } - private void setRpcServers(Collection<RaftServerImpl> newServers) { - newServers.forEach(s -> s.setServerRpc( - new SimulatedServerRpc(s, serverRequestReply, client2serverRequestReply))); - } - - @Override - protected void setPeerRpc() { - initRpc(); + private void initRpc(RaftServerImpl s) { + if (serverRequestReply != null) { + ((SimulationFactory)s.getFactory()).initRpc( + serverRequestReply, client2serverRequestReply); + } } private void addPeersToRpc(Collection<RaftPeer> peers) { @@ -86,20 +85,25 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { } @Override + protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { + final RaftServerImpl s = super.newRaftServer(id, format); + initRpc(s); + return s; + } + + @Override public void restartServer(String id, boolean format) throws IOException { super.restartServer(id, format); RaftServerImpl s = getServer(id); addPeersToRpc(Collections.singletonList(conf.getPeer(new RaftPeerId(id)))); - s.setServerRpc(new SimulatedServerRpc(s, serverRequestReply, - client2serverRequestReply)); s.start(); } @Override - public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, - Collection<RaftServerImpl> newServers, boolean startService) { + public Collection<RaftPeer> addNewPeers( + Collection<RaftServerImpl> newServers, boolean startService) { + final Collection<RaftPeer> newPeers = toRaftPeers(newServers); addPeersToRpc(newPeers); - setRpcServers(newServers); if (startService) { newServers.forEach(RaftServerImpl::start); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java index 9302051..65fe7ad 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java @@ -22,15 +22,11 @@ import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; -import java.io.IOException; -import java.util.Collection; - -public class SimulatedClientRequestReply +class SimulatedClientRequestReply extends SimulatedRequestReply<RaftClientRequest, RaftClientReply> implements RaftClientRequestSender { - SimulatedClientRequestReply(Collection<RaftPeer> allPeers, - int simulateLatencyMs) { - super(allPeers, simulateLatencyMs); + SimulatedClientRequestReply(int simulateLatencyMs) { + super(simulateLatencyMs); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java index f11c626..95d3efa 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java @@ -18,7 +18,6 @@ package org.apache.ratis.server.simulation; import com.google.common.base.Preconditions; - import org.apache.ratis.RaftTestUtil; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftRpcMessage; @@ -27,7 +26,6 @@ import org.apache.ratis.util.RaftUtils; import org.apache.ratis.util.Timestamp; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -37,7 +35,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -public class SimulatedRequestReply<REQUEST extends RaftRpcMessage, +class SimulatedRequestReply<REQUEST extends RaftRpcMessage, REPLY extends RaftRpcMessage> { public static final String SIMULATE_LATENCY_KEY = SimulatedRequestReply.class.getName() + ".simulateLatencyMs"; @@ -105,15 +103,11 @@ public class SimulatedRequestReply<REQUEST extends RaftRpcMessage, } } - private final Map<String, EventQueue<REQUEST, REPLY>> queues; + private final Map<String, EventQueue<REQUEST, REPLY>> queues + = new ConcurrentHashMap<>(); private final int simulateLatencyMs; - SimulatedRequestReply(Collection<RaftPeer> allPeers, int simulateLatencyMs) { - queues = new ConcurrentHashMap<>(); - for (RaftPeer peer : allPeers) { - queues.put(peer.getId().toString(), new EventQueue<>()); - } - + SimulatedRequestReply(int simulateLatencyMs) { this.simulateLatencyMs = simulateLatencyMs; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulationFactory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulationFactory.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulationFactory.java new file mode 100644 index 0000000..d0b3d38 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulationFactory.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerFactory; + +import java.util.Objects; + +public class SimulationFactory extends ServerFactory.BaseFactory { + private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply; + private SimulatedClientRequestReply client2serverRequestReply; + + public void initRpc( + SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, + SimulatedClientRequestReply client2serverRequestReply) { + this.serverRequestReply = Objects.requireNonNull(serverRequestReply); + this.client2serverRequestReply = Objects.requireNonNull(client2serverRequestReply); + } + + @Override + public RaftServerRpc newRaftServerRpc(RaftServerImpl server) { + return new SimulatedServerRpc(server, serverRequestReply, client2serverRequestReply); + } +}
