Repository: incubator-ratis Updated Branches: refs/heads/master 58ab69a80 -> 16eb8cc6b
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index e4b2889..35d120c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -46,6 +46,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.ReconfigurationTimeoutException; import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.server.storage.RaftLog; @@ -418,12 +419,12 @@ public class LeaderState { } } - boolean isBootStrappingPeer(String peerId) { + boolean isBootStrappingPeer(RaftPeerId peerId) { return inStagingState() && getStagingState().contains(peerId); } private void updateLastCommitted() { - final String selfId = server.getId(); + final RaftPeerId selfId = server.getId(); final RaftConfiguration conf = server.getRaftConf(); long majorityInNewConf = computeLastCommitted(voterLists.get(0), conf.containsInConf(selfId)); @@ -551,12 +552,12 @@ public class LeaderState { } private class ConfigurationStagingState { - private final Map<String, RaftPeer> newPeers; + private final Map<RaftPeerId, RaftPeer> newPeers; private final PeerConfiguration newConf; ConfigurationStagingState(Collection<RaftPeer> newPeers, PeerConfiguration newConf) { - Map<String, RaftPeer> map = new HashMap<>(); + Map<RaftPeerId, RaftPeer> map = new HashMap<>(); for (RaftPeer peer : newPeers) { map.put(peer.getId(), peer); } @@ -577,7 +578,7 @@ public class LeaderState { return newPeers.values(); } - boolean contains(String peerId) { + boolean contains(RaftPeerId peerId) { return newPeers.containsKey(peerId); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java index b532303..0b5c8af 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import java.util.*; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; /** * The peer configuration of a raft cluster. @@ -29,11 +30,11 @@ import org.apache.ratis.protocol.RaftPeer; * The objects of this class are immutable. */ class PeerConfiguration { - private final Map<String, RaftPeer> peers; + private final Map<RaftPeerId, RaftPeer> peers; PeerConfiguration(Iterable<RaftPeer> peers) { Preconditions.checkNotNull(peers); - Map<String, RaftPeer> map = new HashMap<>(); + Map<RaftPeerId, RaftPeer> map = new HashMap<>(); for(RaftPeer p : peers) { map.put(p.getId(), p); } @@ -54,17 +55,17 @@ class PeerConfiguration { return peers.values().toString(); } - RaftPeer getPeer(String id) { + RaftPeer getPeer(RaftPeerId id) { return peers.get(id); } - boolean contains(String id) { + boolean contains(RaftPeerId id) { return peers.containsKey(id); } - List<RaftPeer> getOtherPeers(String selfId) { + List<RaftPeer> getOtherPeers(RaftPeerId selfId) { List<RaftPeer> others = new ArrayList<>(); - for (Map.Entry<String, RaftPeer> entry : peers.entrySet()) { + for (Map.Entry<RaftPeerId, RaftPeer> entry : peers.entrySet()) { if (!selfId.equals(entry.getValue().getId())) { others.add(entry.getValue()); } @@ -72,13 +73,13 @@ class PeerConfiguration { return others; } - boolean hasMajority(Collection<String> others, String selfId) { + boolean hasMajority(Collection<RaftPeerId> others, RaftPeerId selfId) { Preconditions.checkArgument(!others.contains(selfId)); int num = 0; if (contains(selfId)) { num++; } - for (String other : others) { + for (RaftPeerId other : others) { if (contains(other)) { num++; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java index 8fdd628..50acbaf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java @@ -24,6 +24,7 @@ import java.util.*; import java.util.concurrent.ThreadLocalRandom; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; /** * The configuration of the raft cluster. @@ -150,23 +151,24 @@ public class RaftConfiguration { return oldConf == null; } - boolean containsInConf(String peerId) { + boolean containsInConf(RaftPeerId peerId) { return conf.contains(peerId); } - boolean containsInOldConf(String peerId) { + boolean containsInOldConf(RaftPeerId peerId) { return oldConf != null && oldConf.contains(peerId); } - boolean contains(String peerId) { - return containsInConf(peerId) && (oldConf == null || containsInOldConf(peerId)); + boolean contains(RaftPeerId peerId) { + return containsInConf(peerId) && + (oldConf == null || containsInOldConf(peerId)); } /** * @return the peer corresponding to the given id; * or return null if the peer is not in this configuration. */ - public RaftPeer getPeer(String id) { + public RaftPeer getPeer(RaftPeerId id) { if (id == null) { return null; } @@ -193,7 +195,7 @@ public class RaftConfiguration { * @return all the peers other than the given self id from the conf, * and the old conf if it exists. */ - public Collection<RaftPeer> getOtherPeers(String selfId) { + public Collection<RaftPeer> getOtherPeers(RaftPeerId selfId) { Collection<RaftPeer> others = conf.getOtherPeers(selfId); if (oldConf != null) { oldConf.getOtherPeers(selfId).stream() @@ -204,7 +206,7 @@ public class RaftConfiguration { } /** @return true if the self id together with the others are in the majority. */ - boolean hasMajority(Collection<String> others, String selfId) { + boolean hasMajority(Collection<RaftPeerId> others, RaftPeerId selfId) { Preconditions.checkArgument(!others.contains(selfId)); return conf.hasMajority(others, selfId) && (oldConf == null || oldConf.hasMajority(others, selfId)); @@ -243,7 +245,7 @@ public class RaftConfiguration { return peers; } - RaftPeer getRandomPeer(String exclusiveId) { + RaftPeer getRandomPeer(RaftPeerId exclusiveId) { final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId); if (peers.isEmpty()) { return null; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 135ae68..30bfa04 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -36,6 +36,7 @@ import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftException; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.ReconfigurationInProgressException; import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.server.RaftServer; @@ -101,7 +102,7 @@ public class RaftServerImpl implements RaftServer { private final LogAppenderFactory appenderFactory; - RaftServerImpl(String id, StateMachine stateMachine, + RaftServerImpl(RaftPeerId id, StateMachine stateMachine, RaftConfiguration raftConf, RaftProperties properties) throws IOException { this.lifeCycle = new LifeCycle(id); @@ -212,7 +213,7 @@ public class RaftServerImpl implements RaftServer { } @Override - public String getId() { + public RaftPeerId getId() { return getState().getSelfId(); } @@ -360,7 +361,7 @@ public class RaftServerImpl implements RaftServer { if (lifeCycle.getCurrentState() != RUNNING) { return new NotLeaderException(getId(), null, null); } - String leaderId = state.getLeaderId(); + RaftPeerId leaderId = state.getLeaderId(); if (leaderId == null || leaderId.equals(state.getSelfId())) { // No idea about who is the current leader. Or the peer is the current // leader, but it is about to step down @@ -439,8 +440,9 @@ public class RaftServerImpl implements RaftServer { return waitForReply(getId(), request, submitClientRequestAsync(request)); } - private static RaftClientReply waitForReply(String id, RaftClientRequest request, - CompletableFuture<RaftClientReply> future) throws IOException { + private static RaftClientReply waitForReply(RaftPeerId id, + RaftClientRequest request, CompletableFuture<RaftClientReply> future) + throws IOException { try { return future.get(); } catch (InterruptedException e) { @@ -522,7 +524,7 @@ public class RaftServerImpl implements RaftServer { * 3. candidate id is not included in conf * 4. candidate's last entry's index < conf's index */ - private boolean shouldSendShutdown(String candidateId, + private boolean shouldSendShutdown(RaftPeerId candidateId, TermIndex candidateLastEntry) { return isLeader() && getRaftConf().isStable() @@ -535,12 +537,13 @@ public class RaftServerImpl implements RaftServer { @Override public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws IOException { - final String candidateId = r.getServerRequest().getRequestorId(); + final RaftPeerId candidateId = + new RaftPeerId(r.getServerRequest().getRequestorId()); return requestVote(candidateId, r.getCandidateTerm(), ServerProtoUtils.toTermIndex(r.getCandidateLastEntry())); } - private RequestVoteReplyProto requestVote(String candidateId, + private RequestVoteReplyProto requestVote(RaftPeerId candidateId, long candidateTerm, TermIndex candidateLastEntry) throws IOException { CodeInjectionForTesting.execute(REQUEST_VOTE, getId(), candidateId, candidateTerm, candidateLastEntry); @@ -619,16 +622,17 @@ public class RaftServerImpl implements RaftServer { .toArray(new LogEntryProto[r.getEntriesCount()]); final TermIndex previous = r.hasPreviousLog() ? ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null; - return appendEntries(r.getServerRequest().getRequestorId(), + return appendEntries(new RaftPeerId(r.getServerRequest().getRequestorId()), r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(), entries); } - private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm, + private AppendEntriesReplyProto appendEntries(RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException { CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), - leaderId, leaderTerm, previous, leaderCommit, initializing, entries); + leaderId, leaderTerm, previous, leaderCommit, initializing, + entries); if (LOG.isDebugEnabled()) { LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(), leaderId, leaderTerm, previous, leaderCommit, initializing, @@ -722,8 +726,10 @@ public class RaftServerImpl implements RaftServer { @Override public InstallSnapshotReplyProto installSnapshot( InstallSnapshotRequestProto request) throws IOException { - final String leaderId = request.getServerRequest().getRequestorId(); - CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request); + final RaftPeerId leaderId = + new RaftPeerId(request.getServerRequest().getRequestorId()); + CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), + leaderId, request); LOG.debug("{}: receive installSnapshot({})", getId(), request); lifeCycle.assertCurrentState(STARTING, RUNNING); @@ -780,7 +786,7 @@ public class RaftServerImpl implements RaftServer { } AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm, - String targetId, TermIndex previous, List<LogEntryProto> entries, + RaftPeerId targetId, TermIndex previous, List<LogEntryProto> entries, boolean initializing) { return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, leaderTerm, entries, state.getLog().getLastCommittedIndex(), @@ -788,8 +794,8 @@ public class RaftServerImpl implements RaftServer { } synchronized InstallSnapshotRequestProto createInstallSnapshotRequest( - String targetId, String requestId, int requestIndex, SnapshotInfo snapshot, - List<FileChunkProto> chunks, boolean done) { + RaftPeerId targetId, String requestId, int requestIndex, + SnapshotInfo snapshot, List<FileChunkProto> chunks, boolean done) { OptionalLong totalSize = snapshot.getFiles().stream() .mapToLong(FileInfo::getFileSize).reduce(Long::sum); assert totalSize.isPresent(); @@ -798,8 +804,8 @@ public class RaftServerImpl implements RaftServer { chunks, totalSize.getAsLong(), done); } - synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId, - long term, TermIndex lastEntry) { + synchronized RequestVoteRequestProto createRequestVoteRequest( + RaftPeerId targetId, long term, TermIndex lastEntry) { return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term, lastEntry); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index c6d650f..5248906 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -20,6 +20,7 @@ package org.apache.ratis.server.impl; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.statemachine.StateMachine; @@ -29,7 +30,7 @@ import java.io.IOException; /** Server utilities for internal use. */ public class ServerImplUtils { public static RaftServer newRaftServer( - String id, StateMachine stateMachine, + RaftPeerId id, StateMachine stateMachine, Iterable<RaftPeer> peers, RaftProperties properties) throws IOException { return newRaftServer(id, stateMachine, RaftConfiguration.newBuilder().setConf(peers).build(), @@ -37,7 +38,7 @@ public class ServerImplUtils { } public static RaftServerImpl newRaftServer( - String id, StateMachine stateMachine, + RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, RaftProperties properties) throws IOException { return new RaftServerImpl(id, stateMachine, conf, properties); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 4d4371d..846fbe2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -25,7 +25,9 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto; @@ -106,21 +108,23 @@ public class ServerProtoUtils { } public static RequestVoteReplyProto toRequestVoteReplyProto( - String requestorId, String replyId, boolean success, long term, + RaftPeerId requestorId, RaftPeerId replyId, boolean success, long term, boolean shouldShutdown) { final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder(); - b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, replyId, - DEFAULT_SEQNUM, success)) + b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder( + requestorId.toBytes(), replyId.toBytes(), DEFAULT_SEQNUM, success)) .setTerm(term) .setShouldShutdown(shouldShutdown); return b.build(); } public static RequestVoteRequestProto toRequestVoteRequestProto( - String requestorId, String replyId, long term, TermIndex lastEntry) { + RaftPeerId requestorId, RaftPeerId replyId, long term, TermIndex lastEntry) { + RaftProtos.RaftRpcRequestProto.Builder rpb = ClientProtoUtils + .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(), + DEFAULT_SEQNUM); final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder() - .setServerRequest( - ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM)) + .setServerRequest(rpb) .setCandidateTerm(term); if (lastEntry != null) { b.setCandidateLastEntry(toTermIndexProto(lastEntry)); @@ -129,10 +133,10 @@ public class ServerProtoUtils { } public static InstallSnapshotReplyProto toInstallSnapshotReplyProto( - String requestorId, String replyId, long term, int requestIndex, + RaftPeerId requestorId, RaftPeerId replyId, long term, int requestIndex, InstallSnapshotResult result) { - final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, - replyId, DEFAULT_SEQNUM, result == InstallSnapshotResult.SUCCESS); + final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(), + replyId.toBytes(), DEFAULT_SEQNUM, result == InstallSnapshotResult.SUCCESS); final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto .newBuilder().setServerReply(rb).setTerm(term).setResult(result) .setRequestIndex(requestIndex); @@ -140,12 +144,13 @@ public class ServerProtoUtils { } public static InstallSnapshotRequestProto toInstallSnapshotRequestProto( - String requestorId, String replyId, String requestId, int requestIndex, + RaftPeerId requestorId, RaftPeerId replyId, String requestId, int requestIndex, long term, TermIndex lastTermIndex, List<FileChunkProto> chunks, long totalSize, boolean done) { return InstallSnapshotRequestProto.newBuilder() .setServerRequest( - ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM)) + ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(), + replyId.toBytes(), DEFAULT_SEQNUM)) .setRequestId(requestId) .setRequestIndex(requestIndex) // .setRaftConfiguration() TODO: save and pass RaftConfiguration @@ -157,10 +162,10 @@ public class ServerProtoUtils { } public static AppendEntriesReplyProto toAppendEntriesReplyProto( - String requestorId, String replyId, long term, + RaftPeerId requestorId, RaftPeerId replyId, long term, long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) { - RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, - replyId, DEFAULT_SEQNUM, appendResult == SUCCESS); + RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(), + replyId.toBytes(), DEFAULT_SEQNUM, appendResult == SUCCESS); final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder(); b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex) .setResult(appendResult); @@ -168,13 +173,14 @@ public class ServerProtoUtils { } public static AppendEntriesRequestProto toAppendEntriesRequestProto( - String requestorId, String replyId, long leaderTerm, + RaftPeerId requestorId, RaftPeerId replyId, long leaderTerm, List<LogEntryProto> entries, long leaderCommit, boolean initializing, TermIndex previous) { final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto .newBuilder() .setServerRequest( - ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM)) + ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(), + replyId.toBytes(), DEFAULT_SEQNUM)) .setLeaderTerm(leaderTerm) .setLeaderCommit(leaderCommit) .setInitializing(initializing); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 8608fc4..150b2c1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -24,6 +24,7 @@ import java.io.Closeable; import java.io.IOException; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.MemoryRaftLog; import org.apache.ratis.server.storage.RaftLog; @@ -44,7 +45,7 @@ import com.google.common.base.Preconditions; * Common states of a raft peer. Protected by RaftServer's lock. */ public class ServerState implements Closeable { - private final String selfId; + private final RaftPeerId selfId; private final RaftServerImpl server; /** Raft log */ private final RaftLog log; @@ -65,11 +66,11 @@ public class ServerState implements Closeable { * The server ID of the leader for this term. Null means either there is * no leader for this term yet or this server does not know who it is yet. */ - private String leaderId; + private RaftPeerId leaderId; /** * Candidate that this peer granted vote for in current term (or null if none) */ - private String votedFor; + private RaftPeerId votedFor; /** * Latest installed snapshot for this server. This maybe different than StateMachine's latest @@ -78,8 +79,9 @@ public class ServerState implements Closeable { */ private TermIndex latestInstalledSnapshot; - ServerState(String id, RaftConfiguration conf, RaftProperties prop, - RaftServerImpl server, StateMachine stateMachine) throws IOException { + ServerState(RaftPeerId id, RaftConfiguration conf, RaftProperties prop, + RaftServerImpl server, StateMachine stateMachine) + throws IOException { this.selfId = id; this.server = server; configurationManager = new ConfigurationManager(conf); @@ -133,8 +135,8 @@ public class ServerState implements Closeable { * note we do not apply log entries to the state machine here since we do not * know whether they have been committed. */ - private RaftLog initLog(String id, RaftProperties prop, RaftServerImpl server, - long lastIndexInSnapshot) throws IOException { + private RaftLog initLog(RaftPeerId id, RaftProperties prop, + RaftServerImpl server, long lastIndexInSnapshot) throws IOException { final RaftLog log; if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY, RAFT_SERVER_USE_MEMORY_LOG_DEFAULT)) { @@ -152,8 +154,7 @@ public class ServerState implements Closeable { } @VisibleForTesting - - public String getSelfId() { + public RaftPeerId getSelfId() { return this.selfId; } @@ -165,7 +166,7 @@ public class ServerState implements Closeable { currentTerm = term; } - String getLeaderId() { + RaftPeerId getLeaderId() { return leaderId; } @@ -194,12 +195,12 @@ public class ServerState implements Closeable { /** * Vote for a candidate and update the local state. */ - void grantVote(String candidateId) { + void grantVote(RaftPeerId candidateId) { votedFor = candidateId; leaderId = null; } - void setLeader(String leaderId) { + void setLeader(RaftPeerId leaderId) { this.leaderId = leaderId; } @@ -220,7 +221,7 @@ public class ServerState implements Closeable { * If accept, update the current state. * @return true if the check passes */ - boolean recognizeLeader(String leaderId, long leaderTerm) { + boolean recognizeLeader(RaftPeerId leaderId, long leaderTerm) { if (leaderTerm < currentTerm) { return false; } else if (leaderTerm > currentTerm || this.leaderId == null) { @@ -238,7 +239,7 @@ public class ServerState implements Closeable { /** * Check if the candidate's term is acceptable */ - boolean recognizeCandidate(String candidateId, + boolean recognizeCandidate(RaftPeerId candidateId, long candidateTerm) { if (candidateTerm > currentTerm) { return true; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java index 8a275ec..2ba37e1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -20,6 +20,7 @@ package org.apache.ratis.server.storage; import java.util.ArrayList; import java.util.List; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.ServerProtoUtils; @@ -35,7 +36,7 @@ import com.google.common.base.Preconditions; public class MemoryRaftLog extends RaftLog { private final List<LogEntryProto> entries = new ArrayList<>(); - public MemoryRaftLog(String selfId) { + public MemoryRaftLog(RaftPeerId selfId) { super(selfId); } @@ -167,7 +168,7 @@ public class MemoryRaftLog extends RaftLog { } @Override - public void writeMetadata(long term, String votedFor) { + public void writeMetadata(long term, RaftPeerId votedFor) { // do nothing } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 05307f2..f0c7b60 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.ConfigurationManager; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; @@ -55,12 +56,12 @@ public abstract class RaftLog implements Closeable { */ protected final AtomicLong lastCommitted = new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX); - private final String selfId; + private final RaftPeerId selfId; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private volatile boolean isOpen = false; - public RaftLog(String selfId) { + public RaftLog(RaftPeerId selfId) { this.selfId = selfId; } @@ -236,7 +237,7 @@ public abstract class RaftLog implements Closeable { * in the RaftPeer's lock. Later we can use an IO task queue to enforce the * order. */ - public abstract void writeMetadata(long term, String votedFor) + public abstract void writeMetadata(long term, RaftPeerId votedFor) throws IOException; public abstract Metadata loadMetadata() throws IOException; @@ -249,15 +250,15 @@ public abstract class RaftLog implements Closeable { } public static class Metadata { - private final String votedFor; + private final RaftPeerId votedFor; private final long term; - public Metadata(String votedFor, long term) { + public Metadata(RaftPeerId votedFor, long term) { this.votedFor = votedFor; this.term = term; } - public String getVotedFor() { + public RaftPeerId getVotedFor() { return votedFor; } @@ -287,7 +288,7 @@ public abstract class RaftLog implements Closeable { isOpen = false; } - public String getSelfId() { + public RaftPeerId getSelfId() { return selfId; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 9b2932c..870d802 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -27,6 +27,7 @@ import java.util.List; import org.apache.commons.io.Charsets; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.ConfigurationManager; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; @@ -101,8 +102,9 @@ public class SegmentedRaftLog extends RaftLog { private final RaftLogWorker fileLogWorker; private final long segmentMaxSize; - public SegmentedRaftLog(String selfId, RaftServerImpl server, RaftStorage storage, - long lastIndexInSnapshot, RaftProperties properties) throws IOException { + public SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server, + RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) + throws IOException { super(selfId); this.storage = storage; this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, @@ -295,13 +297,14 @@ public class SegmentedRaftLog extends RaftLog { * This operation is protected by the RaftServer's lock */ @Override - public void writeMetadata(long term, String votedFor) throws IOException { - storage.getMetaFile().set(term, votedFor); + public void writeMetadata(long term, RaftPeerId votedFor) throws IOException { + storage.getMetaFile().set(term, votedFor != null ? votedFor.toString() : null); } @Override public Metadata loadMetadata() throws IOException { - return new Metadata(storage.getMetaFile().getVotedFor(), + return new Metadata( + RaftPeerId.getRaftPeerId(storage.getMetaFile().getVotedFor()), storage.getMetaFile().getTerm()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java index 73b2af9..ca5525b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.channels.FileChannel; import org.apache.ratis.io.MD5Hash; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto; import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.statemachine.SnapshotInfo; @@ -44,9 +45,9 @@ public class SnapshotManager { private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class); private final RaftStorage storage; - private final String selfId; + private final RaftPeerId selfId; - public SnapshotManager(RaftStorage storage, String selfId) + public SnapshotManager(RaftStorage storage, RaftPeerId selfId) throws IOException { this.storage = storage; this.selfId = selfId; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java index 397a12b..2a845c1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java @@ -26,6 +26,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.storage.RaftStorage; @@ -48,8 +49,8 @@ public class BaseStateMachine implements StateMachine { } @Override - public void initialize(String id, RaftProperties properties, RaftStorage storage) - throws IOException { + public void initialize(RaftPeerId id, RaftProperties properties, + RaftStorage storage) throws IOException { lifeCycle.setName(getClass().getSimpleName() + ":" + id); this.properties = properties; this.storage = storage; @@ -80,8 +81,8 @@ public class BaseStateMachine implements StateMachine { } @Override - public void reinitialize(String id, RaftProperties properties, RaftStorage storage) - throws IOException { + public void reinitialize(RaftPeerId id, RaftProperties properties, + RaftStorage storage) throws IOException { } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index e377aa7..2649dcb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -21,6 +21,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.storage.RaftStorage; @@ -42,7 +43,7 @@ public interface StateMachine extends Closeable { * responsible reading the latest snapshot from the file system (if any) and initialize itself * with the latest term and index there including all the edits. */ - void initialize(String id, RaftProperties properties, RaftStorage storage) + void initialize(RaftPeerId id, RaftProperties properties, RaftStorage storage) throws IOException; /** @@ -62,7 +63,7 @@ public interface StateMachine extends Closeable { * state machine is responsible reading the latest snapshot from the file system (if any) and * initialize itself with the latest term and index there including all the edits. */ - void reinitialize(String id, RaftProperties properties, RaftStorage storage) + void reinitialize(RaftPeerId id, RaftProperties properties, RaftStorage storage) throws IOException; /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index d824014..993d861 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; @@ -84,7 +85,7 @@ public abstract class MiniRaftCluster { @Override public void restartServer(String id, boolean format) throws IOException { super.restartServer(id, format); - setPeerRpc(conf.getPeer(id)).start(); + setPeerRpc(conf.getPeer(new RaftPeerId(id))).start(); } @Override @@ -111,7 +112,8 @@ public abstract class MiniRaftCluster { public static RaftConfiguration initConfiguration(String[] ids) { return RaftConfiguration.newBuilder() - .setConf(Arrays.stream(ids).map(RaftPeer::new).collect(Collectors.toList())) + .setConf(Arrays.stream(ids).map(id -> new RaftPeer(new RaftPeerId(id))) + .collect(Collectors.toList())) .build(); } @@ -137,7 +139,7 @@ public abstract class MiniRaftCluster { protected RaftConfiguration conf; protected final RaftProperties properties; private final String testBaseDir; - protected final Map<String, RaftServerImpl> servers = + protected final Map<RaftPeerId, RaftServerImpl> servers = Collections.synchronizedMap(new LinkedHashMap<>()); public MiniRaftCluster(String[] ids, RaftProperties properties, @@ -171,16 +173,17 @@ public abstract class MiniRaftCluster { * start a stopped server again. */ public void restartServer(String id, boolean format) throws IOException { - killServer(id); - servers.remove(id); - servers.put(id, newRaftServer(id, format)); + final RaftPeerId newId = new RaftPeerId(id); + killServer(newId); + servers.remove(newId); + servers.put(newId, newRaftServer(newId, format)); } public final void restart(boolean format) throws IOException { servers.values().stream().filter(RaftServerImpl::isAlive) .forEach(RaftServerImpl::close); - List<String> idList = new ArrayList<>(servers.keySet()); - for (String id : idList) { + List<RaftPeerId> idList = new ArrayList<>(servers.keySet()); + for (RaftPeerId id : idList) { servers.remove(id); servers.put(id, newRaftServer(id, format)); } @@ -201,7 +204,7 @@ public abstract class MiniRaftCluster { return conf; } - private RaftServerImpl newRaftServer(String id, boolean format) { + private RaftServerImpl newRaftServer(RaftPeerId id, boolean format) { final RaftServerImpl s; try { final String dirStr = testBaseDir + id; @@ -254,7 +257,7 @@ public abstract class MiniRaftCluster { LOG.info("Add new peers {}", Arrays.asList(ids)); Collection<RaftPeer> newPeers = new ArrayList<>(ids.length); for (String id : ids) { - newPeers.add(new RaftPeer(id)); + newPeers.add(new RaftPeer(new RaftPeerId(id))); } // create and add new RaftServers @@ -276,7 +279,7 @@ public abstract class MiniRaftCluster { return new PeerChanges(p, np, new RaftPeer[0]); } - public void startServer(String id) { + public void startServer(RaftPeerId id) { RaftServerImpl server = servers.get(id); assert server != null; server.start(); @@ -315,7 +318,7 @@ public abstract class MiniRaftCluster { removedPeers.toArray(new RaftPeer[removedPeers.size()])); } - public void killServer(String id) { + public void killServer(RaftPeerId id) { servers.get(id).close(); } @@ -370,9 +373,9 @@ public abstract class MiniRaftCluster { return leaders.get(0); } - public boolean isLeader(String leaderId) throws InterruptedException { + boolean isLeader(String leaderId) throws InterruptedException { final RaftServerImpl leader = getLeader(); - return leader != null && leader.getId().equals(leaderId); + return leader != null && leader.getId().toString().equals(leaderId); } public List<RaftServerImpl> getFollowers() { @@ -386,7 +389,7 @@ public abstract class MiniRaftCluster { } public RaftServerImpl getServer(String id) { - return servers.get(id); + return servers.get(new RaftPeerId(id)); } public Collection<RaftPeer> getPeers() { @@ -395,9 +398,8 @@ public abstract class MiniRaftCluster { .collect(Collectors.toList()); } - public RaftClient createClient(String clientId, String leaderId) { + public RaftClient createClient(RaftPeerId leaderId) { return RaftClient.newBuilder() - .setClientId(clientId) .setServers(conf.getPeers()) .setLeaderId(leaderId) .setRequestSender(getRaftClientRequestSender()) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 4ec78b9..9e3897d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -20,6 +20,7 @@ package org.apache.ratis; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftServerImpl; import org.junit.*; import org.junit.rules.Timeout; @@ -82,12 +83,12 @@ public abstract class RaftBasicTests { final MiniRaftCluster cluster = getCluster(); RaftServerImpl leader = waitForLeader(cluster); final long term = leader.getState().getCurrentTerm(); - final String killed = cluster.getFollowers().get(3).getId(); + final RaftPeerId killed = cluster.getFollowers().get(3).getId(); cluster.killServer(killed); LOG.info(cluster.printServers()); final SimpleMessage[] messages = SimpleMessage.create(10); - try(final RaftClient client = cluster.createClient("client", null)) { + try(final RaftClient client = cluster.createClient(null)) { for (SimpleMessage message : messages) { client.send(message); } @@ -120,7 +121,7 @@ public abstract class RaftBasicTests { Client4TestWithLoad(RaftClient client, int numMessages) { this.client = client; - this.messages = SimpleMessage.create(numMessages, client.getId()); + this.messages = SimpleMessage.create(numMessages, client.getId().toString()); } boolean isRunning() { @@ -155,7 +156,7 @@ public abstract class RaftBasicTests { final List<Client4TestWithLoad> clients = Stream.iterate(0, i -> i+1).limit(numClients) - .map(i -> cluster.createClient(String.valueOf((char)('a' + i)), null)) + .map(i -> cluster.createClient(null)) .map(c -> new Client4TestWithLoad(c, numMessages)) .collect(Collectors.toList()); clients.forEach(Thread::start); @@ -176,9 +177,9 @@ public abstract class RaftBasicTests { RaftServerImpl leader = cluster.getLeader(); if (leader != null) { - final String oldLeader = leader.getId(); + final RaftPeerId oldLeader = leader.getId(); LOG.info("Block all requests sent by leader " + oldLeader); - String newLeader = RaftTestUtil.changeLeader(cluster, oldLeader); + RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, oldLeader); LOG.info("Changed leader from " + oldLeader + " to " + newLeader); Assert.assertFalse(newLeader.equals(oldLeader)); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java index 3ac3bf4..dbc32f0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java @@ -21,9 +21,11 @@ import org.apache.log4j.Level; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.simulation.RequestHandler; import org.apache.ratis.server.storage.RaftLog; @@ -74,14 +76,14 @@ public abstract class RaftNotLeaderExceptionBaseTest { @Test public void testHandleNotLeaderException() throws Exception { RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); RaftClientReply reply = client.send(new SimpleMessage("m1")); Assert.assertTrue(reply.isSuccess()); // enforce leader change - String newLeader = RaftTestUtil.changeLeader(cluster, leaderId); + RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, leaderId); Assert.assertNotEquals(leaderId, newLeader); RaftClientRequestSender rpc = client.getRequestSender(); @@ -89,7 +91,7 @@ public abstract class RaftNotLeaderExceptionBaseTest { for (int i = 0; reply == null && i < 10; i++) { try { reply = rpc.sendRequest( - new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM, + new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_SEQNUM, new SimpleMessage("m2"))); } catch (IOException ignored) { Thread.sleep(1000); @@ -110,11 +112,11 @@ public abstract class RaftNotLeaderExceptionBaseTest { public void testNotLeaderExceptionWithReconf() throws Exception { Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster)); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); // enforce leader change - String newLeader = RaftTestUtil.changeLeader(cluster, leaderId); + RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, leaderId); Assert.assertNotEquals(leaderId, newLeader); // also add two new peers @@ -124,7 +126,7 @@ public abstract class RaftNotLeaderExceptionBaseTest { // trigger setConfiguration LOG.info("Start changing the configuration: {}", Arrays.asList(change.allPeersInNewConf)); - try(final RaftClient c2 = cluster.createClient("client2", newLeader)) { + try(final RaftClient c2 = cluster.createClient(newLeader)) { RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf); Assert.assertTrue(reply.isSuccess()); } @@ -136,7 +138,7 @@ public abstract class RaftNotLeaderExceptionBaseTest { for (int i = 0; reply == null && i < 10; i++) { try { reply = rpc.sendRequest( - new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM, + new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_SEQNUM, new SimpleMessage("m1"))); } catch (IOException ignored) { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 910ec6e..6b2e34c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -30,6 +30,8 @@ import java.util.function.IntSupplier; import org.apache.commons.lang.RandomStringUtils; import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; @@ -74,7 +76,7 @@ public class RaftTestUtil { LOG.info(cluster.printServers()); final RaftServerImpl leader = cluster.getLeader(); - Assert.assertEquals(leaderId, leader.getId()); + Assert.assertEquals(leaderId, leader.getId().toString()); return leader; } @@ -88,7 +90,7 @@ public class RaftTestUtil { LOG.info("killing leader = " + leader); cluster.killServer(leader.getId()); } - return leader != null ? leader.getId() : null; + return leader != null ? leader.getId().toString() : null; } public static boolean logEntriesContains(LogEntryProto[] entries, @@ -259,14 +261,14 @@ public class RaftTestUtil { } } - public static String changeLeader(MiniRaftCluster cluster, String oldLeader) + public static RaftPeerId changeLeader(MiniRaftCluster cluster, RaftPeerId oldLeader) throws InterruptedException { - cluster.setBlockRequestsFrom(oldLeader, true); - String newLeader = oldLeader; + cluster.setBlockRequestsFrom(oldLeader.toString(), true); + RaftPeerId newLeader = oldLeader; for(int i = 0; i < 10 && newLeader.equals(oldLeader); i++) { newLeader = RaftTestUtil.waitForLeader(cluster).getId(); } - cluster.setBlockRequestsFrom(oldLeader, false); + cluster.setBlockRequestsFrom(oldLeader.toString(), false); return newLeader; } @@ -284,12 +286,12 @@ public class RaftTestUtil { } // delay RaftServerRequest for other servers - servers.stream().filter(s -> !s.getId().equals(leaderId)) + servers.stream().filter(s -> !s.getId().toString().equals(leaderId)) .forEach(s -> { if (block) { - injection.setDelayMs(s.getId(), delayMs); + injection.setDelayMs(s.getId().toString(), delayMs); } else { - injection.removeDelay(s.getId()); + injection.removeDelay(s.getId().toString()); } }); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java index a7f1b6d..3870e82 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java @@ -18,8 +18,8 @@ package org.apache.ratis.server.impl; import org.apache.ratis.RaftTestUtil; -import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.StringUtils; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -66,7 +66,7 @@ public class BlockRequestHandlingInjection implements CodeInjectionForTesting.Co } @Override - public boolean execute(String localId, String remoteId, Object... args) { + public boolean execute(Object localId, Object remoteId, Object... args) { if (shouldBlock(localId, remoteId)) { try { RaftTestUtil.block(() -> shouldBlock(localId, remoteId)); @@ -79,7 +79,8 @@ public class BlockRequestHandlingInjection implements CodeInjectionForTesting.Co return false; } - private boolean shouldBlock(String localId, String remoteId) { - return repliers.containsKey(localId) || requestors.containsKey(remoteId); + private boolean shouldBlock(Object localId, Object remoteId) { + return (localId != null && repliers.containsKey(localId.toString())) || + (remoteId != null && requestors.containsKey(remoteId.toString())); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java index 8de2474..1818722 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java @@ -50,17 +50,21 @@ public class DelayLocalExecutionInjection implements CodeInjectionForTesting.Cod } @Override - public boolean execute(String localId, String remoteId, Object... args) { - final AtomicInteger d = delays.get(localId); + public boolean execute(Object localId, Object remoteId, Object... args) { + if (localId == null) { + return false; + } + final String localIdStr = localId.toString(); + final AtomicInteger d = delays.get(localIdStr); if (d == null) { return false; } - LOG.info("{} delay {} ms, args={}", localId, d.get(), + LOG.info("{} delay {} ms, args={}", localIdStr, d.get(), Arrays.toString(args)); try { RaftTestUtil.delay(d::get); } catch (InterruptedException e) { - LOG.debug("Interrupted while delaying " + localId); + LOG.debug("Interrupted while delaying " + localIdStr); } return true; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 847678a..e07f5cb 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -60,6 +60,8 @@ public abstract class RaftReconfigurationBaseTest { static final Logger LOG = LoggerFactory.getLogger(RaftReconfigurationBaseTest.class); protected static final RaftProperties prop = new RaftProperties(); + + private static final ClientId clientId = ClientId.createId(); @BeforeClass public static void setup() { @@ -89,7 +91,7 @@ public abstract class RaftReconfigurationBaseTest { RaftPeer[] allPeers = cluster.addNewPeers(2, true).allPeersInNewConf; // trigger setConfiguration - SetConfigurationRequest request = new SetConfigurationRequest("client", + SetConfigurationRequest request = new SetConfigurationRequest(clientId, cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); @@ -117,7 +119,7 @@ public abstract class RaftReconfigurationBaseTest { .removePeers(2, false, Collections.emptyList()).allPeersInNewConf; // trigger setConfiguration - SetConfigurationRequest request = new SetConfigurationRequest("client", + SetConfigurationRequest request = new SetConfigurationRequest(clientId, cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); @@ -155,7 +157,7 @@ public abstract class RaftReconfigurationBaseTest { asList(change.newPeers)).allPeersInNewConf; // trigger setConfiguration - SetConfigurationRequest request = new SetConfigurationRequest("client", + SetConfigurationRequest request = new SetConfigurationRequest(clientId, cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); @@ -174,8 +176,8 @@ public abstract class RaftReconfigurationBaseTest { cluster.start(); try { RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); // submit some msgs before reconf for (int i = 0; i < getStagingGap() * 2; i++) { @@ -241,8 +243,8 @@ public abstract class RaftReconfigurationBaseTest { cluster.start(); try { RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); PeerChanges c1 = cluster.addNewPeers(2, false); @@ -252,7 +254,7 @@ public abstract class RaftReconfigurationBaseTest { final RaftClientRequestSender sender = client.getRequestSender(); final SetConfigurationRequest request = new SetConfigurationRequest( - "client", leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf); + client.getId(), leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf); try { sender.sendRequest(request); Assert.fail("did not get expected exception"); @@ -294,8 +296,8 @@ public abstract class RaftReconfigurationBaseTest { cluster.start(); try { RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); // submit some msgs before reconf for (int i = 0; i < getStagingGap() * 2; i++) { @@ -326,7 +328,7 @@ public abstract class RaftReconfigurationBaseTest { final RaftLog leaderLog = cluster.getLeader().getState().getLog(); for (RaftPeer newPeer : c1.newPeers) { Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE), - cluster.getServer(newPeer.getId()).getState().getLog() + cluster.getServer(newPeer.getId().toString()).getState().getLog() .getEntries(0, Long.MAX_VALUE)); } } finally { @@ -346,8 +348,8 @@ public abstract class RaftReconfigurationBaseTest { cluster.start(); try { RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); PeerChanges c1 = cluster.addNewPeers(2, false); PeerChanges c2 = cluster.removePeers(2, false, asList(c1.newPeers)); @@ -418,8 +420,8 @@ public abstract class RaftReconfigurationBaseTest { cluster.start(); RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); client.send(new SimpleMessage("m")); final long committedIndex = cluster.getLeader().getState().getLog() @@ -452,24 +454,24 @@ public abstract class RaftReconfigurationBaseTest { cluster.start(); RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); + final RaftPeerId leaderId = cluster.getLeader().getId(); RaftPeer[] newPeers = cluster.addNewPeers(2, true).allPeersInNewConf; // delay every peer's logSync so that the setConf request is delayed cluster.getPeers() - .forEach(peer -> logSyncDelay.setDelayMs(peer.getId(), 1000)); + .forEach(peer -> logSyncDelay.setDelayMs(peer.getId().toString(), 1000)); final CountDownLatch latch = new CountDownLatch(1); final RaftPeer[] peersInRequest2 = cluster.getPeers().toArray(new RaftPeer[0]); AtomicBoolean caughtException = new AtomicBoolean(false); new Thread(() -> { - try(final RaftClient client2 = cluster.createClient("client2", leaderId)) { + try(final RaftClient client2 = cluster.createClient(leaderId)) { latch.await(); LOG.info("client2 starts to change conf"); final RaftClientRequestSender sender2 = client2.getRequestSender(); sender2.sendRequest(new SetConfigurationRequest( - "client2", leaderId, DEFAULT_SEQNUM, peersInRequest2)); + client2.getId(), leaderId, DEFAULT_SEQNUM, peersInRequest2)); } catch (ReconfigurationInProgressException e) { caughtException.set(true); } catch (Exception e) { @@ -479,7 +481,7 @@ public abstract class RaftReconfigurationBaseTest { AtomicBoolean confChanged = new AtomicBoolean(false); new Thread(() -> { - try(final RaftClient client1 = cluster.createClient("client1", leaderId)) { + try(final RaftClient client1 = cluster.createClient(leaderId)) { LOG.info("client1 starts to change conf"); confChanged.set(client1.setConfiguration(newPeers).isSuccess()); } catch (IOException e) { @@ -513,9 +515,9 @@ public abstract class RaftReconfigurationBaseTest { cluster.start(); RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); + final RaftPeerId leaderId = cluster.getLeader().getId(); - final RaftLog log = cluster.getServer(leaderId).getState().getLog(); + final RaftLog log = cluster.getServer(leaderId.toString()).getState().getLog(); Thread.sleep(1000); Assert.assertEquals(0, log.getLatestFlushedIndex()); @@ -523,18 +525,18 @@ public abstract class RaftReconfigurationBaseTest { // followers, so that we force the leader change and the old leader will // not know LOG.info("start blocking the leader"); - BlockRequestHandlingInjection.getInstance().blockReplier(leaderId); - cluster.setBlockRequestsFrom(leaderId, true); + BlockRequestHandlingInjection.getInstance().blockReplier(leaderId.toString()); + cluster.setBlockRequestsFrom(leaderId.toString(), true); PeerChanges change = cluster.removePeers(1, false, new ArrayList<>()); AtomicBoolean gotNotLeader = new AtomicBoolean(false); new Thread(() -> { - try(final RaftClient client = cluster.createClient("client1", leaderId)) { + try(final RaftClient client = cluster.createClient(leaderId)) { LOG.info("client starts to change conf"); final RaftClientRequestSender sender = client.getRequestSender(); RaftClientReply reply = sender.sendRequest(new SetConfigurationRequest( - "client", leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf)); + client.getId(), leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf)); if (reply.isNotLeader()) { gotNotLeader.set(true); } @@ -552,8 +554,8 @@ public abstract class RaftReconfigurationBaseTest { log.getLastEntry().getLogEntryBodyCase()); // unblock the old leader - BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId); - cluster.setBlockRequestsFrom(leaderId, false); + BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId.toString()); + cluster.setBlockRequestsFrom(leaderId.toString(), false); // the client should get NotLeaderException for (int i = 0; i < 10 && !gotNotLeader.get(); i++) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 3cf2ef6..012cfd7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -20,10 +20,6 @@ package org.apache.ratis.server.impl; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.impl.RaftConfiguration; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.statemachine.StateMachine; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +47,7 @@ public class RaftServerTestUtil { final RaftConfiguration current = RaftConfiguration.newBuilder() .setConf(peers).setLogEntryIndex(0).build(); for (RaftServerImpl server : cluster.getServers()) { - if (deadPeers != null && deadPeers.contains(server.getId())) { + if (deadPeers != null && deadPeers.contains(server.getId().toString())) { if (current.containsInConf(server.getId())) { deadIncluded++; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java index fa177af..6d26383 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -21,6 +21,7 @@ import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftServerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +90,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { public void restartServer(String id, boolean format) throws IOException { super.restartServer(id, format); RaftServerImpl s = getServer(id); - addPeersToRpc(Collections.singletonList(conf.getPeer(id))); + addPeersToRpc(Collections.singletonList(conf.getPeer(new RaftPeerId(id)))); s.setServerRpc(new SimulatedServerRpc(s, serverRequestReply, client2serverRequestReply)); s.start(); @@ -121,8 +122,8 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { serverRequestReply.getQueue(leaderId).blockSendRequestTo.set(block); // set delay takeRequest for the other queues - getServers().stream().filter(s -> !s.getId().equals(leaderId)) - .map(s -> serverRequestReply.getQueue(s.getId())) + getServers().stream().filter(s -> !s.getId().toString().equals(leaderId)) + .map(s -> serverRequestReply.getQueue(s.getId().toString())) .forEach(q -> q.delayTakeRequestTo.set(delayMs)); final long sleepMs = 3 * getMaxTimeout() / 2; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java index a157524..a399f5f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java @@ -24,7 +24,7 @@ import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; import com.google.common.base.Preconditions; -public class RaftServerReply extends RaftRpcMessage { +public class RaftServerReply implements RaftRpcMessage { private final AppendEntriesReplyProto appendEntries; private final RequestVoteReplyProto requestVote; private final InstallSnapshotReplyProto installSnapshot; @@ -79,22 +79,22 @@ public class RaftServerReply extends RaftRpcMessage { @Override public String getRequestorId() { if (isAppendEntries()) { - return appendEntries.getServerReply().getRequestorId(); + return appendEntries.getServerReply().getRequestorId().toStringUtf8(); } else if (isRequestVote()) { - return requestVote.getServerReply().getRequestorId(); + return requestVote.getServerReply().getRequestorId().toStringUtf8(); } else { - return installSnapshot.getServerReply().getRequestorId(); + return installSnapshot.getServerReply().getRequestorId().toStringUtf8(); } } @Override public String getReplierId() { if (isAppendEntries()) { - return appendEntries.getServerReply().getReplyId(); + return appendEntries.getServerReply().getReplyId().toStringUtf8(); } else if (isRequestVote()) { - return requestVote.getServerReply().getReplyId(); + return requestVote.getServerReply().getReplyId().toStringUtf8(); } else { - return installSnapshot.getServerReply().getReplyId(); + return installSnapshot.getServerReply().getReplyId().toStringUtf8(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java index fd73dff..366df84 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java @@ -22,7 +22,7 @@ import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; -class RaftServerRequest extends RaftRpcMessage { +class RaftServerRequest implements RaftRpcMessage { private final AppendEntriesRequestProto appendEntries; private final RequestVoteRequestProto requestVote; private final InstallSnapshotRequestProto installSnapshot; @@ -77,22 +77,22 @@ class RaftServerRequest extends RaftRpcMessage { @Override public String getRequestorId() { if (isAppendEntries()) { - return appendEntries.getServerRequest().getRequestorId(); + return appendEntries.getServerRequest().getRequestorId().toStringUtf8(); } else if (isRequestVote()) { - return requestVote.getServerRequest().getRequestorId(); + return requestVote.getServerRequest().getRequestorId().toStringUtf8(); } else { - return installSnapshot.getServerRequest().getRequestorId(); + return installSnapshot.getServerRequest().getRequestorId().toStringUtf8(); } } @Override public String getReplierId() { if (isAppendEntries()) { - return appendEntries.getServerRequest().getReplyId(); + return appendEntries.getServerRequest().getReplyId().toStringUtf8(); } else if (isRequestVote()) { - return requestVote.getServerRequest().getReplyId(); + return requestVote.getServerRequest().getReplyId().toStringUtf8(); } else { - return installSnapshot.getServerRequest().getReplyId(); + return installSnapshot.getServerRequest().getReplyId().toStringUtf8(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java index 559c1e6..f11c626 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java @@ -27,6 +27,7 @@ import org.apache.ratis.util.RaftUtils; import org.apache.ratis.util.Timestamp; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -110,7 +111,7 @@ public class SimulatedRequestReply<REQUEST extends RaftRpcMessage, SimulatedRequestReply(Collection<RaftPeer> allPeers, int simulateLatencyMs) { queues = new ConcurrentHashMap<>(); for (RaftPeer peer : allPeers) { - queues.put(peer.getId(), new EventQueue<>()); + queues.put(peer.getId().toString(), new EventQueue<>()); } this.simulateLatencyMs = simulateLatencyMs; @@ -182,7 +183,7 @@ public class SimulatedRequestReply<REQUEST extends RaftRpcMessage, public void addPeers(Collection<RaftPeer> newPeers) { for (RaftPeer peer : newPeers) { - queues.put(peer.getId(), new EventQueue<>()); + queues.put(peer.getId().toString(), new EventQueue<>()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java index d40cf44..09d8493 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -54,9 +54,9 @@ class SimulatedServerRpc implements RaftServerRpc { SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) { this.server = server; - this.serverHandler = new RequestHandler<>(server.getId(), + this.serverHandler = new RequestHandler<>(server.getId().toString(), "serverHandler", serverRequestReply, serverHandlerImpl, 3); - this.clientHandler = new RequestHandler<>(server.getId(), + this.clientHandler = new RequestHandler<>(server.getId().toString(), "clientHandler", clientRequestReply, clientHandlerImpl, 3); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index ce595c5..08f671f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -33,6 +33,7 @@ import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.ConfigurationManager; import org.apache.ratis.server.impl.RaftServerConstants; @@ -51,7 +52,7 @@ public class TestSegmentedRaftLog { RaftUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG); } - private static final String peerId = "s0"; + private static final RaftPeerId peerId = new RaftPeerId("s0"); private static class SegmentRange { final long start; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index 6854b42..5dea3b7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -23,7 +23,6 @@ import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import org.apache.log4j.Level; @@ -32,7 +31,9 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerTestUtil; @@ -115,9 +116,9 @@ public abstract class RaftSnapshotBaseTest { @Test public void testRestartPeer() throws Exception { RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); + final RaftPeerId leaderId = cluster.getLeader().getId(); int i = 0; - try(final RaftClient client = cluster.createClient("client", leaderId)) { + try(final RaftClient client = cluster.createClient(leaderId)) { for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { RaftClientReply reply = client.send(new SimpleMessage("m" + i)); Assert.assertTrue(reply.isSuccess()); @@ -151,13 +152,13 @@ public abstract class RaftSnapshotBaseTest { */ @Test public void testBasicInstallSnapshot() throws Exception { - List<LogPathAndIndex> logs = new ArrayList<>(); + List<LogPathAndIndex> logs; try { RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); + final RaftPeerId leaderId = cluster.getLeader().getId(); int i = 0; - try(final RaftClient client = cluster.createClient("client", leaderId)) { + try(final RaftClient client = cluster.createClient(leaderId)) { for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { RaftClientReply reply = client.send(new SimpleMessage("m" + i)); Assert.assertTrue(reply.isSuccess()); @@ -192,8 +193,7 @@ public abstract class RaftSnapshotBaseTest { assertLeaderContent(cluster); // generate some more traffic - try(final RaftClient client = cluster.createClient("client", - cluster.getLeader().getId())) { + try(final RaftClient client = cluster.createClient(cluster.getLeader().getId())) { Assert.assertTrue(client.send(new SimpleMessage("test")).isSuccess()); } @@ -201,7 +201,7 @@ public abstract class RaftSnapshotBaseTest { MiniRaftCluster.PeerChanges change = cluster.addNewPeers( new String[]{"s3", "s4"}, true); // trigger setConfiguration - SetConfigurationRequest request = new SetConfigurationRequest("client", + SetConfigurationRequest request = new SetConfigurationRequest(ClientId.createId(), cluster.getLeader().getId(), DEFAULT_SEQNUM, change.allPeersInNewConf); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 952c40b..74d10ba 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -31,9 +31,9 @@ import org.apache.ratis.io.MD5Hash; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.LogInputStream; @@ -96,7 +96,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } @Override - public synchronized void initialize(String id, RaftProperties properties, + public synchronized void initialize(RaftPeerId id, RaftProperties properties, RaftStorage raftStorage) throws IOException { LOG.info("Initializing " + getClass().getSimpleName() + ":" + id); lifeCycle.startAndTransition(() -> { @@ -119,7 +119,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } @Override - public synchronized void reinitialize(String id, RaftProperties properties, + public synchronized void reinitialize(RaftPeerId id, RaftProperties properties, RaftStorage storage) throws IOException { LOG.info("Reinitializing " + getClass().getSimpleName() + ":" + id); initialize(id, properties, storage); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/16eb8cc6/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java index bac2c38..4e3391e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java @@ -41,7 +41,6 @@ import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.util.RaftUtils; @@ -163,7 +162,7 @@ public class TestStateMachine { int numTrx = 100; final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx); - try(final RaftClient client = cluster.createClient("client", null)) { + try(final RaftClient client = cluster.createClient(null)) { for (RaftTestUtil.SimpleMessage message : messages) { client.send(message); }
