Repository: incubator-ratis Updated Branches: refs/heads/master ee262d7e3 -> e46aee2c8
RATIS-112. testRevertConfigurationChange may fail. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e46aee2c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e46aee2c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e46aee2c Branch: refs/heads/master Commit: e46aee2c87e85fe7317f2e807cdf97da0be31ddc Parents: ee262d7 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Mon Aug 28 14:03:24 2017 -0700 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Mon Aug 28 14:03:24 2017 -0700 ---------------------------------------------------------------------- .../org/apache/ratis/util/CheckedSupplier.java | 8 +++ .../java/org/apache/ratis/util/JavaUtils.java | 64 ++++++++++++++++++++ .../ratis/server/impl/ServerImplUtils.java | 14 ++++- .../java/org/apache/ratis/RaftTestUtil.java | 23 ------- .../impl/RaftReconfigurationBaseTest.java | 51 +++++++++------- .../ratis/server/impl/RaftServerTestUtil.java | 5 +- .../server/storage/RaftStorageTestUtils.java | 55 +++++++++++++++++ 7 files changed, 172 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java index 0c9de31..06abe4c 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java @@ -27,4 +27,12 @@ public interface CheckedSupplier<OUTPUT, THROWABLE extends Throwable> { * except that this method is declared with a throws-clause. */ OUTPUT get() throws THROWABLE; + + static <THROWABLE extends Throwable> CheckedSupplier<?, THROWABLE> valueOf( + CheckedRunnable<THROWABLE> runnable) { + return () -> { + runnable.run(); + return null; + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index 0664aec..c78f999 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Supplier; @@ -104,4 +105,67 @@ public interface JavaUtils { static ThreadGroup getRootThreadGroup() { return ROOT_THREAD_GROUP.get(); } + + /** Attempt to get a return value from the given supplier multiple times. */ + static <RETURN, THROWABLE extends Throwable> RETURN attempt( + CheckedSupplier<RETURN, THROWABLE> supplier, + int numAttempts, long sleepMs, String name, Logger log) + throws THROWABLE, InterruptedException { + Objects.requireNonNull(supplier, "supplier == null"); + Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + numAttempts + " <= 0"); + Preconditions.assertTrue(sleepMs >= 0L, () -> "sleepMs = " + sleepMs + " < 0"); + + for(int i = 1; i <= numAttempts; i++) { + try { + return supplier.get(); + } catch (Throwable t) { + if (i == numAttempts) { + throw t; + } + if (log != null && log.isWarnEnabled()) { + log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts + + ": " + t + ", sleep " + sleepMs + "ms and then retry."); + } + } + + if (sleepMs > 0) { + Thread.sleep(sleepMs); + } + } + throw new IllegalStateException("BUG: this line should be unreachable."); + } + + /** Attempt to run the given op multiple times. */ + static <THROWABLE extends Throwable> void attempt( + CheckedRunnable<THROWABLE> op, int numAttempts, long sleepMs, String name, Logger log) + throws THROWABLE, InterruptedException { + attempt(CheckedSupplier.valueOf(op), numAttempts, sleepMs, name, log); + } + + /** Attempt to wait the given condition to return true multiple times. */ + static void attempt( + BooleanSupplier condition, int numAttempts, long sleepMs, String name, Logger log) + throws InterruptedException { + Objects.requireNonNull(condition, "condition == null"); + Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + numAttempts + " <= 0"); + Preconditions.assertTrue(sleepMs >= 0L, () -> "sleepMs = " + sleepMs + " < 0"); + + for(int i = 1; i <= numAttempts; i++) { + if (condition.getAsBoolean()) { + return; + } + if (log != null && log.isWarnEnabled()) { + log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts + + ": sleep " + sleepMs + "ms and then retry."); + } + if (sleepMs > 0) { + Thread.sleep(sleepMs); + } + } + + if (!condition.getAsBoolean()) { + throw new IllegalStateException("Failed " + name + " for " + numAttempts + " attempts."); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index bcbab9a..544ed13 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -23,6 +23,8 @@ import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.JavaUtils; import java.io.IOException; @@ -31,7 +33,17 @@ public class ServerImplUtils { public static RaftServerProxy newRaftServer( RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftProperties properties, Parameters parameters) throws IOException { - return new RaftServerProxy(id, stateMachine, group, properties, parameters); + try { + // attempt multiple times to avoid temporary bind exception + return JavaUtils.attempt( + () -> new RaftServerProxy(id, stateMachine, group, properties, parameters), + 5, 500L, "newRaftServer", RaftServerImpl.LOG); + } catch (InterruptedException e) { + throw IOUtils.toInterruptedIOException( + "Interrupted when creating RaftServer " + id + ", " + group, e); + } catch (IOException e) { + throw new IOException("Failed to create RaftServer " + id + ", " + group, e); + } } public static TermIndex newTermIndex(long term, long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 46ff7d8..c3972a1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -31,19 +31,16 @@ import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; -import org.apache.ratis.util.CheckedRunnable; import org.apache.ratis.util.JavaUtils; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; import java.util.Objects; -import java.util.concurrent.ThreadLocalRandom; import java.util.function.BooleanSupplier; import java.util.function.IntSupplier; @@ -257,26 +254,6 @@ public interface RaftTestUtil { } } - static <T extends Throwable> void attempt( - int n, long sleepMs, CheckedRunnable<T> runnable) - throws T, InterruptedException { - for(int i = 1; i <= n; i++) { - LOG.info("Attempt #" + i + "/" + n + ": sleep " + sleepMs + "ms"); - if (sleepMs > 0) { - Thread.sleep(sleepMs); - } - try { - runnable.run(); - return; - } catch (Throwable t) { - if (i == n) { - throw t; - } - LOG.warn("Attempt #" + i + "/" + n + ": Ignoring " + t + " and retry."); - } - } - } - static RaftPeerId changeLeader(MiniRaftCluster cluster, RaftPeerId oldLeader) throws InterruptedException { cluster.setBlockRequestsFrom(oldLeader.toString(), true); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 34ade52..316377b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -28,8 +28,9 @@ import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.server.storage.RaftStorageTestUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.junit.Assert; import org.junit.BeforeClass; @@ -48,6 +49,7 @@ import static java.util.Arrays.asList; import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.NOOP; public abstract class RaftReconfigurationBaseTest extends BaseTest { static { @@ -511,6 +513,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { @Test public void testRevertConfigurationChange() throws Exception { LOG.info("Start testRevertConfigurationChange"); + RaftLog log2 = null; final MiniRaftCluster cluster = getCluster(5); try { cluster.start(); @@ -520,8 +523,8 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { final RaftPeerId leaderId = leader.getId(); final RaftLog log = leader.getState().getLog(); + log2 = log; Thread.sleep(1000); - Assert.assertEquals(0, log.getLatestFlushedIndex()); // we block the incoming msg for the leader and block its requests to // followers, so that we force the leader change and the old leader will @@ -533,7 +536,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { PeerChanges change = cluster.removePeers(1, false, new ArrayList<>()); AtomicBoolean gotNotLeader = new AtomicBoolean(false); - new Thread(() -> { + final Thread clientThread = new Thread(() -> { try(final RaftClient client = cluster.createClient(leaderId)) { LOG.info("client starts to change conf"); final RaftClientRpc sender = client.getClientRpc(); @@ -545,37 +548,41 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { } catch (IOException e) { LOG.warn("Got unexpected exception when client1 changes conf", e); } - }).start(); + }); + clientThread.start(); + + // find CONFIGURATIONENTRY, there may be NOOP before and after it. + final long confIndex = JavaUtils.attempt(() -> { + final long last = log.getLastEntryTermIndex().getIndex(); + for (long i = 1; i <= last; i++) { + if (log.get(i).getLogEntryBodyCase() == CONFIGURATIONENTRY) { + return i; + } + } + throw new Exception("CONFIGURATIONENTRY not found: last=" + last); + }, 10, 500, "confIndex", LOG); // wait till the old leader persist the new conf - for (int i = 0; i < 10 && log.getLatestFlushedIndex() < 1; i++) { - Thread.sleep(500); - } - Assert.assertEquals(1, log.getLatestFlushedIndex()); - TermIndex last = log.getLastEntryTermIndex(); - Assert.assertEquals(CONFIGURATIONENTRY, - log.get(last.getIndex()).getLogEntryBodyCase()); + JavaUtils.attempt(() -> log.getLatestFlushedIndex() >= confIndex, + 10, 500L, "FLUSH", LOG); + final long committed = log.getLastCommittedIndex(); + Assert.assertTrue(committed < confIndex); // unblock the old leader BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId.toString()); cluster.setBlockRequestsFrom(leaderId.toString(), false); // the client should get NotLeaderException - for (int i = 0; i < 10 && !gotNotLeader.get(); i++) { - Thread.sleep(500); - } + clientThread.join(5000); Assert.assertTrue(gotNotLeader.get()); // the old leader should have truncated the setConf from the log - boolean newState = false; - for (int i = 0; i < 10 && !newState; i++) { - Thread.sleep(500); - TermIndex lastTermIndex = log.getLastEntryTermIndex(); - newState = log.getLastCommittedIndex() == 1 && - log.get(lastTermIndex.getIndex()).getLogEntryBodyCase() != CONFIGURATIONENTRY; - } - Assert.assertTrue(newState); + JavaUtils.attempt(() -> log.getLastCommittedIndex() >= confIndex, + 10, 500L, "COMMIT", LOG); + Assert.assertEquals(NOOP, log.get(confIndex).getLogEntryBodyCase()); + log2 = null; } finally { + RaftStorageTestUtils.printLog(log2, s -> LOG.info(s)); cluster.shutdown(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 62c68bf..909685f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -21,6 +21,7 @@ import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.util.JavaUtils; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +35,8 @@ public class RaftServerTestUtil { RaftPeer[] peers, int numOfRemovedPeers, Collection<String> deadPeers) throws Exception { final long sleepMs = cluster.getMaxTimeout() * (numOfRemovedPeers + 2); - RaftTestUtil.attempt(3, sleepMs, - () -> waitAndCheckNewConf(cluster, peers, deadPeers)); + JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, peers, deadPeers), + 3, sleepMs, "waitAndCheckNewConf", LOG); } private static void waitAndCheckNewConf(MiniRaftCluster cluster, RaftPeer[] peers, Collection<String> deadPeers) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java new file mode 100644 index 0000000..55ae7f3 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.shaded.proto.RaftProtos; +import org.apache.ratis.util.AutoCloseableLock; + +import java.util.function.Consumer; + +public interface RaftStorageTestUtils { + static void printLog(RaftLog log, Consumer<String> println) { + if (log == null) { + println.accept("log == null"); + return; + } + + final TermIndex last; + final long flushed, committed; + try(AutoCloseableLock readlock = log.readLock()) { + last = log.getLastEntryTermIndex(); + flushed = log.getLatestFlushedIndex(); + committed = log.getLastCommittedIndex(); + } + final StringBuilder b = new StringBuilder(); + for(long i = 0; i <= last.getIndex(); i++) { + b.setLength(0); + b.append(i == flushed? 'f': ' '); + b.append(i == committed? 'c': ' '); + b.append(String.format("%3d: ", i)); + try { + final RaftProtos.LogEntryProto entry = log.get(i); + b.append(entry != null? entry.getLogEntryBodyCase(): null); + } catch (RaftLogIOException e) { + b.append(e); + } + println.accept(b.toString()); + } + } +}
