Repository: incubator-ratis Updated Branches: refs/heads/master c2032801a -> 9ce1783d3
RATIS-100. Fix bugs for running multiple raft groups with a state machine. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/4ed8f727 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/4ed8f727 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/4ed8f727 Branch: refs/heads/master Commit: 4ed8f72716a931835a28dbd62ec5898a97d99450 Parents: c203280 Author: Jing Zhao <[email protected]> Authored: Mon Sep 4 14:54:15 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Mon Sep 4 14:54:15 2017 -0700 ---------------------------------------------------------------------- .../ratis/protocol/GroupMismatchException.java | 28 ++++++++ .../org/apache/ratis/TestMultiRaftGroup.java | 76 ++++++++++++++++++++ .../ratis/server/impl/PendingRequests.java | 7 +- .../ratis/server/impl/RaftServerImpl.java | 12 ++++ .../ratis/server/impl/StateMachineUpdater.java | 4 ++ .../java/org/apache/ratis/MiniRaftCluster.java | 4 ++ .../java/org/apache/ratis/RaftBasicTests.java | 4 +- .../server/impl/ReinitializationBaseTest.java | 36 +++++++--- .../ratis/statemachine/TestStateMachine.java | 2 +- 9 files changed, 157 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4ed8f727/ratis-common/src/main/java/org/apache/ratis/protocol/GroupMismatchException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupMismatchException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupMismatchException.java new file mode 100644 index 0000000..af60825 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupMismatchException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +/** + * This exception indicates that the group id in the request does not match + * server's group id. + */ +public class GroupMismatchException extends RaftException { + public GroupMismatchException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4ed8f727/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java new file mode 100644 index 0000000..8d3966e --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + + +import org.apache.log4j.Level; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.examples.RaftExamplesTestUtil; +import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine; +import org.apache.ratis.examples.arithmetic.TestArithmetic; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ReinitializationBaseTest; +import org.apache.ratis.util.CheckedBiConsumer; +import org.apache.ratis.util.LogUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +@RunWith(Parameterized.class) +public class TestMultiRaftGroup extends BaseTest { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.TRACE); + } + + @Parameterized.Parameters + public static Collection<Object[]> data() throws IOException { + return RaftExamplesTestUtil.getMiniRaftClusters(ArithmeticStateMachine.class, 0); + } + + @Parameterized.Parameter + public MiniRaftCluster cluster; + + @Test + public void testMultiRaftGroup() throws Exception { + runTestMultiRaftGroup(3, 6, 9, 12, 15); + } + + private void runTestMultiRaftGroup(int... idIndex) throws Exception { + runTestMultiRaftGroup(idIndex, -1); + } + + private final AtomicInteger start = new AtomicInteger(3); + private final int count = 10; + + private void runTestMultiRaftGroup(int[] idIndex, int chosen) throws Exception { + + final CheckedBiConsumer<MiniRaftCluster, RaftGroup, IOException> checker = (cluster, group) -> { + try (final RaftClient client = cluster.createClient(group)) { + TestArithmetic.runTestPythagorean(client, start.getAndAdd(2*count), count); + } + }; + + ReinitializationBaseTest.runTestReinitializeMultiGroups( + cluster, idIndex, chosen, checker); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4ed8f727/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index 98bc0a7..b7b8a9e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -24,7 +24,6 @@ import org.slf4j.Logger; import java.io.IOException; import java.util.Collection; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -45,7 +44,10 @@ class PendingRequests { TransactionContext entry) { // externally synced for now Preconditions.assertTrue(!request.isReadOnly()); - Preconditions.assertTrue(last == null || index == last.getIndex() + 1); + if (last != null && !(last.getRequest() instanceof SetConfigurationRequest)) { + Preconditions.assertTrue(index == last.getIndex() + 1, + () -> "index = " + index + " != last.getIndex() + 1, last=" + last); + } return add(index, request, entry); } @@ -60,6 +62,7 @@ class PendingRequests { PendingRequest addConfRequest(SetConfigurationRequest request) { Preconditions.assertTrue(pendingSetConf == null); pendingSetConf = new PendingRequest(request); + last = pendingSetConf; return pendingSetConf; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4ed8f727/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 14acfb4..4463914 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -398,6 +398,14 @@ public class RaftServerImpl implements RaftServerProtocol, expected); } + void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException { + if (!groupId.equals(requestorGroupId)) { + throw new GroupMismatchException(getId() + + ": The group (" + requestorGroupId + ") of requestor " + requestorId + + " does not match the group (" + groupId + ") of the server " + getId()); + } + } + /** * Handle a normal update request from client. */ @@ -595,6 +603,7 @@ public class RaftServerImpl implements RaftServerProtocol, LOG.debug("{}: receive requestVote({}, {}, {}, {})", getId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry); assertLifeCycleState(RUNNING); + assertGroup(candidateId, candidateGroupId); boolean voteGranted = false; boolean shouldShutdown = false; @@ -699,6 +708,7 @@ public class RaftServerImpl implements RaftServerProtocol, + initializing + ServerProtoUtils.toString(entries)); assertLifeCycleState(STARTING, RUNNING); + assertGroup(leaderId, leaderGroupId); try { validateEntries(leaderTerm, previous, entries); @@ -792,11 +802,13 @@ public class RaftServerImpl implements RaftServerProtocol, InstallSnapshotRequestProto request) throws IOException { final RaftRpcRequestProto r = request.getServerRequest(); final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId()); + final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId()); CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request); LOG.debug("{}: receive installSnapshot({})", getId(), request); assertLifeCycleState(STARTING, RUNNING); + assertGroup(leaderId, leaderGroupId); final long currentTerm; final long leaderTerm = request.getLeaderTerm(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4ed8f727/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 7ed6731..9ef6ce7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -86,6 +86,10 @@ class StateMachineUpdater implements Runnable { state = State.STOP; updater.interrupt(); try { + updater.join(); + } catch (InterruptedException ignored) { + } + try { stateMachine.close(); } catch (IOException ignored) { } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4ed8f727/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index b527a58..79cb9bb 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -422,6 +422,10 @@ public abstract class MiniRaftCluster { return createClient(null, group); } + public RaftClient createClient(RaftGroup g) { + return createClient(null, g); + } + public RaftClient createClient(RaftPeerId leaderId) { return createClient(leaderId, group); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4ed8f727/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index b227e47..7e08809 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -80,7 +80,7 @@ public abstract class RaftBasicTests extends BaseTest { LOG.info(cluster.printServers()); final SimpleMessage[] messages = SimpleMessage.create(10); - try(final RaftClient client = cluster.createClient(null)) { + try(final RaftClient client = cluster.createClient()) { for (SimpleMessage message : messages) { client.send(message); } @@ -149,7 +149,7 @@ public abstract class RaftBasicTests extends BaseTest { final List<Client4TestWithLoad> clients = Stream.iterate(0, i -> i+1).limit(numClients) - .map(i -> cluster.createClient(null)) + .map(i -> cluster.createClient()) .map(c -> new Client4TestWithLoad(c, numMessages)) .collect(Collectors.toList()); clients.forEach(Thread::start); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4ed8f727/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java index 5bc8dbe..d9068d1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java @@ -27,10 +27,13 @@ import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.util.CheckedBiConsumer; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; @@ -42,7 +45,9 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public abstract class ReinitializationBaseTest extends BaseTest { - static { + static final Logger LOG = LoggerFactory.getLogger(ReinitializationBaseTest.class); + + { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } @@ -78,7 +83,7 @@ public abstract class ReinitializationBaseTest extends BaseTest { // Reinitialize servers final RaftGroup newGroup = new RaftGroup(groupId, cluster.getPeers()); - final RaftClient client = cluster.createClient(null, newGroup); + final RaftClient client = cluster.createClient(newGroup); for(RaftPeer p : newGroup.getPeers()) { client.reinitialize(newGroup, p.getId()); } @@ -106,8 +111,15 @@ public abstract class ReinitializationBaseTest extends BaseTest { private void runTestReinitializeMultiGroups(int[] idIndex, int chosen) throws Exception { printThreadCount(null, "init"); - final MiniRaftCluster cluster = getCluster(0); + runTestReinitializeMultiGroups(getCluster(0), idIndex, chosen, NOOP); + } + + static final CheckedBiConsumer<MiniRaftCluster, RaftGroup, RuntimeException> NOOP = (c, g) -> {}; + public static <T extends Throwable> void runTestReinitializeMultiGroups( + MiniRaftCluster cluster, int[] idIndex, int chosen, + CheckedBiConsumer<MiniRaftCluster, RaftGroup, T> checker) + throws IOException, InterruptedException, T { if (chosen < 0) { chosen = ThreadLocalRandom.current().nextInt(idIndex.length); } @@ -147,6 +159,7 @@ public abstract class ReinitializationBaseTest extends BaseTest { } } Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid)); + checker.accept(cluster, groups[i]); } printThreadCount(type, "start groups"); LOG.info("start groups: " + cluster.printServers()); @@ -171,22 +184,23 @@ public abstract class ReinitializationBaseTest extends BaseTest { // update chosen group to use all the peers final RaftGroup newGroup = new RaftGroup(groups[chosen].getGroupId()); - final RaftPeer[] array = allPeers.toArray(RaftPeer.EMPTY_PEERS); for(int i = 0; i < groups.length; i++) { - LOG.info(i + ") update " + cluster.printServers(groups[i].getGroupId())); - if (i == chosen) { - try (final RaftClient client = cluster.createClient(null, groups[i])) { - client.setConfiguration(array); - } - } else { - for(RaftPeer p : groups[i].getPeers()) { + if (i != chosen) { + LOG.info(i + ") reinitialize: " + cluster.printServers(groups[i].getGroupId())); + for (RaftPeer p : groups[i].getPeers()) { try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) { client.reinitialize(newGroup, p.getId()); } } } } + LOG.info(chosen + ") setConfiguration: " + cluster.printServers(groups[chosen].getGroupId())); + try (final RaftClient client = cluster.createClient(groups[chosen])) { + client.setConfiguration(allPeers.toArray(RaftPeer.EMPTY_PEERS)); + } + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + checker.accept(cluster, groups[chosen]); LOG.info("update groups: " + cluster.printServers()); printThreadCount(type, "update groups"); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4ed8f727/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java index f41e764..73ce69d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java @@ -156,7 +156,7 @@ public class TestStateMachine extends BaseTest { int numTrx = 100; final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx); - try(final RaftClient client = cluster.createClient(null)) { + try(final RaftClient client = cluster.createClient()) { for (RaftTestUtil.SimpleMessage message : messages) { client.send(message); }
