http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java new file mode 100644 index 0000000..910ec6e --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import static org.apache.ratis.util.ProtoUtils.toByteString; + +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collection; +import java.util.function.BooleanSupplier; +import java.util.function.IntSupplier; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.BlockRequestHandlingInjection; +import org.apache.ratis.server.impl.DelayLocalExecutionInjection; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.util.CheckedRunnable; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class RaftTestUtil { + static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class); + + public static RaftServerImpl waitForLeader(MiniRaftCluster cluster) + throws InterruptedException { + final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1; + LOG.info(cluster.printServers()); + RaftServerImpl leader = null; + for(int i = 0; leader == null && i < 10; i++) { + Thread.sleep(sleepTime); + leader = cluster.getLeader(); + } + LOG.info(cluster.printServers()); + return leader; + } + + public static RaftServerImpl waitForLeader(MiniRaftCluster cluster, + final String leaderId) throws InterruptedException { + LOG.info(cluster.printServers()); + for(int i = 0; !cluster.tryEnforceLeader(leaderId) && i < 10; i++) { + RaftServerImpl currLeader = cluster.getLeader(); + if (LOG.isDebugEnabled()) { + LOG.debug("try enforcing leader to " + leaderId + " but " + + (currLeader == null? "no leader for this round" + : "new leader is " + currLeader.getId())); + } + } + LOG.info(cluster.printServers()); + + final RaftServerImpl leader = cluster.getLeader(); + Assert.assertEquals(leaderId, leader.getId()); + return leader; + } + + public static String waitAndKillLeader(MiniRaftCluster cluster, + boolean expectLeader) throws InterruptedException { + final RaftServerImpl leader = waitForLeader(cluster); + if (!expectLeader) { + Assert.assertNull(leader); + } else { + Assert.assertNotNull(leader); + LOG.info("killing leader = " + leader); + cluster.killServer(leader.getId()); + } + return leader != null ? leader.getId() : null; + } + + public static boolean logEntriesContains(LogEntryProto[] entries, + SimpleMessage... expectedMessages) { + int idxEntries = 0; + int idxExpected = 0; + while (idxEntries < entries.length + && idxExpected < expectedMessages.length) { + if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(), + entries[idxEntries].getSmLogEntry().getData().toByteArray())) { + ++idxExpected; + } + ++idxEntries; + } + return idxExpected == expectedMessages.length; + } + + public static void assertLogEntries(Collection<RaftServerImpl> servers, + SimpleMessage... expectedMessages) { + final int size = servers.size(); + final long count = servers.stream() + .filter(RaftServerImpl::isAlive) + .map(s -> s.getState().getLog().getEntries(0, Long.MAX_VALUE)) + .filter(e -> logEntriesContains(e, expectedMessages)) + .count(); + if (2*count <= size) { + throw new AssertionError("Not in majority: size=" + size + + " but count=" + count); + } + } + + public static void assertLogEntries(LogEntryProto[] entries, long startIndex, + long expertedTerm, SimpleMessage... expectedMessages) { + Assert.assertEquals(expectedMessages.length, entries.length); + for(int i = 0; i < entries.length; i++) { + final LogEntryProto e = entries[i]; + Assert.assertEquals(expertedTerm, e.getTerm()); + Assert.assertEquals(startIndex + i, e.getIndex()); + Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(), + e.getSmLogEntry().getData().toByteArray()); + } + } + + public static class SimpleMessage implements Message { + public static SimpleMessage[] create(int numMessages) { + return create(numMessages, "m"); + } + + public static SimpleMessage[] create(int numMessages, String prefix) { + final SimpleMessage[] messages = new SimpleMessage[numMessages]; + for (int i = 0; i < messages.length; i++) { + messages[i] = new SimpleMessage(prefix + i); + } + return messages; + } + + final String messageId; + + public SimpleMessage(final String messageId) { + this.messageId = messageId; + } + + @Override + public String toString() { + return messageId; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj == null || !(obj instanceof SimpleMessage)) { + return false; + } else { + final SimpleMessage that = (SimpleMessage)obj; + return this.messageId.equals(that.messageId); + } + } + + @Override + public int hashCode() { + return messageId.hashCode(); + } + + @Override + public ByteString getContent() { + return toByteString(messageId.getBytes(Charset.forName("UTF-8"))); + } + } + + public static class SimpleOperation { + private final String op; + + public SimpleOperation(String op) { + Preconditions.checkArgument(op != null); + this.op = op; + } + + @Override + public String toString() { + return op; + } + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj instanceof SimpleOperation && + ((SimpleOperation) obj).op.equals(op)); + } + + @Override + public int hashCode() { + return op.hashCode(); + } + + public SMLogEntryProto getLogEntryContent() { + try { + return SMLogEntryProto.newBuilder() + .setData(toByteString(op.getBytes("UTF-8"))).build(); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + } + + public static File getTestDir(Class<?> caller) throws IOException { + File dir = new File(System.getProperty("test.build.data", "target/test/data") + + "/" + RandomStringUtils.randomAlphanumeric(10), + caller.getSimpleName()); + if (dir.exists() && !dir.isDirectory()) { + throw new IOException(dir + " already exists and is not a directory"); + } else if (!dir.exists() && !dir.mkdirs()) { + throw new IOException("Cannot create directory " + dir); + } + return dir; + } + + public static void block(BooleanSupplier isBlocked) throws InterruptedException { + for(; isBlocked.getAsBoolean(); ) { + Thread.sleep(RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); + } + } + + public static void delay(IntSupplier getDelayMs) throws InterruptedException { + final int t = getDelayMs.getAsInt(); + if (t > 0) { + Thread.sleep(t); + } + } + + public static <T extends Throwable> void attempt( + int n, long sleepMs, CheckedRunnable<T> runnable) + throws T, InterruptedException { + for(int i = 1; i <= n; i++) { + LOG.info("Attempt #" + i + "/" + n + ": sleep " + sleepMs + "ms"); + if (sleepMs > 0) { + Thread.sleep(sleepMs); + } + try { + runnable.run(); + return; + } catch (Throwable t) { + if (i == n) { + throw t; + } + LOG.warn("Attempt #" + i + "/" + n + ": Ignoring " + t + " and retry."); + } + } + } + + public static String changeLeader(MiniRaftCluster cluster, String oldLeader) + throws InterruptedException { + cluster.setBlockRequestsFrom(oldLeader, true); + String newLeader = oldLeader; + for(int i = 0; i < 10 && newLeader.equals(oldLeader); i++) { + newLeader = RaftTestUtil.waitForLeader(cluster).getId(); + } + cluster.setBlockRequestsFrom(oldLeader, false); + return newLeader; + } + + public static void blockQueueAndSetDelay(Collection<RaftServerImpl> servers, + DelayLocalExecutionInjection injection, String leaderId, int delayMs, + long maxTimeout) throws InterruptedException { + // block reqeusts sent to leader if delayMs > 0 + final boolean block = delayMs > 0; + LOG.debug("{} requests sent to leader {} and set {}ms delay for the others", + block? "Block": "Unblock", leaderId, delayMs); + if (block) { + BlockRequestHandlingInjection.getInstance().blockReplier(leaderId); + } else { + BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId); + } + + // delay RaftServerRequest for other servers + servers.stream().filter(s -> !s.getId().equals(leaderId)) + .forEach(s -> { + if (block) { + injection.setDelayMs(s.getId(), delayMs); + } else { + injection.removeDelay(s.getId()); + } + }); + + Thread.sleep(3 * maxTimeout); + } + + public static void setBlockRequestsFrom(String src, boolean block) { + if (block) { + BlockRequestHandlingInjection.getInstance().blockRequestor(src); + } else { + BlockRequestHandlingInjection.getInstance().unblockRequestor(src); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java new file mode 100644 index 0000000..a7f1b6d --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.CodeInjectionForTesting; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Inject code to block a server from handling incoming requests. */ +public class BlockRequestHandlingInjection implements CodeInjectionForTesting.Code { + private static final BlockRequestHandlingInjection INSTANCE = + new BlockRequestHandlingInjection(); + + static { + CodeInjectionForTesting.put(RaftServerImpl.REQUEST_VOTE, INSTANCE); + CodeInjectionForTesting.put(RaftServerImpl.APPEND_ENTRIES, INSTANCE); + CodeInjectionForTesting.put(RaftServerImpl.INSTALL_SNAPSHOT, INSTANCE); + } + + public static BlockRequestHandlingInjection getInstance() { + return INSTANCE; + } + + private final Map<String, Boolean> requestors = new ConcurrentHashMap<>(); + private final Map<String, Boolean> repliers = new ConcurrentHashMap<>(); + + private BlockRequestHandlingInjection() {} + + public void blockRequestor(String requestor) { + requestors.put(requestor, true); + } + + public void unblockRequestor(String requestor) { + requestors.remove(requestor); + } + + public void blockReplier(String replier) { + repliers.put(replier, true); + } + + public void unblockReplier(String replier) { + repliers.remove(replier); + } + + public void unblockAll() { + requestors.clear(); + repliers.clear(); + } + + @Override + public boolean execute(String localId, String remoteId, Object... args) { + if (shouldBlock(localId, remoteId)) { + try { + RaftTestUtil.block(() -> shouldBlock(localId, remoteId)); + return true; + } catch (InterruptedException e) { + LOG.debug("Interrupted while blocking request handling from " + remoteId + + " to " + localId); + } + } + return false; + } + + private boolean shouldBlock(String localId, String remoteId) { + return repliers.containsKey(localId) || requestors.containsKey(remoteId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java new file mode 100644 index 0000000..8de2474 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.util.CodeInjectionForTesting; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** Inject code to delay particular servers. */ +public class DelayLocalExecutionInjection implements CodeInjectionForTesting.Code { + private final Map<String, AtomicInteger> delays = new ConcurrentHashMap<>(); + + public DelayLocalExecutionInjection(String method) { + CodeInjectionForTesting.put(method, this); + } + + public void clear() { + delays.clear(); + } + + public void setDelayMs(String id, int delayMs) { + AtomicInteger d = delays.get(id); + if (d == null) { + delays.put(id, d = new AtomicInteger()); + } + d.set(delayMs); + } + + public void removeDelay(String id) { + delays.remove(id); + } + + @Override + public boolean execute(String localId, String remoteId, Object... args) { + final AtomicInteger d = delays.get(localId); + if (d == null) { + return false; + } + LOG.info("{} delay {} ms, args={}", localId, d.get(), + Arrays.toString(args)); + try { + RaftTestUtil.delay(d::get); + } catch (InterruptedException e) { + LOG.debug("Interrupted while delaying " + localId); + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java new file mode 100644 index 0000000..30b334f --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -0,0 +1,581 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import static java.util.Arrays.asList; +import static org.apache.ratis.MiniRaftCluster.logSyncDelay; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; +import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.log4j.Level; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.MiniRaftCluster.PeerChanges; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.client.impl.RaftClientImpl; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.ReconfigurationInProgressException; +import org.apache.ratis.protocol.ReconfigurationTimeoutException; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.simulation.RequestHandler; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.util.RaftUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class RaftReconfigurationBaseTest { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + static final Logger LOG = LoggerFactory.getLogger(RaftReconfigurationBaseTest.class); + + protected static final RaftProperties prop = new RaftProperties(); + + @BeforeClass + public static void setup() { + // set a small gap for tests + prop.setInt(RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, 10); + } + + public abstract MiniRaftCluster getCluster(int peerNum) throws IOException; + + private static int getStagingGap() { + return prop.getInt(RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, + RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT); + } + + /** + * add 2 new peers (3 peers -> 5 peers), no leader change + */ + @Test + public void testAddPeers() throws Exception { + LOG.info("Start testAddPeers"); + MiniRaftCluster cluster = getCluster(3); + cluster.start(); + try { + RaftTestUtil.waitForLeader(cluster); + + // add new peers + RaftPeer[] allPeers = cluster.addNewPeers(2, true).allPeersInNewConf; + + // trigger setConfiguration + SetConfigurationRequest request = new SetConfigurationRequest("client", + cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); + LOG.info("Start changing the configuration: {}", request); + cluster.getLeader().setConfiguration(request); + + // wait for the new configuration to take effect + waitAndCheckNewConf(cluster, allPeers, 0, null); + } finally { + cluster.shutdown(); + } + } + + /** + * remove 2 peers (5 peers -> 3 peers), no leader change + */ + @Test + public void testRemovePeers() throws Exception { + LOG.info("Start testRemovePeers"); + MiniRaftCluster cluster = getCluster(5); + cluster.start(); + try { + RaftTestUtil.waitForLeader(cluster); + + // remove peers, leader still included in the new conf + RaftPeer[] allPeers = cluster + .removePeers(2, false, Collections.emptyList()).allPeersInNewConf; + + // trigger setConfiguration + SetConfigurationRequest request = new SetConfigurationRequest("client", + cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); + LOG.info("Start changing the configuration: {}", request); + cluster.getLeader().setConfiguration(request); + + // wait for the new configuration to take effect + waitAndCheckNewConf(cluster, allPeers, 2, null); + } finally { + cluster.shutdown(); + } + } + + /** + * 5 peers -> 5 peers, remove 2 old, add 2 new, no leader change + */ + @Test + public void testAddRemovePeers() throws Exception { + LOG.info("Start testAddRemovePeers"); + testAddRemovePeers(false); + } + + @Test + public void testLeaderStepDown() throws Exception { + LOG.info("Start testLeaderStepDown"); + testAddRemovePeers(true); + } + + private void testAddRemovePeers(boolean leaderStepdown) throws Exception { + MiniRaftCluster cluster = getCluster(5); + cluster.start(); + try { + RaftTestUtil.waitForLeader(cluster); + + PeerChanges change = cluster.addNewPeers(2, true); + RaftPeer[] allPeers = cluster.removePeers(2, leaderStepdown, + asList(change.newPeers)).allPeersInNewConf; + + // trigger setConfiguration + SetConfigurationRequest request = new SetConfigurationRequest("client", + cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); + LOG.info("Start changing the configuration: {}", request); + cluster.getLeader().setConfiguration(request); + + // wait for the new configuration to take effect + waitAndCheckNewConf(cluster, allPeers, 2, null); + } finally { + cluster.shutdown(); + } + } + + @Test(timeout = 30000) + public void testReconfTwice() throws Exception { + LOG.info("Start testReconfTwice"); + final MiniRaftCluster cluster = getCluster(3); + cluster.start(); + try { + RaftTestUtil.waitForLeader(cluster); + final String leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient("client", leaderId); + + // submit some msgs before reconf + for (int i = 0; i < getStagingGap() * 2; i++) { + RaftClientReply reply = client.send(new SimpleMessage("m" + i)); + Assert.assertTrue(reply.isSuccess()); + } + + final AtomicBoolean reconf1 = new AtomicBoolean(false); + final AtomicBoolean reconf2 = new AtomicBoolean(false); + final AtomicReference<RaftPeer[]> finalPeers = new AtomicReference<>(null); + final AtomicReference<RaftPeer[]> deadPeers = new AtomicReference<>(null); + CountDownLatch latch = new CountDownLatch(1); + Thread clientThread = new Thread(() -> { + try { + PeerChanges c1 = cluster.addNewPeers(2, true); + LOG.info("Start changing the configuration: {}", + asList(c1.allPeersInNewConf)); + + RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf); + reconf1.set(reply.isSuccess()); + + PeerChanges c2 = cluster.removePeers(2, true, asList(c1.newPeers)); + finalPeers.set(c2.allPeersInNewConf); + deadPeers.set(c2.removedPeers); + + LOG.info("Start changing the configuration again: {}", + asList(c2.allPeersInNewConf)); + reply = client.setConfiguration(c2.allPeersInNewConf); + reconf2.set(reply.isSuccess()); + + latch.countDown(); + client.close(); + } catch (IOException ignored) { + } + }); + clientThread.start(); + + latch.await(); + Assert.assertTrue(reconf1.get()); + Assert.assertTrue(reconf2.get()); + waitAndCheckNewConf(cluster, finalPeers.get(), 2, null); + + // check configuration manager's internal state + // each reconf will generate two configurations: (old, new) and (new) + cluster.getServers().stream().filter(RaftServerImpl::isAlive) + .forEach(server -> { + ConfigurationManager confManager = + (ConfigurationManager) Whitebox.getInternalState(server.getState(), + "configurationManager"); + // each reconf will generate two configurations: (old, new) and (new) + Assert.assertEquals(5, confManager.numOfConf()); + }); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testReconfTimeout() throws Exception { + LOG.info("Start testReconfTimeout"); + // originally 3 peers + final MiniRaftCluster cluster = getCluster(3); + cluster.start(); + try { + RaftTestUtil.waitForLeader(cluster); + final String leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient("client", leaderId); + + PeerChanges c1 = cluster.addNewPeers(2, false); + + LOG.info("Start changing the configuration: {}", + asList(c1.allPeersInNewConf)); + Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional()); + + final RaftClientRequestSender sender = ((RaftClientImpl)client).getRequestSender(); + final SetConfigurationRequest request = new SetConfigurationRequest( + "client", leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf); + try { + sender.sendRequest(request); + Assert.fail("did not get expected exception"); + } catch (IOException e) { + Assert.assertTrue("Got exception " + e, + e instanceof ReconfigurationTimeoutException); + } + + // the two new peers have not started yet, the bootstrapping must timeout + LOG.info(cluster.printServers()); + + // resend the same request, make sure the server has correctly reset its + // state so that we still get timeout instead of in-progress exception + try { + sender.sendRequest(request); + Assert.fail("did not get expected exception"); + } catch (IOException e) { + Assert.assertTrue("Got exception " + e, + e instanceof ReconfigurationTimeoutException); + } + + // start the two new peers + LOG.info("Start new peers"); + for (RaftPeer np : c1.newPeers) { + cluster.startServer(np.getId()); + } + Assert.assertTrue(client.setConfiguration(c1.allPeersInNewConf).isSuccess()); + client.close(); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testBootstrapReconf() throws Exception { + LOG.info("Start testBootstrapReconf"); + // originally 3 peers + final MiniRaftCluster cluster = getCluster(3); + cluster.start(); + try { + RaftTestUtil.waitForLeader(cluster); + final String leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient("client", leaderId); + + // submit some msgs before reconf + for (int i = 0; i < getStagingGap() * 2; i++) { + RaftClientReply reply = client.send(new SimpleMessage("m" + i)); + Assert.assertTrue(reply.isSuccess()); + } + + PeerChanges c1 = cluster.addNewPeers(2, true); + LOG.info("Start changing the configuration: {}", + asList(c1.allPeersInNewConf)); + final AtomicReference<Boolean> success = new AtomicReference<>(); + + Thread clientThread = new Thread(() -> { + try { + RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf); + success.set(reply.isSuccess()); + client.close(); + } catch (IOException ioe) { + LOG.error("FAILED", ioe); + } + }); + clientThread.start(); + + Thread.sleep(5000); + LOG.info(cluster.printServers()); + assertSuccess(success); + + final RaftLog leaderLog = cluster.getLeader().getState().getLog(); + for (RaftPeer newPeer : c1.newPeers) { + Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE), + cluster.getServer(newPeer.getId()).getState().getLog() + .getEntries(0, Long.MAX_VALUE)); + } + } finally { + cluster.shutdown(); + } + } + + /** + * kill the leader before reconfiguration finishes. Make sure the client keeps + * retrying. + */ + @Test + public void testKillLeaderDuringReconf() throws Exception { + LOG.info("Start testKillLeaderDuringReconf"); + // originally 3 peers + final MiniRaftCluster cluster = getCluster(3); + cluster.start(); + try { + RaftTestUtil.waitForLeader(cluster); + final String leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient("client", leaderId); + + PeerChanges c1 = cluster.addNewPeers(2, false); + PeerChanges c2 = cluster.removePeers(2, false, asList(c1.newPeers)); + + LOG.info("Start changing the configuration: {}", + asList(c2.allPeersInNewConf)); + final AtomicReference<Boolean> success = new AtomicReference<>(); + final AtomicBoolean clientRunning = new AtomicBoolean(true); + Thread clientThread = new Thread(() -> { + try { + boolean r = false; + while (clientRunning.get() && !r) { + r = client.setConfiguration(c2.allPeersInNewConf).isSuccess(); + } + success.set(r); + client.close(); + } catch (IOException ignored) { + } + }); + clientThread.start(); + + // the leader cannot generate the (old, new) conf, and it will keep + // bootstrapping the 2 new peers since they have not started yet + LOG.info(cluster.printServers()); + Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional()); + + // only the first empty entry got committed + final long committedIndex = cluster.getLeader().getState().getLog() + .getLastCommittedIndex(); + Assert.assertTrue("committedIndex is " + committedIndex, + committedIndex <= 1); + + LOG.info("kill the current leader"); + final String oldLeaderId = RaftTestUtil.waitAndKillLeader(cluster, true); + LOG.info("start the two new peers: {}", Arrays.asList(c1.newPeers)); + for (RaftPeer np : c1.newPeers) { + cluster.startServer(np.getId()); + } + + Thread.sleep(3000); + // the client should get the NotLeaderException from the first leader, and + // will retry the same setConfiguration request + waitAndCheckNewConf(cluster, c2.allPeersInNewConf, 2, + Collections.singletonList(oldLeaderId)); + clientRunning.set(false); + //Assert.assertTrue(success.get()); + } finally { + cluster.shutdown(); + } + } + + static void assertSuccess(final AtomicReference<Boolean> success) { + final String s = "success=" + success; + Assert.assertNotNull(s, success.get()); + Assert.assertTrue(s, success.get()); + } + + /** + * When a request's new configuration is the same with the current one, make + * sure we return success immediately and no log entry is recorded. + */ + @Test + public void testNoChangeRequest() throws Exception { + LOG.info("Start testNoChangeRequest"); + // originally 3 peers + final MiniRaftCluster cluster = getCluster(3); + try { + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + + final String leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient("client", leaderId); + client.send(new SimpleMessage("m")); + + final long committedIndex = cluster.getLeader().getState().getLog() + .getLastCommittedIndex(); + final RaftConfiguration confBefore = cluster.getLeader().getRaftConf(); + + // no real configuration change in the request + RaftClientReply reply = client.setConfiguration(cluster.getPeers() + .toArray(new RaftPeer[0])); + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(committedIndex, cluster.getLeader().getState() + .getLog().getLastCommittedIndex()); + Assert.assertSame(confBefore, cluster.getLeader().getRaftConf()); + client.close(); + } finally { + cluster.shutdown(); + } + } + + /** + * Make sure a setConfiguration request is rejected if a configuration change + * is still in progress (i.e., has not been committed yet). + */ + @Test + public void testOverlappedSetConfRequests() throws Exception { + LOG.info("Start testOverlappedSetConfRequests"); + // originally 3 peers + final MiniRaftCluster cluster = getCluster(3); + try { + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + + final String leaderId = cluster.getLeader().getId(); + + RaftPeer[] newPeers = cluster.addNewPeers(2, true).allPeersInNewConf; + + // delay every peer's logSync so that the setConf request is delayed + cluster.getPeers() + .forEach(peer -> logSyncDelay.setDelayMs(peer.getId(), 1000)); + + final CountDownLatch latch = new CountDownLatch(1); + final RaftPeer[] peersInRequest2 = cluster.getPeers().toArray(new RaftPeer[0]); + AtomicBoolean caughtException = new AtomicBoolean(false); + new Thread(() -> { + try(final RaftClient client2 = cluster.createClient("client2", leaderId)) { + latch.await(); + LOG.info("client2 starts to change conf"); + final RaftClientRequestSender sender2 = ((RaftClientImpl)client2).getRequestSender(); + sender2.sendRequest(new SetConfigurationRequest( + "client2", leaderId, DEFAULT_SEQNUM, peersInRequest2)); + } catch (ReconfigurationInProgressException e) { + caughtException.set(true); + } catch (Exception e) { + LOG.warn("Got unexpected exception when client2 changes conf", e); + } + }).start(); + + AtomicBoolean confChanged = new AtomicBoolean(false); + new Thread(() -> { + try(final RaftClient client1 = cluster.createClient("client1", leaderId)) { + LOG.info("client1 starts to change conf"); + confChanged.set(client1.setConfiguration(newPeers).isSuccess()); + } catch (IOException e) { + LOG.warn("Got unexpected exception when client1 changes conf", e); + } + }).start(); + Thread.sleep(100); + latch.countDown(); + + for (int i = 0; i < 10 && !confChanged.get(); i++) { + Thread.sleep(1000); + } + Assert.assertTrue(confChanged.get()); + Assert.assertTrue(caughtException.get()); + } finally { + logSyncDelay.clear(); + cluster.shutdown(); + } + } + + /** + * Test a scenario where the follower truncates its log entries which causes + * configuration change. + */ + @Test + public void testRevertConfigurationChange() throws Exception { + LOG.info("Start testRevertConfigurationChange"); + // originally 3 peers + final MiniRaftCluster cluster = getCluster(5); + try { + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + + final String leaderId = cluster.getLeader().getId(); + + final RaftLog log = cluster.getServer(leaderId).getState().getLog(); + Thread.sleep(1000); + Assert.assertEquals(0, log.getLatestFlushedIndex()); + + // we block the incoming msg for the leader and block its requests to + // followers, so that we force the leader change and the old leader will + // not know + LOG.info("start blocking the leader"); + BlockRequestHandlingInjection.getInstance().blockReplier(leaderId); + cluster.setBlockRequestsFrom(leaderId, true); + + PeerChanges change = cluster.removePeers(1, false, new ArrayList<>()); + + AtomicBoolean gotNotLeader = new AtomicBoolean(false); + new Thread(() -> { + try(final RaftClient client = cluster.createClient("client1", leaderId)) { + LOG.info("client starts to change conf"); + final RaftClientRequestSender sender = ((RaftClientImpl)client).getRequestSender(); + RaftClientReply reply = sender.sendRequest(new SetConfigurationRequest( + "client", leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf)); + if (reply.isNotLeader()) { + gotNotLeader.set(true); + } + } catch (IOException e) { + LOG.warn("Got unexpected exception when client1 changes conf", e); + } + }).start(); + + // wait till the old leader persist the new conf + for (int i = 0; i < 10 && log.getLatestFlushedIndex() < 1; i++) { + Thread.sleep(500); + } + Assert.assertEquals(1, log.getLatestFlushedIndex()); + Assert.assertEquals(CONFIGURATIONENTRY, + log.getLastEntry().getLogEntryBodyCase()); + + // unblock the old leader + BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId); + cluster.setBlockRequestsFrom(leaderId, false); + + // the client should get NotLeaderException + for (int i = 0; i < 10 && !gotNotLeader.get(); i++) { + Thread.sleep(500); + } + Assert.assertTrue(gotNotLeader.get()); + + // the old leader should have truncated the setConf from the log + boolean newState = false; + for (int i = 0; i < 10 && !newState; i++) { + Thread.sleep(500); + newState = log.getLastCommittedIndex() == 1 && + log.getLastEntry().getLogEntryBodyCase() != CONFIGURATIONENTRY; + } + Assert.assertTrue(newState); + } finally { + cluster.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java new file mode 100644 index 0000000..dc10bd3 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.impl.RaftConfiguration; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.statemachine.StateMachine; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; + +public class RaftServerTestUtil { + static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class); + + public static void waitAndCheckNewConf(MiniRaftCluster cluster, + RaftPeer[] peers, int numOfRemovedPeers, Collection<String> deadPeers) + throws Exception { + final long sleepMs = cluster.getMaxTimeout() * (numOfRemovedPeers + 2); + RaftTestUtil.attempt(3, sleepMs, + () -> waitAndCheckNewConf(cluster, peers, deadPeers)); + } + private static void waitAndCheckNewConf(MiniRaftCluster cluster, + RaftPeer[] peers, Collection<String> deadPeers) + throws Exception { + LOG.info(cluster.printServers()); + Assert.assertNotNull(cluster.getLeader()); + + int numIncluded = 0; + int deadIncluded = 0; + final RaftConfiguration current = RaftConfiguration.newBuilder() + .setConf(peers).setLogEntryIndex(0).build(); + for (RaftServerImpl server : cluster.getServers()) { + if (deadPeers != null && deadPeers.contains(server.getId())) { + if (current.containsInConf(server.getId())) { + deadIncluded++; + } + continue; + } + if (current.containsInConf(server.getId())) { + numIncluded++; + Assert.assertTrue(server.getRaftConf().isStable()); + Assert.assertTrue(server.getRaftConf().hasNoChange(peers)); + } else { + Assert.assertFalse(server.getId() + " is still running: " + server, + server.isAlive()); + } + } + Assert.assertEquals(peers.length, numIncluded + deadIncluded); + } + + public static StateMachine getStateMachine(RaftServerImpl s) { + return s.getStateMachine(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java new file mode 100644 index 0000000..fa177af --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { + static final Logger LOG = LoggerFactory.getLogger(MiniRaftClusterWithSimulatedRpc.class); + + public static final Factory<MiniRaftClusterWithSimulatedRpc> FACTORY + = new Factory<MiniRaftClusterWithSimulatedRpc>() { + @Override + public MiniRaftClusterWithSimulatedRpc newCluster( + String[] ids, RaftProperties prop, boolean formatted) { + prop.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0); + return new MiniRaftClusterWithSimulatedRpc(ids, prop, formatted); + } + }; + + private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply; + private SimulatedClientRequestReply client2serverRequestReply; + + public MiniRaftClusterWithSimulatedRpc(int numServers, + RaftProperties properties) { + this(generateIds(numServers, 0), properties, true); + } + + public MiniRaftClusterWithSimulatedRpc(String[] ids, + RaftProperties properties, boolean formatted) { + super(ids, properties, formatted); + initRpc(); + } + + private void initRpc() { + final Collection<RaftPeer> peers = getConf().getPeers(); + final int simulateLatencyMs = properties.getInt( + SimulatedRequestReply.SIMULATE_LATENCY_KEY, + SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT); + LOG.info(SimulatedRequestReply.SIMULATE_LATENCY_KEY + " = " + + simulateLatencyMs); + serverRequestReply = new SimulatedRequestReply<>(peers, simulateLatencyMs); + client2serverRequestReply = new SimulatedClientRequestReply(peers, + simulateLatencyMs); + + setRpcServers(getServers()); + } + + private void setRpcServers(Collection<RaftServerImpl> newServers) { + newServers.forEach(s -> s.setServerRpc( + new SimulatedServerRpc(s, serverRequestReply, client2serverRequestReply))); + } + + @Override + protected void setPeerRpc() { + initRpc(); + } + + private void addPeersToRpc(Collection<RaftPeer> peers) { + serverRequestReply.addPeers(peers); + client2serverRequestReply.addPeers(peers); + } + + @Override + public void restartServer(String id, boolean format) throws IOException { + super.restartServer(id, format); + RaftServerImpl s = getServer(id); + addPeersToRpc(Collections.singletonList(conf.getPeer(id))); + s.setServerRpc(new SimulatedServerRpc(s, serverRequestReply, + client2serverRequestReply)); + s.start(); + } + + @Override + public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, + Collection<RaftServerImpl> newServers, boolean startService) { + addPeersToRpc(newPeers); + setRpcServers(newServers); + if (startService) { + newServers.forEach(RaftServerImpl::start); + } + return newPeers; + } + + @Override + public RaftClientRequestSender getRaftClientRequestSender() { + return client2serverRequestReply; + } + + @Override + public void blockQueueAndSetDelay(String leaderId, int delayMs) + throws InterruptedException { + // block leader sendRequest if delayMs > 0 + final boolean block = delayMs > 0; + LOG.debug("{} leader queue {} and set {}ms delay for the other queues", + block? "Block": "Unblock", leaderId, delayMs); + serverRequestReply.getQueue(leaderId).blockSendRequestTo.set(block); + + // set delay takeRequest for the other queues + getServers().stream().filter(s -> !s.getId().equals(leaderId)) + .map(s -> serverRequestReply.getQueue(s.getId())) + .forEach(q -> q.delayTakeRequestTo.set(delayMs)); + + final long sleepMs = 3 * getMaxTimeout() / 2; + Thread.sleep(sleepMs); + } + + @Override + public void setBlockRequestsFrom(String src, boolean block) { + serverRequestReply.getQueue(src).blockTakeRequestFrom.set(block); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java new file mode 100644 index 0000000..a157524 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.protocol.RaftRpcMessage; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; + +import com.google.common.base.Preconditions; + +public class RaftServerReply extends RaftRpcMessage { + private final AppendEntriesReplyProto appendEntries; + private final RequestVoteReplyProto requestVote; + private final InstallSnapshotReplyProto installSnapshot; + + RaftServerReply(AppendEntriesReplyProto a) { + appendEntries = Preconditions.checkNotNull(a); + requestVote = null; + installSnapshot = null; + } + + RaftServerReply(RequestVoteReplyProto r) { + appendEntries = null; + requestVote = Preconditions.checkNotNull(r); + installSnapshot = null; + } + + RaftServerReply(InstallSnapshotReplyProto i) { + appendEntries = null; + requestVote = null; + installSnapshot = Preconditions.checkNotNull(i); + } + + boolean isAppendEntries() { + return appendEntries != null; + } + + boolean isRequestVote() { + return requestVote != null; + } + + boolean isInstallSnapshot() { + return installSnapshot != null; + } + + AppendEntriesReplyProto getAppendEntries() { + return appendEntries; + } + + RequestVoteReplyProto getRequestVote() { + return requestVote; + } + + InstallSnapshotReplyProto getInstallSnapshot() { + return installSnapshot; + } + + @Override + public boolean isRequest() { + return false; + } + + @Override + public String getRequestorId() { + if (isAppendEntries()) { + return appendEntries.getServerReply().getRequestorId(); + } else if (isRequestVote()) { + return requestVote.getServerReply().getRequestorId(); + } else { + return installSnapshot.getServerReply().getRequestorId(); + } + } + + @Override + public String getReplierId() { + if (isAppendEntries()) { + return appendEntries.getServerReply().getReplyId(); + } else if (isRequestVote()) { + return requestVote.getServerReply().getReplyId(); + } else { + return installSnapshot.getServerReply().getReplyId(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java new file mode 100644 index 0000000..fd73dff --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.protocol.RaftRpcMessage; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; + +class RaftServerRequest extends RaftRpcMessage { + private final AppendEntriesRequestProto appendEntries; + private final RequestVoteRequestProto requestVote; + private final InstallSnapshotRequestProto installSnapshot; + + RaftServerRequest(AppendEntriesRequestProto a) { + appendEntries = a; + requestVote = null; + installSnapshot = null; + } + + RaftServerRequest(RequestVoteRequestProto r) { + appendEntries = null; + requestVote = r; + installSnapshot = null; + } + + RaftServerRequest(InstallSnapshotRequestProto i) { + appendEntries = null; + requestVote = null; + installSnapshot = i; + } + + boolean isAppendEntries() { + return appendEntries != null; + } + + boolean isRequestVote() { + return requestVote != null; + } + + boolean isInstallSnapshot() { + return installSnapshot != null; + } + + AppendEntriesRequestProto getAppendEntries() { + return appendEntries; + } + + RequestVoteRequestProto getRequestVote() { + return requestVote; + } + + InstallSnapshotRequestProto getInstallSnapshot() { + return installSnapshot; + } + + @Override + public boolean isRequest() { + return true; + } + + @Override + public String getRequestorId() { + if (isAppendEntries()) { + return appendEntries.getServerRequest().getRequestorId(); + } else if (isRequestVote()) { + return requestVote.getServerRequest().getRequestorId(); + } else { + return installSnapshot.getServerRequest().getRequestorId(); + } + } + + @Override + public String getReplierId() { + if (isAppendEntries()) { + return appendEntries.getServerRequest().getReplyId(); + } else if (isRequestVote()) { + return requestVote.getServerRequest().getReplyId(); + } else { + return installSnapshot.getServerRequest().getReplyId(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java new file mode 100644 index 0000000..bd60a3b --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.protocol.RaftRpcMessage; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.ExitUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; + +public class RequestHandler<REQUEST extends RaftRpcMessage, + REPLY extends RaftRpcMessage> { + public static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class); + + interface HandlerInterface<REQUEST extends RaftRpcMessage, + REPLY extends RaftRpcMessage> { + + boolean isAlive(); + + REPLY handleRequest(REQUEST r) throws IOException; + } + + private final String serverId; + private final String name; + private final SimulatedRequestReply<REQUEST, REPLY> rpc; + private final HandlerInterface<REQUEST, REPLY> handlerImpl; + private final List<HandlerDaemon> daemons; + + RequestHandler(String serverId, String name, + SimulatedRequestReply<REQUEST, REPLY> rpc, + HandlerInterface<REQUEST, REPLY> handlerImpl, + int numHandlers) { + this.serverId = serverId; + this.name = name; + this.rpc = rpc; + this.handlerImpl = handlerImpl; + + this.daemons = new ArrayList<>(numHandlers); + for(int i = 0; i < numHandlers; i++) { + daemons.add(new HandlerDaemon(i)); + } + } + + void startDaemon() { + daemons.forEach(Thread::start); + } + + void shutdown() { + rpc.shutdown(serverId); + } + + void interruptAndJoinDaemon() throws InterruptedException { + daemons.forEach(Thread::interrupt); + for (Daemon d : daemons) { + d.join(); + } + } + + SimulatedRequestReply<REQUEST, REPLY> getRpc() { + return rpc; + } + + void handleRequest(REQUEST request) throws IOException { + final REPLY reply; + try { + reply = handlerImpl.handleRequest(request); + } catch (IOException ioe) { + LOG.debug("IOException for " + request, ioe); + rpc.sendReply(request, null, ioe); + return; + } + if (reply != null) { + rpc.sendReply(request, reply, null); + } + } + + /** + * A thread keep polling requests from the request queue. Used for simulation. + */ + class HandlerDaemon extends Daemon { + private final int id; + + HandlerDaemon(int id) { + this.id = id; + } + + @Override + public String toString() { + return serverId + "." + name + id; + } + + @Override + public void run() { + while (handlerImpl.isAlive()) { + try { + handleRequest(rpc.takeRequest(serverId)); + } catch (InterruptedIOException e) { + LOG.info(this + " is interrupted by " + e); + LOG.trace("TRACE", e); + break; + } catch (IOException e) { + LOG.error(this + " has " + e); + LOG.trace("TRACE", e); + } catch(Throwable t) { + if (!handlerImpl.isAlive()) { + LOG.info(this + " is stopped."); + break; + } + ExitUtils.terminate(1, this + " is terminating.", t, LOG); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java new file mode 100644 index 0000000..9302051 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeer; + +import java.io.IOException; +import java.util.Collection; + +public class SimulatedClientRequestReply + extends SimulatedRequestReply<RaftClientRequest, RaftClientReply> + implements RaftClientRequestSender { + SimulatedClientRequestReply(Collection<RaftPeer> allPeers, + int simulateLatencyMs) { + super(allPeers, simulateLatencyMs); + } + + @Override + public void addServers(Iterable<RaftPeer> servers) { + // do nothing + } + + @Override + public void close() { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java new file mode 100644 index 0000000..559c1e6 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import com.google.common.base.Preconditions; + +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftRpcMessage; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.util.RaftUtils; +import org.apache.ratis.util.Timestamp; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class SimulatedRequestReply<REQUEST extends RaftRpcMessage, + REPLY extends RaftRpcMessage> { + public static final String SIMULATE_LATENCY_KEY + = SimulatedRequestReply.class.getName() + ".simulateLatencyMs"; + public static final int SIMULATE_LATENCY_DEFAULT + = RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT; + public static final long TIMEOUT = 3000L; + + private static class ReplyOrException<REPLY> { + private final REPLY reply; + private final IOException ioe; + + ReplyOrException(REPLY reply, IOException ioe) { + Preconditions.checkArgument(reply == null ^ ioe == null); + this.reply = reply; + this.ioe = ioe; + } + } + + static class EventQueue<REQUEST, REPLY> { + private final BlockingQueue<REQUEST> requestQueue + = new LinkedBlockingQueue<>(); + private final Map<REQUEST, ReplyOrException<REPLY>> replyMap + = new ConcurrentHashMap<>(); + + /** Block takeRequest for the requests sent from this server. */ + final AtomicBoolean blockTakeRequestFrom = new AtomicBoolean(); + /** Block sendRequest for the requests sent to this server. */ + final AtomicBoolean blockSendRequestTo = new AtomicBoolean(); + /** Delay takeRequest for the requests sent to this server. */ + final AtomicInteger delayTakeRequestTo = new AtomicInteger(); + /** Delay takeRequest for the requests sent from this server. */ + final AtomicInteger delayTakeRequestFrom = new AtomicInteger(); + + REPLY request(REQUEST request) throws InterruptedException, IOException { + requestQueue.put(request); + synchronized (this) { + final Timestamp startTime = new Timestamp(); + while (startTime.elapsedTimeMs() < TIMEOUT && + !replyMap.containsKey(request)) { + this.wait(TIMEOUT); // no need to be precise here + } + } + + if (!replyMap.containsKey(request)) { + throw new IOException("Timeout while waiting for reply of request " + + request); + } + final ReplyOrException<REPLY> re = replyMap.remove(request); + if (re.ioe != null) { + throw re.ioe; + } + return re.reply; + } + + REQUEST takeRequest() throws InterruptedException { + return requestQueue.take(); + } + + void reply(REQUEST request, REPLY reply, IOException ioe) + throws IOException { + replyMap.put(request, new ReplyOrException<>(reply, ioe)); + synchronized (this) { + this.notifyAll(); + } + } + } + + private final Map<String, EventQueue<REQUEST, REPLY>> queues; + private final int simulateLatencyMs; + + SimulatedRequestReply(Collection<RaftPeer> allPeers, int simulateLatencyMs) { + queues = new ConcurrentHashMap<>(); + for (RaftPeer peer : allPeers) { + queues.put(peer.getId(), new EventQueue<>()); + } + + this.simulateLatencyMs = simulateLatencyMs; + } + + EventQueue<REQUEST, REPLY> getQueue(String qid) { + return queues.get(qid); + } + + public REPLY sendRequest(REQUEST request) throws IOException { + final String qid = request.getReplierId(); + final EventQueue<REQUEST, REPLY> q = queues.get(qid); + if (q == null) { + throw new IOException("The peer " + qid + " is not alive."); + } + try { + RaftTestUtil.block(q.blockSendRequestTo::get); + return q.request(request); + } catch (InterruptedException e) { + throw RaftUtils.toInterruptedIOException("", e); + } + } + + public REQUEST takeRequest(String qid) throws IOException { + final EventQueue<REQUEST, REPLY> q = queues.get(qid); + if (q == null) { + throw new IOException("The RPC of " + qid + " has already shutdown."); + } + + final REQUEST request; + try { + // delay request for testing + RaftTestUtil.delay(q.delayTakeRequestTo::get); + + request = q.takeRequest(); + Preconditions.checkState(qid.equals(request.getReplierId())); + + // block request for testing + final EventQueue<REQUEST, REPLY> reqQ = queues.get(request.getRequestorId()); + if (reqQ != null) { + RaftTestUtil.delay(reqQ.delayTakeRequestFrom::get); + RaftTestUtil.block(reqQ.blockTakeRequestFrom::get); + } + } catch (InterruptedException e) { + throw RaftUtils.toInterruptedIOException("", e); + } + return request; + } + + public void sendReply(REQUEST request, REPLY reply, IOException ioe) + throws IOException { + if (reply != null) { + Preconditions.checkArgument( + request.getRequestorId().equals(reply.getRequestorId())); + Preconditions.checkArgument( + request.getReplierId().equals(reply.getReplierId())); + } + simulateLatency(); + final String qid = request.getReplierId(); + EventQueue<REQUEST, REPLY> q = queues.get(qid); + if (q != null) { + q.reply(request, reply, ioe); + } + } + + public void shutdown(String id) { + queues.remove(id); + } + + public void addPeers(Collection<RaftPeer> newPeers) { + for (RaftPeer peer : newPeers) { + queues.put(peer.getId(), new EventQueue<>()); + } + } + + private void simulateLatency() throws IOException { + if (simulateLatencyMs > 0) { + int waitExpetation = simulateLatencyMs / 10; + int waitHalfRange = waitExpetation / 3; + int randomSleepMs = ThreadLocalRandom.current().nextInt(2 * waitHalfRange) + + waitExpetation - waitHalfRange; + try { + Thread.sleep(randomSleepMs); + } catch (InterruptedException ie) { + throw RaftUtils.toInterruptedIOException("", ie); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java new file mode 100644 index 0000000..d40cf44 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +class SimulatedServerRpc implements RaftServerRpc { + static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class); + + private final RaftServerImpl server; + private final RequestHandler<RaftServerRequest, RaftServerReply> serverHandler; + private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler; + private final ExecutorService executor = Executors.newFixedThreadPool(3, + new ThreadFactoryBuilder().setDaemon(true).build()); + + SimulatedServerRpc(RaftServerImpl server, + SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, + SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) { + this.server = server; + this.serverHandler = new RequestHandler<>(server.getId(), + "serverHandler", serverRequestReply, serverHandlerImpl, 3); + this.clientHandler = new RequestHandler<>(server.getId(), + "clientHandler", clientRequestReply, clientHandlerImpl, 3); + } + + @Override + public void start() { + serverHandler.startDaemon(); + clientHandler.startDaemon(); + } + + private void interruptAndJoin() throws InterruptedException { + clientHandler.interruptAndJoinDaemon(); + serverHandler.interruptAndJoinDaemon(); + } + + @Override + public void close() { + try { + interruptAndJoin(); + executor.shutdown(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignored) { + } + clientHandler.shutdown(); + serverHandler.shutdown(); + } + + @Override + public InetSocketAddress getInetSocketAddress() { + return null; + } + + @Override + public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) + throws IOException { + RaftServerReply reply = serverHandler.getRpc() + .sendRequest(new RaftServerRequest(request)); + return reply.getAppendEntries(); + } + + @Override + public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) + throws IOException { + RaftServerReply reply = serverHandler.getRpc() + .sendRequest(new RaftServerRequest(request)); + return reply.getInstallSnapshot(); + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) + throws IOException { + RaftServerReply reply = serverHandler.getRpc() + .sendRequest(new RaftServerRequest(request)); + return reply.getRequestVote(); + } + + @Override + public void addPeers(Iterable<RaftPeer> peers) { + // do nothing + } + + final RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply> serverHandlerImpl + = new RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply>() { + @Override + public boolean isAlive() { + return server.isAlive(); + } + + @Override + public RaftServerReply handleRequest(RaftServerRequest r) + throws IOException { + if (r.isAppendEntries()) { + return new RaftServerReply(server.appendEntries(r.getAppendEntries())); + } else if (r.isRequestVote()) { + return new RaftServerReply(server.requestVote(r.getRequestVote())); + } else if (r.isInstallSnapshot()) { + return new RaftServerReply(server.installSnapshot(r.getInstallSnapshot())); + } else { + throw new IllegalStateException("unexpected state"); + } + } + }; + + final RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply> clientHandlerImpl + = new RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply>() { + @Override + public boolean isAlive() { + return server.isAlive(); + } + + @Override + public RaftClientReply handleRequest(RaftClientRequest request) + throws IOException { + final CompletableFuture<RaftClientReply> future; + if (request instanceof SetConfigurationRequest) { + future = server.setConfigurationAsync((SetConfigurationRequest) request); + } else { + future = server.submitClientRequestAsync(request); + } + + future.whenCompleteAsync((reply, exception) -> { + try { + IOException e = null; + if (exception != null) { + e = exception instanceof IOException ? + (IOException) exception : new IOException(exception); + } + clientHandler.getRpc().sendReply(request, reply, e); + } catch (IOException e) { + LOG.warn("Failed to send reply {} for request {} due to exception {}", + reply, request, e); + } + }, executor); + return null; + } + }; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestNotLeaderExceptionWithSimulation.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestNotLeaderExceptionWithSimulation.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestNotLeaderExceptionWithSimulation.java new file mode 100644 index 0000000..412fb65 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestNotLeaderExceptionWithSimulation.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftNotLeaderExceptionBaseTest; +import org.apache.ratis.conf.RaftProperties; + +import java.io.IOException; + +public class TestNotLeaderExceptionWithSimulation extends RaftNotLeaderExceptionBaseTest { + @Override + public MiniRaftCluster initCluster() throws IOException { + String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0); + return new MiniRaftClusterWithSimulatedRpc(s, new RaftProperties(), true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java new file mode 100644 index 0000000..b8bd679 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import java.io.IOException; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.RaftReconfigurationBaseTest; + +public class TestRaftReconfigurationWithSimulatedRpc + extends RaftReconfigurationBaseTest { + @Override + public MiniRaftCluster getCluster(int peerNum) throws IOException { + return new MiniRaftClusterWithSimulatedRpc(peerNum, prop); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftSnapshotWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftSnapshotWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftSnapshotWithSimulatedRpc.java new file mode 100644 index 0000000..f2d5cfb --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftSnapshotWithSimulatedRpc.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.statemachine.RaftSnapshotBaseTest; + +import java.io.IOException; + +public class TestRaftSnapshotWithSimulatedRpc extends RaftSnapshotBaseTest { + @Override + public MiniRaftCluster initCluster(int numServer, RaftProperties prop) + throws IOException { + return MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(numServer, prop, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java new file mode 100644 index 0000000..29ef6ed --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.log4j.Level; +import org.apache.ratis.RaftBasicTests; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.RaftUtils; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +public class TestRaftWithSimulatedRpc extends RaftBasicTests { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + private final MiniRaftClusterWithSimulatedRpc cluster; + + public TestRaftWithSimulatedRpc() throws IOException { + final RaftProperties properties = getProperties(); + if (ThreadLocalRandom.current().nextBoolean()) { + // turn off simulate latency half of the times. + properties.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0); + } + cluster = new MiniRaftClusterWithSimulatedRpc(NUM_SERVERS, properties); + } + + @Override + public MiniRaftClusterWithSimulatedRpc getCluster() { + return cluster; + } +}
