http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java new file mode 100644 index 0000000..6ed70fa --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_WRITE_BUFFER_SIZE_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_WRITE_BUFFER_SIZE_KEY; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.zip.Checksum; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.PureJavaCrc32C; +import org.apache.ratis.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LogOutputStream implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(LogOutputStream.class); + + private static final ByteBuffer fill; + private static final int BUFFER_SIZE = 1024 * 1024; // 1 MB + static { + fill = ByteBuffer.allocateDirect(BUFFER_SIZE); + fill.position(0); + for (int i = 0; i < fill.capacity(); i++) { + fill.put(RaftServerConstants.LOG_TERMINATE_BYTE); + } + } + + private File file; + private FileChannel fc; // channel of the file stream for sync + private BufferedWriteChannel out; // buffered FileChannel for writing + private final Checksum checksum; + + private final long segmentMaxSize; + private final long preallocatedSize; + private long preallocatedPos; + + public LogOutputStream(File file, boolean append, RaftProperties properties) + throws IOException { + this.file = file; + this.checksum = new PureJavaCrc32C(); + this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, + RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT); + this.preallocatedSize = properties.getLong( + RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, + RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT); + RandomAccessFile rp = new RandomAccessFile(file, "rw"); + fc = rp.getChannel(); + fc.position(fc.size()); + preallocatedPos = fc.size(); + + int bufferSize = properties.getInt(RAFT_LOG_WRITE_BUFFER_SIZE_KEY, + RAFT_LOG_WRITE_BUFFER_SIZE_DEFAULT); + out = new BufferedWriteChannel(fc, bufferSize); + + if (!append) { + create(); + } + } + + /** + * Format: + * LogEntryProto's protobuf + * 4-byte checksum of the above protobuf + */ + public void write(LogEntryProto entry) throws IOException { + final int serialized = entry.getSerializedSize(); + final int bufferSize = CodedOutputStream.computeUInt32SizeNoTag(serialized) + + serialized; + + preallocateIfNecessary(bufferSize + 4); + + byte[] buf = new byte[bufferSize]; + CodedOutputStream cout = CodedOutputStream.newInstance(buf); + cout.writeUInt32NoTag(serialized); + entry.writeTo(cout); + + checksum.reset(); + checksum.update(buf, 0, buf.length); + final int sum = (int) checksum.getValue(); + + out.write(buf); + writeInt(sum); + } + + private void writeInt(int v) throws IOException { + out.write((v >>> 24) & 0xFF); + out.write((v >>> 16) & 0xFF); + out.write((v >>> 8) & 0xFF); + out.write((v) & 0xFF); + } + + private void create() throws IOException { + fc.truncate(0); + fc.position(0); + preallocatedPos = 0; + preallocate(); // preallocate file + + out.write(SegmentedRaftLog.HEADER_BYTES); + flush(); + } + + @Override + public void close() throws IOException { + try { + out.flush(false); + if (fc != null && fc.isOpen()) { + fc.truncate(fc.position()); + } + } finally { + RaftUtils.cleanup(LOG, fc, out); + fc = null; + out = null; + } + } + + /** + * Flush data to persistent store. + * Collect sync metrics. + */ + public void flush() throws IOException { + if (out == null) { + throw new IOException("Trying to use aborted output stream"); + } + out.flush(true); + } + + private void preallocate() throws IOException { + fill.position(0); + long targetSize = Math.min(segmentMaxSize - fc.size(), preallocatedSize); + int allocated = 0; + while (allocated < targetSize) { + int size = (int) Math.min(BUFFER_SIZE, targetSize - allocated); + ByteBuffer buffer = fill.slice(); + buffer.limit(size); + RaftUtils.writeFully(fc, buffer, preallocatedPos); + preallocatedPos += size; + allocated += size; + } + LOG.debug("Pre-allocated {} bytes for the log segment", allocated); + } + + private void preallocateIfNecessary(int size) throws IOException { + if (out.position() + size > preallocatedPos) { + preallocate(); + } + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "(" + file + ")"; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java new file mode 100644 index 0000000..0e5a168 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import java.io.BufferedInputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.Checksum; + +import org.apache.commons.io.Charsets; +import org.apache.ratis.protocol.ChecksumException; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.shaded.com.google.protobuf.CodedInputStream; +import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.PureJavaCrc32C; +import org.apache.ratis.util.RaftUtils; + +import com.google.common.base.Preconditions; + +public class LogReader implements Closeable { + /** + * InputStream wrapper that keeps track of the current stream position. + * + * This stream also allows us to set a limit on how many bytes we can read + * without getting an exception. + */ + public static class LimitedInputStream extends FilterInputStream { + private long curPos = 0; + private long markPos = -1; + private long limitPos = Long.MAX_VALUE; + + public LimitedInputStream(InputStream is) { + super(is); + } + + private void checkLimit(long amt) throws IOException { + long extra = (curPos + amt) - limitPos; + if (extra > 0) { + throw new IOException("Tried to read " + amt + " byte(s) past " + + "the limit at offset " + limitPos); + } + } + + @Override + public int read() throws IOException { + checkLimit(1); + int ret = super.read(); + if (ret != -1) curPos++; + return ret; + } + + @Override + public int read(byte[] data) throws IOException { + checkLimit(data.length); + int ret = super.read(data); + if (ret > 0) curPos += ret; + return ret; + } + + @Override + public int read(byte[] data, int offset, int length) throws IOException { + checkLimit(length); + int ret = super.read(data, offset, length); + if (ret > 0) curPos += ret; + return ret; + } + + public void setLimit(long limit) { + limitPos = curPos + limit; + } + + public void clearLimit() { + limitPos = Long.MAX_VALUE; + } + + @Override + public void mark(int limit) { + super.mark(limit); + markPos = curPos; + } + + @Override + public void reset() throws IOException { + if (markPos == -1) { + throw new IOException("Not marked!"); + } + super.reset(); + curPos = markPos; + markPos = -1; + } + + public long getPos() { + return curPos; + } + + @Override + public long skip(long amt) throws IOException { + long extra = (curPos + amt) - limitPos; + if (extra > 0) { + throw new IOException("Tried to skip " + extra + " bytes past " + + "the limit at offset " + limitPos); + } + long ret = super.skip(amt); + curPos += ret; + return ret; + } + } + + private static final int maxOpSize = 32 * 1024 * 1024; + + private final LimitedInputStream limiter; + private final DataInputStream in; + private byte[] temp = new byte[4096]; + private final Checksum checksum; + + LogReader(File file) throws FileNotFoundException { + this.limiter = new LimitedInputStream( + new BufferedInputStream(new FileInputStream(file))); + in = new DataInputStream(limiter); + checksum = new PureJavaCrc32C(); + } + + String readLogHeader() throws IOException { + byte[] header = new byte[SegmentedRaftLog.HEADER_BYTES.length]; + int num = in.read(header); + if (num < header.length) { + throw new EOFException("EOF before reading a complete log header"); + } + return new String(header, Charsets.UTF_8); + } + + /** + * Read a log entry from the input stream. + * + * @return the operation read from the stream, or null at the end of the + * file + * @throws IOException on error. This function should only throw an + * exception when skipBrokenEdits is false. + */ + LogEntryProto readEntry() throws IOException { + try { + return decodeEntry(); + } catch (IOException e) { + in.reset(); + + throw e; + } catch (Throwable e) { + // raft log requires no gap between any two entries. thus if an entry is + // broken, throw the exception instead of skipping broken entries + in.reset(); + throw new IOException("got unexpected exception " + e.getMessage(), e); + } + } + + /** + * Scan and validate a log entry. + * @return the index of the log entry + */ + long scanEntry() throws IOException { + LogEntryProto entry = decodeEntry(); + return entry != null ? entry.getIndex() : RaftServerConstants.INVALID_LOG_INDEX; + } + + void verifyTerminator() throws IOException { + // The end of the log should contain 0x00 bytes. + // If it contains other bytes, the log itself may be corrupt. + limiter.clearLimit(); + int numRead = -1, idx = 0; + while (true) { + try { + numRead = -1; + numRead = in.read(temp); + if (numRead == -1) { + return; + } + for (idx = 0; idx < numRead; idx++) { + if (temp[idx] != RaftServerConstants.LOG_TERMINATE_BYTE) { + throw new IOException("Read extra bytes after the terminator!"); + } + } + } finally { + // After reading each group of bytes, we reposition the mark one + // byte before the next group. Similarly, if there is an error, we + // want to reposition the mark one byte before the error + if (numRead != -1) { + in.reset(); + RaftUtils.skipFully(in, idx); + in.mark(temp.length + 1); + RaftUtils.skipFully(in, 1); + } + } + } + } + + /** + * Decode the log entry "frame". This includes reading the log entry, and + * validating the checksum. + * + * The input stream will be advanced to the end of the op at the end of this + * function. + * + * @return The log entry, or null if we hit EOF. + */ + private LogEntryProto decodeEntry() throws IOException { + limiter.setLimit(maxOpSize); + in.mark(maxOpSize); + + byte nextByte; + try { + nextByte = in.readByte(); + } catch (EOFException eof) { + // EOF at an opcode boundary is expected. + return null; + } + // Each log entry starts with a var-int. Thus a valid entry's first byte + // should not be 0. So if the terminate byte is 0, we should hit the end + // of the segment. + if (nextByte == RaftServerConstants.LOG_TERMINATE_BYTE) { + verifyTerminator(); + return null; + } + + // Here, we verify that the Op size makes sense and that the + // data matches its checksum before attempting to construct an Op. + int entryLength = CodedInputStream.readRawVarint32(nextByte, in); + if (entryLength > maxOpSize) { + throw new IOException("Entry has size " + entryLength + + ", but maxOpSize = " + maxOpSize); + } + + final int varintLength = CodedOutputStream.computeUInt32SizeNoTag( + entryLength); + final int totalLength = varintLength + entryLength; + checkBufferSize(totalLength); + in.reset(); + in.mark(maxOpSize); + RaftUtils.readFully(in, temp, 0, totalLength); + + // verify checksum + checksum.reset(); + checksum.update(temp, 0, totalLength); + int expectedChecksum = in.readInt(); + int calculatedChecksum = (int) checksum.getValue(); + if (expectedChecksum != calculatedChecksum) { + throw new ChecksumException("LogEntry is corrupt. Calculated checksum is " + + calculatedChecksum + " but read checksum " + expectedChecksum, + limiter.markPos); + } + + // parse the buffer + return LogEntryProto.parseFrom( + CodedInputStream.newInstance(temp, varintLength, entryLength)); + } + + private void checkBufferSize(int entryLength) { + Preconditions.checkArgument(entryLength <= maxOpSize); + int length = temp.length; + if (length < entryLength) { + while (length < entryLength) { + length = Math.min(length * 2, maxOpSize); + } + temp = new byte[length]; + } + } + + long getPos() { + return limiter.getPos(); + } + + void skipFully(long length) throws IOException { + limiter.clearLimit(); + RaftUtils.skipFully(limiter, length); + } + + @Override + public void close() throws IOException { + RaftUtils.cleanup(null, in); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java new file mode 100644 index 0000000..af9ee66 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.ratis.server.impl.ConfigurationManager; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.FileUtils; + +import com.google.common.base.Preconditions; + +/** + * In-memory cache for a log segment file. All the updates will be first written + * into LogSegment then into corresponding files in the same order. + * + * This class will be protected by the RaftServer's lock. + */ +class LogSegment implements Comparable<Long> { + static class LogRecord { + /** starting offset in the file */ + final long offset; + final LogEntryProto entry; + + LogRecord(long offset, LogEntryProto entry) { + this.offset = offset; + this.entry = entry; + } + } + + static class SegmentFileInfo { + final long startIndex; // start index of the + final long endIndex; // original end index + final boolean isOpen; + final long targetLength; // position for truncation + final long newEndIndex; // new end index after the truncation + + SegmentFileInfo(long start, long end, boolean isOpen, long targetLength, + long newEndIndex) { + this.startIndex = start; + this.endIndex = end; + this.isOpen = isOpen; + this.targetLength = targetLength; + this.newEndIndex = newEndIndex; + } + } + + static long getEntrySize(LogEntryProto entry) { + final int serialized = entry.getSerializedSize(); + return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4; + } + + private boolean isOpen; + private final List<LogRecord> records = new ArrayList<>(); + private long totalSize; + private final long startIndex; + private long endIndex; + + private LogSegment(boolean isOpen, long start, long end) { + this.isOpen = isOpen; + this.startIndex = start; + this.endIndex = end; + totalSize = SegmentedRaftLog.HEADER_BYTES.length; + } + + static LogSegment newOpenSegment(long start) { + Preconditions.checkArgument(start >= 0); + return new LogSegment(true, start, start - 1); + } + + private static LogSegment newCloseSegment(long start, long end) { + Preconditions.checkArgument(start >= 0 && end >= start); + return new LogSegment(false, start, end); + } + + static LogSegment loadSegment(File file, long start, long end, boolean isOpen, + ConfigurationManager confManager) throws IOException { + final LogSegment segment; + try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) { + segment = isOpen ? LogSegment.newOpenSegment(start) : + LogSegment.newCloseSegment(start, end); + LogEntryProto next; + LogEntryProto prev = null; + while ((next = in.nextEntry()) != null) { + if (prev != null) { + Preconditions.checkState(next.getIndex() == prev.getIndex() + 1, + "gap between entry %s and entry %s", prev, next); + } + segment.append(next); + if (confManager != null && + next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { + confManager.addConfiguration(next.getIndex(), + ServerProtoUtils.toRaftConfiguration(next.getIndex(), + next.getConfigurationEntry())); + } + prev = next; + } + } + + // truncate padding if necessary + if (file.length() > segment.getTotalSize()) { + FileUtils.truncateFile(file, segment.getTotalSize()); + } + + Preconditions.checkState(start == segment.records.get(0).entry.getIndex()); + if (!isOpen) { + Preconditions.checkState(segment.getEndIndex() == end); + } + return segment; + } + + long getStartIndex() { + return startIndex; + } + + long getEndIndex() { + return endIndex; + } + + boolean isOpen() { + return isOpen; + } + + int numOfEntries() { + return (int) (endIndex - startIndex + 1); + } + + void appendToOpenSegment(LogEntryProto... entries) { + Preconditions.checkState(isOpen(), + "The log segment %s is not open for append", this.toString()); + append(entries); + } + + private void append(LogEntryProto... entries) { + Preconditions.checkArgument(entries != null && entries.length > 0); + final long term = entries[0].getTerm(); + if (records.isEmpty()) { + Preconditions.checkArgument(entries[0].getIndex() == startIndex, + "gap between start index %s and first entry to append %s", + startIndex, entries[0].getIndex()); + } + for (LogEntryProto entry : entries) { + // all these entries should be of the same term + Preconditions.checkArgument(entry.getTerm() == term, + "expected term:%s, term of the entry:%s", term, entry.getTerm()); + final LogRecord currentLast = getLastRecord(); + if (currentLast != null) { + Preconditions.checkArgument( + entry.getIndex() == currentLast.entry.getIndex() + 1, + "gap between entries %s and %s", entry.getIndex(), + currentLast.entry.getIndex()); + } + + final LogRecord record = new LogRecord(totalSize, entry); + records.add(record); + totalSize += getEntrySize(entry); + endIndex = entry.getIndex(); + } + } + + LogRecord getLogRecord(long index) { + if (index >= startIndex && index <= endIndex) { + return records.get((int) (index - startIndex)); + } + return null; + } + + LogRecord getLastRecord() { + return records.isEmpty() ? null : records.get(records.size() - 1); + } + + long getTotalSize() { + return totalSize; + } + + /** + * Remove records from the given index (inclusive) + */ + void truncate(long fromIndex) { + Preconditions.checkArgument(fromIndex >= startIndex && fromIndex <= endIndex); + LogRecord record = records.get((int) (fromIndex - startIndex)); + for (long index = endIndex; index >= fromIndex; index--) { + records.remove((int)(index - startIndex)); + } + totalSize = record.offset; + isOpen = false; + this.endIndex = fromIndex - 1; + } + + void close() { + Preconditions.checkState(isOpen()); + isOpen = false; + } + + @Override + public String toString() { + return isOpen() ? "log-" + startIndex + "-inprogress" : + "log-" + startIndex + "-" + endIndex; + } + + @Override + public int compareTo(Long l) { + return (l >= getStartIndex() && l <= getEndIndex()) ? 0 : + (this.getEndIndex() < l ? -1 : 1); + } + + void clear() { + records.clear(); + endIndex = startIndex - 1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java new file mode 100644 index 0000000..8a275ec --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.ratis.server.impl.RaftConfiguration; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.CodeInjectionForTesting; + +import com.google.common.base.Preconditions; + +/** + * A simple RaftLog implementation in memory. Used only for testing. + */ +public class MemoryRaftLog extends RaftLog { + private final List<LogEntryProto> entries = new ArrayList<>(); + + public MemoryRaftLog(String selfId) { + super(selfId); + } + + @Override + public LogEntryProto get(long index) { + checkLogState(); + try(AutoCloseableLock readLock = readLock()) { + final int i = (int) index; + return i >= 0 && i < entries.size() ? entries.get(i) : null; + } + } + + @Override + public LogEntryProto[] getEntries(long startIndex, long endIndex) { + checkLogState(); + try(AutoCloseableLock readLock = readLock()) { + final int i = (int) startIndex; + if (startIndex >= entries.size()) { + return null; + } + final int toIndex = (int) Math.min(entries.size(), endIndex); + return entries.subList(i, toIndex).toArray(EMPTY_LOGENTRY_ARRAY); + } + } + + @Override + void truncate(long index) { + checkLogState(); + try(AutoCloseableLock writeLock = writeLock()) { + Preconditions.checkArgument(index >= 0); + final int truncateIndex = (int) index; + for (int i = entries.size() - 1; i >= truncateIndex; i--) { + entries.remove(i); + } + } + } + + @Override + public LogEntryProto getLastEntry() { + checkLogState(); + try(AutoCloseableLock readLock = readLock()) { + final int size = entries.size(); + return size == 0 ? null : entries.get(size - 1); + } + } + + @Override + void appendEntry(LogEntryProto entry) { + checkLogState(); + try(AutoCloseableLock writeLock = writeLock()) { + entries.add(entry); + } + } + + @Override + public long append(long term, RaftConfiguration newConf) { + checkLogState(); + try(AutoCloseableLock writeLock = writeLock()) { + final long nextIndex = getNextIndex(); + final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term, + nextIndex); + entries.add(e); + return nextIndex; + } + } + + @Override + public long getStartIndex() { + return entries.isEmpty() ? RaftServerConstants.INVALID_LOG_INDEX : + entries.get(0).getIndex(); + } + + @Override + public void append(LogEntryProto... entries) { + checkLogState(); + try(AutoCloseableLock writeLock = writeLock()) { + if (entries == null || entries.length == 0) { + return; + } + // Before truncating the entries, we first need to check if some + // entries are duplicated. If the leader sends entry 6, entry 7, then + // entry 6 again, without this check the follower may truncate entry 7 + // when receiving entry 6 again. Then before the leader detects this + // truncation in the next appendEntries RPC, leader may think entry 7 has + // been committed but in the system the entry has not been committed to + // the quorum of peers' disks. + // TODO add a unit test for this + boolean toTruncate = false; + int truncateIndex = (int) entries[0].getIndex(); + int index = 0; + for (; truncateIndex < getNextIndex() && index < entries.length; + index++, truncateIndex++) { + if (this.entries.get(truncateIndex).getTerm() != + entries[index].getTerm()) { + toTruncate = true; + break; + } + } + if (toTruncate) { + truncate(truncateIndex); + } + // Collections.addAll(this.entries, entries); + for (int i = index; i < entries.length; i++) { + this.entries.add(entries[i]); + } + } + } + + @Override + public String toString() { + return "last=" + ServerProtoUtils.toString(getLastEntry()) + + ", committed=" + + ServerProtoUtils.toString(get(getLastCommittedIndex())); + } + + public String getEntryString() { + return "entries=" + entries; + } + + @Override + public void logSync() { + CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null); + // do nothing + } + + @Override + public long getLatestFlushedIndex() { + return getNextIndex() - 1; + } + + @Override + public void writeMetadata(long term, String votedFor) { + // do nothing + } + + @Override + public Metadata loadMetadata() { + return new Metadata(null, 0); + } + + @Override + public void syncWithSnapshot(long lastSnapshotIndex) { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java new file mode 100644 index 0000000..8deb7e3 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import com.google.common.base.Charsets; + +import org.apache.ratis.util.AtomicFileOutputStream; +import org.apache.ratis.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Properties; + +/** + * Class that represents a file on disk which persistently stores + * a single <code>long</code> value. The file is updated atomically + * and durably (i.e fsynced). + */ +class MetaFile { + private static final Logger LOG = LoggerFactory.getLogger(MetaFile.class); + private static final String TERM_KEY = "term"; + private static final String VOTEDFOR_KEY = "votedFor"; + static final long DEFAULT_TERM = 0; + static final String EMPTY_VOTEFOR = ""; + + private final File file; + private boolean loaded = false; + private long term; + private String votedFor; + + MetaFile(File file) { + this.file = file; + term = DEFAULT_TERM; + votedFor = EMPTY_VOTEFOR; + } + + boolean exists() { + return this.file.exists(); + } + + long getTerm() throws IOException { + if (!loaded) { + readFile(); + loaded = true; + } + return term; + } + + String getVotedFor() throws IOException { + if (!loaded) { + readFile(); + loaded = true; + } + return votedFor; + } + + void set(long newTerm, String newVotedFor) throws IOException { + newVotedFor = newVotedFor == null ? EMPTY_VOTEFOR : newVotedFor; + if (!loaded || (newTerm != term || !newVotedFor.equals(votedFor))) { + writeFile(newTerm, newVotedFor); + } + term = newTerm; + votedFor = newVotedFor; + loaded = true; + } + + /** + * Atomically write the given term and votedFor information to the given file, + * including fsyncing. + * + * @throws IOException if the file cannot be written + */ + void writeFile(long term, String votedFor) throws IOException { + AtomicFileOutputStream fos = new AtomicFileOutputStream(file); + Properties properties = new Properties(); + properties.setProperty(TERM_KEY, Long.toString(term)); + properties.setProperty(VOTEDFOR_KEY, votedFor); + try { + properties.store( + new BufferedWriter(new OutputStreamWriter(fos, Charsets.UTF_8)), ""); + fos.close(); + fos = null; + } finally { + if (fos != null) { + fos.abort(); + } + } + } + + void readFile() throws IOException { + term = DEFAULT_TERM; + votedFor = EMPTY_VOTEFOR; + if (file.exists()) { + BufferedReader br = new BufferedReader( + new InputStreamReader(new FileInputStream(file), Charsets.UTF_8)); + try { + Properties properties = new Properties(); + properties.load(br); + if (properties.containsKey(TERM_KEY) && + properties.containsKey(VOTEDFOR_KEY)) { + term = Long.parseLong((String) properties.get(TERM_KEY)); + votedFor = (String) properties.get(VOTEDFOR_KEY); + } else { + throw new IOException("Corrupted term/votedFor properties: " + + properties); + } + } catch(IOException e) { + LOG.warn("Cannot load term/votedFor properties from {}", file, e); + throw e; + } finally { + RaftUtils.cleanup(LOG, br); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java new file mode 100644 index 0000000..05307f2 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.ratis.server.impl.ConfigurationManager; +import org.apache.ratis.server.impl.RaftConfiguration; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.ProtoUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Base class of RaftLog. Currently we provide two types of RaftLog + * implementation: + * 1. MemoryRaftLog: all the log entries are stored in memory. This is only used + * for testing. + * 2. Segmented RaftLog: the log entries are persisted on disk, and are stored + * in segments. + */ +public abstract class RaftLog implements Closeable { + public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class); + public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new LogEntryProto[0]; + public static final String LOG_SYNC = RaftLog.class.getSimpleName() + ".logSync"; + + /** + * The largest committed index. Note the last committed log may be included + * in the latest snapshot file. + */ + protected final AtomicLong lastCommitted = + new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX); + private final String selfId; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private volatile boolean isOpen = false; + + public RaftLog(String selfId) { + this.selfId = selfId; + } + + public long getLastCommittedIndex() { + return lastCommitted.get(); + } + + public void checkLogState() { + Preconditions.checkState(isOpen, + "The RaftLog has not been opened or has been closed"); + } + + /** + * Update the last committed index. + * @param majorityIndex the index that has achieved majority. + * @param currentTerm the current term. + */ + public void updateLastCommitted(long majorityIndex, long currentTerm) { + try(AutoCloseableLock writeLock = writeLock()) { + if (lastCommitted.get() < majorityIndex) { + // Only update last committed index for current term. See §5.4.2 in + // paper for details. + final LogEntryProto entry = get(majorityIndex); + if (entry != null && entry.getTerm() == currentTerm) { + LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex); + lastCommitted.set(majorityIndex); + } + } + } + } + + /** + * Does the log contains the given term and index? Used to check the + * consistency between the local log of a follower and the log entries sent + * by the leader. + */ + public boolean contains(TermIndex ti) { + if (ti == null) { + return false; + } + LogEntryProto entry = get(ti.getIndex()); + TermIndex local = ServerProtoUtils.toTermIndex(entry); + return ti.equals(local); + } + + /** + * @return the index of the next log entry to append. + */ + public long getNextIndex() { + final LogEntryProto last = getLastEntry(); + if (last == null) { + // if the log is empty, the last committed index should be consistent with + // the last index included in the latest snapshot. + return getLastCommittedIndex() + 1; + } + return last.getIndex() + 1; + } + + /** + * Generate a log entry for the given term and message, and append the entry. + * Used by the leader. + * @return the index of the new log entry. + */ + public long append(long term, TransactionContext operation) throws IOException { + checkLogState(); + try(AutoCloseableLock writeLock = writeLock()) { + final long nextIndex = getNextIndex(); + + // This is called here to guarantee strict serialization of callback executions in case + // the SM wants to attach a logic depending on ordered execution in the log commit order. + operation = operation.preAppendTransaction(); + + // build the log entry after calling the StateMachine + final LogEntryProto e = ProtoUtils.toLogEntryProto( + operation.getSMLogEntry().get(), term, nextIndex); + + appendEntry(e); + operation.setLogEntry(e); + return nextIndex; + } + } + + /** + * Generate a log entry for the given term and configurations, + * and append the entry. Used by the leader. + * @return the index of the new log entry. + */ + public long append(long term, RaftConfiguration newConf) { + checkLogState(); + try(AutoCloseableLock writeLock = writeLock()) { + final long nextIndex = getNextIndex(); + final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term, + nextIndex); + appendEntry(e); + return nextIndex; + } + } + + public void open(ConfigurationManager confManager, long lastIndexInSnapshot) + throws IOException { + isOpen = true; + } + + public abstract long getStartIndex(); + + /** + * Get the log entry of the given index. + * + * @param index The given index. + * @return The log entry associated with the given index. + * Null if there is no log entry with the index. + */ + public abstract LogEntryProto get(long index); + + /** + * @param startIndex the starting log index (inclusive) + * @param endIndex the ending log index (exclusive) + * @return all log entries within the given index range. Null if startIndex + * is greater than the smallest available index. + */ + public abstract LogEntryProto[] getEntries(long startIndex, long endIndex); + + /** + * @return the last log entry. + */ + public abstract LogEntryProto getLastEntry(); + + /** + * Truncate the log entries till the given index. The log with the given index + * will also be truncated (i.e., inclusive). + */ + abstract void truncate(long index); + + /** + * Used by the leader when appending a new entry based on client's request + * or configuration change. + */ + abstract void appendEntry(LogEntryProto entry); + + /** + * Append all the given log entries. Used by the followers. + * + * If an existing entry conflicts with a new one (same index but different + * terms), delete the existing entry and all entries that follow it (§5.3). + * + * This method, {@link #append(long, TransactionContext)}, + * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)}, + * do not guarantee the changes are persisted. + * Need to call {@link #logSync()} to persist the changes. + */ + public abstract void append(LogEntryProto... entries); + + /** + * Flush and sync the log. + * It is triggered by AppendEntries RPC request from the leader. + */ + public abstract void logSync() throws InterruptedException; + + /** + * @return the index of the latest entry that has been flushed to the local + * storage. + */ + public abstract long getLatestFlushedIndex(); + + /** + * Write and flush the metadata (votedFor and term) into the meta file. + * + * We need to guarantee that the order of writeMetadata calls is the same with + * that when we change the in-memory term/votedFor. Otherwise we may persist + * stale term/votedFor in file. + * + * Since the leader change is not frequent, currently we simply put this call + * in the RaftPeer's lock. Later we can use an IO task queue to enforce the + * order. + */ + public abstract void writeMetadata(long term, String votedFor) + throws IOException; + + public abstract Metadata loadMetadata() throws IOException; + + public abstract void syncWithSnapshot(long lastSnapshotIndex); + + @Override + public String toString() { + return ServerProtoUtils.toString(getLastEntry()); + } + + public static class Metadata { + private final String votedFor; + private final long term; + + public Metadata(String votedFor, long term) { + this.votedFor = votedFor; + this.term = term; + } + + public String getVotedFor() { + return votedFor; + } + + public long getTerm() { + return term; + } + } + + public AutoCloseableLock readLock() { + return AutoCloseableLock.acquire(lock.readLock()); + } + + public AutoCloseableLock writeLock() { + return AutoCloseableLock.acquire(lock.writeLock()); + } + + public boolean hasWriteLock() { + return this.lock.isWriteLockedByCurrentThread(); + } + + public boolean hasReadLock() { + return this.lock.getReadHoldCount() > 0 || hasWriteLock(); + } + + @Override + public void close() throws IOException { + isOpen = false; + } + + public String getSelfId() { + return selfId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java new file mode 100644 index 0000000..90dd7fd --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.storage.LogSegment.LogRecord; +import org.apache.ratis.server.storage.LogSegment.SegmentFileInfo; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * In-memory RaftLog Cache. Currently we provide a simple implementation that + * caches all the segments in the memory. The cache is not thread-safe and + * requires external lock protection. + */ +class RaftLogCache { + private LogSegment openSegment; + private final List<LogSegment> closedSegments; + + RaftLogCache() { + closedSegments = new ArrayList<>(); + } + + private boolean areConsecutiveSegments(LogSegment prev, LogSegment segment) { + return !prev.isOpen() && prev.getEndIndex() + 1 == segment.getStartIndex(); + } + + private LogSegment getLastClosedSegment() { + return closedSegments.isEmpty() ? + null : closedSegments.get(closedSegments.size() - 1); + } + + private void validateAdding(LogSegment segment) { + final LogSegment lastClosed = getLastClosedSegment(); + if (!segment.isOpen()) { + Preconditions.checkState(lastClosed == null || + areConsecutiveSegments(lastClosed, segment)); + } else { + Preconditions.checkState(openSegment == null && + (lastClosed == null || areConsecutiveSegments(lastClosed, segment))); + } + } + + void addSegment(LogSegment segment) { + validateAdding(segment); + if (segment.isOpen()) { + openSegment = segment; + } else { + closedSegments.add(segment); + } + } + + LogEntryProto getEntry(long index) { + if (openSegment != null && index >= openSegment.getStartIndex()) { + final LogRecord record = openSegment.getLogRecord(index); + return record == null ? null : record.entry; + } else { + int segmentIndex = Collections.binarySearch(closedSegments, index); + if (segmentIndex < 0) { + return null; + } else { + return closedSegments.get(segmentIndex).getLogRecord(index).entry; + } + } + } + + /** + * @param startIndex inclusive + * @param endIndex exclusive + */ + LogEntryProto[] getEntries(final long startIndex, final long endIndex) { + if (startIndex < 0 || startIndex < getStartIndex()) { + throw new IndexOutOfBoundsException("startIndex = " + startIndex + + ", log cache starts from index " + getStartIndex()); + } + if (startIndex > endIndex) { + throw new IndexOutOfBoundsException("startIndex(" + startIndex + + ") > endIndex(" + endIndex + ")"); + } + final long realEnd = Math.min(getEndIndex() + 1, endIndex); + if (startIndex >= realEnd) { + return RaftLog.EMPTY_LOGENTRY_ARRAY; + } + + LogEntryProto[] entries = new LogEntryProto[(int) (realEnd - startIndex)]; + int segmentIndex = Collections.binarySearch(closedSegments, startIndex); + if (segmentIndex < 0) { + getEntriesFromSegment(openSegment, startIndex, entries, 0, entries.length); + } else { + long index = startIndex; + for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; i++) { + LogSegment s = closedSegments.get(i); + int numberFromSegment = (int) Math.min(realEnd - index, + s.getEndIndex() - index + 1); + getEntriesFromSegment(s, index, entries, (int) (index - startIndex), + numberFromSegment); + index += numberFromSegment; + } + if (index < realEnd) { + getEntriesFromSegment(openSegment, index, entries, + (int) (index - startIndex), (int) (realEnd - index)); + } + } + return entries; + } + + private void getEntriesFromSegment(LogSegment segment, long startIndex, + LogEntryProto[] entries, int offset, int size) { + long endIndex = segment.getEndIndex(); + endIndex = Math.min(endIndex, startIndex + size - 1); + int index = offset; + for (long i = startIndex; i <= endIndex; i++) { + entries[index++] = segment.getLogRecord(i).entry; + } + } + + long getStartIndex() { + if (closedSegments.isEmpty()) { + return openSegment != null ? openSegment.getStartIndex() : + RaftServerConstants.INVALID_LOG_INDEX; + } else { + return closedSegments.get(0).getStartIndex(); + } + } + + @VisibleForTesting + long getEndIndex() { + return openSegment != null ? openSegment.getEndIndex() : + (closedSegments.isEmpty() ? + INVALID_LOG_INDEX : + closedSegments.get(closedSegments.size() - 1).getEndIndex()); + } + + LogEntryProto getLastEntry() { + return (openSegment != null && openSegment.numOfEntries() > 0) ? + openSegment.getLastRecord().entry : + (closedSegments.isEmpty() ? null : + closedSegments.get(closedSegments.size() - 1).getLastRecord().entry); + } + + LogSegment getOpenSegment() { + return openSegment; + } + + void appendEntry(LogEntryProto entry) { + // SegmentedRaftLog does the segment creation/rolling work. Here we just + // simply append the entry into the open segment. + Preconditions.checkState(openSegment != null); + openSegment.appendToOpenSegment(entry); + } + + /** + * finalize the current open segment, and start a new open segment + */ + void rollOpenSegment(boolean createNewOpen) { + Preconditions.checkState(openSegment != null + && openSegment.numOfEntries() > 0); + final long nextIndex = openSegment.getEndIndex() + 1; + openSegment.close(); + closedSegments.add(openSegment); + if (createNewOpen) { + openSegment = LogSegment.newOpenSegment(nextIndex); + } else { + openSegment = null; + } + } + + private SegmentFileInfo deleteOpenSegment() { + final long oldEnd = openSegment.getEndIndex(); + openSegment.clear(); + SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(), + oldEnd, true, 0, openSegment.getEndIndex()); + openSegment = null; + return info; + } + + /** + * truncate log entries starting from the given index (inclusive) + */ + TruncationSegments truncate(long index) { + int segmentIndex = Collections.binarySearch(closedSegments, index); + if (segmentIndex == -closedSegments.size() - 1) { + if (openSegment != null && openSegment.getEndIndex() >= index) { + final long oldEnd = openSegment.getEndIndex(); + if (index == openSegment.getStartIndex()) { + // the open segment should be deleted + return new TruncationSegments(null, + Collections.singletonList(deleteOpenSegment())); + } else { + openSegment.truncate(index); + Preconditions.checkState(!openSegment.isOpen()); + SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(), + oldEnd, true, openSegment.getTotalSize(), + openSegment.getEndIndex()); + closedSegments.add(openSegment); + openSegment = null; + return new TruncationSegments(info, Collections.emptyList()); + } + } + } else if (segmentIndex >= 0) { + LogSegment ts = closedSegments.get(segmentIndex); + final long oldEnd = ts.getEndIndex(); + List<SegmentFileInfo> list = new ArrayList<>(); + ts.truncate(index); + final int size = closedSegments.size(); + for (int i = size - 1; + i >= (ts.numOfEntries() == 0 ? segmentIndex : segmentIndex + 1); + i-- ) { + LogSegment s = closedSegments.remove(i); + final long endOfS = i == segmentIndex ? oldEnd : s.getEndIndex(); + s.clear(); + list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0, + s.getEndIndex())); + } + if (openSegment != null) { + list.add(deleteOpenSegment()); + } + SegmentFileInfo t = ts.numOfEntries() == 0 ? null : + new SegmentFileInfo(ts.getStartIndex(), oldEnd, false, + ts.getTotalSize(), ts.getEndIndex()); + return new TruncationSegments(t, list); + } + return null; + } + + static class TruncationSegments { + final SegmentFileInfo toTruncate; // name of the file to be truncated + final SegmentFileInfo[] toDelete; // names of the files to be deleted + + TruncationSegments(SegmentFileInfo toTruncate, + List<SegmentFileInfo> toDelete) { + this.toDelete = toDelete == null ? null : + toDelete.toArray(new SegmentFileInfo[toDelete.size()]); + this.toTruncate = toTruncate; + } + } + + Iterator<LogEntryProto> iterator(long startIndex) { + return new EntryIterator(startIndex); + } + + private class EntryIterator implements Iterator<LogEntryProto> { + private long nextIndex; + private LogSegment currentSegment; + private int segmentIndex; + + EntryIterator(long start) { + this.nextIndex = start; + segmentIndex = Collections.binarySearch(closedSegments, nextIndex); + if (segmentIndex >= 0) { + currentSegment = closedSegments.get(segmentIndex); + } else { + segmentIndex = -segmentIndex - 1; + if (segmentIndex == closedSegments.size()) { + currentSegment = openSegment; + } else { + // the start index is smaller than the first closed segment's start + // index. We no longer keep the log entry (because of the snapshot) or + // the start index is invalid. + Preconditions.checkState(segmentIndex == 0); + throw new IndexOutOfBoundsException(); + } + } + } + + @Override + public boolean hasNext() { + return currentSegment != null && + currentSegment.getLogRecord(nextIndex) != null; + } + + @Override + public LogEntryProto next() { + LogRecord record; + if (currentSegment == null || + (record = currentSegment.getLogRecord(nextIndex)) == null) { + throw new NoSuchElementException(); + } + if (++nextIndex > currentSegment.getEndIndex()) { + if (currentSegment != openSegment) { + segmentIndex++; + currentSegment = segmentIndex == closedSegments.size() ? + openSegment : closedSegments.get(segmentIndex); + } + } + return record.entry; + } + } + + @VisibleForTesting + int getNumOfSegments() { + return closedSegments.size() + (openSegment == null ? 0 : 1); + } + + boolean isEmpty() { + return closedSegments.isEmpty() && openSegment == null; + } + + void clear() { + openSegment = null; + closedSegments.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java new file mode 100644 index 0000000..e057a51 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_KEY; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.io.nativeio.NativeIO; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.storage.LogSegment.SegmentFileInfo; +import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; +import org.apache.ratis.server.storage.SegmentedRaftLog.Task; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * This class takes the responsibility of all the raft log related I/O ops for a + * raft peer. + */ +class RaftLogWorker implements Runnable { + static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class); + /** + * The task queue accessed by rpc handler threads and the io worker thread. + */ + private final BlockingQueue<Task> queue = new ArrayBlockingQueue<>(4096); + private volatile boolean running = true; + private final Thread workerThread; + + private final RaftStorage storage; + private LogOutputStream out; + private final RaftServerImpl raftServer; + + /** + * The number of entries that have been written into the LogOutputStream but + * has not been flushed. + */ + private int pendingFlushNum = 0; + /** the index of the last entry that has been written */ + private long lastWrittenIndex; + /** the largest index of the entry that has been flushed */ + private volatile long flushedIndex; + + private final int forceSyncNum; + + private final RaftProperties properties; + + RaftLogWorker(RaftServerImpl raftServer, RaftStorage storage, + RaftProperties properties) { + this.raftServer = raftServer; + this.storage = storage; + this.properties = properties; + this.forceSyncNum = properties.getInt(RAFT_LOG_FORCE_SYNC_NUM_KEY, + RAFT_LOG_FORCE_SYNC_NUM_DEFAULT); + workerThread = new Thread(this, + getClass().getSimpleName() + " for " + storage); + } + + void start(long latestIndex, File openSegmentFile) throws IOException { + lastWrittenIndex = latestIndex; + flushedIndex = latestIndex; + if (openSegmentFile != null) { + Preconditions.checkArgument(openSegmentFile.exists()); + out = new LogOutputStream(openSegmentFile, true, properties); + } + workerThread.start(); + } + + void close() { + this.running = false; + workerThread.interrupt(); + try { + workerThread.join(); + } catch (InterruptedException ignored) { + } + } + + /** + * A snapshot has just been installed on the follower. Need to update the IO + * worker's state accordingly. + */ + void syncWithSnapshot(long lastSnapshotIndex) { + queue.clear(); + lastWrittenIndex = lastSnapshotIndex; + flushedIndex = lastSnapshotIndex; + pendingFlushNum = 0; + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "-" + + (raftServer != null ? raftServer.getId() : ""); + } + + /** + * This is protected by the RaftServer and RaftLog's lock. + */ + private Task addIOTask(Task task) { + LOG.debug("add task {}", task); + try { + if (!queue.offer(task, 1, TimeUnit.SECONDS)) { + Preconditions.checkState(isAlive(), + "the worker thread is not alive"); + queue.put(task); + } + } catch (Throwable t) { + if (t instanceof InterruptedException && !running) { + LOG.info("Got InterruptedException when adding task " + task + + ". The RaftLogWorker already stopped."); + } else { + ExitUtils.terminate(2, "Failed to add IO task " + task, t, LOG); + } + } + return task; + } + + boolean isAlive() { + return running && workerThread.isAlive(); + } + + @Override + public void run() { + while (running) { + try { + Task task = queue.poll(1, TimeUnit.SECONDS); + if (task != null) { + try { + task.execute(); + } catch (IOException e) { + if (task.getEndIndex() < lastWrittenIndex) { + LOG.info("Ignore IOException when handling task " + task + + " which is smaller than the lastWrittenIndex." + + " There should be a snapshot installed.", e); + } else { + throw e; + } + } + task.done(); + } + } catch (InterruptedException e) { + LOG.info(Thread.currentThread().getName() + + " was interrupted, exiting. There are " + queue.size() + + " tasks remaining in the queue."); + } catch (Throwable t) { + // TODO avoid terminating the jvm by supporting multiple log directories + ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.", t, LOG); + } + } + } + + private boolean shouldFlush() { + return pendingFlushNum >= forceSyncNum || + (pendingFlushNum > 0 && queue.isEmpty()); + } + + private void flushWrites() throws IOException { + if (out != null) { + LOG.debug("flush data to " + out + ", reset pending_sync_number to 0"); + out.flush(); + updateFlushedIndex(); + } + } + + private void updateFlushedIndex() { + flushedIndex = lastWrittenIndex; + pendingFlushNum = 0; + if (raftServer != null) { + raftServer.submitLocalSyncEvent(); + } + } + + /** + * The following several methods (startLogSegment, rollLogSegment, + * writeLogEntry, and truncate) are only called by SegmentedRaftLog which is + * protected by RaftServer's lock. + * + * Thus all the tasks are created and added sequentially. + */ + Task startLogSegment(long startIndex) { + return addIOTask(new StartLogSegment(startIndex)); + } + + Task rollLogSegment(LogSegment segmentToClose) { + addIOTask(new FinalizeLogSegment(segmentToClose)); + return addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1)); + } + + Task writeLogEntry(LogEntryProto entry) { + return addIOTask(new WriteLog(entry)); + } + + Task truncate(TruncationSegments ts) { + return addIOTask(new TruncateLog(ts)); + } + + // TODO we can add another level of buffer for writing here + private class WriteLog extends Task { + private final LogEntryProto entry; + + WriteLog(LogEntryProto entry) { + this.entry = entry; + } + + @Override + public void execute() throws IOException { + Preconditions.checkState(out != null); + Preconditions.checkState(lastWrittenIndex + 1 == entry.getIndex(), + "lastWrittenIndex == %s, entry == %s", lastWrittenIndex, entry); + out.write(entry); + lastWrittenIndex = entry.getIndex(); + pendingFlushNum++; + if (shouldFlush()) { + flushWrites(); + } + } + + @Override + long getEndIndex() { + return entry.getIndex(); + } + } + + private class FinalizeLogSegment extends Task { + private final LogSegment segmentToClose; + + FinalizeLogSegment(LogSegment segmentToClose) { + this.segmentToClose = segmentToClose; + } + + @Override + public void execute() throws IOException { + RaftUtils.cleanup(null, out); + out = null; + Preconditions.checkState(segmentToClose != null); + + File openFile = storage.getStorageDir() + .getOpenLogFile(segmentToClose.getStartIndex()); + Preconditions.checkState(openFile.exists(), + "File %s does not exist.", openFile); + if (segmentToClose.numOfEntries() > 0) { + // finalize the current open segment + File dstFile = storage.getStorageDir().getClosedLogFile( + segmentToClose.getStartIndex(), segmentToClose.getEndIndex()); + Preconditions.checkState(!dstFile.exists()); + + NativeIO.renameTo(openFile, dstFile); + } else { // delete the file of the empty segment + FileUtils.deleteFile(openFile); + } + updateFlushedIndex(); + } + + @Override + long getEndIndex() { + return segmentToClose.getEndIndex(); + } + } + + private class StartLogSegment extends Task { + private final long newStartIndex; + + StartLogSegment(long newStartIndex) { + this.newStartIndex = newStartIndex; + } + + @Override + void execute() throws IOException { + File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex); + Preconditions.checkState(!openFile.exists(), "open file %s exists for %s", + openFile.getAbsolutePath(), RaftLogWorker.this.toString()); + Preconditions.checkState(out == null && pendingFlushNum == 0); + out = new LogOutputStream(openFile, false, properties); + } + + @Override + long getEndIndex() { + return newStartIndex; + } + } + + private class TruncateLog extends Task { + private final TruncationSegments segments; + + TruncateLog(TruncationSegments ts) { + this.segments = ts; + } + + @Override + void execute() throws IOException { + RaftUtils.cleanup(null, out); + out = null; + if (segments.toTruncate != null) { + File fileToTruncate = segments.toTruncate.isOpen ? + storage.getStorageDir().getOpenLogFile( + segments.toTruncate.startIndex) : + storage.getStorageDir().getClosedLogFile( + segments.toTruncate.startIndex, + segments.toTruncate.endIndex); + FileUtils.truncateFile(fileToTruncate, segments.toTruncate.targetLength); + + // rename the file + File dstFile = storage.getStorageDir().getClosedLogFile( + segments.toTruncate.startIndex, segments.toTruncate.newEndIndex); + Preconditions.checkState(!dstFile.exists()); + NativeIO.renameTo(fileToTruncate, dstFile); + + // update lastWrittenIndex + lastWrittenIndex = segments.toTruncate.newEndIndex; + } + if (segments.toDelete != null && segments.toDelete.length > 0) { + long minStart = segments.toDelete[0].startIndex; + for (SegmentFileInfo del : segments.toDelete) { + final File delFile; + if (del.isOpen) { + delFile = storage.getStorageDir().getOpenLogFile(del.startIndex); + } else { + delFile = storage.getStorageDir() + .getClosedLogFile(del.startIndex, del.endIndex); + } + FileUtils.deleteFile(delFile); + minStart = Math.min(minStart, del.startIndex); + } + if (segments.toTruncate == null) { + lastWrittenIndex = minStart - 1; + } + } + updateFlushedIndex(); + } + + @Override + long getEndIndex() { + if (segments.toTruncate != null) { + return segments.toTruncate.newEndIndex; + } else if (segments.toDelete.length > 0) { + return segments.toDelete[segments.toDelete.length - 1].endIndex; + } + return RaftServerConstants.INVALID_LOG_INDEX; + } + } + + long getFlushedIndex() { + return flushedIndex; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java new file mode 100644 index 0000000..09ea55c --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import com.google.common.base.Preconditions; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.StateMachineStorage; +import org.apache.ratis.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + +public class RaftStorage implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RaftStorage.class); + + // TODO support multiple storage directories + private final RaftStorageDirectory storageDir; + private final StorageState state; + private volatile MetaFile metaFile; + private StateMachineStorage stateMachineStorage; + + public RaftStorage(RaftProperties prop, RaftServerConstants.StartupOption option) + throws IOException { + final String dir = prop.get(RAFT_SERVER_STORAGE_DIR_KEY, + RAFT_SERVER_STORAGE_DIR_DEFAULT); + storageDir = new RaftStorageDirectory( + new File(FileUtils.stringAsURI(dir).getPath())); + if (option == RaftServerConstants.StartupOption.FORMAT) { + if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) { + throw new IOException("Cannot format " + storageDir); + } + storageDir.lock(); + format(); + state = storageDir.analyzeStorage(false); + Preconditions.checkState(state == StorageState.NORMAL); + } else { + state = analyzeAndRecoverStorage(true); // metaFile is initialized here + if (state != StorageState.NORMAL) { + storageDir.unlock(); + throw new IOException("Cannot load " + storageDir + + ". Its state: " + state); + } + } + } + + StorageState getState() { + return state; + } + + private void format() throws IOException { + storageDir.clearDirectory(); + metaFile = writeMetaFile(MetaFile.DEFAULT_TERM, MetaFile.EMPTY_VOTEFOR); + LOG.info("Storage directory " + storageDir.getRoot() + + " has been successfully formatted."); + } + + private MetaFile writeMetaFile(long term, String votedFor) throws IOException { + MetaFile metaFile = new MetaFile(storageDir.getMetaFile()); + metaFile.set(term, votedFor); + return metaFile; + } + + private void cleanMetaTmpFile() throws IOException { + Files.deleteIfExists(storageDir.getMetaTmpFile().toPath()); + } + + private StorageState analyzeAndRecoverStorage(boolean toLock) + throws IOException { + StorageState storageState = storageDir.analyzeStorage(toLock); + if (storageState == StorageState.NORMAL) { + metaFile = new MetaFile(storageDir.getMetaFile()); + assert metaFile.exists(); + metaFile.readFile(); + // Existence of raft-meta.tmp means the change of votedFor/term has not + // been committed. Thus we should delete the tmp file. + cleanMetaTmpFile(); + return StorageState.NORMAL; + } else if (storageState == StorageState.NOT_FORMATTED && + storageDir.isCurrentEmpty()) { + format(); + return StorageState.NORMAL; + } else { + return storageState; + } + } + + public RaftStorageDirectory getStorageDir() { + return storageDir; + } + + @Override + public void close() throws IOException { + storageDir.unlock(); + } + + MetaFile getMetaFile() { + return metaFile; + } + + public SnapshotInfo getLastestSnapshot() throws IOException { + return getStateMachineStorage().getLatestSnapshot(); + } + + /** + * Called by the state machine after it has initialized the StateMachineStorage. + */ + public void setStateMachineStorage(StateMachineStorage smStorage) { + this.stateMachineStorage = smStorage; + } + + public StateMachineStorage getStateMachineStorage() { + return stateMachineStorage; + } + + @Override + public String toString() { + return getStorageDir() + ""; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java new file mode 100644 index 0000000..bfa691d --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.ratis.util.AtomicFileOutputStream; +import org.apache.ratis.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.management.ManagementFactory; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static java.nio.file.Files.newDirectoryStream; +import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; + +public class RaftStorageDirectory { + static final Logger LOG = LoggerFactory.getLogger(RaftStorageDirectory.class); + + static final String STORAGE_DIR_CURRENT = "current"; + static final String STORAGE_FILE_LOCK = "in_use.lock"; + static final String META_FILE_NAME = "raft-meta"; + static final String LOG_FILE_INPROGRESS = "inprogress"; + static final String LOG_FILE_PREFIX = "log"; + static final String STATE_MACHINE = "sm"; // directory containing state machine snapshots + static final String TEMP = "tmp"; + static final Pattern CLOSED_SEGMENT_REGEX = Pattern.compile("log_(\\d+)-(\\d+)"); + static final Pattern OPEN_SEGMENT_REGEX = Pattern.compile("log_inprogress_(\\d+)(?:\\..*)?"); + + private static final List<Pattern> LOGSEGMENTS_REGEXES = + ImmutableList.of(CLOSED_SEGMENT_REGEX, OPEN_SEGMENT_REGEX); + + enum StorageState { + NON_EXISTENT, + NOT_FORMATTED, + NORMAL + } + + public static class LogPathAndIndex { + public final Path path; + public final long startIndex; + public final long endIndex; + + LogPathAndIndex(Path path, long startIndex, long endIndex) { + this.path = path; + this.startIndex = startIndex; + this.endIndex = endIndex; + } + + @Override + public String toString() { + return path + "-" + startIndex + "-" + endIndex; + } + } + + private final File root; // root directory + private FileLock lock; // storage lock + + /** + * Constructor + * @param dir directory corresponding to the storage + */ + RaftStorageDirectory(File dir) { + this.root = dir; + this.lock = null; + } + + /** + * Get root directory of this storage + */ + //TODO + public File getRoot() { + return root; + } + + /** + * Clear and re-create storage directory. + * <p> + * Removes contents of the current directory and creates an empty directory. + * + * This does not fully format storage directory. + * It cannot write the version file since it should be written last after + * all other storage type dependent files are written. + * Derived storage is responsible for setting specific storage values and + * writing the version file to disk. + */ + void clearDirectory() throws IOException { + File curDir = this.getCurrentDir(); + clearDirectory(curDir); + clearDirectory(getStateMachineDir()); + } + + void clearDirectory(File dir) throws IOException { + if (dir.exists()) { + File[] files = FileUtils.listFiles(dir); + LOG.info("Will remove files: " + Arrays.toString(files)); + if (!(FileUtils.fullyDelete(dir))) + throw new IOException("Cannot remove directory: " + dir); + } + if (!dir.mkdirs()) + throw new IOException("Cannot create directory " + dir); + } + + /** + * Directory {@code current} contains latest files defining + * the file system meta-data. + * + * @return the directory path + */ + File getCurrentDir() { + return new File(root, STORAGE_DIR_CURRENT); + } + + File getMetaFile() { + return new File(getCurrentDir(), META_FILE_NAME); + } + + File getMetaTmpFile() { + return new File(getCurrentDir(), META_FILE_NAME + + AtomicFileOutputStream.TMP_EXTENSION); + } + + File getOpenLogFile(long startIndex) { + return new File(getCurrentDir(), getOpenLogFileName(startIndex)); + } + + static String getOpenLogFileName(long startIndex) { + return LOG_FILE_PREFIX + "_" + LOG_FILE_INPROGRESS + "_" + startIndex; + } + + File getClosedLogFile(long startIndex, long endIndex) { + return new File(getCurrentDir(), getClosedLogFileName(startIndex, endIndex)); + } + + static String getClosedLogFileName(long startIndex, long endIndex) { + return LOG_FILE_PREFIX + "_" + startIndex + "-" + endIndex; + } + + public File getStateMachineDir() { + return new File(getRoot(), STATE_MACHINE); + } + + /** Returns a uniquely named temporary directory under $rootdir/tmp/ */ + public File getNewTempDir() { + return new File(new File(getRoot(), TEMP), UUID.randomUUID().toString()); + } + + public Path relativizeToRoot(Path p) { + if (p.isAbsolute()) { + return getRoot().toPath().relativize(p); + } + return p; + } + + /** + * @return log segment files sorted based on their index. + */ + @VisibleForTesting + public List<LogPathAndIndex> getLogSegmentFiles() throws IOException { + List<LogPathAndIndex> list = new ArrayList<>(); + try (DirectoryStream<Path> stream = + Files.newDirectoryStream(getCurrentDir().toPath())) { + for (Path path : stream) { + for (Pattern pattern : LOGSEGMENTS_REGEXES) { + Matcher matcher = pattern.matcher(path.getFileName().toString()); + if (matcher.matches()) { + final long startIndex = Long.parseLong(matcher.group(1)); + final long endIndex = matcher.groupCount() == 2 ? + Long.parseLong(matcher.group(2)) : INVALID_LOG_INDEX; + list.add(new LogPathAndIndex(path, startIndex, endIndex)); + } + } + } + } + Collections.sort(list, + (o1, o2) -> o1.startIndex == o2.startIndex ? + 0 : (o1.startIndex < o2.startIndex ? -1 : 1)); + return list; + } + + /** + * Check to see if current/ directory is empty. + */ + boolean isCurrentEmpty() throws IOException { + File currentDir = getCurrentDir(); + if(!currentDir.exists()) { + // if current/ does not exist, it's safe to format it. + return true; + } + try(DirectoryStream<Path> dirStream = + newDirectoryStream(currentDir.toPath())) { + if (dirStream.iterator().hasNext()) { + return false; + } + } + return true; + } + + /** + * Check consistency of the storage directory. + * + * @return state {@link StorageState} of the storage directory + */ + StorageState analyzeStorage(boolean toLock) throws IOException { + Preconditions.checkState(root != null, "root directory is null"); + + String rootPath = root.getCanonicalPath(); + try { // check that storage exists + if (!root.exists()) { + LOG.info(rootPath + " does not exist. Creating ..."); + if (!root.mkdirs()) { + throw new IOException("Cannot create directory " + rootPath); + } + } + // or is inaccessible + if (!root.isDirectory()) { + LOG.warn(rootPath + "is not a directory"); + return StorageState.NON_EXISTENT; + } + if (!FileUtils.canWrite(root)) { + LOG.warn("Cannot access storage directory " + rootPath); + return StorageState.NON_EXISTENT; + } + } catch(SecurityException ex) { + LOG.warn("Cannot access storage directory " + rootPath, ex); + return StorageState.NON_EXISTENT; + } + + if (toLock) { + this.lock(); // lock storage if it exists + } + + // check whether current directory is valid + if (hasMetaFile()) { + return StorageState.NORMAL; + } else { + return StorageState.NOT_FORMATTED; + } + } + + boolean hasMetaFile() throws IOException { + return getMetaFile().exists(); + } + + /** + * Lock storage to provide exclusive access. + * + * <p> Locking is not supported by all file systems. + * E.g., NFS does not consistently support exclusive locks. + * + * <p> If locking is supported we guarantee exclusive access to the + * storage directory. Otherwise, no guarantee is given. + * + * @throws IOException if locking fails + */ + public void lock() throws IOException { + FileLock newLock = tryLock(); + if (newLock == null) { + String msg = "Cannot lock storage " + this.root + + ". The directory is already locked"; + LOG.info(msg); + throw new IOException(msg); + } + // Don't overwrite lock until success - this way if we accidentally + // call lock twice, the internal state won't be cleared by the second + // (failed) lock attempt + lock = newLock; + } + + /** + * Attempts to acquire an exclusive lock on the storage. + * + * @return A lock object representing the newly-acquired lock or + * <code>null</code> if storage is already locked. + * @throws IOException if locking fails. + */ + private FileLock tryLock() throws IOException { + boolean deletionHookAdded = false; + File lockF = new File(root, STORAGE_FILE_LOCK); + if (!lockF.exists()) { + lockF.deleteOnExit(); + deletionHookAdded = true; + } + RandomAccessFile file = new RandomAccessFile(lockF, "rws"); + String jvmName = ManagementFactory.getRuntimeMXBean().getName(); + FileLock res; + try { + res = file.getChannel().tryLock(); + if (null == res) { + LOG.error("Unable to acquire file lock on path " + lockF.toString()); + throw new OverlappingFileLockException(); + } + file.write(jvmName.getBytes(Charsets.UTF_8)); + LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName); + } catch (OverlappingFileLockException oe) { + // Cannot read from the locked file on Windows. + LOG.error("It appears that another process " + + "has already locked the storage directory: " + root, oe); + file.close(); + return null; + } catch(IOException e) { + LOG.error("Failed to acquire lock on " + lockF + + ". If this storage directory is mounted via NFS, " + + "ensure that the appropriate nfs lock services are running.", e); + file.close(); + throw e; + } + if (!deletionHookAdded) { + // If the file existed prior to our startup, we didn't + // call deleteOnExit above. But since we successfully locked + // the dir, we can take care of cleaning it up. + lockF.deleteOnExit(); + } + return res; + } + + /** + * Unlock storage. + */ + public void unlock() throws IOException { + if (this.lock == null) + return; + this.lock.release(); + lock.channel().close(); + lock = null; + } + + @Override + public String toString() { + return "Storage Directory " + this.root; + } +}
