Repository: incubator-ratis Updated Branches: refs/heads/master b901b3a5a -> 4d723a2c7
RATIS-377. Tolerate partially written log header. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/4d723a2c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/4d723a2c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/4d723a2c Branch: refs/heads/master Commit: 4d723a2c73b353f5228b067635383e03a697f563 Parents: b901b3a Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Mon Oct 29 13:46:31 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Mon Oct 29 13:46:31 2018 +0800 ---------------------------------------------------------------------- .../apache/ratis/io/CorruptedFileException.java | 27 +++++++ .../org/apache/ratis/util/CheckedConsumer.java | 37 +++++++++ .../org/apache/ratis/util/OpenCloseState.java | 38 +++++++-- .../java/org/apache/ratis/util/StringUtils.java | 5 ++ .../apache/ratis/server/impl/LogAppender.java | 4 + .../ratis/server/impl/RaftServerConstants.java | 1 - .../ratis/server/storage/LogInputStream.java | 55 ++++++------- .../ratis/server/storage/LogOutputStream.java | 6 +- .../apache/ratis/server/storage/LogReader.java | 44 ++++++++--- .../apache/ratis/server/storage/LogSegment.java | 29 ++++--- .../ratis/server/storage/RaftLogCache.java | 4 +- .../ratis/server/storage/SegmentedRaftLog.java | 3 - .../server/storage/SegmentedRaftLogFormat.java | 82 ++++++++++++++++++++ .../apache/ratis/server/ServerRestartTests.java | 54 ++++++++++++- .../apache/ratis/server/TestRaftLogMetrics.java | 18 +++-- .../server/storage/TestRaftLogReadWrite.java | 4 +- .../server/storage/TestRaftLogSegment.java | 10 +-- .../server/storage/TestSegmentedRaftLog.java | 13 ++++ 18 files changed, 354 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-common/src/main/java/org/apache/ratis/io/CorruptedFileException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/io/CorruptedFileException.java b/ratis-common/src/main/java/org/apache/ratis/io/CorruptedFileException.java new file mode 100644 index 0000000..3cf525b --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/io/CorruptedFileException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.io; + +import java.io.File; +import java.io.IOException; + +public class CorruptedFileException extends IOException { + public CorruptedFileException(File file, String message) { + super("File " + file + " (exist? " + file.exists() + ", length=" + file.length() + ") is corrupted: " + message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-common/src/main/java/org/apache/ratis/util/CheckedConsumer.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedConsumer.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedConsumer.java new file mode 100644 index 0000000..0532ea1 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedConsumer.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +/** Consumer with a throws-clause. */ +@FunctionalInterface +public interface CheckedConsumer<INPUT, THROWABLE extends Throwable> { + /** + * The same as {@link java.util.function.Consumer#accept(Object)} + * except that this method is declared with a throws-clause. + */ + void accept(INPUT input) throws THROWABLE; + + /** @return a {@link CheckedFunction} with {@link Void} return type. */ + static <INPUT, THROWABLE extends Throwable> CheckedFunction<INPUT, Void, THROWABLE> asCheckedFunction( + CheckedConsumer<INPUT, THROWABLE> consumer) { + return input -> { + consumer.accept(input); + return null; + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java index c6ccbd3..7847c21 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java @@ -64,6 +64,22 @@ public class OpenCloseState { } } + public boolean isUnopened() { + return state.get() == initTrace; + } + + public boolean isOpened() { + return state.get() instanceof OpenTrace; + } + + public boolean isClosed() { + return state.get() instanceof CloseTrace; + } + + public Throwable getThrowable() { + return state.get(); + } + /** * Transit to open state. * The method is NOT idempotent. @@ -76,17 +92,27 @@ public class OpenCloseState { } } + private boolean readyToClose(Throwable t) { + return t == initTrace || t instanceof OpenTrace; + } + /** * Transit to close state. * The method is idempotent. + * + * @return true if the state is transited to close as a result of the invocation of this method. + * Otherwise, return false since it is already closed. + * + * @throws IOException if the current state is not allowed to transit to close. */ - public void close() throws IOException { - final Throwable t = state.updateAndGet( - previous -> previous instanceof OpenTrace? new CloseTrace("Close "+ name): previous); - // If t is CloseTrace, t is either the previous or the newly created object. - if (!(t instanceof CloseTrace)) { - throw new IOException("Failed to close " + name + " since it is " + toString(t)); + public boolean close() throws IOException { + final Throwable previous = state.getAndUpdate(prev -> readyToClose(prev)? new CloseTrace("Close "+ name): prev); + if (readyToClose(previous)) { + return true; + } else if (previous instanceof CloseTrace) { + return false; // already closed } + throw new IOException("Failed to close " + name + " since it is " + toString(previous)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java index 6a9442e..be797a0 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java @@ -88,6 +88,11 @@ public class StringUtils { return bytes2HexString(ByteBuffer.wrap(bytes)); } + public static String bytes2HexString(byte[] bytes, int offset, int length) { + Objects.requireNonNull(bytes, "bytes == null"); + return bytes2HexString(ByteBuffer.wrap(bytes, offset, length)); + } + public static String bytes2HexString(ByteBuffer bytes) { Objects.requireNonNull(bytes, "bytes == null"); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 0542f8b..02e0f56 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -44,6 +44,7 @@ import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX import static org.apache.ratis.util.LifeCycle.State.CLOSED; import static org.apache.ratis.util.LifeCycle.State.CLOSING; import static org.apache.ratis.util.LifeCycle.State.EXCEPTION; +import static org.apache.ratis.util.LifeCycle.State.NEW; import static org.apache.ratis.util.LifeCycle.State.RUNNING; import static org.apache.ratis.util.LifeCycle.State.STARTING; @@ -117,6 +118,9 @@ public class LogAppender { } public void stopAppender() { + if (lifeCycle.compareAndTransition(NEW, CLOSED)) { + return; + } lifeCycle.transition(CLOSING); daemon.interrupt(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java index 35e3df8..a2b7057 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java @@ -19,7 +19,6 @@ package org.apache.ratis.server.impl; public interface RaftServerConstants { long INVALID_LOG_INDEX = -1; - byte LOG_TERMINATE_BYTE = 0; long DEFAULT_CALLID = 0; long DEFAULT_SEQNUM = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java index 22d7949..6eb1e38 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java @@ -23,9 +23,11 @@ import java.io.Closeable; import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.util.Optional; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.OpenCloseState; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,17 +59,11 @@ public class LogInputStream implements Closeable { } } - private enum State { - UNINIT, - OPEN, - CLOSED - } - private final File logFile; private final long startIndex; private final long endIndex; private final boolean isOpen; - private State state = State.UNINIT; + private final OpenCloseState state; private LogReader reader; public LogInputStream(File log, long startIndex, long endIndex, @@ -82,20 +78,19 @@ public class LogInputStream implements Closeable { this.startIndex = startIndex; this.endIndex = endIndex; this.isOpen = isOpen; + this.state = new OpenCloseState(getName()); } private void init() throws IOException { - Preconditions.assertTrue(state == State.UNINIT); + state.open(); try { - reader = new LogReader(logFile); - // read the log header - String header = reader.readLogHeader(); - Preconditions.assertTrue(SegmentedRaftLog.HEADER_STR.equals(header), - "Corrupted log header: %s", header); - state = State.OPEN; + final LogReader r = new LogReader(logFile); + if (r.verifyHeader()) { + reader = r; + } } finally { if (reader == null) { - state = State.CLOSED; + state.close(); } } } @@ -113,19 +108,18 @@ public class LogInputStream implements Closeable { } public LogEntryProto nextEntry() throws IOException { - LogEntryProto entry = null; - switch (state) { - case UNINIT: + if (state.isUnopened()) { try { init(); } catch (Throwable e) { LOG.error("caught exception initializing " + this, e); throw IOUtils.asIOException(e); } - Preconditions.assertTrue(state != State.UNINIT); - return nextEntry(); - case OPEN: - entry = reader.readEntry(); + } + + Preconditions.assertTrue(!state.isUnopened()); + if (state.isOpened()) { + final LogEntryProto entry = reader.readEntry(); if (entry != null) { long index = entry.getIndex(); if (!isOpen() && index >= endIndex) { @@ -142,20 +136,20 @@ public class LogInputStream implements Closeable { } } } - break; - case CLOSED: - break; // return null + return entry; + } else if (state.isClosed()) { + return null; } - return entry; + throw new IOException("Failed to get next entry from " + this, state.getThrowable()); } long scanNextEntry() throws IOException { - Preconditions.assertTrue(state == State.OPEN); + state.assertOpen(); return reader.scanEntry(); } long getPosition() { - if (state == State.OPEN) { + if (state.isOpened()) { return reader.getPos(); } else { return 0; @@ -164,10 +158,9 @@ public class LogInputStream implements Closeable { @Override public void close() throws IOException { - if (state == State.OPEN) { - reader.close(); + if (state.close()) { + Optional.ofNullable(reader).ifPresent(LogReader::close); } - state = State.CLOSED; } boolean isOpen() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java index 118a3e8..46da9da 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java @@ -17,9 +17,9 @@ */ package org.apache.ratis.server.storage; -import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.CheckedConsumer; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.PureJavaCrc32C; @@ -43,7 +43,7 @@ public class LogOutputStream implements Closeable { fill = ByteBuffer.allocateDirect(BUFFER_SIZE); fill.position(0); for (int i = 0; i < fill.capacity(); i++) { - fill.put(RaftServerConstants.LOG_TERMINATE_BYTE); + fill.put(SegmentedRaftLogFormat.getTerminator()); } } @@ -136,7 +136,7 @@ public class LogOutputStream implements Closeable { preallocatedPos = 0; preallocate(); // preallocate file - out.write(SegmentedRaftLog.HEADER_BYTES); + SegmentedRaftLogFormat.applyHeaderTo(CheckedConsumer.asCheckedFunction(out::write)); flush(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java index 735e8a3..b15b72f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.server.storage; +import org.apache.ratis.io.CorruptedFileException; import org.apache.ratis.protocol.ChecksumException; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.thirdparty.com.google.protobuf.CodedInputStream; @@ -25,11 +26,11 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.PureJavaCrc32C; +import org.apache.ratis.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; -import java.nio.charset.StandardCharsets; import java.util.zip.Checksum; public class LogReader implements Closeable { @@ -138,13 +139,37 @@ public class LogReader implements Closeable { checksum = new PureJavaCrc32C(); } - String readLogHeader() throws IOException { - byte[] header = new byte[SegmentedRaftLog.HEADER_BYTES.length]; - int num = in.read(header); - if (num < header.length) { - throw new EOFException("EOF before reading a complete log header"); + /** + * Read header from the log file: + * + * (1) The header in file is verified successfully. + * Then, return true. + * + * (2) The header in file is partially written. + * Then, return false. + * + * (3) The header in file is corrupted or there is some other {@link IOException}. + * Then, throw an exception. + */ + boolean verifyHeader() throws IOException { + final int headerLength = SegmentedRaftLogFormat.getHeaderLength(); + final int readLength = in.read(temp, 0, headerLength); + Preconditions.assertTrue(readLength <= headerLength); + final int matchLength = SegmentedRaftLogFormat.matchHeader(temp, 0, readLength); + Preconditions.assertTrue(matchLength <= readLength); + + if (readLength == headerLength && matchLength == readLength) { + // The header is matched successfully + return true; + } else if (SegmentedRaftLogFormat.isTerminator(temp, matchLength, readLength - matchLength)) { + // The header is partially written + return false; } - return new String(header, StandardCharsets.UTF_8); + // The header is corrupted + throw new CorruptedFileException(file, "Log header mismatched: expected header length=" + + SegmentedRaftLogFormat.getHeaderLength() + ", read length=" + readLength + ", match length=" + matchLength + + ", header in file=" + StringUtils.bytes2HexString(temp, 0, readLength) + + ", expected header=" + SegmentedRaftLogFormat.applyHeaderTo(StringUtils::bytes2HexString)); } /** @@ -196,13 +221,12 @@ public class LogReader implements Closeable { int numRead = -1, idx = 0; while (true) { try { - numRead = -1; numRead = in.read(temp); if (numRead == -1) { return; } for (idx = 0; idx < numRead; idx++) { - if (temp[idx] != RaftServerConstants.LOG_TERMINATE_BYTE) { + if (!SegmentedRaftLogFormat.isTerminator(temp[idx])) { throw new IOException("Read extra bytes after the terminator!"); } } @@ -243,7 +267,7 @@ public class LogReader implements Closeable { // Each log entry starts with a var-int. Thus a valid entry's first byte // should not be 0. So if the terminate byte is 0, we should hit the end // of the segment. - if (nextByte == RaftServerConstants.LOG_TERMINATE_BYTE) { + if (SegmentedRaftLogFormat.isTerminator(nextByte)) { verifyTerminator(); return null; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java index 5f61864..a74d6a8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java @@ -25,6 +25,8 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -45,6 +47,8 @@ import java.util.function.Consumer; * This class will be protected by the RaftServer's lock. */ class LogSegment implements Comparable<Long> { + static final Logger LOG = LoggerFactory.getLogger(LogSegment.class); + static long getEntrySize(LogEntryProto entry) { final int serialized = ServerProtoUtils.removeStateMachineData(entry).getSerializedSize(); return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4; @@ -103,12 +107,11 @@ class LogSegment implements Comparable<Long> { return new LogSegment(storage, false, start, end); } - private static void readSegmentFile(File file, long start, long end, + private static int readSegmentFile(File file, long start, long end, boolean isOpen, Consumer<LogEntryProto> entryConsumer) throws IOException { + int count = 0; try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) { - LogEntryProto next; - LogEntryProto prev = null; - while ((next = in.nextEntry()) != null) { + for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) { if (prev != null) { Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1, "gap between entry %s and entry %s", prev, next); @@ -117,9 +120,10 @@ class LogSegment implements Comparable<Long> { if (entryConsumer != null) { entryConsumer.accept(next); } - prev = next; + count++; } } + return count; } static LogSegment loadSegment(RaftStorage storage, File file, @@ -130,15 +134,20 @@ class LogSegment implements Comparable<Long> { LogSegment.newOpenSegment(storage, start) : LogSegment.newCloseSegment(storage, start, end); - readSegmentFile(file, start, end, isOpen, entry -> { - segment.append(keepEntryInCache | isOpen, entry); + final int entryCount = readSegmentFile(file, start, end, isOpen, entry -> { + segment.append(keepEntryInCache || isOpen, entry); if (logConsumer != null) { logConsumer.accept(entry); } }); + LOG.info("Successfully read {} entries from segment file {}", entryCount, file); - // truncate padding if necessary - if (file.length() > segment.getTotalSize()) { + if (entryCount == 0) { + // The segment does not have any entries, delete the file. + FileUtils.deleteFile(file); + return null; + } else if (file.length() > segment.getTotalSize()) { + // The segment has extra padding, truncate it. FileUtils.truncateFile(file, segment.getTotalSize()); } @@ -217,7 +226,7 @@ class LogSegment implements Comparable<Long> { this.isOpen = isOpen; this.startIndex = start; this.endIndex = end; - totalSize = SegmentedRaftLog.HEADER_BYTES.length; + totalSize = SegmentedRaftLogFormat.getHeaderLength(); hasEntryCache = isOpen; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java index 25291c8..451e713 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java @@ -109,7 +109,9 @@ class RaftLogCache { Consumer<LogEntryProto> logConsumer) throws IOException { LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(), pi.startIndex, pi.endIndex, pi.isOpen(), keepEntryInCache, logConsumer); - addSegment(logSegment); + if (logSegment != null) { + addSegment(logSegment); + } } long getCachedSegmentNum() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 0dc7cea..f2b5c1a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -68,9 +68,6 @@ import java.util.function.Consumer; * we may have hole when append further log. */ public class SegmentedRaftLog extends RaftLog { - static final String HEADER_STR = "RAFTLOG1"; - static final byte[] HEADER_BYTES = HEADER_STR.getBytes(StandardCharsets.UTF_8); - /** * I/O task definitions. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java new file mode 100644 index 0000000..1a00a84 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.util.CheckedFunction; +import org.apache.ratis.util.Preconditions; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +public interface SegmentedRaftLogFormat { + class Internal { + private static final byte[] HEADER_BYTES = "RaftLog1".getBytes(StandardCharsets.UTF_8); + private static final byte[] HEADER_BYTES_CLONE = HEADER_BYTES.clone(); + private static final byte TERMINATOR_BYTE = 0; + + private static void assertHeader() { + Preconditions.assertTrue(Arrays.equals(HEADER_BYTES, HEADER_BYTES_CLONE)); + } + } + + static int getHeaderLength() { + return Internal.HEADER_BYTES.length; + } + + static int matchHeader(byte[] bytes, int offset, int length) { + Preconditions.assertTrue(length <= getHeaderLength()); + for(int i = 0; i < length; i++) { + if (bytes[offset + i] != Internal.HEADER_BYTES[i]) { + return i; + } + } + return length; + } + + static <T> T applyHeaderTo(CheckedFunction<byte[], T, IOException> function) throws IOException { + final T t = function.apply(Internal.HEADER_BYTES); + Internal.assertHeader(); // assert that the header is unmodified by the function. + return t; + } + + static byte getTerminator() { + return Internal.TERMINATOR_BYTE; + } + + static boolean isTerminator(byte b) { + return b == Internal.TERMINATOR_BYTE; + } + + static boolean isTerminator(byte[] bytes, int offset, int length) { + return indexOfNonTerminator(bytes, offset, length) == -1; + } + + /** + * @return The index of the first non-terminator if it exists. + * Otherwise, return -1, i.e. all bytes are terminator. + */ + static int indexOfNonTerminator(byte[] bytes, int offset, int length) { + for(int i = 0; i < length; i++) { + if (!isTerminator(bytes[offset + i])) { + return i; + } + } + return -1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java index 95ac858..531d2e2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java @@ -33,17 +33,21 @@ import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.impl.ServerState; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; +import org.apache.ratis.server.storage.SegmentedRaftLogFormat; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.StringUtils; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import java.io.File; +import java.io.RandomAccessFile; import java.nio.file.Path; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -57,7 +61,6 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> extends BaseTest implements MiniRaftCluster.Factory.Get<CLUSTER> { { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); } @@ -154,12 +157,57 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> server.getProxy().close(); } - static File getOpenLogFile(RaftServerImpl server) throws Exception { - final List<Path> openLogs = server.getState().getStorage().getStorageDir().getLogSegmentFiles().stream() + static List<Path> getOpenLogFiles(RaftServerImpl server) throws Exception { + return server.getState().getStorage().getStorageDir().getLogSegmentFiles().stream() .filter(LogPathAndIndex::isOpen) .map(LogPathAndIndex::getPath) .collect(Collectors.toList()); + } + + static File getOpenLogFile(RaftServerImpl server) throws Exception { + final List<Path> openLogs = getOpenLogFiles(server); Assert.assertEquals(1, openLogs.size()); return openLogs.get(0).toFile(); } + + @Test + public void testRestartWithCorruptedLogHeader() throws Exception { + try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) { + runTestRestartWithCorruptedLogHeader(cluster, LOG); + } + } + + static void runTestRestartWithCorruptedLogHeader(MiniRaftCluster cluster, Logger LOG) throws Exception { + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + + // shutdown all servers + cluster.getServers().forEach(RaftServerProxy::close); + + for(RaftServerImpl impl : cluster.iterateServerImpls()) { + final File openLogFile = JavaUtils.attempt(() -> getOpenLogFile(impl), + 10, 100, impl.getId() + "-getOpenLogFile", LOG); + for(int i = 0; i < SegmentedRaftLogFormat.getHeaderLength(); i++) { + assertCorruptedLogHeader(impl.getId(), openLogFile, i, cluster, LOG); + Assert.assertTrue(getOpenLogFiles(impl).isEmpty()); + } + } + } + + static void assertCorruptedLogHeader(RaftPeerId id, File openLogFile, int partialLength, + MiniRaftCluster cluster, Logger LOG) throws Exception { + Preconditions.assertTrue(partialLength < SegmentedRaftLogFormat.getHeaderLength()); + try(final RandomAccessFile raf = new RandomAccessFile(openLogFile, "rw")) { + SegmentedRaftLogFormat.applyHeaderTo(header -> { + LOG.info("header = {}", StringUtils.bytes2HexString(header)); + final byte[] corrupted = new byte[header.length]; + System.arraycopy(header, 0, corrupted, 0, partialLength); + LOG.info("corrupted = {}", StringUtils.bytes2HexString(corrupted)); + raf.write(corrupted); + return null; + }); + } + final RaftServerImpl server = cluster.restartServer(id, false); + server.getProxy().close(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java index 9cc60a6..58e319d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java @@ -26,11 +26,11 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; import org.apache.ratis.metrics.RatisMetricsRegistry; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; import org.apache.ratis.server.storage.RaftStorageTestUtils; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.junit.Assert; import org.junit.Test; @@ -90,12 +90,21 @@ public class TestRaftLogMetrics extends BaseTest } } - for (RaftServerProxy rsp: cluster.getServers()) { - final String flushTimeMetric = RaftStorageTestUtils.getLogFlushTimeMetric(rsp.getId()); + // For leader, flush must happen before client can get replies. + assertFlushCount(cluster.getLeader()); + + // For followers, flush can be lagged behind. Attempt multiple times. + for(RaftServerImpl f : cluster.getFollowers()) { + JavaUtils.attempt(() -> assertFlushCount(f), 10, 100, f.getId() + "-assertFlushCount", null); + } + } + + static void assertFlushCount(RaftServerImpl server) throws Exception { + final String flushTimeMetric = RaftStorageTestUtils.getLogFlushTimeMetric(server.getId()); Timer tm = RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric); Assert.assertNotNull(tm); - final MetricsStateMachine stateMachine = MetricsStateMachine.get(rsp.getImpl(cluster.getGroupId())); + final MetricsStateMachine stateMachine = MetricsStateMachine.get(server); final int expectedFlush = stateMachine.getFlushCount(); Assert.assertEquals(expectedFlush, tm.getCount()); @@ -106,6 +115,5 @@ public class TestRaftLogMetrics extends BaseTest Assert.assertEquals(expectedFlush, ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count")) .intValue()); - } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java index f3b314f..7d9fdf5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java @@ -104,7 +104,7 @@ public class TestRaftLogReadWrite extends BaseTest { public void testReadWriteLog() throws IOException { final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); File openSegment = storage.getStorageDir().getOpenLogFile(0); - long size = SegmentedRaftLog.HEADER_BYTES.length; + long size = SegmentedRaftLogFormat.getHeaderLength(); final LogEntryProto[] entries = new LogEntryProto[100]; try (LogOutputStream out = @@ -162,7 +162,7 @@ public class TestRaftLogReadWrite extends BaseTest { public void testReadWithPadding() throws IOException { final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); File openSegment = storage.getStorageDir().getOpenLogFile(0); - long size = SegmentedRaftLog.HEADER_BYTES.length; + long size = SegmentedRaftLogFormat.getHeaderLength(); LogEntryProto[] entries = new LogEntryProto[100]; LogOutputStream out = new LogOutputStream(openSegment, false, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java index bc2339d..270e279 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -125,7 +125,7 @@ public class TestRaftLogSegment extends BaseTest { Assert.assertEquals(isOpen, segment.isOpen()); Assert.assertEquals(totalSize, segment.getTotalSize()); - long offset = SegmentedRaftLog.HEADER_BYTES.length; + long offset = SegmentedRaftLogFormat.getHeaderLength(); for (long i = start; i <= end; i++) { LogSegment.LogRecord record = segment.getLogRecord(i); LogRecordWithEntry lre = segment.getEntryWithoutLoading(i); @@ -184,7 +184,7 @@ public class TestRaftLogSegment extends BaseTest { public void testAppendEntries() throws Exception { final long start = 1000; LogSegment segment = LogSegment.newOpenSegment(null, start); - long size = SegmentedRaftLog.HEADER_BYTES.length; + long size = SegmentedRaftLogFormat.getHeaderLength(); final long max = 8 * 1024 * 1024; checkLogSegment(segment, start, start - 1, true, size, 0); @@ -267,7 +267,7 @@ public class TestRaftLogSegment extends BaseTest { segment.truncate(start); Assert.assertEquals(0, segment.numOfEntries()); checkLogSegment(segment, start, start - 1, false, - SegmentedRaftLog.HEADER_BYTES.length, term); + SegmentedRaftLogFormat.getHeaderLength(), term); } @Test @@ -307,7 +307,7 @@ public class TestRaftLogSegment extends BaseTest { out.write(entry); } Assert.assertEquals(file.length(), - size + SegmentedRaftLog.HEADER_BYTES.length); + size + SegmentedRaftLogFormat.getHeaderLength()); try (LogInputStream in = new LogInputStream(file, 0, INVALID_LOG_INDEX, true)) { LogEntryProto entry = in.nextEntry(); @@ -332,7 +332,7 @@ public class TestRaftLogSegment extends BaseTest { LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0); final long entrySize = LogSegment.getEntrySize(entry); - long totalSize = SegmentedRaftLog.HEADER_BYTES.length; + long totalSize = SegmentedRaftLogFormat.getHeaderLength(); long preallocated = 16 * 1024; try (LogOutputStream out = new LogOutputStream(file, false, max.getSize(), 16 * 1024, 10 * 1024)) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 9971d30..5cb498a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -40,6 +40,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -398,4 +399,16 @@ public class TestSegmentedRaftLog extends BaseTest { Assert.assertEquals(5, cache.getNumOfSegments()); } } + + + @Test + public void testSegmentedRaftLogFormatInternalHeader() throws Exception { + testFailureCase("testSegmentedRaftLogFormatInternalHeader", + () -> SegmentedRaftLogFormat.applyHeaderTo(header -> { + LOG.info("header = " + new String(header, StandardCharsets.UTF_8)); + header[0] += 1; // try changing the internal header + LOG.info("header' = " + new String(header, StandardCharsets.UTF_8)); + return null; + }), IllegalStateException.class); + } }
