Repository: incubator-ratis Updated Branches: refs/heads/master 89e64f212 -> fb702de7c
RATIS-96. LeaderState computeLastCommitted may throw ArrayIndexOutOfBoundsException. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/fb702de7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/fb702de7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/fb702de7 Branch: refs/heads/master Commit: fb702de7cbccbaf0a2aa9dd1badb04f74194d73b Parents: 89e64f2 Author: Jing Zhao <[email protected]> Authored: Mon Jul 31 23:33:03 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Mon Jul 31 23:33:03 2017 -0700 ---------------------------------------------------------------------- .../ratis/client/impl/RaftClientImpl.java | 4 +- .../org/apache/ratis/protocol/RaftGroup.java | 2 +- .../org/apache/ratis/util/CollectionUtils.java | 6 +- .../java/org/apache/ratis/util/JavaUtils.java | 44 +++++++ .../java/org/apache/ratis/util/LifeCycle.java | 13 +- .../ratis/grpc/client/AppendStreamer.java | 3 + .../apache/ratis/server/impl/LeaderState.java | 23 +++- .../ratis/server/impl/RaftServerImpl.java | 25 ++-- .../apache/ratis/server/impl/ServerState.java | 8 +- .../ratis/server/storage/RaftStorage.java | 15 ++- .../java/org/apache/ratis/MiniRaftCluster.java | 53 ++++++-- .../java/org/apache/ratis/RaftTestUtil.java | 14 ++- .../server/impl/ReinitializationBaseTest.java | 125 ++++++++++++++++++- 13 files changed, 282 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 1082f38..c7ad935 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -113,10 +113,8 @@ final class RaftClientImpl implements RaftClient { throws InterruptedIOException, StateMachineException { for(;;) { final RaftClientRequest request = supplier.get(); - LOG.debug("{}: {}", clientId, request); final RaftClientReply reply = sendRequest(request); if (reply != null) { - LOG.debug("{}: {}", clientId, reply); return reply; } @@ -133,6 +131,7 @@ final class RaftClientImpl implements RaftClient { private RaftClientReply sendRequest(RaftClientRequest request) throws StateMachineException { + LOG.debug("{}: {}", clientId, request); RaftClientReply reply = null; try { reply = clientRpc.sendRequest(request); @@ -140,6 +139,7 @@ final class RaftClientImpl implements RaftClient { handleIOException(request, ioe, null); } if (reply != null) { + LOG.debug("{}: {}", clientId, reply); if (reply.isNotLeader()) { handleNotLeaderException(request, reply.getNotLeaderException()); return null; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java index 3ec0fdc..6e870b1 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java @@ -50,6 +50,6 @@ public class RaftGroup { @Override public String toString() { - return groupId + ":" + Arrays.asList(peers); + return groupId + ":" + peers; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java index d8eb674..5ea8d4f 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java @@ -59,11 +59,7 @@ public interface CollectionUtils { .filter(e -> !given.equals(e)) .collect(Collectors.toList()); final int size = list.size(); - if (size == 0) { - throw new IllegalArgumentException( - "All elements in the iteration equals to the given element."); - } - return list.get(ThreadLocalRandom.current().nextInt(size)); + return size == 0? null: list.get(ThreadLocalRandom.current().nextInt(size)); } static <INPUT, OUTPUT> Iterable<OUTPUT> as( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index 5da2012..0664aec 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -23,9 +23,11 @@ package org.apache.ratis.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import java.util.function.Supplier; /** * General Java utility methods. @@ -60,4 +62,46 @@ public interface JavaUtils { } consumer.accept(t); } + + /** + * Create a memoized supplier which gets a value by invoking the initializer once + * and then keeps returning the same value as its supplied results. + * + * @param initializer to supply at most one non-null value. + * @param <T> The supplier result type. + * @return a memoized supplier which is thread-safe. + */ + static <T> Supplier<T> memoize(Supplier<T> initializer) { + Objects.requireNonNull(initializer, "initializer == null"); + return new Supplier<T>() { + private volatile T value = null; + + @Override + public T get() { + T v = value; + if (v == null) { + synchronized (this) { + v = value; + if (v == null) { + v = value = Objects.requireNonNull(initializer.get(), + "initializer.get() returns null"); + } + } + } + return v; + } + }; + } + + Supplier<ThreadGroup> ROOT_THREAD_GROUP = memoize(() -> { + for (ThreadGroup g = Thread.currentThread().getThreadGroup(), p;; g = p) { + if ((p = g.getParent()) == null) { + return g; + } + } + }); + + static ThreadGroup getRootThreadGroup() { + return ROOT_THREAD_GROUP.get(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java index 5246aba..c6ace9e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java @@ -23,6 +23,7 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,11 +153,17 @@ public class LifeCycle { /** Assert if the current state equals to one of the expected states. */ public void assertCurrentState(State... expected) { + assertCurrentState((n, c) -> new IllegalStateException("STATE MISMATCHED: In " + + n + ", current state " + c + " is not one of the expected states " + + Arrays.toString(expected)), expected); + } + + /** Assert if the current state equals to one of the expected states. */ + public <T extends Throwable> void assertCurrentState( + BiFunction<String, State, T> newThrowable, State... expected) throws T { final State c = getCurrentState(); if (!c.isOneOf(expected)) { - throw new IllegalStateException("STATE MISMATCHED: In " + name - + ", current state " + c + " is not one of the expected states " - + Arrays.toString(expected)); + throw newThrowable.apply(name, c); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index 5d01235..e7d2cd0 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -118,6 +118,9 @@ public class AppendStreamer implements Closeable { leaderId = peers.keySet().iterator().next(); } else { leaderId = CollectionUtils.random(oldLeader, peers.keySet()); + if (leaderId == null) { + leaderId = oldLeader; + } } } LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index e9784bb..0415aab 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -397,8 +397,14 @@ public class LeaderState { private void updateLastCommitted() { final RaftPeerId selfId = server.getId(); final RaftConfiguration conf = server.getRaftConf(); - long majorityInNewConf = computeLastCommitted(voterLists.get(0), - conf.containsInConf(selfId)); + + final List<FollowerInfo> followers = voterLists.get(0); + final boolean includeSelf = conf.containsInConf(selfId); + if (followers.isEmpty() && !includeSelf) { + return; + } + + final long majorityInNewConf = computeLastCommitted(followers, includeSelf); final long oldLastCommitted = raftLog.getLastCommittedIndex(); final TermIndex[] entriesToCommit; if (!conf.isTransitional()) { @@ -409,8 +415,13 @@ public class LeaderState { Math.max(majorityInNewConf, oldLastCommitted) + 1); server.getState().updateStatemachine(majorityInNewConf, currentTerm); } else { // configuration is in transitional state - long majorityInOldConf = computeLastCommitted(voterLists.get(1), - conf.containsInOldConf(selfId)); + final List<FollowerInfo> oldFollowers = voterLists.get(1); + final boolean includeSelfInOldConf = conf.containsInOldConf(selfId); + if (oldFollowers.isEmpty() && !includeSelfInOldConf) { + return; + } + + final long majorityInOldConf = computeLastCommitted(oldFollowers, includeSelfInOldConf); final long majority = Math.min(majorityInNewConf, majorityInOldConf); entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, Math.max(majority, oldLastCommitted) + 1); @@ -477,6 +488,10 @@ public class LeaderState { private long computeLastCommitted(List<FollowerInfo> followers, boolean includeSelf) { final int length = includeSelf ? followers.size() + 1 : followers.size(); + if (length == 0) { + throw new IllegalArgumentException("followers.size() == " + + followers.size() + " and includeSelf == " + includeSelf); + } final long[] indices = new long[length]; for (int i = 0; i < followers.size(); i++) { indices[i] = followers.get(i).getMatchIndex(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 0f8e692..1a73cba 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -25,7 +25,6 @@ import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.FileInfo; import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.ratis.shaded.com.google.common.base.Supplier; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; @@ -41,6 +40,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.apache.ratis.server.impl.ServerProtoUtils.toRaftConfiguration; import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; @@ -124,7 +124,7 @@ public class RaftServerImpl implements RaftServerProtocol, maxTimeoutMs - minTimeoutMs + 1); } - RaftGroupId getGroupId() { + public RaftGroupId getGroupId() { return groupId; } @@ -318,7 +318,8 @@ public class RaftServerImpl implements RaftServerProtocol, @Override public String toString() { - return role + " " + state + " " + lifeCycle.getCurrentState(); + return String.format("%8s ", role) + groupId + " " + state + + " " + lifeCycle.getCurrentState(); } /** @@ -367,14 +368,20 @@ public class RaftServerImpl implements RaftServerProtocol, peers.toArray(new RaftPeer[peers.size()])); } + private void assertLifeCycleState(LifeCycle.State... expected) throws IOException { + lifeCycle.assertCurrentState((n, c) -> new IOException("Server " + n + + " is not " + Arrays.asList(expected) + ": current state is " + c), + expected); + } + /** * Handle a normal update request from client. */ private CompletableFuture<RaftClientReply> appendTransaction( RaftClientRequest request, TransactionContext context, - RetryCache.CacheEntry cacheEntry) throws RaftException { + RetryCache.CacheEntry cacheEntry) throws IOException { LOG.debug("{}: receive client request({})", getId(), request); - lifeCycle.assertCurrentState(RUNNING); + assertLifeCycleState(RUNNING); CompletableFuture<RaftClientReply> reply; final PendingRequest pending; @@ -486,7 +493,7 @@ public class RaftServerImpl implements RaftServerProtocol, public CompletableFuture<RaftClientReply> setConfigurationAsync( SetConfigurationRequest request) throws IOException { LOG.debug("{}: receive setConfiguration({})", getId(), request); - lifeCycle.assertCurrentState(RUNNING); + assertLifeCycleState(RUNNING); CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null); if (reply != null) { return reply; @@ -560,7 +567,7 @@ public class RaftServerImpl implements RaftServerProtocol, candidateId, candidateTerm, candidateLastEntry); LOG.debug("{}: receive requestVote({}, {}, {})", getId(), candidateId, candidateTerm, candidateLastEntry); - lifeCycle.assertCurrentState(RUNNING); + assertLifeCycleState(RUNNING); boolean voteGranted = false; boolean shouldShutdown = false; @@ -661,7 +668,7 @@ public class RaftServerImpl implements RaftServerProtocol, + leaderTerm + ", " + previous + ", " + leaderCommit + ", " + initializing + ServerProtoUtils.toString(entries)); - lifeCycle.assertCurrentState(STARTING, RUNNING); + assertLifeCycleState(STARTING, RUNNING); try { validateEntries(leaderTerm, previous, entries); @@ -760,7 +767,7 @@ public class RaftServerImpl implements RaftServerProtocol, leaderId, request); LOG.debug("{}: receive installSnapshot({})", getId(), request); - lifeCycle.assertCurrentState(STARTING, RUNNING); + assertLifeCycleState(STARTING, RUNNING); final long currentTerm; final long leaderTerm = request.getLeaderTerm(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 53363cd..5ab1517 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -30,7 +30,6 @@ import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; -import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; import java.io.Closeable; @@ -85,7 +84,7 @@ public class ServerState implements Closeable { RaftConfiguration initialConf = RaftConfiguration.newBuilder() .setConf(group.getPeers()).build(); configurationManager = new ConfigurationManager(initialConf); - storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR); + storage = new RaftStorage(prop, group.getGroupId(), RaftServerConstants.StartupOption.REGULAR); snapshotManager = new SnapshotManager(storage, id); long lastApplied = initStatemachine(stateMachine, prop); @@ -231,10 +230,7 @@ public class ServerState implements Closeable { // leader and term later return true; } - Preconditions.assertTrue(this.leaderId.equals(leaderId), - "selfId:%s, this.leaderId:%s, received leaderId:%s", - selfId, this.leaderId, leaderId); - return true; + return this.leaderId.equals(leaderId); } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java index db8a196..0b38a31 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.storage; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState; @@ -44,9 +45,17 @@ public class RaftStorage implements Closeable { public RaftStorage(RaftProperties prop, RaftServerConstants.StartupOption option) throws IOException { - final String dir = RaftServerConfigKeys.storageDir(prop); - storageDir = new RaftStorageDirectory( - new File(FileUtils.stringAsURI(dir).getPath())); + this(prop, null, option); + } + + public RaftStorage(RaftProperties prop, RaftGroupId groupId, RaftServerConstants.StartupOption option) + throws IOException { + final String dirStr = RaftServerConfigKeys.storageDir(prop); + File dir = new File(FileUtils.stringAsURI(dirStr).getPath()); + if (groupId != null) { + dir = new File(dir, groupId.toString()); + } + storageDir = new RaftStorageDirectory(dir); if (option == RaftServerConstants.StartupOption.FORMAT) { if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) { throw new IOException("Cannot format " + storageDir); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 577ae16..becbd0d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -100,11 +100,11 @@ public abstract class MiniRaftCluster { } public static RaftGroup initRaftGroup(Collection<String> ids) { - List<RaftPeer> peers = ids.stream() + final RaftPeer[] peers = ids.stream() .map(RaftPeerId::valueOf) .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())) - .collect(Collectors.toList()); - return new RaftGroup(RaftGroupId.createId(), peers.toArray(new RaftPeer[peers.size()])); + .toArray(RaftPeer[]::new); + return new RaftGroup(RaftGroupId.createId(), peers); } private static String getBaseDirectory() { @@ -227,7 +227,7 @@ public abstract class MiniRaftCluster { return ReflectionUtils.newInstance(smClass); } - public static Collection<RaftPeer> toRaftPeers( + public static List<RaftPeer> toRaftPeers( Collection<RaftServerProxy> servers) { return servers.stream() .map(MiniRaftCluster::toRaftPeer) @@ -308,11 +308,28 @@ public abstract class MiniRaftCluster { } public String printServers() { - StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n"); - for (RaftServer s : servers.values()) { - b.append(" "); - b.append(s).append("\n"); + return printServers(null); + } + + public String printServers(RaftGroupId groupId) { + final StringBuilder b = new StringBuilder("printing "); + if (groupId != null) { + b.append(groupId); + } else { + b.append("ALL groups"); } + getServers().stream().filter( + s -> { + if (groupId == null) { + return true; + } + try { + return groupId.equals(s.getImpl().getGroupId()); + } catch (IOException e) { + return false; + } + }) + .forEach(s -> b.append("\n ").append(s)); return b.toString(); } @@ -332,10 +349,20 @@ public abstract class MiniRaftCluster { } public RaftServerImpl getLeader() { + return getLeader((RaftGroupId)null); + } + + public RaftServerImpl getLeader(RaftGroupId groupId) { + Stream<RaftServerImpl> stream = getServerAliveStream(); + if (groupId != null) { + stream = stream.filter(s -> groupId.equals(s.getGroupId())); + } + return getLeader(stream); + } + + static RaftServerImpl getLeader(Stream<RaftServerImpl> serverAliveStream) { final List<RaftServerImpl> leaders = new ArrayList<>(); - getServerAliveStream() - .filter(RaftServerImpl::isLeader) - .forEach(s -> { + serverAliveStream.filter(RaftServerImpl::isLeader).forEach(s -> { if (leaders.isEmpty()) { leaders.add(s); } else { @@ -352,7 +379,7 @@ public abstract class MiniRaftCluster { if (leaders.isEmpty()) { return null; } else if (leaders.size() > 1) { - throw new IllegalStateException(printServers() + leaders + throw new IllegalStateException(leaders + ", leaders.size() = " + leaders.size() + " > 1"); } return leaders.get(0); @@ -388,7 +415,7 @@ public abstract class MiniRaftCluster { return servers.get(id); } - public Collection<RaftPeer> getPeers() { + public List<RaftPeer> getPeers() { return toRaftPeers(getServers()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 9086289..ec39073 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -18,6 +18,7 @@ package org.apache.ratis; import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; @@ -64,22 +65,27 @@ public class RaftTestUtil { } public static RaftServerImpl waitForLeader( - MiniRaftCluster cluster, boolean tolerateMultipleLeaders) + MiniRaftCluster cluster, boolean tolerateMultipleLeaders) throws InterruptedException { + return waitForLeader(cluster, tolerateMultipleLeaders, null); + } + + public static RaftServerImpl waitForLeader( + MiniRaftCluster cluster, boolean tolerateMultipleLeaders, RaftGroupId groupId) throws InterruptedException { final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1; - LOG.info(cluster.printServers()); + LOG.info(cluster.printServers(groupId)); RaftServerImpl leader = null; for(int i = 0; leader == null && i < 10; i++) { Thread.sleep(sleepTime); try { - leader = cluster.getLeader(); + leader = cluster.getLeader(groupId); } catch(IllegalStateException e) { if (!tolerateMultipleLeaders) { throw e; } } } - LOG.info(cluster.printServers()); + LOG.info(cluster.printServers(groupId)); return leader; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java index 22e0564..20b8680 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java @@ -21,11 +21,13 @@ import org.apache.log4j.Level; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.ConfUtils; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.junit.Assert; import org.junit.Test; @@ -34,14 +36,18 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public abstract class ReinitializationBaseTest { static { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.TRACE); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + LogUtils.setLogLevel(ConfUtils.LOG, Level.OFF); } static final Logger LOG = LoggerFactory.getLogger(ReinitializationBaseTest.class); @@ -59,7 +65,8 @@ public abstract class ReinitializationBaseTest { LOG.info("Start testReinitialize" + cluster.printServers()); // Start server with an empty conf - RaftGroup group = new RaftGroup(RaftGroupId.createId(), new RaftPeer[0]); + final RaftGroupId groupId = RaftGroupId.createId(); + final RaftGroup group = new RaftGroup(groupId, RaftPeer.EMPTY_PEERS); final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0)) .map(RaftPeerId::valueOf).collect(Collectors.toList()); @@ -77,9 +84,121 @@ public abstract class ReinitializationBaseTest { final RaftPeer[] peers = cluster.getPeers().toArray(RaftPeer.EMPTY_PEERS); for(RaftPeer p : peers) { final RaftClient client = cluster.createClient(p.getId(), - new RaftGroup(RaftGroupId.createId(), new RaftPeer[]{p})); + new RaftGroup(groupId, new RaftPeer[]{p})); client.reinitialize(peers, p.getId()); } Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + cluster.shutdown(); + } + + @Test + public void testReinitialize5Nodes() throws Exception { + final int[] idIndex = {3, 4, 5}; + runTestReinitializeMultiGroups(idIndex, 0); + } + + @Test + public void testReinitialize7Nodes() throws Exception { + final int[] idIndex = {1, 6, 7}; + runTestReinitializeMultiGroups(idIndex, 1); + } + + @Test + public void testReinitialize9Nodes() throws Exception { + final int[] idIndex = {5, 8, 9}; + runTestReinitializeMultiGroups(idIndex, 0); + } + + private void runTestReinitializeMultiGroups(int[] idIndex, int chosen) throws Exception { + printThreadCount(null, "init"); + final MiniRaftCluster cluster = getCluster(0); + + if (chosen < 0) { + chosen = ThreadLocalRandom.current().nextInt(idIndex.length); + } + final String type = cluster.getClass().getSimpleName() + + Arrays.toString(idIndex) + "chosen=" + chosen; + LOG.info("\n\nrunTestReinitializeMultiGroups with " + type + ": " + cluster.printServers()); + + // Start server with an empty conf + final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.createId(), RaftPeer.EMPTY_PEERS); + + final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0)) + .map(RaftPeerId::valueOf).collect(Collectors.toList()); + ids.forEach(id -> cluster.putNewServer(id, emptyGroup, true)); + LOG.info("putNewServer: " + cluster.printServers()); + + cluster.start(); + LOG.info("start: " + cluster.printServers()); + + // Make sure that there are no leaders. + TimeUnit.SECONDS.sleep(1); + Assert.assertNull(cluster.getLeader()); + + // Reinitialize servers to three groups + final List<RaftPeer> allPeers = cluster.getPeers(); + Collections.sort(allPeers, Comparator.comparing(p -> p.getId().toString())); + final RaftGroup[] groups = new RaftGroup[idIndex.length]; + for (int i = 0; i < idIndex.length; i++) { + final RaftGroupId gid = RaftGroupId.createId(); + final int previous = i == 0 ? 0 : idIndex[i - 1]; + final RaftPeer[] peers = allPeers.subList(previous, idIndex[i]).toArray(RaftPeer.EMPTY_PEERS); + groups[i] = new RaftGroup(gid, peers); + + LOG.info(i + ") starting " + groups[i]); + for(RaftPeer p : peers) { + try(final RaftClient client = cluster.createClient(p.getId(), groups[i])) { + client.reinitialize(peers, p.getId()); + } + } + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid)); + } + printThreadCount(type, "start groups"); + LOG.info("start groups: " + cluster.printServers()); + + // randomly close two of the groups (i.e. reinitialize to empty peers) + LOG.info("chosen = " + chosen + ", " + groups[chosen]); + + for (int i = 0; i < groups.length; i++) { + if (i != chosen) { + final RaftGroup g = groups[i]; + LOG.info(i + ") close " + cluster.printServers(g.getGroupId())); + for(RaftPeer p : g.getPeers()) { + try (final RaftClient client = cluster.createClient(p.getId(), g)) { + client.reinitialize(RaftPeer.EMPTY_PEERS, p.getId()); + } + } + } + } + printThreadCount(type, "close groups"); + LOG.info("close groups: " + cluster.printServers()); + + // update chosen group to use all the peers + final RaftPeer[] array = allPeers.toArray(RaftPeer.EMPTY_PEERS); + for(int i = 0; i < groups.length; i++) { + LOG.info(i + ") update " + cluster.printServers(groups[i].getGroupId())); + if (i == chosen) { + try (final RaftClient client = cluster.createClient(null, groups[i])) { + client.setConfiguration(array); + } + } else { + for(RaftPeer p : groups[i].getPeers()) { + try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) { + client.reinitialize(array, p.getId()); + } + } + } + } + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + LOG.info("update groups: " + cluster.printServers()); + printThreadCount(type, "update groups"); + + cluster.shutdown(); + printThreadCount(type, "shutdown"); + } + + static void printThreadCount(String type, String label) { + System.out.println("| " + type + " | " + label + " | " + + JavaUtils.getRootThreadGroup().activeCount() + " |"); } }
