Repository: incubator-ratis Updated Branches: refs/heads/master 85d8e025f -> c59425465
RATIS-373. Tolerate the last partially written log entry. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c5942546 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c5942546 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c5942546 Branch: refs/heads/master Commit: c59425465d531b57f17fdba35b1567076e4be306 Parents: 85d8e02 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Oct 26 17:42:11 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Oct 26 17:42:11 2018 +0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/FileUtils.java | 3 +- .../java/org/apache/ratis/util/IOUtils.java | 3 +- .../org/apache/ratis/util/OpenCloseState.java | 103 +++++++++++++++++++ .../ratis/server/storage/LogOutputStream.java | 11 +- .../apache/ratis/server/storage/LogReader.java | 19 +++- .../apache/ratis/server/storage/RaftLog.java | 23 +++-- .../ratis/server/storage/RaftLogCache.java | 6 +- .../server/storage/RaftStorageDirectory.java | 11 +- .../ratis/server/storage/SegmentedRaftLog.java | 4 +- .../java/org/apache/ratis/MiniRaftCluster.java | 16 +-- .../ratis/TestRaftServerSlownessDetection.java | 3 + .../apache/ratis/server/ServerRestartTests.java | 95 +++++++++++++---- .../ratis/server/simulation/RequestHandler.java | 5 +- .../server/storage/TestRaftLogSegment.java | 61 ++++++++--- .../statemachine/RaftSnapshotBaseTest.java | 2 +- .../SimpleStateMachine4Testing.java | 4 +- 16 files changed, 302 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java index 3171d4e..d7f722b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java @@ -31,12 +31,13 @@ public interface FileUtils { Logger LOG = LoggerFactory.getLogger(FileUtils.class); static void truncateFile(File f, long target) throws IOException { + final long original = f.length(); LogUtils.runAndLog(LOG, () -> { try (FileOutputStream out = new FileOutputStream(f, true)) { out.getChannel().truncate(target); }}, - () -> "FileOutputStream.getChannel().truncate " + f + " to target length " + target); + () -> "FileOutputStream.getChannel().truncate " + f + " length: " + original + " -> " + target); } static OutputStream createNewFile(Path p) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java index c560bc6..8559239 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java @@ -97,7 +97,8 @@ public interface IOUtils { for(int toRead = len; toRead > 0; ) { final int ret = in.read(buf, off, toRead); if (ret < 0) { - throw new IOException( "Premature EOF from inputStream"); + final int read = len - toRead; + throw new EOFException("Premature EOF: read length is " + len + " but encountered EOF at " + read); } toRead -= ret; off += ret; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java new file mode 100644 index 0000000..c6ccbd3 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The state of objects that can be unopened, opened, closed or exception. + */ +public class OpenCloseState { + private static class OpenTrace extends Throwable { + OpenTrace(String message) { + super(message); + } + } + + private static class CloseTrace extends Throwable { + CloseTrace(String message) { + super(message); + } + } + + private final String name; + private final Throwable initTrace; + /** + * The referenced {@link Throwable} indicates the state: + * InitTrace: unopened + * OpenTrace: opened + * CloseTrace: closed + * Other {@link Throwable} subclass: exception + */ + private final AtomicReference<Throwable> state; + + public OpenCloseState(String name) { + this.name = name; + this.initTrace = new Throwable("Initialize " + name); + this.state = new AtomicReference<>(initTrace); + } + + /** + * Assert this is open. + */ + public void assertOpen() { + final Throwable t = state.get(); + if (!(t instanceof OpenTrace)) { + final String s = name + " is expected to be opened but it is " + toString(t); + throw new IllegalArgumentException(s, t); + } + } + + /** + * Transit to open state. + * The method is NOT idempotent. + */ + public void open() throws IOException { + final OpenTrace openTrace = new OpenTrace("Open " + name); + final Throwable t = state.updateAndGet(previous -> previous == initTrace? openTrace: previous); + if (t != openTrace) { + throw new IOException("Failed to open " + name +" since it is " + toString(t)); + } + } + + /** + * Transit to close state. + * The method is idempotent. + */ + public void close() throws IOException { + final Throwable t = state.updateAndGet( + previous -> previous instanceof OpenTrace? new CloseTrace("Close "+ name): previous); + // If t is CloseTrace, t is either the previous or the newly created object. + if (!(t instanceof CloseTrace)) { + throw new IOException("Failed to close " + name + " since it is " + toString(t)); + } + } + + @Override + public String toString() { + return toString(state.get()); + } + + private String toString(Throwable t) { + return t == initTrace? "UNOPENED" + : t instanceof OpenTrace? "OPENED" + : t instanceof CloseTrace? "CLOSED" + : t.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java index a09b90d..118a3e8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java @@ -92,9 +92,16 @@ public class LogOutputStream implements Closeable { } /** + * Write the given entry to this output stream. + * * Format: - * LogEntryProto's protobuf - * 4-byte checksum of the above protobuf + * (1) The serialized size of the entry. + * (2) The entry. + * (3) 4-byte checksum of the entry. + * + * Size in bytes to be written: + * (size to encode n) + n + (checksum size), + * where n is the entry serialized size and the checksum size is 4. */ public void write(LogEntryProto entry) throws IOException { final int serialized = entry.getSerializedSize(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java index ce149dd..735e8a3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java @@ -25,12 +25,15 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.PureJavaCrc32C; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.zip.Checksum; public class LogReader implements Closeable { + static final Logger LOG = LoggerFactory.getLogger(LogReader.class); /** * InputStream wrapper that keeps track of the current stream position. * @@ -121,12 +124,14 @@ public class LogReader implements Closeable { private static final int maxOpSize = 32 * 1024 * 1024; + private final File file; private final LimitedInputStream limiter; private final DataInputStream in; private byte[] temp = new byte[4096]; private final Checksum checksum; LogReader(File file) throws FileNotFoundException { + this.file = file; this.limiter = new LimitedInputStream( new BufferedInputStream(new FileInputStream(file))); in = new DataInputStream(limiter); @@ -153,6 +158,16 @@ public class LogReader implements Closeable { LogEntryProto readEntry() throws IOException { try { return decodeEntry(); + } catch (EOFException eof) { + in.reset(); + // The last entry is partially written. + // It is okay to ignore it since this entry is never committed in this server. + if (LOG.isWarnEnabled()) { + LOG.warn("Ignoring the last partial written log entry in " + file + ": " + eof); + } else if (LOG.isTraceEnabled()) { + LOG.trace("Ignoring the last partial written log entry in " + file , eof); + } + return null; } catch (IOException e) { in.reset(); @@ -286,7 +301,7 @@ public class LogReader implements Closeable { } @Override - public void close() throws IOException { - IOUtils.cleanup(null, in); + public void close() { + IOUtils.cleanup(LOG, in); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index effc027..8b88b31 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -29,13 +29,13 @@ import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.OpenCloseState; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -53,6 +53,7 @@ public abstract class RaftLog implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class); public static final String LOG_SYNC = RaftLog.class.getSimpleName() + ".logSync"; + /** * The largest committed index. Note the last committed log may be included * in the latest snapshot file. @@ -63,11 +64,12 @@ public abstract class RaftLog implements Closeable { private final int maxBufferSize; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - private volatile boolean isOpen = false; + private final OpenCloseState state; public RaftLog(RaftPeerId selfId, int maxBufferSize) { this.selfId = selfId; this.maxBufferSize = maxBufferSize; + this.state = new OpenCloseState(getName()); } public long getLastCommittedIndex() { @@ -75,8 +77,7 @@ public abstract class RaftLog implements Closeable { } public void checkLogState() { - Preconditions.assertTrue(isOpen, - () -> getSelfId() + ": The RaftLog has not been opened or has been closed"); + state.assertOpen(); } /** @@ -182,7 +183,7 @@ public abstract class RaftLog implements Closeable { public void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException { - isOpen = true; + state.open(); } public abstract long getStartIndex(); @@ -293,11 +294,7 @@ public abstract class RaftLog implements Closeable { @Override public String toString() { - if (!isOpen) { - return "Closed log"; - } - TermIndex last = getLastEntryTermIndex(); - return last == null ? "null" : Collections.singletonList(last).toString(); + return getName() + ":" + state; } public static class Metadata { @@ -336,13 +333,17 @@ public abstract class RaftLog implements Closeable { @Override public void close() throws IOException { - isOpen = false; + state.close(); } public RaftPeerId getSelfId() { return selfId; } + public String getName() { + return selfId + "-" + getClass().getSimpleName(); + } + /** * Holds proto entry along with future which contains read state machine data */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java index 0cb1047..25291c8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java @@ -105,10 +105,10 @@ class RaftLogCache { return maxCachedSegments; } - void loadSegment(LogPathAndIndex pi, boolean isOpen, boolean keepEntryInCache, + void loadSegment(LogPathAndIndex pi, boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer) throws IOException { - LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(), - pi.startIndex, pi.endIndex, isOpen, keepEntryInCache, logConsumer); + LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(), + pi.startIndex, pi.endIndex, pi.isOpen(), keepEntryInCache, logConsumer); addSegment(logSegment); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java index c2feabb..2242934 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.server.storage; +import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.util.AtomicFileOutputStream; import org.apache.ratis.util.FileUtils; import org.slf4j.Logger; @@ -61,7 +62,7 @@ public class RaftStorageDirectory { } public static class LogPathAndIndex { - public final Path path; + private final Path path; public final long startIndex; public final long endIndex; @@ -71,6 +72,14 @@ public class RaftStorageDirectory { this.endIndex = endIndex; } + public Path getPath() { + return path; + } + + public boolean isOpen() { + return endIndex == RaftServerConstants.INVALID_LOG_INDEX; + } + @Override public String toString() { return path + "-" + startIndex + "-" + endIndex; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index c16c478..0dc7cea 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -20,7 +20,6 @@ package org.apache.ratis.server.storage; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; @@ -141,7 +140,6 @@ public class SegmentedRaftLog extends RaftLog { List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles(); int i = 0; for (LogPathAndIndex pi : paths) { - boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX; // During the initial loading, we can only confirm the committed // index based on the snapshot. This means if a log segment is not kept // in cache after the initial loading, later we have to load its content @@ -150,7 +148,7 @@ public class SegmentedRaftLog extends RaftLog { // so that during the initial loading we can apply part of the log // entries to the state machine boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments(); - cache.loadSegment(pi, isOpen, keepEntryInCache, logConsumer); + cache.loadSegment(pi, keepEntryInCache, logConsumer); } // if the largest index is smaller than the last index in snapshot, we do http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 2d28896..3dd0612 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -204,15 +204,17 @@ public abstract class MiniRaftCluster implements Closeable { /** * start a stopped server again. */ - public void restartServer(RaftPeerId newId, boolean format) throws IOException { - restartServer(newId, group, format); + public RaftServerImpl restartServer(RaftPeerId newId, boolean format) throws IOException { + return restartServer(newId, group, format); } - public void restartServer(RaftPeerId newId, RaftGroup group, boolean format) throws IOException { + public RaftServerImpl restartServer(RaftPeerId newId, RaftGroup group, boolean format) throws IOException { killServer(newId); servers.remove(newId); - putNewServer(newId, group, format).start(); + final RaftServerProxy proxy = putNewServer(newId, group, format); + proxy.start(); + return group == null? null: proxy.getImpl(group.getGroupId()); } public void restart(boolean format) throws IOException { @@ -579,18 +581,18 @@ public abstract class MiniRaftCluster implements Closeable { LOG.info(printServers()); final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), Daemon::new); + getServers().forEach(proxy -> executor.submit(proxy::close)); try { - getServers().forEach(proxy -> executor.submit(proxy::close)); + executor.shutdown(); // just wait for a few seconds executor.awaitTermination(5, TimeUnit.SECONDS); } catch(InterruptedException e) { LOG.warn("shutdown interrupted", e); - } finally { - executor.shutdownNow(); } timer.cancel(); ExitUtils.assertNotTerminated(); + LOG.info(getClass().getSimpleName() + " shutdown completed"); } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java index e109598..96a164e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java +++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java @@ -31,6 +31,7 @@ import org.apache.ratis.util.TimeDuration; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; @@ -41,6 +42,8 @@ import java.util.concurrent.TimeUnit; /** * Test Raft Server Slowness detection and notification to Leader's statemachine. */ +//TODO: fix StateMachine.notifySlowness(..); see RATIS-370 +@Ignore public class TestRaftServerSlownessDetection extends BaseTest { static { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java index 5353caa..95ac858 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java @@ -25,23 +25,30 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.impl.ServerState; import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.SizeInBytes; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.Arrays; +import java.io.File; +import java.nio.file.Path; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Test restarting raft peers. @@ -49,15 +56,14 @@ import java.util.List; public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> extends BaseTest implements MiniRaftCluster.Factory.Get<CLUSTER> { - static { + { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); } static final int NUM_SERVERS = 3; - @Before - public void setup() { + { final RaftProperties prop = getProperties(); prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); @@ -75,15 +81,11 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> cluster.start(); RaftTestUtil.waitForLeader(cluster); final RaftPeerId leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient(leaderId); // write some messages - final byte[] content = new byte[1024]; - Arrays.fill(content, (byte)1); - final SimpleMessage message = new SimpleMessage(new String(content)); - for(int i = 0; i < 10; i++) { - Assert.assertTrue(client.send(message).isSuccess()); - } + final AtomicInteger messageCount = new AtomicInteger(); + final Supplier<Message> newMessage = () -> new SimpleMessage("m" + messageCount.getAndIncrement()); + writeSomething(newMessage, cluster); // restart a follower RaftPeerId followerId = cluster.getFollowers().get(0).getId(); @@ -91,10 +93,8 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> cluster.restartServer(followerId, false); // write some more messages - for(int i = 0; i < 10; i++) { - Assert.assertTrue(client.send(message).isSuccess()); - } - client.close(); + writeSomething(newMessage, cluster); + final int truncatedMessageIndex = messageCount.get() - 1; final long leaderLastIndex = cluster.getLeader().getState().getLog().getLastEntryTermIndex().getIndex(); // make sure the restarted follower can catchup @@ -103,8 +103,63 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> 10, 500, "follower catchup", LOG); // make sure the restarted peer's log segments is correct - cluster.restartServer(followerId, false); - Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog() - .getLastEntryTermIndex().getIndex() >= leaderLastIndex); + final RaftServerImpl follower = cluster.restartServer(followerId, false); + final RaftLog followerLog = follower.getState().getLog(); + final long followerLastIndex = followerLog.getLastEntryTermIndex().getIndex(); + Assert.assertTrue(followerLastIndex >= leaderLastIndex); + + final File followerOpenLogFile = getOpenLogFile(follower); + final File leaderOpenLogFile = getOpenLogFile(cluster.getRaftServerImpl(leaderId)); + + // shutdown all servers + cluster.getServers().forEach(RaftServerProxy::close); + + // truncate log and + assertTruncatedLog(followerId, followerOpenLogFile, followerLastIndex, cluster); + assertTruncatedLog(leaderId, leaderOpenLogFile, leaderLastIndex, cluster); + + // restart and write something. + cluster.restart(false); + writeSomething(newMessage, cluster); + + // restart again and check messages. + cluster.restart(false); + try(final RaftClient client = cluster.createClient()) { + for(int i = 0; i < messageCount.get(); i++) { + if (i != truncatedMessageIndex) { + final Message m = new SimpleMessage("m" + i); + final RaftClientReply reply = client.sendReadOnly(m); + Assert.assertTrue(reply.isSuccess()); + LOG.info("query {}: {} {}", m, reply, LogEntryProto.parseFrom(reply.getMessage().getContent())); + } + } + } + } + + static void writeSomething(Supplier<Message> newMessage, MiniRaftCluster cluster) throws Exception { + try(final RaftClient client = cluster.createClient()) { + // write some messages + for(int i = 0; i < 10; i++) { + Assert.assertTrue(client.send(newMessage.get()).isSuccess()); + } + } + } + + static void assertTruncatedLog(RaftPeerId id, File openLogFile, long lastIndex, MiniRaftCluster cluster) throws Exception { + // truncate log + FileUtils.truncateFile(openLogFile, openLogFile.length() - 1); + final RaftServerImpl server = cluster.restartServer(id, false); + // the last index should be one less than before + Assert.assertEquals(lastIndex - 1, server.getState().getLog().getLastEntryTermIndex().getIndex()); + server.getProxy().close(); + } + + static File getOpenLogFile(RaftServerImpl server) throws Exception { + final List<Path> openLogs = server.getState().getStorage().getStorageDir().getLogSegmentFiles().stream() + .filter(LogPathAndIndex::isOpen) + .map(LogPathAndIndex::getPath) + .collect(Collectors.toList()); + Assert.assertEquals(1, openLogs.size()); + return openLogs.get(0).toFile(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java index 5c12ef4..802797f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java @@ -77,7 +77,7 @@ public class RequestHandler<REQUEST extends RaftRpcMessage, void interruptAndJoinDaemon() throws InterruptedException { daemons.forEach(Thread::interrupt); for (Daemon d : daemons) { - d.join(); + d.join(1000); } } @@ -118,6 +118,9 @@ public class RequestHandler<REQUEST extends RaftRpcMessage, public void run() { while (handlerImpl.isAlive()) { try { + if (Thread.interrupted()) { + throw new InterruptedException(this + " was interrupted previously."); + } handleRequest(rpc.takeRequest(getServerId())); } catch (InterruptedIOException e) { LOG.info(this + " is interrupted by " + e); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java index a6150aa..bc2339d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -26,7 +26,9 @@ import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; +import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TraditionalBinaryPrefix; import org.junit.After; @@ -42,6 +44,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; import static org.apache.ratis.server.storage.LogSegment.getEntrySize; @@ -75,26 +78,47 @@ public class TestRaftLogSegment extends BaseTest { } } - private File prepareLog(boolean isOpen, long start, int size, long term) + File prepareLog(boolean isOpen, long startIndex, int numEntries, long term, boolean isLastEntryPartiallyWritten) throws IOException { + if (!isOpen) { + Preconditions.assertTrue(!isLastEntryPartiallyWritten, "For closed log, the last entry cannot be partially written."); + } RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - File file = isOpen ? storage.getStorageDir().getOpenLogFile(start) : - storage.getStorageDir().getClosedLogFile(start, start + size - 1); + final File file = isOpen ? + storage.getStorageDir().getOpenLogFile(startIndex) : + storage.getStorageDir().getClosedLogFile(startIndex, startIndex + numEntries - 1); - LogEntryProto[] entries = new LogEntryProto[size]; + final LogEntryProto[] entries = new LogEntryProto[numEntries]; try (LogOutputStream out = new LogOutputStream(file, false, segmentMaxSize, preallocatedSize, bufferSize)) { - for (int i = 0; i < size; i++) { + for (int i = 0; i < entries.length; i++) { SimpleOperation op = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i + start); + entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i + startIndex); out.write(entries[i]); } } + + if (isLastEntryPartiallyWritten) { + final int entrySize = size(entries[entries.length - 1]); + final int truncatedEntrySize = ThreadLocalRandom.current().nextInt(entrySize - 1) + 1; + // 0 < truncatedEntrySize < entrySize + final long fileLength = file.length(); + final long truncatedFileLength = fileLength - (entrySize - truncatedEntrySize); + LOG.info("truncate last entry: entry(size={}, truncated={}), file(length={}, truncated={})", + entrySize, truncatedEntrySize, fileLength, truncatedFileLength); + FileUtils.truncateFile(file, truncatedFileLength); + } + storage.close(); return file; } - private void checkLogSegment(LogSegment segment, long start, long end, + static int size(LogEntryProto entry) { + final int n = entry.getSerializedSize(); + return CodedOutputStream.computeUInt32SizeNoTag(n) + n + 4; + } + + static void checkLogSegment(LogSegment segment, long start, long end, boolean isOpen, long totalSize, long term) throws Exception { Assert.assertEquals(start, segment.getStartIndex()); Assert.assertEquals(end, segment.getEndIndex()); @@ -117,27 +141,38 @@ public class TestRaftLogSegment extends BaseTest { @Test public void testLoadLogSegment() throws Exception { - testLoadSegment(true); + testLoadSegment(true, false); + } + + @Test + public void testLoadLogSegmentLastEntryPartiallyWritten() throws Exception { + testLoadSegment(true, true); } @Test public void testLoadCache() throws Exception { - testLoadSegment(false); + testLoadSegment(false, false); + } + + @Test + public void testLoadCacheLastEntryPartiallyWritten() throws Exception { + testLoadSegment(false, true); } - private void testLoadSegment(boolean loadInitial) throws Exception { + private void testLoadSegment(boolean loadInitial, boolean isLastEntryPartiallyWritten) throws Exception { // load an open segment - File openSegmentFile = prepareLog(true, 0, 100, 0); + final File openSegmentFile = prepareLog(true, 0, 100, 0, isLastEntryPartiallyWritten); RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0, INVALID_LOG_INDEX, true, loadInitial, null); - checkLogSegment(openSegment, 0, 99, true, openSegmentFile.length(), 0); + final int delta = isLastEntryPartiallyWritten? 1: 0; + checkLogSegment(openSegment, 0, 99 - delta, true, openSegmentFile.length(), 0); storage.close(); // for open segment we currently always keep log entries in the memory Assert.assertEquals(0, openSegment.getLoadingTimes()); // load a closed segment (1000-1099) - File closedSegmentFile = prepareLog(false, 1000, 100, 1); + final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false); LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile, 1000, 1099, false, loadInitial, null); checkLogSegment(closedSegment, 1000, 1099, false, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index 0a5e38d..cbb1ee2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -189,7 +189,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { // delete the log segments from the leader for (LogPathAndIndex path : logs) { - FileUtils.deleteFile(path.path.toFile()); + FileUtils.delete(path.getPath()); } // restart the peer http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 313e713..2c6883e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -151,7 +151,9 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { private void put(LogEntryProto entry) { final LogEntryProto previous = indexMap.put(entry.getIndex(), entry); Preconditions.assertNull(previous, "previous"); - dataMap.put(entry.getStateMachineLogEntry().getLogData().toStringUtf8(), entry); + final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8(); + dataMap.put(s, entry); + LOG.info("put {}, {} -> {}", entry.getIndex(), s, ServerProtoUtils.toLogEntryString(entry)); } @Override
