RATIS-13. Add global unique ID for Raft Client. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/16eb8cc6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/16eb8cc6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/16eb8cc6 Branch: refs/heads/master Commit: 16eb8cc6be5dfe06ab3cdd67250af16bd790f6ff Parents: 58ab69a Author: Jing Zhao <[email protected]> Authored: Thu Feb 16 15:21:06 2017 -0800 Committer: Jing Zhao <[email protected]> Committed: Thu Feb 16 15:21:06 2017 -0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 18 ++--- .../ratis/client/impl/ClientImplUtils.java | 9 ++- .../ratis/client/impl/ClientProtoUtils.java | 49 ++++++++----- .../ratis/client/impl/RaftClientImpl.java | 27 +++---- .../org/apache/ratis/protocol/ClientId.java | 73 +++++++++++++++++++ .../ratis/protocol/NotLeaderException.java | 2 +- .../ratis/protocol/RaftClientMessage.java | 51 ++++++++++++++ .../apache/ratis/protocol/RaftClientReply.java | 26 +++---- .../ratis/protocol/RaftClientRequest.java | 30 +++----- .../org/apache/ratis/protocol/RaftPeer.java | 10 +-- .../org/apache/ratis/protocol/RaftPeerId.java | 74 ++++++++++++++++++++ .../apache/ratis/protocol/RaftRpcMessage.java | 14 ++-- .../ratis/protocol/SetConfigurationRequest.java | 4 +- .../ratis/util/CodeInjectionForTesting.java | 6 +- .../org/apache/ratis/util/PeerProxyMap.java | 7 +- .../java/org/apache/ratis/util/ProtoUtils.java | 5 +- .../java/org/apache/ratis/util/RaftUtils.java | 22 ++++++ .../java/org/apache/ratis/util/StringUtils.java | 22 ------ .../arithmetic/ArithmeticStateMachine.java | 9 +-- .../java/org/apache/ratis/TestBatchAppend.java | 11 ++- .../org/apache/ratis/TestRestartRaftPeer.java | 9 ++- .../examples/arithmetic/TestArithmetic.java | 7 +- .../TestRaftStateMachineException.java | 7 +- .../org/apache/ratis/grpc/RaftGRpcService.java | 20 +++--- .../ratis/grpc/client/AppendStreamer.java | 33 ++++----- .../grpc/client/RaftClientProtocolService.java | 5 +- .../grpc/client/RaftClientSenderWithGrpc.java | 3 +- .../ratis/grpc/client/RaftOutputStream.java | 8 ++- .../grpc/server/RaftServerProtocolService.java | 5 +- .../ratis/grpc/MiniRaftClusterWithGRpc.java | 9 ++- .../org/apache/ratis/grpc/TestRaftStream.java | 11 +-- .../client/HadoopClientRequestSender.java | 2 +- .../hadooprpc/server/HadoopRpcService.java | 16 ++--- .../netty/client/NettyClientRequestSender.java | 3 +- .../ratis/netty/server/NettyRpcService.java | 17 +++-- .../ratis/netty/MiniRaftClusterWithNetty.java | 3 +- ratis-proto-shaded/src/main/proto/Raft.proto | 10 +-- .../org/apache/ratis/server/RaftServer.java | 14 ++-- .../ratis/server/impl/LeaderElection.java | 5 +- .../apache/ratis/server/impl/LeaderState.java | 11 +-- .../ratis/server/impl/PeerConfiguration.java | 17 ++--- .../ratis/server/impl/RaftConfiguration.java | 18 ++--- .../ratis/server/impl/RaftServerImpl.java | 42 ++++++----- .../ratis/server/impl/ServerImplUtils.java | 5 +- .../ratis/server/impl/ServerProtoUtils.java | 38 +++++----- .../apache/ratis/server/impl/ServerState.java | 29 ++++---- .../ratis/server/storage/MemoryRaftLog.java | 5 +- .../apache/ratis/server/storage/RaftLog.java | 15 ++-- .../ratis/server/storage/SegmentedRaftLog.java | 13 ++-- .../ratis/server/storage/SnapshotManager.java | 5 +- .../ratis/statemachine/BaseStateMachine.java | 9 +-- .../apache/ratis/statemachine/StateMachine.java | 5 +- .../java/org/apache/ratis/MiniRaftCluster.java | 36 +++++----- .../java/org/apache/ratis/RaftBasicTests.java | 13 ++-- .../ratis/RaftNotLeaderExceptionBaseTest.java | 20 +++--- .../java/org/apache/ratis/RaftTestUtil.java | 20 +++--- .../impl/BlockRequestHandlingInjection.java | 9 +-- .../impl/DelayLocalExecutionInjection.java | 12 ++-- .../impl/RaftReconfigurationBaseTest.java | 58 +++++++-------- .../ratis/server/impl/RaftServerTestUtil.java | 6 +- .../MiniRaftClusterWithSimulatedRpc.java | 7 +- .../server/simulation/RaftServerReply.java | 14 ++-- .../server/simulation/RaftServerRequest.java | 14 ++-- .../simulation/SimulatedRequestReply.java | 5 +- .../server/simulation/SimulatedServerRpc.java | 4 +- .../server/storage/TestSegmentedRaftLog.java | 3 +- .../statemachine/RaftSnapshotBaseTest.java | 18 ++--- .../SimpleStateMachine4Testing.java | 6 +- .../ratis/statemachine/TestStateMachine.java | 3 +- 69 files changed, 676 insertions(+), 440 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 72dc1ca..41022e2 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -20,16 +20,17 @@ package org.apache.ratis.client; import com.google.common.base.Preconditions; import org.apache.ratis.client.impl.ClientImplUtils; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.util.Collection; -import java.util.concurrent.atomic.AtomicInteger; /** A client who sends requests to a raft service. */ public interface RaftClient extends Closeable { @@ -37,7 +38,7 @@ public interface RaftClient extends Closeable { long DEFAULT_SEQNUM = 0; /** @return the id of this client. */ - String getId(); + ClientId getId(); /** @return the request sender of this client. */ RaftClientRequestSender getRequestSender(); @@ -62,12 +63,10 @@ public interface RaftClient extends Closeable { /** To build {@link RaftClient} objects. */ class Builder { - private static final AtomicInteger COUNT = new AtomicInteger(); - - private String clientId = RaftClient.class.getSimpleName() + COUNT.incrementAndGet(); + private ClientId clientId; private RaftClientRequestSender requestSender; private Collection<RaftPeer> servers; - private String leaderId; + private RaftPeerId leaderId; private RaftProperties properties; private int retryInterval = RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT; @@ -78,6 +77,9 @@ public interface RaftClient extends Closeable { Preconditions.checkNotNull(requestSender); Preconditions.checkNotNull(servers); + if (clientId == null) { + clientId = ClientId.createId(); + } if (leaderId == null) { leaderId = servers.iterator().next().getId(); //use the first peer } @@ -91,7 +93,7 @@ public interface RaftClient extends Closeable { } /** Set {@link RaftClient} ID. */ - public Builder setClientId(String clientId) { + public Builder setClientId(ClientId clientId) { this.clientId = clientId; return this; } @@ -103,7 +105,7 @@ public interface RaftClient extends Closeable { } /** Set leader ID. */ - public Builder setLeaderId(String leaderId) { + public Builder setLeaderId(RaftPeerId leaderId) { this.leaderId = leaderId; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java index 542f600..134e610 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java @@ -19,15 +19,18 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import java.util.Collection; /** Client utilities for internal use. */ public class ClientImplUtils { - public static RaftClient newRaftClient( - String clientId, Collection< RaftPeer > peers, String leaderId, + public static RaftClient newRaftClient(ClientId clientId, + Collection<RaftPeer> peers, RaftPeerId leaderId, RaftClientRequestSender requestSender, int retryInterval) { - return new RaftClientImpl(clientId, peers, leaderId, requestSender, retryInterval); + return new RaftClientImpl(clientId, peers, leaderId, requestSender, + retryInterval); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 24bb4ec..b0e1d41 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -26,39 +26,48 @@ import java.util.Arrays; public class ClientProtoUtils { public static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder( - String requestorId, String replyId, long seqNum, boolean success) { + byte[] requestorId, byte[] replyId, long seqNum, boolean success) { return RaftRpcReplyProto.newBuilder() - .setRequestorId(requestorId).setReplyId(replyId).setSeqNum(seqNum) + .setRequestorId(ProtoUtils.toByteString(requestorId)) + .setReplyId(ProtoUtils.toByteString(replyId)) + .setSeqNum(seqNum) .setSuccess(success); } public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( - String requesterId, String replyId, long seqNum) { + byte[] requesterId, byte[] replyId, long seqNum) { return RaftRpcRequestProto.newBuilder() - .setRequestorId(requesterId).setReplyId(replyId).setSeqNum(seqNum); + .setRequestorId(ProtoUtils.toByteString(requesterId)) + .setReplyId(ProtoUtils.toByteString(replyId)) + .setSeqNum(seqNum); } public static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) { - return new RaftClientRequest(p.getRpcRequest().getRequestorId(), - p.getRpcRequest().getReplyId(), p.getRpcRequest().getSeqNum(), + ClientId clientId = new ClientId( + p.getRpcRequest().getRequestorId().toByteArray()); + RaftPeerId serverId = new RaftPeerId( + p.getRpcRequest().getReplyId()); + return new RaftClientRequest(clientId, serverId, + p.getRpcRequest().getSeqNum(), toMessage(p.getMessage()), p.getReadOnly()); } public static RaftClientRequestProto toRaftClientRequestProto( RaftClientRequest request) { return RaftClientRequestProto.newBuilder() - .setRpcRequest(toRaftRpcRequestProtoBuilder(request.getRequestorId(), - request.getReplierId(), request.getSeqNum())) + .setRpcRequest(toRaftRpcRequestProtoBuilder(request.getClientId().toBytes(), + request.getServerId().toBytes(), request.getSeqNum())) .setMessage(toClientMessageEntryProto(request.getMessage())) .setReadOnly(request.isReadOnly()) .build(); } public static RaftClientRequestProto genRaftClientRequestProto( - String requestorId, String replierId, long seqNum, ByteString content, + ClientId clientId, RaftPeerId serverId, long seqNum, ByteString content, boolean readOnly) { return RaftClientRequestProto.newBuilder() - .setRpcRequest(toRaftRpcRequestProtoBuilder(requestorId, replierId, seqNum)) + .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(), + serverId.toBytes(), seqNum)) .setMessage(ClientMessageEntryProto.newBuilder().setContent(content)) .setReadOnly(readOnly) .build(); @@ -68,8 +77,8 @@ public class ClientProtoUtils { RaftClientReply reply) { final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder(); if (reply != null) { - b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getRequestorId(), - reply.getReplierId(), reply.getSeqNum(), reply.isSuccess())); + b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toBytes(), + reply.getServerId().toBytes(), reply.getSeqNum(), reply.isSuccess())); if (reply.getMessage() != null) { b.setMessage(toClientMessageEntryProto(reply.getMessage())); } @@ -96,9 +105,11 @@ public class ClientProtoUtils { ProtoUtils.toRaftPeer(replyProto.getSuggestedLeader()) : null; final RaftPeer[] peers = ProtoUtils.toRaftPeerArray( replyProto.getPeersInConfList()); - e = new NotLeaderException(rp.getReplyId(), suggestedLeader, peers); + e = new NotLeaderException(new RaftPeerId(rp.getReplyId()), + suggestedLeader, peers); } - return new RaftClientReply(rp.getRequestorId(), rp.getReplyId(), + return new RaftClientReply(new ClientId(rp.getRequestorId().toByteArray()), + new RaftPeerId(rp.getReplyId()), rp.getSeqNum(), rp.getSuccess(), toMessage(replyProto.getMessage()), e); } @@ -115,15 +126,19 @@ public class ClientProtoUtils { SetConfigurationRequestProto p) { final RaftRpcRequestProto m = p.getRpcRequest(); final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList()); - return new SetConfigurationRequest(m.getRequestorId(), m.getReplyId(), + return new SetConfigurationRequest( + new ClientId(m.getRequestorId().toByteArray()), + new RaftPeerId(m.getReplyId()), p.getRpcRequest().getSeqNum(), peers); } public static SetConfigurationRequestProto toSetConfigurationRequestProto( SetConfigurationRequest request) { return SetConfigurationRequestProto.newBuilder() - .setRpcRequest(toRaftRpcRequestProtoBuilder(request.getRequestorId(), - request.getReplierId(), request.getSeqNum())) + .setRpcRequest(toRaftRpcRequestProtoBuilder( + request.getClientId().toBytes(), + request.getServerId().toBytes(), + request.getSeqNum())) .addAllPeers(ProtoUtils.toRaftPeerProtos( Arrays.asList(request.getPeersInNewConf()))) .build(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 2f5d450..d29eebe 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -21,7 +21,6 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.protocol.*; import org.apache.ratis.util.RaftUtils; -import org.apache.ratis.util.StringUtils; import java.io.IOException; import java.io.InterruptedIOException; @@ -34,16 +33,16 @@ import java.util.stream.Collectors; /** A client who sends requests to a raft service. */ final class RaftClientImpl implements RaftClient { - private final String clientId; + private final ClientId clientId; private final RaftClientRequestSender requestSender; - private final Map<String, RaftPeer> peers; + private final Map<RaftPeerId, RaftPeer> peers; private final int retryInterval; - private volatile String leaderId; + private volatile RaftPeerId leaderId; - RaftClientImpl( - String clientId, Collection<RaftPeer> peers, String leaderId, - RaftClientRequestSender requestSender, int retryInterval) { + RaftClientImpl(ClientId clientId, Collection<RaftPeer> peers, + RaftPeerId leaderId, RaftClientRequestSender requestSender, + int retryInterval) { this.clientId = clientId; this.requestSender = requestSender; this.peers = peers.stream().collect( @@ -53,7 +52,7 @@ final class RaftClientImpl implements RaftClient { } @Override - public String getId() { + public ClientId getId() { return clientId; } @@ -121,9 +120,10 @@ final class RaftClientImpl implements RaftClient { return null; } - private void handleNotLeaderException(RaftClientRequest request, NotLeaderException nle) { + private void handleNotLeaderException(RaftClientRequest request, + NotLeaderException nle) { refreshPeers(nle.getPeers()); - final String newLeader = nle.getSuggestedLeader() == null? null + final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null : nle.getSuggestedLeader().getId(); handleIOException(request, nle, newLeader); } @@ -139,11 +139,12 @@ final class RaftClientImpl implements RaftClient { } } - private void handleIOException(RaftClientRequest request, IOException ioe, String newLeader) { + private void handleIOException(RaftClientRequest request, IOException ioe, + RaftPeerId newLeader) { LOG.debug("{}: Failed with {}", clientId, ioe); - final String oldLeader = request.getReplierId(); + final RaftPeerId oldLeader = request.getServerId(); if (newLeader == null && oldLeader.equals(leaderId)) { - newLeader = StringUtils.next(oldLeader, peers.keySet()); + newLeader = RaftUtils.next(oldLeader, peers.keySet()); } if (newLeader != null && oldLeader.equals(leaderId)) { LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, newLeader); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java new file mode 100644 index 0000000..ece7c50 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import com.google.common.base.Preconditions; + +import java.nio.ByteBuffer; +import java.util.UUID; + +/** + * Id of Raft client. Should be globally unique so that raft peers can use it + * to correctly identify retry requests from the same client. + */ +public class ClientId { + public static final int BYTE_LENGTH = 16; + + public static ClientId createId() { + UUID uuid = UUID.randomUUID(); + return new ClientId(uuid); + } + + private final UUID uuid; + + private ClientId(UUID id) { + Preconditions.checkNotNull(uuid = id); + } + + public ClientId(byte[] data) { + Preconditions.checkArgument(data != null && data.length == BYTE_LENGTH); + ByteBuffer buffer = ByteBuffer.wrap(data); + this.uuid = new UUID(buffer.getLong(), buffer.getLong()); + } + + public byte[] toBytes() { + ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]); + buf.putLong(uuid.getMostSignificantBits()); + buf.putLong(uuid.getLeastSignificantBits()); + return buf.array(); + } + + @Override + public String toString() { + return uuid.toString(); + } + + + @Override + public boolean equals(Object other) { + return other == this || + (other instanceof ClientId && + uuid.equals(((ClientId) other).uuid)); + } + + @Override + public int hashCode() { + return uuid.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java index 1306290..062f5de 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java @@ -22,7 +22,7 @@ public class NotLeaderException extends RaftException { /** the client may need to update its RaftPeer list */ private final RaftPeer[] peers; - public NotLeaderException(String id, RaftPeer suggestedLeader, + public NotLeaderException(RaftPeerId id, RaftPeer suggestedLeader, RaftPeer[] peers) { super("Server " + id + " is not the leader (" + suggestedLeader + "). Request must be sent to leader."); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java new file mode 100644 index 0000000..f205f34 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +public abstract class RaftClientMessage implements RaftRpcMessage { + private final ClientId clientId; + private final RaftPeerId serverId; + + public RaftClientMessage(ClientId clientId, RaftPeerId serverId) { + this.clientId = clientId; + this.serverId = serverId; + } + + @Override + public String getRequestorId() { + return clientId.toString(); + } + + @Override + public String getReplierId() { + return serverId.toString(); + } + + public ClientId getClientId() { + return clientId; + } + + public RaftPeerId getServerId() { + return serverId; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + clientId + "->" + serverId + ")"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index 8c5cd75..459e0f4 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -17,9 +17,10 @@ */ package org.apache.ratis.protocol; -public class RaftClientReply extends RaftRpcMessage { - private final String requestorId; - private final String replierId; +/** + * Reply from server to client + */ +public class RaftClientReply extends RaftClientMessage { private final boolean success; private final long seqNum; @@ -27,10 +28,9 @@ public class RaftClientReply extends RaftRpcMessage { private final NotLeaderException notLeaderException; private final Message message; - public RaftClientReply(String requestorId, String replierId, long seqNum, + public RaftClientReply(ClientId clientId, RaftPeerId serverId, long seqNum, boolean success, Message message, NotLeaderException notLeaderException) { - this.requestorId = requestorId; - this.replierId = replierId; + super(clientId, serverId); this.success = success; this.seqNum = seqNum; this.message = message; @@ -39,12 +39,12 @@ public class RaftClientReply extends RaftRpcMessage { public RaftClientReply(RaftClientRequest request, NotLeaderException notLeaderException) { - this(request.getRequestorId(), request.getReplierId(), request.getSeqNum(), + this(request.getClientId(), request.getServerId(), request.getSeqNum(), false, null, notLeaderException); } public RaftClientReply(RaftClientRequest request, Message message) { - this(request.getRequestorId(), request.getReplierId(), request.getSeqNum(), + this(request.getClientId(), request.getServerId(), request.getSeqNum(), true, message, null); } @@ -53,16 +53,6 @@ public class RaftClientReply extends RaftRpcMessage { return false; } - @Override - public String getRequestorId() { - return requestorId; - } - - @Override - public String getReplierId() { - return replierId; - } - public long getSeqNum() { return seqNum; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index 90b648a..9b649c9 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -17,22 +17,22 @@ */ package org.apache.ratis.protocol; -public class RaftClientRequest extends RaftRpcMessage { - private final String requestorId; - private final String replierId; +/** + * Request from client to server + */ +public class RaftClientRequest extends RaftClientMessage { private final long seqNum; private final Message message; private final boolean readOnly; - public RaftClientRequest(String requestorId, String replierId, long seqNum, - Message message) { - this(requestorId, replierId, seqNum, message, false); + public RaftClientRequest(ClientId clientId, RaftPeerId serverId, + long seqNum, Message message) { + this(clientId, serverId, seqNum, message, false); } - public RaftClientRequest(String requestorId, String replierId, long seqNum, - Message message, boolean readOnly) { - this.requestorId = requestorId; - this.replierId = replierId; + public RaftClientRequest(ClientId clientId, RaftPeerId serverId, + long seqNum, Message message, boolean readOnly) { + super(clientId, serverId); this.seqNum = seqNum; this.message = message; this.readOnly = readOnly; @@ -43,16 +43,6 @@ public class RaftClientRequest extends RaftRpcMessage { return true; } - @Override - public String getRequestorId() { - return requestorId; - } - - @Override - public String getReplierId() { - return replierId; - } - public long getSeqNum() { return seqNum; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java index a32aaa0..cfa595c 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java @@ -30,30 +30,30 @@ public class RaftPeer { public static final RaftPeer[] EMPTY_PEERS = {}; /** The id of the peer. */ - private final String id; + private final RaftPeerId id; /** The address of the peer. */ private final String address; /** Construct a peer with the given id and a null address. */ - public RaftPeer(String id) { + public RaftPeer(RaftPeerId id) { this(id, (String)null); } /** Construct a peer with the given id and address. */ - public RaftPeer(String id, InetSocketAddress address) { + public RaftPeer(RaftPeerId id, InetSocketAddress address) { this(id, address == null ? null : HostAndPort.fromParts(address.getAddress().getHostAddress(), address.getPort()).toString()); } /** Construct a peer with the given id and address. */ - public RaftPeer(String id, String address) { + public RaftPeer(RaftPeerId id, String address) { this.id = id; this.address = address; } /** @return The id of the peer. */ - public String getId() { + public RaftPeerId getId() { return id; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java new file mode 100644 index 0000000..55cbb6d --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import com.google.common.base.Preconditions; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * Id of Raft Peer. Should be globally unique. + */ +public class RaftPeerId { + public static RaftPeerId getRaftPeerId(String id) { + return id == null || id.isEmpty() ? null : new RaftPeerId(id); + } + + /** UTF-8 string as id */ + private final byte[] id; + + public RaftPeerId(String id) { + Preconditions.checkArgument(id != null && !id.isEmpty()); + this.id = id.getBytes(StandardCharsets.UTF_8); + } + + public RaftPeerId(byte[] id) { + this.id = id; + } + + public RaftPeerId(ByteString id) { + this(id.toByteArray()); + } + + /** + * @return id in byte[]. + */ + public byte[] toBytes() { + return id; + } + + @Override + public String toString() { + return new String(id, StandardCharsets.UTF_8); + } + + @Override + public boolean equals(Object other) { + return other == this || + (other instanceof RaftPeerId && + Arrays.equals(id, ((RaftPeerId) other).id)); + } + + @Override + public int hashCode() { + return Arrays.hashCode(id); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java index 82f1ebb..58aa9ee 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java @@ -17,17 +17,11 @@ */ package org.apache.ratis.protocol; -public abstract class RaftRpcMessage { +public interface RaftRpcMessage { - public abstract boolean isRequest(); + boolean isRequest(); - public abstract String getRequestorId(); + String getRequestorId(); - public abstract String getReplierId(); - - @Override - public String toString() { - return getClass().getSimpleName() + "(" + getRequestorId() - + (isRequest()? "->": "<-") + getReplierId() + ")"; - } + String getReplierId(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java index 84449d4..6bc34f4 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java @@ -22,9 +22,9 @@ import java.util.Arrays; public class SetConfigurationRequest extends RaftClientRequest { private final RaftPeer[] peers; - public SetConfigurationRequest(String requestorId, String replierId, + public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId, long seqNum, RaftPeer[] peers) { - super(requestorId, replierId, seqNum, null); + super(clientId, serverId, seqNum, null); this.peers = peers; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java index 60b5daf..922948b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java @@ -39,7 +39,7 @@ public class CodeInjectionForTesting { * @param args other possible args * @return if the injected code is executed */ - boolean execute(String localId, String remoteId, Object... args); + boolean execute(Object localId, Object remoteId, Object... args); } private static final Map<String, Code> INJECTION_POINTS @@ -52,8 +52,8 @@ public class CodeInjectionForTesting { } /** Execute the injected code, if there is any. */ - public static boolean execute(String injectionPoint, String localId, - String remoteId, Object... args) { + public static boolean execute(String injectionPoint, Object localId, + Object remoteId, Object... args) { final Code code = INJECTION_POINTS.get(injectionPoint); if (code == null) { return false; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java index 9466719..6a07b77 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java @@ -20,6 +20,7 @@ package org.apache.ratis.util; import com.google.common.base.Preconditions; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +75,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable { } } - private final Map<String, PeerAndProxy> peers = new ConcurrentHashMap<>(); + private final Map<RaftPeerId, PeerAndProxy> peers = new ConcurrentHashMap<>(); private final Object resetLock = new Object(); private final CheckedFunction<RaftPeer, PROXY, IOException> createProxy; @@ -86,7 +87,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable { this.createProxy = this::createProxyImpl; } - public PROXY getProxy(String id) throws IOException { + public PROXY getProxy(RaftPeerId id) throws IOException { PeerAndProxy p = peers.get(id); if (p == null) { synchronized (resetLock) { @@ -108,7 +109,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable { peers.putIfAbsent(p.getId(), new PeerAndProxy(p)); } - public void resetProxy(String id) { + public void resetProxy(RaftPeerId id) { synchronized (resetLock) { final PeerAndProxy pp = peers.remove(id); final RaftPeer peer = pp.getPeer(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index 8dc822b..8d9c25e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.ServiceException; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; @@ -70,7 +71,7 @@ public class ProtoUtils { public static RaftPeerProto toRaftPeerProto(RaftPeer peer) { RaftPeerProto.Builder builder = RaftPeerProto.newBuilder() - .setId(peer.getId()); + .setId(toByteString(peer.getId().toBytes())); if (peer.getAddress() != null) { builder.setAddress(peer.getAddress()); } @@ -78,7 +79,7 @@ public class ProtoUtils { } public static RaftPeer toRaftPeer(RaftPeerProto p) { - return new RaftPeer(p.getId(), p.getAddress()); + return new RaftPeer(new RaftPeerId(p.getId()), p.getAddress()); } public static RaftPeer[] toRaftPeerArray(List<RaftPeerProto> protos) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java index 8621cef..0b3d24e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java @@ -27,6 +27,7 @@ import java.io.*; import java.lang.reflect.Constructor; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -237,4 +238,25 @@ public abstract class RaftUtils { } } } + + /** + * @return the next element in the iteration right after the given element; + * if the given element is not in the iteration, return the first one + */ + public static <T> T next(final T given, final Iterable<T> iteration) { + Preconditions.checkNotNull(given); + Preconditions.checkNotNull(iteration); + final Iterator<T> i = iteration.iterator(); + Preconditions.checkArgument(i.hasNext()); + + final T first = i.next(); + for(T current = first; i.hasNext(); ) { + final T next = i.next(); + if (given.equals(current)) { + return next; + } + current = next; + } + return first; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java index 511ebbe..f17ee93 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Interner; import com.google.common.collect.Interners; -import java.util.Iterator; import java.util.Locale; public class StringUtils { @@ -84,25 +83,4 @@ public class StringUtils { return defaultValue; } } - - /** - * @return the next string in the iteration right after the given string; - * if the given string is not in the iteration, return the first string. - */ - public static String next(final String given, final Iterable<String> iteration) { - Preconditions.checkNotNull(given); - Preconditions.checkNotNull(iteration); - final Iterator<String> i = iteration.iterator(); - Preconditions.checkArgument(i.hasNext()); - - final String first = i.next(); - for(String current = first; i.hasNext(); ) { - final String next = i.next(); - if (given.equals(current)) { - return next; - } - current = next; - } - return first; - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java index b241f3a..e400817 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java @@ -24,6 +24,7 @@ import org.apache.ratis.examples.arithmetic.expression.Expression; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; @@ -65,16 +66,16 @@ public class ArithmeticStateMachine extends BaseStateMachine { } @Override - public void initialize(String id, RaftProperties properties, RaftStorage raftStorage) - throws IOException { + public void initialize(RaftPeerId id, RaftProperties properties, + RaftStorage raftStorage) throws IOException { super.initialize(id, properties, raftStorage); this.storage.init(raftStorage); loadSnapshot(storage.getLatestSnapshot()); } @Override - public void reinitialize(String id, RaftProperties properties, RaftStorage storage) - throws IOException { + public void reinitialize(RaftPeerId id, RaftProperties properties, + RaftStorage storage) throws IOException { close(); this.initialize(id, properties, storage); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java b/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java index 15f65a9..71d21eb 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java @@ -18,12 +18,11 @@ package org.apache.ratis; import org.apache.log4j.Level; -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.examples.RaftExamplesTestUtil; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.simulation.RequestHandler; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; @@ -95,9 +94,9 @@ public class TestBatchAppend { private final SimpleMessage[] msgs; private final AtomicBoolean succeed = new AtomicBoolean(false); - Sender(String clientId, String leaderId, CountDownLatch latch, int numMsg) { + Sender(RaftPeerId leaderId, CountDownLatch latch, int numMsg) { this.latch = latch; - this.client = cluster.createClient(clientId, leaderId); + this.client = cluster.createClient(leaderId); msgs = generateMsgs(numMsg); } @@ -144,12 +143,12 @@ public class TestBatchAppend { final int numClients = 5; cluster.start(); RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); + final RaftPeerId leaderId = cluster.getLeader().getId(); // start several clients and write concurrently CountDownLatch latch = new CountDownLatch(1); final List<Sender> senders = Stream.iterate(0, i -> i+1).limit(numClients) - .map(i -> new Sender("c" + i, leaderId, latch, numMsgs)) + .map(i -> new Sender(leaderId, latch, numMsgs)) .collect(Collectors.toList()); senders.forEach(Thread::start); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java index 37b4064..60f06cd 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java @@ -18,12 +18,11 @@ package org.apache.ratis; import org.apache.log4j.Level; -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.examples.RaftExamplesTestUtil; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.simulation.RequestHandler; @@ -76,8 +75,8 @@ public class TestRestartRaftPeer { public void restartFollower() throws Exception { cluster.start(); RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); // write some messages final byte[] content = new byte[1024]; @@ -88,7 +87,7 @@ public class TestRestartRaftPeer { } // restart a follower - String followerId = cluster.getFollowers().get(0).getId(); + String followerId = cluster.getFollowers().get(0).getId().toString(); LOG.info("Restart follower {}", followerId); cluster.restartServer(followerId, false); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java index 44cf894..f4bcd0a 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java @@ -23,10 +23,9 @@ import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; import org.apache.ratis.examples.RaftExamplesTestUtil; -import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine; -import org.apache.ratis.examples.arithmetic.AssignmentMessage; import org.apache.ratis.examples.arithmetic.expression.*; import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.RaftUtils; import org.junit.Assert; import org.junit.Test; @@ -54,8 +53,8 @@ public class TestArithmetic { public void testPythagorean() throws Exception { cluster.start(); RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("pythagorean", leaderId); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); final Variable a = new Variable("a"); final Variable b = new Variable("b"); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java index 1f885ea..e38e245 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -23,12 +23,11 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; import org.apache.ratis.examples.RaftExamplesTestUtil; import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.simulation.RequestHandler; import org.apache.ratis.server.storage.RaftLog; -import org.apache.ratis.statemachine.SimpleStateMachine4Testing; -import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.RaftUtils; import org.junit.Assert; import org.junit.Test; @@ -73,9 +72,9 @@ public class TestRaftStateMachineException { cluster.start(); RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); + final RaftPeerId leaderId = cluster.getLeader().getId(); - try(final RaftClient client = cluster.createClient("client", leaderId)) { + try(final RaftClient client = cluster.createClient(leaderId)) { client.send(new RaftTestUtil.SimpleMessage("m")); fail("Exception expected"); } catch (StateMachineException e) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index 9ea23c3..48acf35 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -23,6 +23,7 @@ import org.apache.ratis.grpc.client.RaftClientProtocolService; import org.apache.ratis.grpc.server.RaftServerProtocolClient; import org.apache.ratis.grpc.server.RaftServerProtocolService; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.shaded.io.grpc.Server; @@ -89,9 +90,9 @@ public class RaftGRpcService implements RaftServerRpc { private final Server server; private final InetSocketAddress address; - private final Map<String, RaftServerProtocolClient> peers = + private final Map<RaftPeerId, RaftServerProtocolClient> peers = Collections.synchronizedMap(new HashMap<>()); - private final String selfId; + private final RaftPeerId selfId; private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) { ServerBuilder serverBuilder = ServerBuilder.forPort(port); @@ -118,14 +119,11 @@ public class RaftGRpcService implements RaftServerRpc { } catch (IOException e) { ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG); } - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - RaftGRpcService.this.close(); - System.err.println("*** server shut down"); - } - }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + RaftGRpcService.this.close(); + System.err.println("*** server shut down"); + })); } @Override @@ -162,7 +160,7 @@ public class RaftGRpcService implements RaftServerRpc { null, request); RaftServerProtocolClient target = Preconditions.checkNotNull( - peers.get(request.getServerRequest().getReplyId())); + peers.get(new RaftPeerId(request.getServerRequest().getReplyId()))); return target.requestVote(request); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index cb05b33..4f96c06 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -19,6 +19,8 @@ package org.apache.ratis.grpc.client; import com.google.common.base.Preconditions; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; @@ -32,7 +34,6 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.PeerProxyMap; import org.apache.ratis.util.RaftUtils; -import org.apache.ratis.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,7 @@ public class AppendStreamer implements Closeable { enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR} private static class ExceptionAndRetry { - private final Map<String, IOException> exceptionMap = new HashMap<>(); + private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>(); private final AtomicInteger retryTimes = new AtomicInteger(0); private final int maxRetryTimes; private final long retryInterval; @@ -70,7 +71,7 @@ public class AppendStreamer implements Closeable { TimeUnit.MILLISECONDS); } - void addException(String peer, IOException e) { + void addException(RaftPeerId peer, IOException e) { exceptionMap.put(peer, e); retryTimes.incrementAndGet(); } @@ -89,17 +90,17 @@ public class AppendStreamer implements Closeable { private final int maxPendingNum; private final PeerProxyMap<RaftClientProtocolProxy> proxyMap; - private final Map<String, RaftPeer> peers; - private String leaderId; + private final Map<RaftPeerId, RaftPeer> peers; + private RaftPeerId leaderId; private volatile RaftClientProtocolProxy leaderProxy; - private final String clientId; + private final ClientId clientId; private volatile RunningState running = RunningState.RUNNING; private final ExceptionAndRetry exceptionAndRetry; private final Sender senderThread; AppendStreamer(RaftProperties prop, Collection<RaftPeer> peers, - String leaderId, String clientId) { + RaftPeerId leaderId, ClientId clientId) { this.clientId = clientId; maxPendingNum = prop.getInt( RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY, @@ -120,15 +121,15 @@ public class AppendStreamer implements Closeable { senderThread.start(); } - private synchronized void refreshLeaderProxy(String suggested, - String oldLeader) { + private synchronized void refreshLeaderProxy(RaftPeerId suggested, + RaftPeerId oldLeader) { if (suggested != null) { leaderId = suggested; } else { if (oldLeader == null) { leaderId = peers.keySet().iterator().next(); } else { - leaderId = StringUtils.next(oldLeader, peers.keySet()); + leaderId = RaftUtils.next(oldLeader, peers.keySet()); } } LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this, @@ -248,7 +249,7 @@ public class AppendStreamer implements Closeable { /** the response handler for stream RPC */ private class ResponseHandler implements RaftClientProtocolProxy.CloseableStreamObserver { - private final String targetId; + private final RaftPeerId targetId; // once handled the first NotLeaderException or Error, the handler should // be inactive and should not make any further action. private volatile boolean active = true; @@ -285,7 +286,7 @@ public class AppendStreamer implements Closeable { RaftClientReply r = toRaftClientReply(reply); if (r.isNotLeader()) { LOG.debug("{} received a NotLeaderException from {}", this, - r.getReplierId()); + r.getServerId()); handleNotLeader(r.getNotLeaderException(), targetId); } } @@ -324,7 +325,7 @@ public class AppendStreamer implements Closeable { } private void handleNotLeader(NotLeaderException nle, - String oldLeader) { + RaftPeerId oldLeader) { Preconditions.checkState(Thread.holdsLock(AppendStreamer.this)); // handle NotLeaderException: refresh leader and RaftConfiguration refreshPeers(nle.getPeers()); @@ -349,7 +350,7 @@ public class AppendStreamer implements Closeable { } } - private void refreshLeader(String suggestedLeader, String oldLeader) { + private void refreshLeader(RaftPeerId suggestedLeader, RaftPeerId oldLeader) { running = RunningState.LOOK_FOR_LEADER; refreshLeaderProxy(suggestedLeader, oldLeader); reQueuePendingRequests(leaderId); @@ -364,7 +365,7 @@ public class AppendStreamer implements Closeable { leaderProxy.onNext(request); } - private void reQueuePendingRequests(String newLeader) { + private void reQueuePendingRequests(RaftPeerId newLeader) { if (isRunning()) { // resend all the pending requests while (!ackQueue.isEmpty()) { @@ -374,7 +375,7 @@ public class AppendStreamer implements Closeable { .setMessage(oldRequest.getMessage()) .setReadOnly(oldRequest.getReadOnly()) .setRpcRequest(toRaftRpcRequestProtoBuilder( - clientId, newLeader, r.getSeqNum())) + clientId.toBytes(), newLeader.toBytes(), r.getSeqNum())) .build(); dataQueue.offerFirst(newRequest); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java index bb212e1..99d8778 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java @@ -19,6 +19,7 @@ package org.apache.ratis.grpc.client; import com.google.common.base.Preconditions; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; @@ -65,10 +66,10 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase } private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE); - private final String id; + private final RaftPeerId id; private final RaftClientAsynchronousProtocol client; - public RaftClientProtocolService(String id, RaftClientAsynchronousProtocol client) { + public RaftClientProtocolService(RaftPeerId id, RaftClientAsynchronousProtocol client) { this.id = id; this.client = client; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java index 6d9e11f..50b05da 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.grpc.client; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; @@ -54,7 +55,7 @@ public class RaftClientSenderWithGrpc implements RaftClientRequestSender { @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { - final String serverId = request.getReplierId(); + final RaftPeerId serverId = request.getServerId(); final RaftClientProtocolClient proxy = proxies.getProxy(serverId); if (request instanceof SetConfigurationRequest) { SetConfigurationRequestProto setConf = http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java index a3905f8..8f0e183 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java @@ -25,7 +25,9 @@ import java.io.OutputStream; import java.util.Collection; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.ProtoUtils; public class RaftOutputStream extends OutputStream { @@ -33,13 +35,13 @@ public class RaftOutputStream extends OutputStream { private final byte buf[]; private int count; private long seqNum = 0; - private final String clientId; + private final ClientId clientId; private final AppendStreamer streamer; private boolean closed = false; - public RaftOutputStream(RaftProperties prop, String clientId, - Collection<RaftPeer> peers, String leaderId) { + public RaftOutputStream(RaftProperties prop, ClientId clientId, + Collection<RaftPeer> peers, RaftPeerId leaderId) { final int bufferSize = prop.getInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT); buf = new byte[bufferSize]; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java index 08e6a51..3e5ae0d 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java @@ -18,6 +18,7 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; @@ -33,10 +34,10 @@ import org.slf4j.LoggerFactory; public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase { public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class); - private final String id; + private final RaftPeerId id; private final RaftServerProtocol server; - public RaftServerProtocolService(String id, RaftServerProtocol server) { + public RaftServerProtocolService(RaftPeerId id, RaftServerProtocol server) { this.id = id; this.server = server; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java index 7a996eb..85829e5 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java @@ -23,11 +23,10 @@ import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.RaftGRpcService; -import org.apache.ratis.grpc.RaftGrpcConfigKeys; import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc; import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; import org.apache.ratis.server.impl.LogAppenderFactory; @@ -101,7 +100,7 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { RaftServerImpl server = servers.get(entry.getKey().getId()); server.setServerRpc(entry.getValue()); if (!startService) { - BlockRequestHandlingInjection.getInstance().blockReplier(server.getId()); + BlockRequestHandlingInjection.getInstance().blockReplier(server.getId().toString()); } else { server.start(); } @@ -130,9 +129,9 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { } @Override - public void startServer(String id) { + public void startServer(RaftPeerId id) { super.startServer(id); - BlockRequestHandlingInjection.getInstance().unblockReplier(id); + BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java index f4c8d27..99e98c6 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -18,6 +18,7 @@ package org.apache.ratis.grpc; import org.apache.log4j.Level; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; @@ -98,8 +99,8 @@ public class TestRaftStream { RaftServerImpl leader = waitForLeader(cluster); int count = 1; - try (RaftOutputStream out = new RaftOutputStream(prop, "writer-1", - cluster.getPeers(), leader.getId())) { + try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(), + cluster.getPeers(), leader.getId())) { for (int i = 0; i < 500; i++) { // generate 500 requests out.write(genContent(count++)); } @@ -138,7 +139,7 @@ public class TestRaftStream { cluster.start(); RaftServerImpl leader = waitForLeader(cluster); - RaftOutputStream out = new RaftOutputStream(prop, "writer", + RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(), cluster.getPeers(), leader.getId()); int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072}; @@ -217,7 +218,7 @@ public class TestRaftStream { cluster.start(); RaftServerImpl leader = waitForLeader(cluster); - RaftOutputStream out = new RaftOutputStream(prop, "writer", + RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(), cluster.getPeers(), leader.getId()); byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2]; @@ -283,7 +284,7 @@ public class TestRaftStream { new Thread(() -> { LOG.info("Writer thread starts"); int count = 0; - try (RaftOutputStream out = new RaftOutputStream(prop, "writer", + try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(), cluster.getPeers(), leader.getId())) { while (running.get()) { out.write(toBytes(count++)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java index aac7c31..116be19 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java @@ -40,7 +40,7 @@ public class HadoopClientRequestSender implements RaftClientRequestSender { @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { - final String serverId = request.getReplierId(); + final RaftPeerId serverId = request.getServerId(); final RaftClientProtocolClientSideTranslatorPB proxy = proxies.getProxy(serverId); try { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java index 4d69797..eb93c5c 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.hadooprpc.server; -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.ProtobufRpcEngineShaded; import org.apache.hadoop.ipc.RPC; @@ -26,6 +25,7 @@ import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB; import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB; import org.apache.ratis.protocol.RaftClientProtocol; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; @@ -81,7 +81,7 @@ public class HadoopRpcService implements RaftServerRpc { return new Builder(); } - private final String id; + private final RaftPeerId id; private final RPC.Server ipcServer; private final InetSocketAddress ipcServerAddress; @@ -160,11 +160,11 @@ public class HadoopRpcService implements RaftServerRpc { @Override public AppendEntriesReplyProto appendEntries( AppendEntriesRequestProto request) throws IOException { - Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); final RaftServerProtocolPB proxy = proxies.getProxy( - request.getServerRequest().getReplyId()).getProtocol(); + new RaftPeerId(request.getServerRequest().getReplyId())) + .getProtocol(); try { return proxy.appendEntries(null, request); } catch (ServiceException se) { @@ -175,11 +175,11 @@ public class HadoopRpcService implements RaftServerRpc { @Override public InstallSnapshotReplyProto installSnapshot( InstallSnapshotRequestProto request) throws IOException { - Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); final RaftServerProtocolPB proxy = proxies.getProxy( - request.getServerRequest().getReplyId()).getProtocol(); + new RaftPeerId(request.getServerRequest().getReplyId())) + .getProtocol(); try { return proxy.installSnapshot(null, request); } catch (ServiceException se) { @@ -190,11 +190,11 @@ public class HadoopRpcService implements RaftServerRpc { @Override public RequestVoteReplyProto requestVote( RequestVoteRequestProto request) throws IOException { - Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); final RaftServerProtocolPB proxy = proxies.getProxy( - request.getServerRequest().getReplyId()).getProtocol(); + new RaftPeerId(request.getServerRequest().getReplyId())) + .getProtocol(); try { return proxy.requestVote(null, request); } catch (ServiceException se) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java index 38d806b..1604b5c 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.netty.client; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; @@ -40,7 +41,7 @@ public class NettyClientRequestSender implements RaftClientRequestSender { @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { - final String serverId = request.getReplierId(); + final RaftPeerId serverId = request.getServerId(); final NettyRpcProxy proxy = proxies.getProxy(serverId); final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index b3f2efb..a659665 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -26,6 +26,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; @@ -78,7 +79,7 @@ public final class NettyRpcService implements RaftServerRpc { private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName()); private final RaftServer server; - private final String id; + private final RaftPeerId id; private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); @@ -209,10 +210,11 @@ public final class NettyRpcService implements RaftServerRpc { private static RaftNettyServerReplyProto toRaftNettyServerReplyProto( RaftRpcRequestProto request, IOException e) { - final RaftRpcReplyProto.Builder rpcReply = ClientProtoUtils.toRaftRpcReplyProtoBuilder( - request.getRequestorId(), - request.getReplyId(), - request.getSeqNum(), false); + final RaftRpcReplyProto.Builder rpcReply = RaftRpcReplyProto.newBuilder() + .setRequestorId(request.getRequestorId()) + .setReplyId(request.getReplyId()) + .setSeqNum(request.getSeqNum()) + .setSuccess(false); final RaftNettyExceptionReplyProto.Builder ioe = RaftNettyExceptionReplyProto.newBuilder() .setRpcReply(rpcReply) .setException(ProtoUtils.toByteString(e)); @@ -221,7 +223,6 @@ public final class NettyRpcService implements RaftServerRpc { @Override public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException { - Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder() @@ -233,7 +234,6 @@ public final class NettyRpcService implements RaftServerRpc { @Override public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException { - Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder() @@ -245,7 +245,6 @@ public final class NettyRpcService implements RaftServerRpc { @Override public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException { - Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder() @@ -258,7 +257,7 @@ public final class NettyRpcService implements RaftServerRpc { private RaftNettyServerReplyProto sendRaftNettyServerRequestProto( RaftRpcRequestProto request, RaftNettyServerRequestProto proto) throws IOException { - final String id = request.getReplyId(); + final RaftPeerId id = new RaftPeerId(request.getReplyId()); final NettyRpcProxy p = proxies.getProxy(id); try { return p.send(request, proto); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java index 32ed98b..f642ee5 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -24,6 +24,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.netty.client.NettyClientRequestSender; import org.apache.ratis.netty.server.NettyRpcService; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerImpl; @@ -57,7 +58,7 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { init(initRpcServices(getServers(), getConf())); } - private static String getAddress(String id, RaftConfiguration conf) { + private static String getAddress(RaftPeerId id, RaftConfiguration conf) { final RaftPeer peer = conf.getPeer(id); if (peer != null) { final String address = peer.getAddress(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 80c4b8c..182a905 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -22,7 +22,7 @@ option java_generate_equals_and_hash = true; package ratis.common; message RaftPeerProto { - string id = 1; // id of the peer + bytes id = 1; // id of the peer string address = 2; // e.g. IP address, hostname etc. } @@ -58,14 +58,14 @@ message TermIndexProto { } message RaftRpcRequestProto { - string requestorId = 1; - string replyId = 2; + bytes requestorId = 1; + bytes replyId = 2; uint64 seqNum = 3; } message RaftRpcReplyProto { - string requestorId = 1; - string replyId = 2; + bytes requestorId = 1; + bytes replyId = 2; uint64 seqNum = 3; bool success = 4; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index f3f63ae..bfbd75f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -22,19 +22,19 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; import org.apache.ratis.protocol.RaftClientProtocol; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.ServerImplUtils; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.statemachine.StateMachine; import java.io.Closeable; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; /** Raft server interface */ public interface RaftServer extends Closeable, RaftServerProtocol, RaftClientProtocol, RaftClientAsynchronousProtocol { /** @return the server ID. */ - String getId(); + RaftPeerId getId(); /** Set server RPC service. */ void setServerRpc(RaftServerRpc serverRpc); @@ -55,9 +55,7 @@ public interface RaftServer extends Closeable, RaftServerProtocol, /** To build {@link RaftServer} objects. */ class Builder { - private static final AtomicInteger COUNT = new AtomicInteger(); - - private String serverId = RaftServer.class.getSimpleName() + COUNT.incrementAndGet(); + private RaftPeerId serverId; private StateMachine stateMachine; private Iterable<RaftPeer> peers; private RaftProperties properties; @@ -67,12 +65,14 @@ public interface RaftServer extends Closeable, RaftServerProtocol, Preconditions.checkNotNull(stateMachine); Preconditions.checkNotNull(peers); Preconditions.checkNotNull(properties); + Preconditions.checkNotNull(serverId); - return ServerImplUtils.newRaftServer(serverId, stateMachine, peers, properties); + return ServerImplUtils.newRaftServer(serverId, stateMachine, peers, + properties); } /** Set the server ID. */ - public Builder setServerId(String serverId) { + public Builder setServerId(RaftPeerId serverId) { this.serverId = serverId; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 7b3845a..f2010e4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; @@ -205,7 +206,7 @@ class LeaderElection extends Daemon { final List<RequestVoteReplyProto> responses = new ArrayList<>(); final List<Exception> exceptions = new ArrayList<>(); int waitForNum = submitted; - Collection<String> votedPeers = new ArrayList<>(); + Collection<RaftPeerId> votedPeers = new ArrayList<>(); while (waitForNum > 0 && running && server.isCandidate()) { final long waitTime = -timeout.elapsedTimeMs(); if (waitTime <= 0) { @@ -229,7 +230,7 @@ class LeaderElection extends Daemon { exceptions, r.getTerm()); } if (r.getServerReply().getSuccess()) { - votedPeers.add(r.getServerReply().getReplyId()); + votedPeers.add(new RaftPeerId(r.getServerReply().getReplyId())); if (conf.hasMajority(votedPeers, server.getId())) { return logAndReturn(Result.PASSED, responses, exceptions, -1); }
