RATIS-89. Add raft group and raft group ID. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/466fc2c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/466fc2c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/466fc2c3 Branch: refs/heads/master Commit: 466fc2c3a8fc984b3f94e939045fc4d8a7b0ae0c Parents: ddb82cd Author: Jing Zhao <[email protected]> Authored: Tue Jun 13 19:42:53 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Tue Jun 13 19:42:53 2017 -0700 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 9 ++- .../ratis/client/impl/ClientImplUtils.java | 5 +- .../ratis/client/impl/ClientProtoUtils.java | 37 +++++++---- .../ratis/client/impl/RaftClientImpl.java | 13 ++-- .../org/apache/ratis/protocol/ClientId.java | 42 ++---------- .../ratis/protocol/RaftClientMessage.java | 13 +++- .../apache/ratis/protocol/RaftClientReply.java | 15 +++-- .../ratis/protocol/RaftClientRequest.java | 8 +-- .../org/apache/ratis/protocol/RaftGroup.java | 55 ++++++++++++++++ .../org/apache/ratis/protocol/RaftGroupId.java | 41 ++++++++++++ .../java/org/apache/ratis/protocol/RaftId.java | 67 ++++++++++++++++++++ .../org/apache/ratis/protocol/RaftPeerId.java | 3 +- .../apache/ratis/protocol/RaftRpcMessage.java | 2 + .../ratis/protocol/ReinitializeRequest.java | 14 ++-- .../ratis/protocol/SetConfigurationRequest.java | 4 +- .../java/org/apache/ratis/util/ProtoUtils.java | 1 + .../TestRaftStateMachineException.java | 4 +- .../ratis/grpc/client/AppendStreamer.java | 14 ++-- .../ratis/grpc/client/RaftOutputStream.java | 5 +- .../ratis/grpc/MiniRaftClusterWithGRpc.java | 7 +- .../org/apache/ratis/grpc/TestRaftStream.java | 9 ++- .../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 8 +-- .../ratis/netty/MiniRaftClusterWithNetty.java | 7 +- ratis-proto-shaded/src/main/proto/Raft.proto | 11 ++++ .../org/apache/ratis/server/RaftServer.java | 8 +-- .../ratis/server/impl/RaftServerImpl.java | 40 +++++++----- .../ratis/server/impl/RaftServerProxy.java | 22 +++---- .../ratis/server/impl/ServerImplUtils.java | 15 +---- .../ratis/server/impl/ServerProtoUtils.java | 29 +++++---- .../apache/ratis/server/impl/ServerState.java | 7 +- .../java/org/apache/ratis/MiniRaftCluster.java | 67 +++++++++++--------- .../ratis/RaftNotLeaderExceptionBaseTest.java | 6 +- .../org/apache/ratis/RaftRetryCacheTests.java | 9 +-- .../impl/RaftReconfigurationBaseTest.java | 14 ++-- .../server/impl/ReinitializationBaseTest.java | 15 +++-- .../MiniRaftClusterWithSimulatedRpc.java | 7 +- .../server/simulation/RaftServerReply.java | 12 ++++ .../server/simulation/RaftServerRequest.java | 12 ++++ .../statemachine/RaftSnapshotBaseTest.java | 2 +- 39 files changed, 433 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 956a6de..1a7faf6 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; -import java.util.Collection; import java.util.Objects; /** A client who sends requests to a raft service. */ @@ -64,7 +63,7 @@ public interface RaftClient extends Closeable { class Builder { private ClientId clientId; private RaftClientRpc clientRpc; - private Collection<RaftPeer> servers; + private RaftGroup group; private RaftPeerId leaderId; private RaftProperties properties; private TimeDuration retryInterval = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT; @@ -80,7 +79,7 @@ public interface RaftClient extends Closeable { retryInterval = RaftClientConfigKeys.Rpc.timeout(properties); } return ClientImplUtils.newRaftClient(clientId, - Objects.requireNonNull(servers, "The 'servers' field is not initialized."), + Objects.requireNonNull(group, "The 'servers' field is not initialized."), leaderId, Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."), retryInterval); @@ -93,8 +92,8 @@ public interface RaftClient extends Closeable { } /** Set servers. */ - public Builder setServers(Collection<RaftPeer> servers) { - this.servers = servers; + public Builder setRaftGroup(RaftGroup group) { + this.group = group; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java index 882aa41..07b07b0 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java @@ -19,6 +19,7 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeer; @@ -29,8 +30,8 @@ import java.util.Collection; /** Client utilities for internal use. */ public class ClientImplUtils { public static RaftClient newRaftClient( - ClientId clientId, Collection<RaftPeer> peers, RaftPeerId leaderId, + ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftClientRpc clientRpc, TimeDuration retryInterval) { - return new RaftClientImpl(clientId, peers, leaderId, clientRpc, retryInterval); + return new RaftClientImpl(clientId, group, leaderId, clientRpc, retryInterval); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index ff49109..146622b 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -30,27 +30,31 @@ import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.Exce public class ClientProtoUtils { public static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder( - byte[] requestorId, byte[] replyId, long callId, boolean success) { + byte[] requestorId, byte[] replyId, byte[] groupId, long callId, boolean success) { return RaftRpcReplyProto.newBuilder() .setRequestorId(ProtoUtils.toByteString(requestorId)) .setReplyId(ProtoUtils.toByteString(replyId)) + .setRaftGroupId(RaftGroupIdProto.newBuilder().setId(ProtoUtils.toByteString(groupId))) .setCallId(callId) .setSuccess(success); } public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( - byte[] requesterId, byte[] replyId, long callId) { + byte[] requesterId, byte[] replyId, byte[] raftGroupId, long callId) { return RaftRpcRequestProto.newBuilder() .setRequestorId(ProtoUtils.toByteString(requesterId)) .setReplyId(ProtoUtils.toByteString(replyId)) + .setRaftGroupId(RaftGroupIdProto.newBuilder().setId(ProtoUtils.toByteString(raftGroupId))) .setCallId(callId); } public static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) { ClientId clientId = new ClientId( p.getRpcRequest().getRequestorId().toByteArray()); + RaftGroupId groupId = + new RaftGroupId(p.getRpcRequest().getRaftGroupId().getId().toByteArray()); RaftPeerId serverId = RaftPeerId.valueOf(p.getRpcRequest().getReplyId()); - return new RaftClientRequest(clientId, serverId, + return new RaftClientRequest(clientId, serverId, groupId, p.getRpcRequest().getCallId(), toMessage(p.getMessage()), p.getReadOnly()); } @@ -59,18 +63,19 @@ public class ClientProtoUtils { RaftClientRequest request) { return RaftClientRequestProto.newBuilder() .setRpcRequest(toRaftRpcRequestProtoBuilder(request.getClientId().toBytes(), - request.getServerId().toBytes(), request.getCallId())) + request.getServerId().toBytes(), request.getRaftGroupId().toBytes(), + request.getCallId())) .setMessage(toClientMessageEntryProto(request.getMessage())) .setReadOnly(request.isReadOnly()) .build(); } public static RaftClientRequestProto genRaftClientRequestProto( - ClientId clientId, RaftPeerId serverId, long callId, ByteString content, - boolean readOnly) { + ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, + ByteString content, boolean readOnly) { return RaftClientRequestProto.newBuilder() .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(), - serverId.toBytes(), callId)) + serverId.toBytes(), groupId.toBytes(), callId)) .setMessage(ClientMessageEntryProto.newBuilder().setContent(content)) .setReadOnly(readOnly) .build(); @@ -81,7 +86,8 @@ public class ClientProtoUtils { final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder(); if (reply != null) { b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toBytes(), - reply.getServerId().toBytes(), reply.getCallId(), reply.isSuccess())); + reply.getServerId().toBytes(), reply.getRaftGroupId().toBytes(), + reply.getCallId(), reply.isSuccess())); if (reply.getMessage() != null) { b.setMessage(toClientMessageEntryProto(reply.getMessage())); } @@ -128,9 +134,11 @@ public class ClientProtoUtils { smeProto.getExceptionClassName(), smeProto.getErrorMsg(), smeProto.getStacktrace()); } - return new RaftClientReply(new ClientId(rp.getRequestorId().toByteArray()), - RaftPeerId.valueOf(rp.getReplyId()), - rp.getCallId(), rp.getSuccess(), toMessage(replyProto.getMessage()), e); + ClientId clientId = new ClientId(rp.getRequestorId().toByteArray()); + RaftGroupId groupId = new RaftGroupId(rp.getRaftGroupId().getId().toByteArray()); + return new RaftClientReply(clientId, RaftPeerId.valueOf(rp.getReplyId()), + groupId, rp.getCallId(), rp.getSuccess(), + toMessage(replyProto.getMessage()), e); } private static StateMachineException wrapStateMachineException( @@ -171,6 +179,7 @@ public class ClientProtoUtils { return new SetConfigurationRequest( new ClientId(m.getRequestorId().toByteArray()), RaftPeerId.valueOf(m.getReplyId()), + new RaftGroupId(m.getRaftGroupId().getId().toByteArray()), p.getRpcRequest().getCallId(), peers); } @@ -180,6 +189,7 @@ public class ClientProtoUtils { .setRpcRequest(toRaftRpcRequestProtoBuilder( request.getClientId().toBytes(), request.getServerId().toBytes(), + request.getRaftGroupId().toBytes(), request.getCallId())) .addAllPeers(ProtoUtils.toRaftPeerProtos( Arrays.asList(request.getPeersInNewConf()))) @@ -193,6 +203,7 @@ public class ClientProtoUtils { return new ReinitializeRequest( new ClientId(m.getRequestorId().toByteArray()), RaftPeerId.valueOf(m.getReplyId()), + new RaftGroupId(m.getRaftGroupId().getId().toByteArray()), p.getRpcRequest().getCallId(), peers); } @@ -202,9 +213,9 @@ public class ClientProtoUtils { .setRpcRequest(toRaftRpcRequestProtoBuilder( request.getClientId().toBytes(), request.getServerId().toBytes(), + request.getRaftGroupId().toBytes(), request.getCallId())) - .addAllPeers(ProtoUtils.toRaftPeerProtos( - Arrays.asList(request.getPeersInNewConf()))) + .addAllPeers(ProtoUtils.toRaftPeerProtos(request.getPeersInGroup().getPeers())) .build(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 75ef2a4..c6571ae 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -26,6 +26,7 @@ import org.apache.ratis.protocol.*; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; @@ -43,16 +44,18 @@ final class RaftClientImpl implements RaftClient { private final ClientId clientId; private final RaftClientRpc clientRpc; private final Collection<RaftPeer> peers; + private final RaftGroupId groupId; private final TimeDuration retryInterval; private volatile RaftPeerId leaderId; - RaftClientImpl(ClientId clientId, Collection<RaftPeer> peers, + RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftClientRpc clientRpc, TimeDuration retryInterval) { this.clientId = clientId; this.clientRpc = clientRpc; - this.peers = peers; + this.peers = new ArrayList<>(group.getPeers()); + this.groupId = group.getGroupId(); this.leaderId = leaderId != null? leaderId : peers.iterator().next().getId(); this.retryInterval = retryInterval; @@ -77,7 +80,7 @@ final class RaftClientImpl implements RaftClient { private RaftClientReply send(Message message, boolean readOnly) throws IOException { final long callId = nextCallId(); return sendRequestWithRetry(() -> new RaftClientRequest( - clientId, leaderId, callId, message, readOnly)); + clientId, leaderId, groupId, callId, message, readOnly)); } @Override @@ -87,7 +90,7 @@ final class RaftClientImpl implements RaftClient { // also refresh the rpc proxies for these peers addServers(peersInNewConf); return sendRequestWithRetry(() -> new SetConfigurationRequest( - clientId, leaderId, callId, peersInNewConf)); + clientId, leaderId, groupId, callId, peersInNewConf)); } @Override @@ -96,7 +99,7 @@ final class RaftClientImpl implements RaftClient { final long callId = nextCallId(); addServers(peersInNewConf); return sendRequest(new ReinitializeRequest( - clientId, server, callId, peersInNewConf)); + clientId, server, groupId, callId, peersInNewConf)); } private void addServers(RaftPeer[] peersInNewConf) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java index 310e207..2af6558 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java @@ -17,60 +17,28 @@ */ package org.apache.ratis.protocol; -import org.apache.ratis.util.Preconditions; - -import java.nio.ByteBuffer; -import java.util.Objects; import java.util.UUID; /** * Id of Raft client. Should be globally unique so that raft peers can use it * to correctly identify retry requests from the same client. */ -public class ClientId { - public static final int BYTE_LENGTH = 16; - +public class ClientId extends RaftId { public static ClientId createId() { UUID uuid = UUID.randomUUID(); return new ClientId(uuid); } - private final UUID uuid; - - private ClientId(UUID id) { - this.uuid = Objects.requireNonNull(id, "id == null"); - } - public ClientId(byte[] data) { - Objects.requireNonNull(data, "data == null"); - Preconditions.assertTrue(data.length == BYTE_LENGTH, - "data.length = %s != BYTE_LENGTH = %s", data.length, BYTE_LENGTH); - ByteBuffer buffer = ByteBuffer.wrap(data); - this.uuid = new UUID(buffer.getLong(), buffer.getLong()); + super(data); } - public byte[] toBytes() { - ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]); - buf.putLong(uuid.getMostSignificantBits()); - buf.putLong(uuid.getLeastSignificantBits()); - return buf.array(); + private ClientId(UUID uuid) { + super(uuid); } @Override public String toString() { - return uuid.toString(); - } - - - @Override - public boolean equals(Object other) { - return other == this || - (other instanceof ClientId && - uuid.equals(((ClientId) other).uuid)); - } - - @Override - public int hashCode() { - return uuid.hashCode(); + return "client-" + super.toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java index f205f34..49f2b6f 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java @@ -20,10 +20,13 @@ package org.apache.ratis.protocol; public abstract class RaftClientMessage implements RaftRpcMessage { private final ClientId clientId; private final RaftPeerId serverId; + private final RaftGroupId groupId; - public RaftClientMessage(ClientId clientId, RaftPeerId serverId) { + public RaftClientMessage(ClientId clientId, RaftPeerId serverId, + RaftGroupId groupId) { this.clientId = clientId; this.serverId = serverId; + this.groupId = groupId; } @Override @@ -45,7 +48,13 @@ public abstract class RaftClientMessage implements RaftRpcMessage { } @Override + public RaftGroupId getRaftGroupId() { + return groupId; + } + + @Override public String toString() { - return getClass().getSimpleName() + "(" + clientId + "->" + serverId + ")"; + return getClass().getSimpleName() + "(" + clientId + "->" + serverId + + ") in raft group " + groupId; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index 7179505..60dc6c1 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -32,9 +32,10 @@ public class RaftClientReply extends RaftClientMessage { private final RaftException exception; private final Message message; - public RaftClientReply(ClientId clientId, RaftPeerId serverId, long callId, - boolean success, Message message, RaftException exception) { - super(clientId, serverId); + public RaftClientReply(ClientId clientId, RaftPeerId serverId, + RaftGroupId groupId, long callId, boolean success, Message message, + RaftException exception) { + super(clientId, serverId, groupId); this.success = success; this.callId = callId; this.message = message; @@ -43,13 +44,13 @@ public class RaftClientReply extends RaftClientMessage { public RaftClientReply(RaftClientRequest request, RaftException exception) { - this(request.getClientId(), request.getServerId(), request.getCallId(), - false, null, exception); + this(request.getClientId(), request.getServerId(), request.getRaftGroupId(), + request.getCallId(), false, null, exception); } public RaftClientReply(RaftClientRequest request, Message message) { - this(request.getClientId(), request.getServerId(), request.getCallId(), - true, message, null); + this(request.getClientId(), request.getServerId(), request.getRaftGroupId(), + request.getCallId(), true, message, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index 898c166..41bdb2e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -26,13 +26,13 @@ public class RaftClientRequest extends RaftClientMessage { private final boolean readOnly; public RaftClientRequest(ClientId clientId, RaftPeerId serverId, - long callId, Message message) { - this(clientId, serverId, callId, message, false); + RaftGroupId groupId, long callId, Message message) { + this(clientId, serverId, groupId, callId, message, false); } public RaftClientRequest(ClientId clientId, RaftPeerId serverId, - long callId, Message message, boolean readOnly) { - super(clientId, serverId); + RaftGroupId groupId, long callId, Message message, boolean readOnly) { + super(clientId, serverId, groupId); this.callId = callId; this.message = message; this.readOnly = readOnly; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java new file mode 100644 index 0000000..3ec0fdc --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import org.apache.ratis.util.Preconditions; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Description of a raft group. It has a globally unique ID and a group of raft + * peers. + */ +public class RaftGroup { + /** UTF-8 string as id */ + private final RaftGroupId groupId; + /** The group of raft peers */ + private final List<RaftPeer> peers; + + public RaftGroup(RaftGroupId groupId, RaftPeer[] peers) { + Preconditions.assertTrue(peers != null); + this.groupId = groupId; + this.peers = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(peers))); + } + + public RaftGroupId getGroupId() { + return groupId; + } + + public List<RaftPeer> getPeers() { + return peers; + } + + @Override + public String toString() { + return groupId + ":" + Arrays.asList(peers); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java new file mode 100644 index 0000000..0cb3a07 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import java.util.UUID; + +public class RaftGroupId extends RaftId { + + public static RaftGroupId createId() { + UUID uuid = UUID.randomUUID(); + return new RaftGroupId(uuid); + } + + protected RaftGroupId(UUID id) { + super(id); + } + + public RaftGroupId(byte[] data) { + super(data); + } + + @Override + public String toString() { + return "group-" + super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java new file mode 100644 index 0000000..86d7ccc --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import org.apache.ratis.util.Preconditions; + +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.UUID; + +public abstract class RaftId { + public static final int BYTE_LENGTH = 16; + + private final UUID uuid; + + protected RaftId(UUID id) { + this.uuid = Objects.requireNonNull(id, "id == null"); + } + + public RaftId(byte[] data) { + Objects.requireNonNull(data, "data == null"); + Preconditions.assertTrue(data.length == BYTE_LENGTH, + "data.length = %s != BYTE_LENGTH = %s", data.length, BYTE_LENGTH); + ByteBuffer buffer = ByteBuffer.wrap(data); + this.uuid = new UUID(buffer.getLong(), buffer.getLong()); + } + + public byte[] toBytes() { + ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]); + buf.putLong(uuid.getMostSignificantBits()); + buf.putLong(uuid.getLeastSignificantBits()); + return buf.array(); + } + + @Override + public String toString() { + return uuid.toString(); + } + + + @Override + public boolean equals(Object other) { + return other == this || + (other instanceof RaftId && + uuid.equals(((RaftId) other).uuid)); + } + + @Override + public int hashCode() { + return uuid.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java index 3fe3d0e..4e1a5d8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java @@ -39,7 +39,7 @@ public class RaftPeerId { } public static RaftPeerId valueOf(String id) { - return stringMap.computeIfAbsent(id, key -> new RaftPeerId(key)); + return stringMap.computeIfAbsent(id, RaftPeerId::new); } public static RaftPeerId getRaftPeerId(String id) { @@ -56,6 +56,7 @@ public class RaftPeerId { Preconditions.assertTrue(!id.isEmpty(), "id is an empty string."); this.id = id.getBytes(StandardCharsets.UTF_8); } + private RaftPeerId(byte[] id) { this.id = Objects.requireNonNull(id, "id == null"); Preconditions.assertTrue(id.length > 0, "id is an empty array."); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java index 58aa9ee..70727af 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java @@ -24,4 +24,6 @@ public interface RaftRpcMessage { String getRequestorId(); String getReplierId(); + + RaftGroupId getRaftGroupId(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java index 0a89340..b69c845 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java @@ -20,20 +20,20 @@ package org.apache.ratis.protocol; import java.util.Arrays; public class ReinitializeRequest extends RaftClientRequest { - private final RaftPeer[] peers; + private final RaftGroup group; public ReinitializeRequest(ClientId clientId, RaftPeerId serverId, - long callId, RaftPeer[] peers) { - super(clientId, serverId, callId, null); - this.peers = peers; + RaftGroupId groupId, long callId, RaftPeer[] peers) { + super(clientId, serverId, groupId, callId, null); + this.group = new RaftGroup(groupId, peers); } - public RaftPeer[] getPeersInNewConf() { - return peers; + public RaftGroup getPeersInGroup() { + return group; } @Override public String toString() { - return super.toString() + ", peers:" + Arrays.asList(getPeersInNewConf()); + return super.toString() + ", peers:" + Arrays.asList(getPeersInGroup()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java index 77d545c..83be197 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java @@ -23,8 +23,8 @@ public class SetConfigurationRequest extends RaftClientRequest { private final RaftPeer[] peers; public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId, - long callId, RaftPeer[] peers) { - super(clientId, serverId, callId, null); + RaftGroupId groupId, long callId, RaftPeer[] peers) { + super(clientId, serverId, groupId, callId, null); this.peers = peers; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index 527c4e8..7d73251 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.shaded.com.google.protobuf.ByteString; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java index b16bf67..5845ca9 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -116,7 +116,7 @@ public class TestRaftStateMachineException { final RaftClientRpc rpc = client.getClientRpc(); final long callId = 999; RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, - callId, new RaftTestUtil.SimpleMessage("message")); + cluster.getGroupId(), callId, new RaftTestUtil.SimpleMessage("message")); RaftClientReply reply = rpc.sendRequest(r); Assert.assertFalse(reply.isSuccess()); Assert.assertNotNull(reply.getStateMachineException()); @@ -163,7 +163,7 @@ public class TestRaftStateMachineException { final RaftClientRpc rpc = client.getClientRpc(); final long callId = 999; RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, - callId, new RaftTestUtil.SimpleMessage("message")); + cluster.getGroupId(), callId, new RaftTestUtil.SimpleMessage("message")); RaftClientReply reply = rpc.sendRequest(r); Assert.assertTrue(reply.hasStateMachineException()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index f9ff00a..75d1dd7 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -86,8 +86,9 @@ public class AppendStreamer implements Closeable { private volatile RunningState running = RunningState.RUNNING; private final ExceptionAndRetry exceptionAndRetry; private final Sender senderThread; + private final RaftGroupId groupId; - AppendStreamer(RaftProperties prop, Collection<RaftPeer> peers, + AppendStreamer(RaftProperties prop, RaftGroup group, RaftPeerId leaderId, ClientId clientId) { this.clientId = clientId; maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop); @@ -95,11 +96,12 @@ public class AppendStreamer implements Closeable { ackQueue = new ConcurrentLinkedDeque<>(); exceptionAndRetry = new ExceptionAndRetry(prop); - this.peers = peers.stream().collect( + this.groupId = group.getGroupId(); + this.peers = group.getPeers().stream().collect( Collectors.toMap(RaftPeer::getId, Function.identity())); proxyMap = new PeerProxyMap<>( raftPeer -> new RaftClientProtocolProxy(raftPeer, ResponseHandler::new)); - proxyMap.addPeers(peers); + proxyMap.addPeers(group.getPeers()); refreshLeaderProxy(leaderId, null); senderThread = new Sender(); @@ -155,7 +157,7 @@ public class AppendStreamer implements Closeable { if (isRunning()) { // wrap the current buffer into a RaftClientRequestProto final RaftClientRequestProto request = genRaftClientRequestProto( - clientId, leaderId, seqNum, content, false); + clientId, leaderId, groupId, seqNum, content, false); dataQueue.offer(request); this.notifyAll(); } else { @@ -361,8 +363,8 @@ public class AppendStreamer implements Closeable { RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder() .setMessage(oldRequest.getMessage()) .setReadOnly(oldRequest.getReadOnly()) - .setRpcRequest(toRaftRpcRequestProtoBuilder( - clientId.toBytes(), newLeader.toBytes(), r.getCallId())) + .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(), + newLeader.toBytes(), groupId.toBytes(), r.getCallId())) .build(); dataQueue.offerFirst(newRequest); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java index 33d3d22..09d57a0 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java @@ -20,6 +20,7 @@ package org.apache.ratis.grpc.client; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.ProtoUtils; @@ -40,12 +41,12 @@ public class RaftOutputStream extends OutputStream { private boolean closed = false; public RaftOutputStream(RaftProperties prop, ClientId clientId, - Collection<RaftPeer> peers, RaftPeerId leaderId) { + RaftGroup group, RaftPeerId leaderId) { final int bufferSize = GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt(); buf = new byte[bufferSize]; count = 0; this.clientId = clientId; - streamer = new AppendStreamer(prop, peers, leaderId, clientId); + streamer = new AppendStreamer(prop, group, leaderId, clientId); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java index 1cf55ad..a5992b1 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java @@ -21,6 +21,7 @@ import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; @@ -49,10 +50,10 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { @Override protected RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftPeerId id, StateMachine stateMachine, RaftGroup group, RaftProperties properties) throws IOException { - GrpcConfigKeys.Server.setPort(properties, getPort(id, conf)); - return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null); + GrpcConfigKeys.Server.setPort(properties, getPort(id, group)); + return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java index 1efe4d3..c67572b 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -57,7 +57,6 @@ public class TestRaftStream { private MiniRaftClusterWithGRpc cluster; - @After public void tearDown() { if (cluster != null) { @@ -91,7 +90,7 @@ public class TestRaftStream { final long seed = r.nextLong(); r.setSeed(seed); try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(), - cluster.getPeers(), leader.getId())) { + cluster.getGroup(), leader.getId())) { for (int i = 0; i < 500; i++) { // generate 500 requests out.write(toBytes(r.nextInt())); } @@ -128,7 +127,7 @@ public class TestRaftStream { RaftServerImpl leader = waitForLeader(cluster); RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(), - cluster.getPeers(), leader.getId()); + cluster.getGroup(), leader.getId()); int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072}; ByteValue[] values = new ByteValue[lengths.length]; @@ -207,7 +206,7 @@ public class TestRaftStream { RaftServerImpl leader = waitForLeader(cluster); RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(), - cluster.getPeers(), leader.getId()); + cluster.getGroup(), leader.getId()); byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2]; Arrays.fill(b1, (byte) 1); @@ -273,7 +272,7 @@ public class TestRaftStream { LOG.info("Writer thread starts"); int count = 0; try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(), - cluster.getPeers(), leader.getId())) { + cluster.getGroup(), leader.getId())) { while (running.get()) { out.write(toBytes(count++)); Thread.sleep(10); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java index c52c81d..bc420f4 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -23,10 +23,10 @@ import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.hadooprpc.server.HadoopRpcService; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; -import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.impl.ServerImplUtils; import org.apache.ratis.statemachine.StateMachine; @@ -73,13 +73,13 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { @Override protected RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftPeerId id, StateMachine stateMachine, RaftGroup group, RaftProperties properties) throws IOException { final Configuration hconf = new Configuration(hadoopConf); - final String address = "0.0.0.0:" + getPort(id, conf); + final String address = "0.0.0.0:" + getPort(id, group); HadoopConfigKeys.Ipc.setAddress(hconf, address); - return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, + return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, HadoopFactory.newRaftParameters(hconf)); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java index 02a7493..e6b19f8 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -22,6 +22,7 @@ import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.netty.server.NettyRpcService; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.*; @@ -48,10 +49,10 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { @Override protected RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftPeerId id, StateMachine stateMachine, RaftGroup group, RaftProperties properties) throws IOException { - NettyConfigKeys.Server.setPort(properties, getPort(id, conf)); - return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null); + NettyConfigKeys.Server.setPort(properties, getPort(id, group)); + return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index b53181f..ed145df 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -26,6 +26,15 @@ message RaftPeerProto { string address = 2; // e.g. IP address, hostname etc. } +message RaftGroupIdProto { + bytes id = 1; +} + +message RaftGroupProto { + RaftGroupIdProto groupId = 1; + repeated RaftPeerProto peers = 2; +} + message RaftConfigurationProto { repeated RaftPeerProto peers = 1; // the peers in the current or new conf repeated RaftPeerProto oldPeers = 2; // the peers in the old conf @@ -66,6 +75,7 @@ message RaftRpcRequestProto { bytes requestorId = 1; bytes replyId = 2; uint64 callId = 3; + RaftGroupIdProto raftGroupId = 4; } message RaftRpcReplyProto { @@ -73,6 +83,7 @@ message RaftRpcReplyProto { bytes replyId = 2; uint64 callId = 3; bool success = 4; + RaftGroupIdProto raftGroupId = 5; } message FileChunkProto { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index 0899dd1..d5a46f9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -61,7 +61,7 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, class Builder { private RaftPeerId serverId; private StateMachine stateMachine; - private Iterable<RaftPeer> peers; + private RaftGroup group; private RaftProperties properties; private Parameters parameters; @@ -69,8 +69,8 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, public RaftServer build() throws IOException { return ServerImplUtils.newRaftServer( Objects.requireNonNull(serverId, "The 'serverId' field is not initialized."), + Objects.requireNonNull(group, "The 'peers' field is not initialized."), Objects.requireNonNull(stateMachine, "The 'stateMachine' is not initialized."), - Objects.requireNonNull(peers, "The 'peers' field is not initialized."), Objects.requireNonNull(properties, "The 'properties' field is not initialized."), parameters); } @@ -88,8 +88,8 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, } /** Set all the peers (including the server being built) in the Raft cluster. */ - public Builder setPeers(Iterable<RaftPeer> peers) { - this.peers = peers; + public Builder setPeers(RaftGroup group) { + this.group = group; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 9685fbc..88cf240 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -82,16 +82,18 @@ public class RaftServerImpl implements RaftServerProtocol, private final RetryCache retryCache; - RaftServerImpl(RaftPeerId id, RaftServerProxy proxy, - RaftConfiguration raftConf, RaftProperties properties) - throws IOException { + private final RaftGroupId groupId; + + RaftServerImpl(RaftPeerId id, RaftGroup group, RaftServerProxy proxy, + RaftProperties properties) throws IOException { + this.groupId = group.getGroupId(); this.lifeCycle = new LifeCycle(id); minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS); maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS); Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs, "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); this.proxy = proxy; - this.state = new ServerState(id, raftConf, properties, this, proxy.getStateMachine()); + this.state = new ServerState(id, group, properties, this, proxy.getStateMachine()); this.retryCache = initRetryCache(properties); } @@ -122,6 +124,10 @@ public class RaftServerImpl implements RaftServerProtocol, maxTimeoutMs - minTimeoutMs + 1); } + RaftGroupId getGroupId() { + return groupId; + } + StateMachine getStateMachine() { return proxy.getStateMachine(); } @@ -582,7 +588,7 @@ public class RaftServerImpl implements RaftServerProtocol, shouldShutdown = true; } reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(), - voteGranted, state.getCurrentTerm(), shouldShutdown); + groupId, voteGranted, state.getCurrentTerm(), shouldShutdown); if (LOG.isDebugEnabled()) { LOG.debug("{} replies to vote request: {}. Peer's state: {}", getId(), ProtoUtils.toString(reply), state); @@ -671,7 +677,7 @@ public class RaftServerImpl implements RaftServerProtocol, currentTerm = state.getCurrentTerm(); if (!recognized) { final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getId(), currentTerm, nextIndex, NOT_LEADER); + leaderId, getId(), groupId, currentTerm, nextIndex, NOT_LEADER); if (LOG.isDebugEnabled()) { LOG.debug("{}: do not recognize leader. Reply: {}", getId(), ProtoUtils.toString(reply)); @@ -697,7 +703,7 @@ public class RaftServerImpl implements RaftServerProtocol, // last index should have been committed. if (previous != null && !containPrevious(previous)) { final AppendEntriesReplyProto reply = - ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), + ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), groupId, currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY); if (LOG.isDebugEnabled()) { LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", @@ -710,7 +716,7 @@ public class RaftServerImpl implements RaftServerProtocol, state.updateConfiguration(entries); state.updateStatemachine(leaderCommit, currentTerm); } - if (entries != null && entries.length > 0) { + if (entries.length > 0) { try { state.getLog().logSync(); } catch (InterruptedException e) { @@ -727,7 +733,7 @@ public class RaftServerImpl implements RaftServerProtocol, } } final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getId(), currentTerm, nextIndex, SUCCESS); + leaderId, getId(), groupId, currentTerm, nextIndex, SUCCESS); logAppendEntries(isHeartbeat, () -> getId() + ": succeeded to handle AppendEntries. Reply: " + ServerProtoUtils.toString(reply)); @@ -767,7 +773,7 @@ public class RaftServerImpl implements RaftServerProtocol, currentTerm = state.getCurrentTerm(); if (!recognized) { final InstallSnapshotReplyProto reply = ServerProtoUtils - .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm, + .toInstallSnapshotReplyProto(leaderId, getId(), groupId, currentTerm, request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); LOG.debug("{}: do not recognize leader for installing snapshot." + " Reply: {}", getId(), reply); @@ -804,14 +810,14 @@ public class RaftServerImpl implements RaftServerProtocol, LOG.info("{}: successfully install the whole snapshot-{}", getId(), lastIncludedIndex); } - return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), + return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), groupId, currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS); } AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm, RaftPeerId targetId, TermIndex previous, List<LogEntryProto> entries, boolean initializing) { - return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, + return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, groupId, leaderTerm, entries, state.getLog().getLastCommittedIndex(), initializing, previous); } @@ -822,15 +828,15 @@ public class RaftServerImpl implements RaftServerProtocol, OptionalLong totalSize = snapshot.getFiles().stream() .mapToLong(FileInfo::getFileSize).reduce(Long::sum); assert totalSize.isPresent(); - return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId, + return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId, groupId, requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(), chunks, totalSize.getAsLong(), done); } synchronized RequestVoteRequestProto createRequestVoteRequest( RaftPeerId targetId, long term, TermIndex lastEntry) { - return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term, - lastEntry); + return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, + groupId, term, lastEntry); } public synchronized void submitLocalSyncEvent() { @@ -857,13 +863,13 @@ public class RaftServerImpl implements RaftServerProtocol, stateMachineFuture.whenComplete((reply, exception) -> { final RaftClientReply r; if (exception == null) { - r = new RaftClientReply(clientId, serverId, callId, true, reply, null); + r = new RaftClientReply(clientId, serverId, groupId, callId, true, reply, null); } else { // the exception is coming from the state machine. wrap it into the // reply as a StateMachineException final StateMachineException e = new StateMachineException( getId().toString(), exception); - r = new RaftClientReply(clientId, serverId, callId, false, null, e); + r = new RaftClientReply(clientId, serverId, groupId, callId, false, null, e); } // update retry cache cacheEntry.updateResult(r); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 5afb737..366a74c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -50,7 +50,7 @@ public class RaftServerProxy implements RaftServer { private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>(); RaftServerProxy(RaftPeerId id, StateMachine stateMachine, - RaftConfiguration raftConf, RaftProperties properties, Parameters parameters) + RaftGroup group, RaftProperties properties, Parameters parameters) throws IOException { this.id = id; this.properties = properties; @@ -59,20 +59,20 @@ public class RaftServerProxy implements RaftServer { final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters)); - this.impl = CompletableFuture.completedFuture(initImpl(raftConf)); - this.serverRpc = initRaftServerRpc(factory, this, raftConf); + this.impl = CompletableFuture.completedFuture(initImpl(group)); + this.serverRpc = initRaftServerRpc(factory, this, group); } - private RaftServerImpl initImpl(RaftConfiguration raftConf) throws IOException { - return new RaftServerImpl(id, this, raftConf, properties); + private RaftServerImpl initImpl(RaftGroup group) throws IOException { + return new RaftServerImpl(id, group, this, properties); } private static RaftServerRpc initRaftServerRpc( - ServerFactory factory, RaftServer server, RaftConfiguration raftConf) { + ServerFactory factory, RaftServer server, RaftGroup group) { final RaftServerRpc rpc = factory.newRaftServerRpc(server); // add peers into rpc service - if (raftConf != null) { - rpc.addPeers(raftConf.getPeers()); + if (group != null) { + rpc.addPeers(group.getPeers()); } return rpc; } @@ -167,11 +167,9 @@ public class RaftServerProxy implements RaftServer { impl = new CompletableFuture<>(); JavaUtils.getAndConsume(oldImpl, RaftServerImpl::shutdown); - final RaftConfiguration newConf = RaftConfiguration.newBuilder() - .setConf(request.getPeersInNewConf()).build(); final RaftServerImpl newImpl; try { - newImpl = initImpl(newConf); + newImpl = initImpl(request.getPeersInGroup()); } catch (IOException ioe) { final RaftException re = new RaftException( "Failed to reinitialize, request=" + request, ioe); @@ -180,7 +178,7 @@ public class RaftServerProxy implements RaftServer { return new RaftClientReply(request, re); } - getServerRpc().addPeers(newConf.getPeers()); + getServerRpc().addPeers(request.getPeersInGroup().getPeers()); newImpl.start(); impl.complete(newImpl); return new RaftClientReply(request, (Message) null); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 7934bbb..3c617f1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -19,9 +19,8 @@ package org.apache.ratis.server.impl; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.statemachine.StateMachine; @@ -29,18 +28,10 @@ import java.io.IOException; /** Server utilities for internal use. */ public class ServerImplUtils { - public static RaftServer newRaftServer( - RaftPeerId id, StateMachine stateMachine, Iterable<RaftPeer> peers, - RaftProperties properties, Parameters parameters) throws IOException { - return newRaftServer(id, stateMachine, - RaftConfiguration.newBuilder().setConf(peers).build(), - properties, parameters); - } - public static RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftProperties properties, Parameters parameters) throws IOException { - return new RaftServerProxy(id, stateMachine, conf, properties, parameters); + return new RaftServerProxy(id, stateMachine, group, properties, parameters); } public static TermIndex newTermIndex(long term, long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index ffd4378..5b11599 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.shaded.proto.RaftProtos; @@ -108,20 +109,20 @@ public class ServerProtoUtils { } public static RequestVoteReplyProto toRequestVoteReplyProto( - RaftPeerId requestorId, RaftPeerId replyId, boolean success, long term, - boolean shouldShutdown) { + RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, + boolean success, long term, boolean shouldShutdown) { final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder(); b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder( - requestorId.toBytes(), replyId.toBytes(), DEFAULT_CALLID, success)) + requestorId.toBytes(), replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, success)) .setTerm(term) .setShouldShutdown(shouldShutdown); return b.build(); } public static RequestVoteRequestProto toRequestVoteRequestProto( - RaftPeerId requestorId, RaftPeerId replyId, long term, TermIndex lastEntry) { + RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long term, TermIndex lastEntry) { RaftProtos.RaftRpcRequestProto.Builder rpb = ClientProtoUtils - .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(), DEFAULT_CALLID); + .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID); final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder() .setServerRequest(rpb) .setCandidateTerm(term); @@ -132,10 +133,10 @@ public class ServerProtoUtils { } public static InstallSnapshotReplyProto toInstallSnapshotReplyProto( - RaftPeerId requestorId, RaftPeerId replyId, long term, int requestIndex, - InstallSnapshotResult result) { + RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, + long term, int requestIndex, InstallSnapshotResult result) { final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), DEFAULT_CALLID, result == InstallSnapshotResult.SUCCESS); + replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, result == InstallSnapshotResult.SUCCESS); final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto .newBuilder().setServerReply(rb).setTerm(term).setResult(result) .setRequestIndex(requestIndex); @@ -143,13 +144,13 @@ public class ServerProtoUtils { } public static InstallSnapshotRequestProto toInstallSnapshotRequestProto( - RaftPeerId requestorId, RaftPeerId replyId, String requestId, int requestIndex, + RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, String requestId, int requestIndex, long term, TermIndex lastTermIndex, List<FileChunkProto> chunks, long totalSize, boolean done) { return InstallSnapshotRequestProto.newBuilder() .setServerRequest( ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), DEFAULT_CALLID)) + replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID)) .setRequestId(requestId) .setRequestIndex(requestIndex) // .setRaftConfiguration() TODO: save and pass RaftConfiguration @@ -161,10 +162,10 @@ public class ServerProtoUtils { } public static AppendEntriesReplyProto toAppendEntriesReplyProto( - RaftPeerId requestorId, RaftPeerId replyId, long term, + RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long term, long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) { RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), DEFAULT_CALLID, appendResult == SUCCESS); + replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, appendResult == SUCCESS); final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder(); b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex) .setResult(appendResult); @@ -172,14 +173,14 @@ public class ServerProtoUtils { } public static AppendEntriesRequestProto toAppendEntriesRequestProto( - RaftPeerId requestorId, RaftPeerId replyId, long leaderTerm, + RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long leaderTerm, List<LogEntryProto> entries, long leaderCommit, boolean initializing, TermIndex previous) { final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto .newBuilder() .setServerRequest( ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), DEFAULT_CALLID)) + replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID)) .setLeaderTerm(leaderTerm) .setLeaderCommit(leaderCommit) .setInitializing(initializing); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index d595691..53363cd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -19,6 +19,7 @@ package org.apache.ratis.server.impl; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServerConfigKeys; @@ -76,12 +77,14 @@ public class ServerState implements Closeable { */ private TermIndex latestInstalledSnapshot; - ServerState(RaftPeerId id, RaftConfiguration conf, RaftProperties prop, + ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop, RaftServerImpl server, StateMachine stateMachine) throws IOException { this.selfId = id; this.server = server; - configurationManager = new ConfigurationManager(conf); + RaftConfiguration initialConf = RaftConfiguration.newBuilder() + .setConf(group.getPeers()).build(); + configurationManager = new ConfigurationManager(initialConf); storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR); snapshotManager = new SnapshotManager(storage, id); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 83fcf54..0ef7439 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -21,6 +21,8 @@ import org.apache.ratis.client.ClientFactory; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; @@ -75,9 +77,10 @@ public abstract class MiniRaftCluster { RaftTestUtil.setBlockRequestsFrom(src, block); } - public static int getPort(RaftPeerId id, RaftConfiguration conf) { - final RaftPeer peer = conf.getPeer(id); - final String address = peer != null? peer.getAddress(): null; + public static int getPort(RaftPeerId id, RaftGroup group) { + final List<RaftPeer> peers = group.getPeers().stream() + .filter(raftPeer -> raftPeer.getId().equals(id)).collect(Collectors.toList()); + final String address = peers.isEmpty() ? null : peers.get(0).getAddress(); final InetSocketAddress inetAddress = address != null? NetUtils.createSocketAddr(address): NetUtils.createLocalServerAddress(); return inetAddress.getPort(); @@ -96,13 +99,12 @@ public abstract class MiniRaftCluster { } } - public static RaftConfiguration initConfiguration(Collection<String> ids) { - return RaftConfiguration.newBuilder() - .setConf(ids.stream() - .map(id -> RaftPeerId.valueOf(id)) - .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())) - .collect(Collectors.toList())) - .build(); + public static RaftGroup initRaftGroup(Collection<String> ids) { + List<RaftPeer> peers = ids.stream() + .map(RaftPeerId::valueOf) + .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())) + .collect(Collectors.toList()); + return new RaftGroup(RaftGroupId.createId(), peers.toArray(new RaftPeer[peers.size()])); } private static String getBaseDirectory() { @@ -125,14 +127,14 @@ public abstract class MiniRaftCluster { } protected final ClientFactory clientFactory; - protected RaftConfiguration conf; + protected RaftGroup group; protected final RaftProperties properties; protected final Parameters parameters; private final String testBaseDir; protected final Map<RaftPeerId, RaftServerProxy> servers = new ConcurrentHashMap<>(); protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) { - this.conf = initConfiguration(Arrays.asList(ids)); + this.group = initRaftGroup(Arrays.asList(ids)); this.properties = new RaftProperties(properties); this.parameters = parameters; @@ -146,17 +148,17 @@ public abstract class MiniRaftCluster { public MiniRaftCluster initServers() { if (servers.isEmpty()) { - putNewServers(CollectionUtils.as(conf.getPeers(), RaftPeer::getId), true); + putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true); } return this; } private RaftServerProxy putNewServer(RaftPeerId id, boolean format) { - return putNewServer(id, conf, format); + return putNewServer(id, group, format); } - public RaftServerProxy putNewServer(RaftPeerId id, RaftConfiguration raftConf, boolean format) { - final RaftServerProxy s = newRaftServer(id, raftConf, format); + public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean format) { + final RaftServerProxy s = newRaftServer(id, group, format); Preconditions.assertTrue(servers.put(id, s) == null); return s; } @@ -197,11 +199,8 @@ public abstract class MiniRaftCluster { return RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS); } - public RaftConfiguration getConf() { - return conf; - } - - private RaftServerProxy newRaftServer(RaftPeerId id, RaftConfiguration raftConf, boolean format) { + private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group, + boolean format) { try { final String dirStr = testBaseDir + id; if (format) { @@ -210,14 +209,14 @@ public abstract class MiniRaftCluster { final RaftProperties prop = new RaftProperties(properties); RaftServerConfigKeys.setStorageDir(prop, dirStr); final StateMachine stateMachine = getStateMachine4Test(properties); - return newRaftServer(id, stateMachine, raftConf, prop); + return newRaftServer(id, stateMachine, group, prop); } catch (IOException e) { throw new RuntimeException(e); } } protected abstract RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, + RaftPeerId id, StateMachine stateMachine, RaftGroup group, RaftProperties properties) throws IOException; static StateMachine getStateMachine4Test(RaftProperties properties) { @@ -259,9 +258,9 @@ public abstract class MiniRaftCluster { final Collection<RaftPeer> newPeers = toRaftPeers(newServers); final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]); - newPeers.addAll(conf.getPeers()); - conf = RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build(); + newPeers.addAll(group.getPeers()); RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]); + group = new RaftGroup(group.getGroupId(), p); return new PeerChanges(p, np, new RaftPeer[0]); } @@ -280,7 +279,7 @@ public abstract class MiniRaftCluster { */ public PeerChanges removePeers(int number, boolean removeLeader, Collection<RaftPeer> excluded) { - Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers()); + Collection<RaftPeer> peers = new ArrayList<>(group.getPeers()); List<RaftPeer> removedPeers = new ArrayList<>(number); if (removeLeader) { final RaftPeer leader = toRaftPeer(getLeader()); @@ -298,8 +297,8 @@ public abstract class MiniRaftCluster { removed++; } } - conf = RaftConfiguration.newBuilder().setConf(peers).setLogEntryIndex(0).build(); RaftPeer[] p = peers.toArray(new RaftPeer[peers.size()]); + group = new RaftGroup(group.getGroupId(), p); return new PeerChanges(p, new RaftPeer[0], removedPeers.toArray(new RaftPeer[removedPeers.size()])); } @@ -393,13 +392,17 @@ public abstract class MiniRaftCluster { return toRaftPeers(getServers()); } + public RaftGroup getGroup() { + return group; + } + public RaftClient createClient(RaftPeerId leaderId) { - return createClient(leaderId, conf.getPeers()); + return createClient(leaderId, group); } - public RaftClient createClient(RaftPeerId leaderId, Collection<RaftPeer> servers) { + public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) { return RaftClient.newBuilder() - .setServers(servers) + .setRaftGroup(group) .setLeaderId(leaderId) .setClientRpc(clientFactory.newRaftClientRpc()) .setProperties(properties) @@ -451,4 +454,8 @@ public abstract class MiniRaftCluster { /** Block/unblock the requests sent from the given source. */ public abstract void setBlockRequestsFrom(String src, boolean block); + + public RaftGroupId getGroupId() { + return group.getGroupId(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java index 83c88f5..43c9fff 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java @@ -104,7 +104,8 @@ public abstract class RaftNotLeaderExceptionBaseTest { for (int i = 0; reply == null && i < 10; i++) { try { reply = rpc.sendRequest( - new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_CALLID, + new RaftClientRequest(ClientId.createId(), leaderId, + cluster.getGroupId(), DEFAULT_CALLID, new SimpleMessage("m2"))); } catch (IOException ignored) { Thread.sleep(1000); @@ -151,7 +152,8 @@ public abstract class RaftNotLeaderExceptionBaseTest { for (int i = 0; reply == null && i < 10; i++) { try { reply = rpc.sendRequest( - new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_CALLID, + new RaftClientRequest(ClientId.createId(), leaderId, + cluster.getGroupId(), DEFAULT_CALLID, new SimpleMessage("m1"))); } catch (IOException ignored) { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java index a42d1bd..4317be1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java @@ -88,7 +88,7 @@ public abstract class RaftRetryCacheTests { final RaftClientRpc rpc = client.getClientRpc(); final long callId = 999; RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, - callId, new RaftTestUtil.SimpleMessage("message")); + cluster.getGroupId(), callId, new RaftTestUtil.SimpleMessage("message")); RaftClientReply reply = rpc.sendRequest(r); Assert.assertEquals(callId, reply.getCallId()); Assert.assertTrue(reply.isSuccess()); @@ -132,7 +132,7 @@ public abstract class RaftRetryCacheTests { RaftClientRpc rpc = client.getClientRpc(); final long callId = 999; RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, - callId, new RaftTestUtil.SimpleMessage("message")); + cluster.getGroupId(), callId, new RaftTestUtil.SimpleMessage("message")); RaftClientReply reply = rpc.sendRequest(r); Assert.assertEquals(callId, reply.getCallId()); Assert.assertTrue(reply.isSuccess()); @@ -144,7 +144,8 @@ public abstract class RaftRetryCacheTests { asList(change.newPeers)).allPeersInNewConf; // trigger setConfiguration SetConfigurationRequest request = new SetConfigurationRequest( - client.getId(), cluster.getLeader().getId(), DEFAULT_CALLID, allPeers); + client.getId(), cluster.getLeader().getId(), cluster.getGroupId(), + DEFAULT_CALLID, allPeers); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); @@ -152,7 +153,7 @@ public abstract class RaftRetryCacheTests { final RaftPeerId newLeaderId = cluster.getLeader().getId(); Assert.assertNotEquals(leaderId, newLeaderId); // same clientId and callId in the request - r = new RaftClientRequest(client.getId(), newLeaderId, + r = new RaftClientRequest(client.getId(), newLeaderId, cluster.getGroupId(), callId, new RaftTestUtil.SimpleMessage("message")); for (int i = 0; i < 10; i++) { try { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 20e66e6..2a80b37 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -92,7 +92,7 @@ public abstract class RaftReconfigurationBaseTest { // trigger setConfiguration SetConfigurationRequest request = new SetConfigurationRequest(clientId, - cluster.getLeader().getId(), DEFAULT_CALLID, allPeers); + cluster.getLeader().getId(), cluster.getGroupId(), DEFAULT_CALLID, allPeers); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); @@ -120,7 +120,7 @@ public abstract class RaftReconfigurationBaseTest { // trigger setConfiguration SetConfigurationRequest request = new SetConfigurationRequest(clientId, - cluster.getLeader().getId(), DEFAULT_CALLID, allPeers); + cluster.getLeader().getId(), cluster.getGroupId(), DEFAULT_CALLID, allPeers); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); @@ -158,7 +158,7 @@ public abstract class RaftReconfigurationBaseTest { // trigger setConfiguration SetConfigurationRequest request = new SetConfigurationRequest(clientId, - cluster.getLeader().getId(), DEFAULT_CALLID, allPeers); + cluster.getLeader().getId(), cluster.getGroupId(), DEFAULT_CALLID, allPeers); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); @@ -254,7 +254,7 @@ public abstract class RaftReconfigurationBaseTest { final RaftClientRpc sender = client.getClientRpc(); final SetConfigurationRequest request = new SetConfigurationRequest( - client.getId(), leaderId, DEFAULT_CALLID, c1.allPeersInNewConf); + client.getId(), leaderId, cluster.getGroupId(), DEFAULT_CALLID, c1.allPeersInNewConf); try { sender.sendRequest(request); Assert.fail("did not get expected exception"); @@ -472,7 +472,7 @@ public abstract class RaftReconfigurationBaseTest { LOG.info("client2 starts to change conf"); final RaftClientRpc sender2 = client2.getClientRpc(); sender2.sendRequest(new SetConfigurationRequest( - client2.getId(), leaderId, DEFAULT_CALLID, peersInRequest2)); + client2.getId(), leaderId, cluster.getGroupId(), DEFAULT_CALLID, peersInRequest2)); } catch (ReconfigurationInProgressException e) { caughtException.set(true); } catch (Exception e) { @@ -537,7 +537,7 @@ public abstract class RaftReconfigurationBaseTest { LOG.info("client starts to change conf"); final RaftClientRpc sender = client.getClientRpc(); RaftClientReply reply = sender.sendRequest(new SetConfigurationRequest( - client.getId(), leaderId, DEFAULT_CALLID, change.allPeersInNewConf)); + client.getId(), leaderId, cluster.getGroupId(), DEFAULT_CALLID, change.allPeersInNewConf)); if (reply.isNotLeader()) { gotNotLeader.set(true); } @@ -601,7 +601,7 @@ public abstract class RaftReconfigurationBaseTest { final RaftClientRpc sender = client.getClientRpc(); final RaftClientRequest request = new RaftClientRequest(client.getId(), - leaderId, 0, new SimpleMessage("test")); + leaderId, cluster.getGroupId(), 0, new SimpleMessage("test")); while (!success.get()) { try { RaftClientReply reply = sender.sendRequest(request);
