Repository: incubator-ratis Updated Branches: refs/heads/master cce89d059 -> 079ad4876
RATIS-401. StateMachineDataPolicy.getFromFuture should throw an exception if all retries have failed. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/079ad487 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/079ad487 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/079ad487 Branch: refs/heads/master Commit: 079ad4876bd92e6acd43667c5ade610311a5e4c1 Parents: cce89d0 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Thu Nov 8 16:59:40 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Thu Nov 8 16:59:40 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/JavaUtils.java | 2 +- .../ratis/server/storage/RaftLogWorker.java | 6 +++ .../server/storage/TestSegmentedRaftLog.java | 52 +++++++++++++++++--- 3 files changed, 52 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/079ad487/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index b855b2a..95fcf35 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -139,7 +139,7 @@ public interface JavaUtils { throw t; } if (log != null && log.isWarnEnabled()) { - log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts + log.warn("FAILED \"" + name + "\", attempt #" + i + "/" + numAttempts + ": " + t + ", sleep " + sleepTime + " and then retry.", t); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/079ad487/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index 82b0b49..8e1c855 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -60,6 +61,7 @@ class RaftLogWorker implements Runnable { this.sync = RaftServerConfigKeys.Log.StateMachineData.sync(properties); this.syncTimeout = RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties); this.syncTimeoutRetry = RaftServerConfigKeys.Log.StateMachineData.syncTimeoutRetry(properties); + Preconditions.assertTrue(syncTimeoutRetry >= -1); } boolean isSync() { @@ -68,14 +70,18 @@ class RaftLogWorker implements Runnable { void getFromFuture(CompletableFuture<?> future, Supplier<Object> getName) throws IOException { Preconditions.assertTrue(isSync()); + TimeoutIOException lastException = null; for(int retry = 0; syncTimeoutRetry == -1 || retry <= syncTimeoutRetry; retry++) { try { IOUtils.getFromFuture(future, getName, syncTimeout); return; } catch(TimeoutIOException e) { LOG.warn("Timeout " + retry + (syncTimeoutRetry == -1? "/~": "/" + syncTimeoutRetry), e); + lastException = e; } } + Objects.requireNonNull(lastException, "lastException == null"); + throw lastException; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/079ad487/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 8083b62..bcbfa73 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -22,6 +22,7 @@ import org.apache.ratis.BaseTest; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.TimeoutIOException; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RetryCacheTestUtil; @@ -31,11 +32,14 @@ import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -48,6 +52,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -175,7 +180,7 @@ public class TestSegmentedRaftLog extends BaseTest { } } - List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist, + static List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist, Supplier<String> stringSupplier) { List<LogEntryProto> eList = new ArrayList<>(); for (SegmentRange range : slist) { @@ -184,17 +189,21 @@ public class TestSegmentedRaftLog extends BaseTest { return eList; } - List<LogEntryProto> prepareLogEntries(SegmentRange range, + static List<LogEntryProto> prepareLogEntries(SegmentRange range, Supplier<String> stringSupplier, boolean hasStataMachineData, List<LogEntryProto> eList) { for(long index = range.start; index <= range.end; index++) { - SimpleOperation m = stringSupplier == null? - new SimpleOperation("m" + index, hasStataMachineData): - new SimpleOperation(stringSupplier.get(), hasStataMachineData); - eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, index)); + eList.add(prepareLogEntry(range.term, index, stringSupplier, hasStataMachineData)); } return eList; } + static LogEntryProto prepareLogEntry(long term, long index, Supplier<String> stringSupplier, boolean hasStataMachineData) { + final SimpleOperation m = stringSupplier == null? + new SimpleOperation("m" + index, hasStataMachineData): + new SimpleOperation(stringSupplier.get(), hasStataMachineData); + return ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), term, index); + } + /** * Append entry one by one and check if log state is correct. */ @@ -447,6 +456,35 @@ public class TestSegmentedRaftLog extends BaseTest { } } + @Test + public void testSegmentedRaftLogStateMachineDataTimeoutIOException() throws Exception { + RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true); + final TimeDuration syncTimeout = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + RaftServerConfigKeys.Log.StateMachineData.setSyncTimeout(properties, syncTimeout); + final int numRetries = 2; + RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry(properties, numRetries); + ExitUtils.disableSystemExit(); + + final LogEntryProto entry = prepareLogEntry(0, 0, null, true); + final StateMachine sm = new BaseStateMachine() { + @Override + public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) { + return new CompletableFuture<>(); // the future never completes + } + }; + + try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + raftLog.appendEntry(entry); // RaftLogWorker should catch TimeoutIOException + + JavaUtils.attempt(() -> { + final ExitUtils.ExitException exitException = ExitUtils.getFirstExitException(); + Objects.requireNonNull(exitException, "exitException == null"); + Assert.assertEquals(TimeoutIOException.class, exitException.getCause().getClass()); + }, 3*numRetries, syncTimeout, "RaftLogWorker should catch TimeoutIOException and exit", LOG); + } + } + static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) { final Thread t = new Thread(() -> raftLog.appendEntry(entry)); t.start();
