Repository: incubator-ratis Updated Branches: refs/heads/master a783abd0d -> be4c414e0
RATIS-115. RaftLogWorker still may throw IllegalStateException. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/be4c414e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/be4c414e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/be4c414e Branch: refs/heads/master Commit: be4c414e05d39c34148782a8860df2695170dc19 Parents: a783abd Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Thu Sep 28 11:28:11 2017 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Thu Sep 28 11:28:11 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/ExitUtils.java | 6 ++ .../ratis/server/impl/ServerProtoUtils.java | 5 +- .../apache/ratis/server/storage/LogSegment.java | 21 +++-- .../ratis/server/storage/RaftLogCache.java | 81 ++++++++++++-------- .../ratis/server/storage/RaftLogWorker.java | 19 ++++- .../ratis/server/storage/SegmentedRaftLog.java | 19 ++++- .../java/org/apache/ratis/MiniRaftCluster.java | 6 +- .../java/org/apache/ratis/RaftBasicTests.java | 17 +++- .../server/storage/RaftStorageTestUtils.java | 6 ++ .../ratis/server/storage/TestRaftLogCache.java | 2 +- 10 files changed, 132 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/be4c414e/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java index 4404344..47d3550 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java @@ -56,6 +56,12 @@ public class ExitUtils { return firstExitException != null; } + public static void assertNotTerminated() { + if (ExitUtils.isTerminated()) { + throw new AssertionError("Unexpected exited.", getFirstExitException()); + } + } + /** Disable the use of {@link System#exit(int)} for testing. */ public static void disableSystemExit() { systemExitDisabled = true; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/be4c414e/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index bca6cbb..d660f76 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -56,7 +56,10 @@ public class ServerProtoUtils { return TermIndex.toString(entry.getTerm(), entry.getIndex()); } - private static String toLogEntryString(LogEntryProto entry) { + public static String toLogEntryString(LogEntryProto entry) { + if (entry == null) { + return null; + } final ByteString clientId = entry.getClientId(); return toTermIndexString(entry) + entry.getLogEntryBodyCase() + ", " + (clientId.isEmpty()? "<empty clientId>": ClientId.valueOf(clientId)) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/be4c414e/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java index ff7f353..61fda5c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java @@ -29,12 +29,7 @@ import org.apache.ratis.util.ProtoUtils; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -176,6 +171,20 @@ class LogSegment implements Comparable<Long> { storage.getStorageDir().getClosedLogFile(startIndex, endIndex); } + public String toDebugString() { + final StringBuilder b = new StringBuilder() + .append("startIndex=").append(startIndex) + .append(", endIndex=").append(endIndex) + .append(", numOfEntries=").append(numOfEntries()) + .append(", isOpen? ").append(isOpen) + .append(", file=").append(getSegmentFile()); + records.stream().map(LogRecord::getTermIndex).forEach( + ti -> b.append(" ").append(ti).append(", cache=") + .append(ServerProtoUtils.toLogEntryString(entryCache.get(ti))) + ); + return b.toString(); + } + private volatile boolean isOpen; private long totalSize; private final long startIndex; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/be4c414e/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java index 2ae0660..e1ff84b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java @@ -17,17 +17,8 @@ */ package org.apache.ratis.server.storage; -import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.function.Consumer; - import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.protocol.TermIndex; @@ -35,8 +26,15 @@ import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidation import org.apache.ratis.server.storage.LogSegment.LogRecord; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; - import org.apache.ratis.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.function.Consumer; + +import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; /** * In-memory RaftLog Cache. Currently we provide a simple implementation that @@ -44,6 +42,8 @@ import org.apache.ratis.util.Preconditions; * requires external lock protection. */ class RaftLogCache { + public static final Logger LOG = LoggerFactory.getLogger(RaftLogCache.class); + static class SegmentFileInfo { final long startIndex; // start index of the segment final long endIndex; // original end index @@ -59,6 +59,13 @@ class RaftLogCache { this.targetLength = targetLength; this.newEndIndex = newEndIndex; } + + @Override + public String toString() { + return "(" + startIndex + ", " + endIndex + + ") isOpen? " + isOpen + ", length=" + targetLength + + ", newEndIndex=" + newEndIndex; + } } static class TruncationSegments { @@ -72,11 +79,14 @@ class RaftLogCache { this.toTruncate = toTruncate; } - int getDeletionSize() { - return toDelete == null ? 0 : toDelete.length; + @Override + public String toString() { + return "toTruncate: " + toTruncate + + "\n toDelete: " + Arrays.toString(toDelete); } } + private final String name; private volatile LogSegment openSegment; private final List<LogSegment> closedSegments; private final RaftStorage storage; @@ -84,7 +94,8 @@ class RaftLogCache { private final int maxCachedSegments; private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault(); - RaftLogCache(RaftStorage storage, RaftProperties properties) { + RaftLogCache(RaftPeerId selfId, RaftStorage storage, RaftProperties properties) { + this.name = selfId + "-" + getClass().getSimpleName(); this.storage = storage; maxCachedSegments = RaftServerConfigKeys.Log.maxCachedSegmentNum(properties); closedSegments = new ArrayList<>(); @@ -118,10 +129,6 @@ class RaftLogCache { } } - private boolean areConsecutiveSegments(LogSegment prev, LogSegment segment) { - return !prev.isOpen() && prev.getEndIndex() + 1 == segment.getStartIndex(); - } - private LogSegment getLastClosedSegment() { return closedSegments.isEmpty() ? null : closedSegments.get(closedSegments.size() - 1); @@ -129,24 +136,37 @@ class RaftLogCache { private void validateAdding(LogSegment segment) { final LogSegment lastClosed = getLastClosedSegment(); - if (!segment.isOpen()) { - Preconditions.assertTrue(lastClosed == null || - areConsecutiveSegments(lastClosed, segment)); - } else { - Preconditions.assertTrue(openSegment == null && - (lastClosed == null || areConsecutiveSegments(lastClosed, segment))); + if (lastClosed != null) { + Preconditions.assertTrue(!lastClosed.isOpen()); + Preconditions.assertTrue(lastClosed.getEndIndex() + 1 == segment.getStartIndex()); } } void addSegment(LogSegment segment) { validateAdding(segment); if (segment.isOpen()) { - openSegment = segment; + setOpenSegment(segment); } else { closedSegments.add(segment); } } + void addOpenSegment(long startIndex) { + setOpenSegment(LogSegment.newOpenSegment(storage, startIndex)); + } + + private void setOpenSegment(LogSegment openSegment) { + LOG.trace("{}: setOpenSegment to {}", name, openSegment); + Preconditions.assertTrue(this.openSegment == null); + this.openSegment = Objects.requireNonNull(openSegment); + } + + private void clearOpenSegment() { + LOG.trace("{}: clearOpenSegment {}", name, openSegment); + Objects.requireNonNull(openSegment); + this.openSegment = null; + } + LogSegment getOpenSegment() { return openSegment; } @@ -160,10 +180,9 @@ class RaftLogCache { final long nextIndex = openSegment.getEndIndex() + 1; openSegment.close(); closedSegments.add(openSegment); + clearOpenSegment(); if (createNewOpen) { - openSegment = LogSegment.newOpenSegment(storage, nextIndex); - } else { - openSegment = null; + addOpenSegment(nextIndex); } } @@ -273,7 +292,7 @@ class RaftLogCache { openSegment.clear(); SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(), oldEnd, true, 0, openSegment.getEndIndex()); - openSegment = null; + clearOpenSegment(); return info; } @@ -296,7 +315,7 @@ class RaftLogCache { oldEnd, true, openSegment.getTotalSize(), openSegment.getEndIndex()); closedSegments.add(openSegment); - openSegment = null; + clearOpenSegment(); return new TruncationSegments(info, Collections.emptyList()); } } @@ -389,7 +408,7 @@ class RaftLogCache { void clear() { if (openSegment != null) { openSegment.clear(); - openSegment = null; + clearOpenSegment(); } closedSegments.forEach(LogSegment::clear); closedSegments.clear(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/be4c414e/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index 3d60c3d..1858de6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.storage.RaftLogCache.SegmentFileInfo; import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; import org.apache.ratis.server.storage.SegmentedRaftLog.Task; @@ -262,6 +263,11 @@ class RaftLogWorker implements Runnable { long getEndIndex() { return entry.getIndex(); } + + @Override + public String toString() { + return super.toString() + ": " + ServerProtoUtils.toLogEntryString(entry); + } } private class FinalizeLogSegment extends Task { @@ -281,7 +287,8 @@ class RaftLogWorker implements Runnable { .getOpenLogFile(segmentToClose.getStartIndex()); LOG.info("{} finalizing log segment {}", name, openFile); Preconditions.assertTrue(openFile.exists(), - "File %s does not exist.", openFile); + () -> name + ": File " + openFile + " does not exist, segmentToClose=" + + segmentToClose.toDebugString()); if (segmentToClose.numOfEntries() > 0) { // finalize the current open segment File dstFile = storage.getStorageDir().getClosedLogFile( @@ -299,6 +306,11 @@ class RaftLogWorker implements Runnable { long getEndIndex() { return segmentToClose.getEndIndex(); } + + @Override + public String toString() { + return super.toString() + ": " + segmentToClose.toDebugString(); + } } private class StartLogSegment extends Task { @@ -384,6 +396,11 @@ class RaftLogWorker implements Runnable { } return RaftServerConstants.INVALID_LOG_INDEX; } + + @Override + public String toString() { + return super.toString() + ": " + segments; + } } long getFlushedIndex() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/be4c414e/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index f964a77..9a4f55e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.LogSegment.LogRecord; import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; @@ -91,7 +92,7 @@ public class SegmentedRaftLog extends RaftLog { @Override public String toString() { - return getClass().getSimpleName() + "-" + getEndIndex(); + return getClass().getSimpleName() + ":" + getEndIndex(); } } private static final ThreadLocal<Task> myTask = new ThreadLocal<>(); @@ -109,7 +110,7 @@ public class SegmentedRaftLog extends RaftLog { this.server = server; this.storage = storage; segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); - cache = new RaftLogCache(storage, properties); + cache = new RaftLogCache(selfId, storage, properties); fileLogWorker = new RaftLogWorker(selfId, server, storage, properties); lastCommitted.set(lastIndexInSnapshot); } @@ -245,11 +246,15 @@ public class SegmentedRaftLog extends RaftLog { @Override void appendEntry(LogEntryProto entry) { checkLogState(); + if (LOG.isTraceEnabled()) { + LOG.trace("{}: appendEntry {}", server.getId(), + ServerProtoUtils.toLogEntryString(entry)); + } try(AutoCloseableLock writeLock = writeLock()) { final LogSegment currentOpenSegment = cache.getOpenSegment(); if (currentOpenSegment == null) { - cache.addSegment(LogSegment.newOpenSegment(storage, entry.getIndex())); - fileLogWorker.startLogSegment(getNextIndex()); + cache.addOpenSegment(entry.getIndex()); + fileLogWorker.startLogSegment(entry.getIndex()); } else if (isSegmentFull(currentOpenSegment, entry)) { cache.rollOpenSegment(true); fileLogWorker.rollLogSegment(currentOpenSegment); @@ -303,6 +308,12 @@ public class SegmentedRaftLog extends RaftLog { if (storedEntry.getTerm() != entries[index].getTerm()) { // we should truncate from the storedEntry's index truncateIndex = storedEntry.getIndex(); + if (LOG.isTraceEnabled()) { + LOG.trace("{}: truncate to {}, index={}, ti={}, storedEntry={}, entries={}", + server.getId(), truncateIndex, index, + ServerProtoUtils.toTermIndex(entries[index]), storedEntry, + ServerProtoUtils.toString(entries)); + } break; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/be4c414e/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 4423fdb..b5cdaef 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -455,11 +455,7 @@ public abstract class MiniRaftCluster { LOG.info("************************************************************** "); getServerAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close); - if (ExitUtils.isTerminated()) { - LOG.error("Test resulted in an unexpected exit", - ExitUtils.getFirstExitException()); - throw new AssertionError("Test resulted in an unexpected exit"); - } + ExitUtils.assertNotTerminated(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/be4c414e/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 5464d4b..9875845 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -22,10 +22,11 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.storage.RaftStorageTestUtils; +import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.junit.After; @@ -88,6 +89,20 @@ public abstract class RaftBasicTests extends BaseTest { } @Test + public void testChangeLeader() throws Exception { + RaftStorageTestUtils.setRaftLogWorkerLogLevel(Level.TRACE); + LOG.info("Running testChangeLeader"); + final MiniRaftCluster cluster = getCluster(); + + RaftPeerId leader = RaftTestUtil.waitForLeader(cluster).getId(); + for(int i = 0; i < 10; i++) { + leader = RaftTestUtil.changeLeader(cluster, leader); + ExitUtils.assertNotTerminated(); + } + RaftStorageTestUtils.setRaftLogWorkerLogLevel(Level.INFO); + } + + @Test public void testBasicAppendEntries() throws Exception { LOG.info("Running testBasicAppendEntries"); final MiniRaftCluster cluster = getCluster(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/be4c414e/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java index 55ae7f3..cc9d76d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java @@ -17,13 +17,19 @@ */ package org.apache.ratis.server.storage; +import org.apache.log4j.Level; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.LogUtils; import java.util.function.Consumer; public interface RaftStorageTestUtils { + static void setRaftLogWorkerLogLevel(Level level) { + LogUtils.setLogLevel(RaftLogWorker.LOG, level); + } + static void printLog(RaftLog log, Consumer<String> println) { if (log == null) { println.accept("log == null"); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/be4c414e/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java index c76fd4d..42e63ea 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java @@ -40,7 +40,7 @@ public class TestRaftLogCache { @Before public void setup() { - cache = new RaftLogCache(null, prop); + cache = new RaftLogCache(null, null, prop); } private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
