http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java deleted file mode 100644 index 0dc8029..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.PureJavaCrc32C; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.zip.Checksum; - -import static org.apache.raft.server.RaftServerConfigKeys.*; - -public class LogOutputStream implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(LogOutputStream.class); - - private static final ByteBuffer fill; - private static final int BUFFER_SIZE = 1024 * 1024; // 1 MB - static { - fill = ByteBuffer.allocateDirect(BUFFER_SIZE); - fill.position(0); - for (int i = 0; i < fill.capacity(); i++) { - fill.put(RaftServerConstants.LOG_TERMINATE_BYTE); - } - } - - private File file; - private FileChannel fc; // channel of the file stream for sync - private BufferedWriteChannel out; // buffered FileChannel for writing - private final Checksum checksum; - - private final long segmentMaxSize; - private final long preallocatedSize; - private long preallocatedPos; - - public LogOutputStream(File file, boolean append, RaftProperties properties) - throws IOException { - this.file = file; - this.checksum = new PureJavaCrc32C(); - this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, - RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT); - this.preallocatedSize = properties.getLong( - RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, - RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT); - RandomAccessFile rp = new RandomAccessFile(file, "rw"); - fc = rp.getChannel(); - fc.position(fc.size()); - preallocatedPos = fc.size(); - - int bufferSize = properties.getInt(RAFT_LOG_WRITE_BUFFER_SIZE_KEY, - RAFT_LOG_WRITE_BUFFER_SIZE_DEFAULT); - out = new BufferedWriteChannel(fc, bufferSize); - - if (!append) { - create(); - } - } - - /** - * Format: - * LogEntryProto's protobuf - * 4-byte checksum of the above protobuf - */ - public void write(LogEntryProto entry) throws IOException { - final int serialized = entry.getSerializedSize(); - final int bufferSize = CodedOutputStream.computeUInt32SizeNoTag(serialized) - + serialized; - - preallocateIfNecessary(bufferSize + 4); - - byte[] buf = new byte[bufferSize]; - CodedOutputStream cout = CodedOutputStream.newInstance(buf); - cout.writeUInt32NoTag(serialized); - entry.writeTo(cout); - - checksum.reset(); - checksum.update(buf, 0, buf.length); - final int sum = (int) checksum.getValue(); - - out.write(buf); - writeInt(sum); - } - - private void writeInt(int v) throws IOException { - out.write((v >>> 24) & 0xFF); - out.write((v >>> 16) & 0xFF); - out.write((v >>> 8) & 0xFF); - out.write((v) & 0xFF); - } - - private void create() throws IOException { - fc.truncate(0); - fc.position(0); - preallocatedPos = 0; - preallocate(); // preallocate file - - out.write(SegmentedRaftLog.HEADER_BYTES); - flush(); - } - - @Override - public void close() throws IOException { - try { - out.flush(false); - if (fc != null && fc.isOpen()) { - fc.truncate(fc.position()); - } - } finally { - RaftUtils.cleanup(LOG, fc, out); - fc = null; - out = null; - } - } - - /** - * Flush data to persistent store. - * Collect sync metrics. - */ - public void flush() throws IOException { - if (out == null) { - throw new IOException("Trying to use aborted output stream"); - } - out.flush(true); - } - - private void preallocate() throws IOException { - fill.position(0); - long targetSize = Math.min(segmentMaxSize - fc.size(), preallocatedSize); - int allocated = 0; - while (allocated < targetSize) { - int size = (int) Math.min(BUFFER_SIZE, targetSize - allocated); - ByteBuffer buffer = fill.slice(); - buffer.limit(size); - RaftUtils.writeFully(fc, buffer, preallocatedPos); - preallocatedPos += size; - allocated += size; - } - LOG.debug("Pre-allocated {} bytes for the log segment", allocated); - } - - private void preallocateIfNecessary(int size) throws IOException { - if (out.position() + size > preallocatedPos) { - preallocate(); - } - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "(" + file + ")"; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java deleted file mode 100644 index 9523cac..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java +++ /dev/null @@ -1,292 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.base.Preconditions; -import org.apache.commons.io.Charsets; -import org.apache.raft.protocol.ChecksumException; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.shaded.com.google.protobuf.CodedInputStream; -import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.PureJavaCrc32C; -import org.apache.raft.util.RaftUtils; - -import java.io.*; -import java.util.zip.Checksum; - -public class LogReader implements Closeable { - /** - * InputStream wrapper that keeps track of the current stream position. - * - * This stream also allows us to set a limit on how many bytes we can read - * without getting an exception. - */ - public static class LimitedInputStream extends FilterInputStream { - private long curPos = 0; - private long markPos = -1; - private long limitPos = Long.MAX_VALUE; - - public LimitedInputStream(InputStream is) { - super(is); - } - - private void checkLimit(long amt) throws IOException { - long extra = (curPos + amt) - limitPos; - if (extra > 0) { - throw new IOException("Tried to read " + amt + " byte(s) past " + - "the limit at offset " + limitPos); - } - } - - @Override - public int read() throws IOException { - checkLimit(1); - int ret = super.read(); - if (ret != -1) curPos++; - return ret; - } - - @Override - public int read(byte[] data) throws IOException { - checkLimit(data.length); - int ret = super.read(data); - if (ret > 0) curPos += ret; - return ret; - } - - @Override - public int read(byte[] data, int offset, int length) throws IOException { - checkLimit(length); - int ret = super.read(data, offset, length); - if (ret > 0) curPos += ret; - return ret; - } - - public void setLimit(long limit) { - limitPos = curPos + limit; - } - - public void clearLimit() { - limitPos = Long.MAX_VALUE; - } - - @Override - public void mark(int limit) { - super.mark(limit); - markPos = curPos; - } - - @Override - public void reset() throws IOException { - if (markPos == -1) { - throw new IOException("Not marked!"); - } - super.reset(); - curPos = markPos; - markPos = -1; - } - - public long getPos() { - return curPos; - } - - @Override - public long skip(long amt) throws IOException { - long extra = (curPos + amt) - limitPos; - if (extra > 0) { - throw new IOException("Tried to skip " + extra + " bytes past " + - "the limit at offset " + limitPos); - } - long ret = super.skip(amt); - curPos += ret; - return ret; - } - } - - private static final int maxOpSize = 32 * 1024 * 1024; - - private final LimitedInputStream limiter; - private final DataInputStream in; - private byte[] temp = new byte[4096]; - private final Checksum checksum; - - LogReader(File file) throws FileNotFoundException { - this.limiter = new LimitedInputStream( - new BufferedInputStream(new FileInputStream(file))); - in = new DataInputStream(limiter); - checksum = new PureJavaCrc32C(); - } - - String readLogHeader() throws IOException { - byte[] header = new byte[SegmentedRaftLog.HEADER_BYTES.length]; - int num = in.read(header); - if (num < header.length) { - throw new EOFException("EOF before reading a complete log header"); - } - return new String(header, Charsets.UTF_8); - } - - /** - * Read a log entry from the input stream. - * - * @return the operation read from the stream, or null at the end of the - * file - * @throws IOException on error. This function should only throw an - * exception when skipBrokenEdits is false. - */ - LogEntryProto readEntry() throws IOException { - try { - return decodeEntry(); - } catch (IOException e) { - in.reset(); - - throw e; - } catch (Throwable e) { - // raft log requires no gap between any two entries. thus if an entry is - // broken, throw the exception instead of skipping broken entries - in.reset(); - throw new IOException("got unexpected exception " + e.getMessage(), e); - } - } - - /** - * Scan and validate a log entry. - * @return the index of the log entry - */ - long scanEntry() throws IOException { - LogEntryProto entry = decodeEntry(); - return entry != null ? entry.getIndex() : RaftServerConstants.INVALID_LOG_INDEX; - } - - void verifyTerminator() throws IOException { - // The end of the log should contain 0x00 bytes. - // If it contains other bytes, the log itself may be corrupt. - limiter.clearLimit(); - int numRead = -1, idx = 0; - while (true) { - try { - numRead = -1; - numRead = in.read(temp); - if (numRead == -1) { - return; - } - for (idx = 0; idx < numRead; idx++) { - if (temp[idx] != RaftServerConstants.LOG_TERMINATE_BYTE) { - throw new IOException("Read extra bytes after the terminator!"); - } - } - } finally { - // After reading each group of bytes, we reposition the mark one - // byte before the next group. Similarly, if there is an error, we - // want to reposition the mark one byte before the error - if (numRead != -1) { - in.reset(); - RaftUtils.skipFully(in, idx); - in.mark(temp.length + 1); - RaftUtils.skipFully(in, 1); - } - } - } - } - - /** - * Decode the log entry "frame". This includes reading the log entry, and - * validating the checksum. - * - * The input stream will be advanced to the end of the op at the end of this - * function. - * - * @return The log entry, or null if we hit EOF. - */ - private LogEntryProto decodeEntry() throws IOException { - limiter.setLimit(maxOpSize); - in.mark(maxOpSize); - - byte nextByte; - try { - nextByte = in.readByte(); - } catch (EOFException eof) { - // EOF at an opcode boundary is expected. - return null; - } - // Each log entry starts with a var-int. Thus a valid entry's first byte - // should not be 0. So if the terminate byte is 0, we should hit the end - // of the segment. - if (nextByte == RaftServerConstants.LOG_TERMINATE_BYTE) { - verifyTerminator(); - return null; - } - - // Here, we verify that the Op size makes sense and that the - // data matches its checksum before attempting to construct an Op. - int entryLength = CodedInputStream.readRawVarint32(nextByte, in); - if (entryLength > maxOpSize) { - throw new IOException("Entry has size " + entryLength - + ", but maxOpSize = " + maxOpSize); - } - - final int varintLength = CodedOutputStream.computeUInt32SizeNoTag( - entryLength); - final int totalLength = varintLength + entryLength; - checkBufferSize(totalLength); - in.reset(); - in.mark(maxOpSize); - RaftUtils.readFully(in, temp, 0, totalLength); - - // verify checksum - checksum.reset(); - checksum.update(temp, 0, totalLength); - int expectedChecksum = in.readInt(); - int calculatedChecksum = (int) checksum.getValue(); - if (expectedChecksum != calculatedChecksum) { - throw new ChecksumException("LogEntry is corrupt. Calculated checksum is " - + calculatedChecksum + " but read checksum " + expectedChecksum, - limiter.markPos); - } - - // parse the buffer - return LogEntryProto.parseFrom( - CodedInputStream.newInstance(temp, varintLength, entryLength)); - } - - private void checkBufferSize(int entryLength) { - Preconditions.checkArgument(entryLength <= maxOpSize); - int length = temp.length; - if (length < entryLength) { - while (length < entryLength) { - length = Math.min(length * 2, maxOpSize); - } - temp = new byte[length]; - } - } - - long getPos() { - return limiter.getPos(); - } - - void skipFully(long length) throws IOException { - limiter.clearLimit(); - RaftUtils.skipFully(limiter, length); - } - - @Override - public void close() throws IOException { - RaftUtils.cleanup(null, in); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java deleted file mode 100644 index 987cc6c..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.base.Preconditions; -import org.apache.raft.server.impl.ConfigurationManager; -import org.apache.raft.server.impl.ServerProtoUtils; -import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.FileUtils; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; - -/** - * In-memory cache for a log segment file. All the updates will be first written - * into LogSegment then into corresponding files in the same order. - * - * This class will be protected by the RaftServer's lock. - */ -class LogSegment implements Comparable<Long> { - static class LogRecord { - /** starting offset in the file */ - final long offset; - final LogEntryProto entry; - - LogRecord(long offset, LogEntryProto entry) { - this.offset = offset; - this.entry = entry; - } - } - - static class SegmentFileInfo { - final long startIndex; // start index of the - final long endIndex; // original end index - final boolean isOpen; - final long targetLength; // position for truncation - final long newEndIndex; // new end index after the truncation - - SegmentFileInfo(long start, long end, boolean isOpen, long targetLength, - long newEndIndex) { - this.startIndex = start; - this.endIndex = end; - this.isOpen = isOpen; - this.targetLength = targetLength; - this.newEndIndex = newEndIndex; - } - } - - static long getEntrySize(LogEntryProto entry) { - final int serialized = entry.getSerializedSize(); - return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4; - } - - private boolean isOpen; - private final List<LogRecord> records = new ArrayList<>(); - private long totalSize; - private final long startIndex; - private long endIndex; - - private LogSegment(boolean isOpen, long start, long end) { - this.isOpen = isOpen; - this.startIndex = start; - this.endIndex = end; - totalSize = SegmentedRaftLog.HEADER_BYTES.length; - } - - static LogSegment newOpenSegment(long start) { - Preconditions.checkArgument(start >= 0); - return new LogSegment(true, start, start - 1); - } - - private static LogSegment newCloseSegment(long start, long end) { - Preconditions.checkArgument(start >= 0 && end >= start); - return new LogSegment(false, start, end); - } - - static LogSegment loadSegment(File file, long start, long end, boolean isOpen, - ConfigurationManager confManager) throws IOException { - final LogSegment segment; - try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) { - segment = isOpen ? LogSegment.newOpenSegment(start) : - LogSegment.newCloseSegment(start, end); - LogEntryProto next; - LogEntryProto prev = null; - while ((next = in.nextEntry()) != null) { - if (prev != null) { - Preconditions.checkState(next.getIndex() == prev.getIndex() + 1, - "gap between entry %s and entry %s", prev, next); - } - segment.append(next); - if (confManager != null && - next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { - confManager.addConfiguration(next.getIndex(), - ServerProtoUtils.toRaftConfiguration(next.getIndex(), - next.getConfigurationEntry())); - } - prev = next; - } - } - - // truncate padding if necessary - if (file.length() > segment.getTotalSize()) { - FileUtils.truncateFile(file, segment.getTotalSize()); - } - - Preconditions.checkState(start == segment.records.get(0).entry.getIndex()); - if (!isOpen) { - Preconditions.checkState(segment.getEndIndex() == end); - } - return segment; - } - - long getStartIndex() { - return startIndex; - } - - long getEndIndex() { - return endIndex; - } - - boolean isOpen() { - return isOpen; - } - - int numOfEntries() { - return (int) (endIndex - startIndex + 1); - } - - void appendToOpenSegment(LogEntryProto... entries) { - Preconditions.checkState(isOpen(), - "The log segment %s is not open for append", this.toString()); - append(entries); - } - - private void append(LogEntryProto... entries) { - Preconditions.checkArgument(entries != null && entries.length > 0); - final long term = entries[0].getTerm(); - if (records.isEmpty()) { - Preconditions.checkArgument(entries[0].getIndex() == startIndex, - "gap between start index %s and first entry to append %s", - startIndex, entries[0].getIndex()); - } - for (LogEntryProto entry : entries) { - // all these entries should be of the same term - Preconditions.checkArgument(entry.getTerm() == term, - "expected term:%s, term of the entry:%s", term, entry.getTerm()); - final LogRecord currentLast = getLastRecord(); - if (currentLast != null) { - Preconditions.checkArgument( - entry.getIndex() == currentLast.entry.getIndex() + 1, - "gap between entries %s and %s", entry.getIndex(), - currentLast.entry.getIndex()); - } - - final LogRecord record = new LogRecord(totalSize, entry); - records.add(record); - totalSize += getEntrySize(entry); - endIndex = entry.getIndex(); - } - } - - LogRecord getLogRecord(long index) { - if (index >= startIndex && index <= endIndex) { - return records.get((int) (index - startIndex)); - } - return null; - } - - LogRecord getLastRecord() { - return records.isEmpty() ? null : records.get(records.size() - 1); - } - - long getTotalSize() { - return totalSize; - } - - /** - * Remove records from the given index (inclusive) - */ - void truncate(long fromIndex) { - Preconditions.checkArgument(fromIndex >= startIndex && fromIndex <= endIndex); - LogRecord record = records.get((int) (fromIndex - startIndex)); - for (long index = endIndex; index >= fromIndex; index--) { - records.remove((int)(index - startIndex)); - } - totalSize = record.offset; - isOpen = false; - this.endIndex = fromIndex - 1; - } - - void close() { - Preconditions.checkState(isOpen()); - isOpen = false; - } - - @Override - public String toString() { - return isOpen() ? "log-" + startIndex + "-inprogress" : - "log-" + startIndex + "-" + endIndex; - } - - @Override - public int compareTo(Long l) { - return (l >= getStartIndex() && l <= getEndIndex()) ? 0 : - (this.getEndIndex() < l ? -1 : 1); - } - - void clear() { - records.clear(); - endIndex = startIndex - 1; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java deleted file mode 100644 index c12e1aa..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.base.Preconditions; -import org.apache.raft.server.impl.RaftConfiguration; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.server.impl.ServerProtoUtils; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.AutoCloseableLock; -import org.apache.raft.util.CodeInjectionForTesting; - -import java.util.ArrayList; -import java.util.List; - -/** - * A simple RaftLog implementation in memory. Used only for testing. - */ -public class MemoryRaftLog extends RaftLog { - private final List<LogEntryProto> entries = new ArrayList<>(); - - public MemoryRaftLog(String selfId) { - super(selfId); - } - - @Override - public LogEntryProto get(long index) { - checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - final int i = (int) index; - return i >= 0 && i < entries.size() ? entries.get(i) : null; - } - } - - @Override - public LogEntryProto[] getEntries(long startIndex, long endIndex) { - checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - final int i = (int) startIndex; - if (startIndex >= entries.size()) { - return null; - } - final int toIndex = (int) Math.min(entries.size(), endIndex); - return entries.subList(i, toIndex).toArray(EMPTY_LOGENTRY_ARRAY); - } - } - - @Override - void truncate(long index) { - checkLogState(); - try(AutoCloseableLock writeLock = writeLock()) { - Preconditions.checkArgument(index >= 0); - final int truncateIndex = (int) index; - for (int i = entries.size() - 1; i >= truncateIndex; i--) { - entries.remove(i); - } - } - } - - @Override - public LogEntryProto getLastEntry() { - checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - final int size = entries.size(); - return size == 0 ? null : entries.get(size - 1); - } - } - - @Override - void appendEntry(LogEntryProto entry) { - checkLogState(); - try(AutoCloseableLock writeLock = writeLock()) { - entries.add(entry); - } - } - - @Override - public long append(long term, RaftConfiguration newConf) { - checkLogState(); - try(AutoCloseableLock writeLock = writeLock()) { - final long nextIndex = getNextIndex(); - final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term, - nextIndex); - entries.add(e); - return nextIndex; - } - } - - @Override - public long getStartIndex() { - return entries.isEmpty() ? RaftServerConstants.INVALID_LOG_INDEX : - entries.get(0).getIndex(); - } - - @Override - public void append(LogEntryProto... entries) { - checkLogState(); - try(AutoCloseableLock writeLock = writeLock()) { - if (entries == null || entries.length == 0) { - return; - } - // Before truncating the entries, we first need to check if some - // entries are duplicated. If the leader sends entry 6, entry 7, then - // entry 6 again, without this check the follower may truncate entry 7 - // when receiving entry 6 again. Then before the leader detects this - // truncation in the next appendEntries RPC, leader may think entry 7 has - // been committed but in the system the entry has not been committed to - // the quorum of peers' disks. - // TODO add a unit test for this - boolean toTruncate = false; - int truncateIndex = (int) entries[0].getIndex(); - int index = 0; - for (; truncateIndex < getNextIndex() && index < entries.length; - index++, truncateIndex++) { - if (this.entries.get(truncateIndex).getTerm() != - entries[index].getTerm()) { - toTruncate = true; - break; - } - } - if (toTruncate) { - truncate(truncateIndex); - } - // Collections.addAll(this.entries, entries); - for (int i = index; i < entries.length; i++) { - this.entries.add(entries[i]); - } - } - } - - @Override - public String toString() { - return "last=" + ServerProtoUtils.toString(getLastEntry()) - + ", committed=" - + ServerProtoUtils.toString(get(getLastCommittedIndex())); - } - - public String getEntryString() { - return "entries=" + entries; - } - - @Override - public void logSync() { - CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null); - // do nothing - } - - @Override - public long getLatestFlushedIndex() { - return getNextIndex() - 1; - } - - @Override - public void writeMetadata(long term, String votedFor) { - // do nothing - } - - @Override - public Metadata loadMetadata() { - return new Metadata(null, 0); - } - - @Override - public void syncWithSnapshot(long lastSnapshotIndex) { - // do nothing - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java b/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java deleted file mode 100644 index b2b6f04..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.base.Charsets; -import org.apache.raft.util.AtomicFileOutputStream; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.util.Properties; - -/** - * Class that represents a file on disk which persistently stores - * a single <code>long</code> value. The file is updated atomically - * and durably (i.e fsynced). - */ -class MetaFile { - private static final Logger LOG = LoggerFactory.getLogger(MetaFile.class); - private static final String TERM_KEY = "term"; - private static final String VOTEDFOR_KEY = "votedFor"; - static final long DEFAULT_TERM = 0; - static final String EMPTY_VOTEFOR = ""; - - private final File file; - private boolean loaded = false; - private long term; - private String votedFor; - - MetaFile(File file) { - this.file = file; - term = DEFAULT_TERM; - votedFor = EMPTY_VOTEFOR; - } - - boolean exists() { - return this.file.exists(); - } - - long getTerm() throws IOException { - if (!loaded) { - readFile(); - loaded = true; - } - return term; - } - - String getVotedFor() throws IOException { - if (!loaded) { - readFile(); - loaded = true; - } - return votedFor; - } - - void set(long newTerm, String newVotedFor) throws IOException { - newVotedFor = newVotedFor == null ? EMPTY_VOTEFOR : newVotedFor; - if (!loaded || (newTerm != term || !newVotedFor.equals(votedFor))) { - writeFile(newTerm, newVotedFor); - } - term = newTerm; - votedFor = newVotedFor; - loaded = true; - } - - /** - * Atomically write the given term and votedFor information to the given file, - * including fsyncing. - * - * @throws IOException if the file cannot be written - */ - void writeFile(long term, String votedFor) throws IOException { - AtomicFileOutputStream fos = new AtomicFileOutputStream(file); - Properties properties = new Properties(); - properties.setProperty(TERM_KEY, Long.toString(term)); - properties.setProperty(VOTEDFOR_KEY, votedFor); - try { - properties.store( - new BufferedWriter(new OutputStreamWriter(fos, Charsets.UTF_8)), ""); - fos.close(); - fos = null; - } finally { - if (fos != null) { - fos.abort(); - } - } - } - - void readFile() throws IOException { - term = DEFAULT_TERM; - votedFor = EMPTY_VOTEFOR; - if (file.exists()) { - BufferedReader br = new BufferedReader( - new InputStreamReader(new FileInputStream(file), Charsets.UTF_8)); - try { - Properties properties = new Properties(); - properties.load(br); - if (properties.containsKey(TERM_KEY) && - properties.containsKey(VOTEDFOR_KEY)) { - term = Long.parseLong((String) properties.get(TERM_KEY)); - votedFor = (String) properties.get(VOTEDFOR_KEY); - } else { - throw new IOException("Corrupted term/votedFor properties: " - + properties); - } - } catch(IOException e) { - LOG.warn("Cannot load term/votedFor properties from {}", file, e); - throw e; - } finally { - RaftUtils.cleanup(LOG, br); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java deleted file mode 100644 index de79911..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java +++ /dev/null @@ -1,292 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.base.Preconditions; -import org.apache.raft.server.impl.ConfigurationManager; -import org.apache.raft.server.impl.RaftConfiguration; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.server.impl.ServerProtoUtils; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.AutoCloseableLock; -import org.apache.raft.util.ProtoUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * Base class of RaftLog. Currently we provide two types of RaftLog - * implementation: - * 1. MemoryRaftLog: all the log entries are stored in memory. This is only used - * for testing. - * 2. Segmented RaftLog: the log entries are persisted on disk, and are stored - * in segments. - */ -public abstract class RaftLog implements Closeable { - public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class); - public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new LogEntryProto[0]; - public static final String LOG_SYNC = RaftLog.class.getSimpleName() + ".logSync"; - - /** - * The largest committed index. Note the last committed log may be included - * in the latest snapshot file. - */ - protected final AtomicLong lastCommitted = - new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX); - private final String selfId; - - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - private volatile boolean isOpen = false; - - public RaftLog(String selfId) { - this.selfId = selfId; - } - - public long getLastCommittedIndex() { - return lastCommitted.get(); - } - - public void checkLogState() { - Preconditions.checkState(isOpen, - "The RaftLog has not been opened or has been closed"); - } - - /** - * Update the last committed index. - * @param majorityIndex the index that has achieved majority. - * @param currentTerm the current term. - */ - public void updateLastCommitted(long majorityIndex, long currentTerm) { - try(AutoCloseableLock writeLock = writeLock()) { - if (lastCommitted.get() < majorityIndex) { - // Only update last committed index for current term. See §5.4.2 in - // paper for details. - final LogEntryProto entry = get(majorityIndex); - if (entry != null && entry.getTerm() == currentTerm) { - LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex); - lastCommitted.set(majorityIndex); - } - } - } - } - - /** - * Does the log contains the given term and index? Used to check the - * consistency between the local log of a follower and the log entries sent - * by the leader. - */ - public boolean contains(TermIndex ti) { - if (ti == null) { - return false; - } - LogEntryProto entry = get(ti.getIndex()); - TermIndex local = ServerProtoUtils.toTermIndex(entry); - return ti.equals(local); - } - - /** - * @return the index of the next log entry to append. - */ - public long getNextIndex() { - final LogEntryProto last = getLastEntry(); - if (last == null) { - // if the log is empty, the last committed index should be consistent with - // the last index included in the latest snapshot. - return getLastCommittedIndex() + 1; - } - return last.getIndex() + 1; - } - - /** - * Generate a log entry for the given term and message, and append the entry. - * Used by the leader. - * @return the index of the new log entry. - */ - public long append(long term, TransactionContext operation) throws IOException { - checkLogState(); - try(AutoCloseableLock writeLock = writeLock()) { - final long nextIndex = getNextIndex(); - - // This is called here to guarantee strict serialization of callback executions in case - // the SM wants to attach a logic depending on ordered execution in the log commit order. - operation = operation.preAppendTransaction(); - - // build the log entry after calling the StateMachine - final LogEntryProto e = ProtoUtils.toLogEntryProto( - operation.getSMLogEntry().get(), term, nextIndex); - - appendEntry(e); - operation.setLogEntry(e); - return nextIndex; - } - } - - /** - * Generate a log entry for the given term and configurations, - * and append the entry. Used by the leader. - * @return the index of the new log entry. - */ - public long append(long term, RaftConfiguration newConf) { - checkLogState(); - try(AutoCloseableLock writeLock = writeLock()) { - final long nextIndex = getNextIndex(); - final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term, - nextIndex); - appendEntry(e); - return nextIndex; - } - } - - public void open(ConfigurationManager confManager, long lastIndexInSnapshot) - throws IOException { - isOpen = true; - } - - public abstract long getStartIndex(); - - /** - * Get the log entry of the given index. - * - * @param index The given index. - * @return The log entry associated with the given index. - * Null if there is no log entry with the index. - */ - public abstract LogEntryProto get(long index); - - /** - * @param startIndex the starting log index (inclusive) - * @param endIndex the ending log index (exclusive) - * @return all log entries within the given index range. Null if startIndex - * is greater than the smallest available index. - */ - public abstract LogEntryProto[] getEntries(long startIndex, long endIndex); - - /** - * @return the last log entry. - */ - public abstract LogEntryProto getLastEntry(); - - /** - * Truncate the log entries till the given index. The log with the given index - * will also be truncated (i.e., inclusive). - */ - abstract void truncate(long index); - - /** - * Used by the leader when appending a new entry based on client's request - * or configuration change. - */ - abstract void appendEntry(LogEntryProto entry); - - /** - * Append all the given log entries. Used by the followers. - * - * If an existing entry conflicts with a new one (same index but different - * terms), delete the existing entry and all entries that follow it (§5.3). - * - * This method, {@link #append(long, TransactionContext)}, - * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)}, - * do not guarantee the changes are persisted. - * Need to call {@link #logSync()} to persist the changes. - */ - public abstract void append(LogEntryProto... entries); - - /** - * Flush and sync the log. - * It is triggered by AppendEntries RPC request from the leader. - */ - public abstract void logSync() throws InterruptedException; - - /** - * @return the index of the latest entry that has been flushed to the local - * storage. - */ - public abstract long getLatestFlushedIndex(); - - /** - * Write and flush the metadata (votedFor and term) into the meta file. - * - * We need to guarantee that the order of writeMetadata calls is the same with - * that when we change the in-memory term/votedFor. Otherwise we may persist - * stale term/votedFor in file. - * - * Since the leader change is not frequent, currently we simply put this call - * in the RaftPeer's lock. Later we can use an IO task queue to enforce the - * order. - */ - public abstract void writeMetadata(long term, String votedFor) - throws IOException; - - public abstract Metadata loadMetadata() throws IOException; - - public abstract void syncWithSnapshot(long lastSnapshotIndex); - - @Override - public String toString() { - return ServerProtoUtils.toString(getLastEntry()); - } - - public static class Metadata { - private final String votedFor; - private final long term; - - public Metadata(String votedFor, long term) { - this.votedFor = votedFor; - this.term = term; - } - - public String getVotedFor() { - return votedFor; - } - - public long getTerm() { - return term; - } - } - - public AutoCloseableLock readLock() { - return AutoCloseableLock.acquire(lock.readLock()); - } - - public AutoCloseableLock writeLock() { - return AutoCloseableLock.acquire(lock.writeLock()); - } - - public boolean hasWriteLock() { - return this.lock.isWriteLockedByCurrentThread(); - } - - public boolean hasReadLock() { - return this.lock.getReadHoldCount() > 0 || hasWriteLock(); - } - - @Override - public void close() throws IOException { - isOpen = false; - } - - public String getSelfId() { - return selfId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java deleted file mode 100644 index d022a91..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java +++ /dev/null @@ -1,323 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.server.storage.LogSegment.LogRecord; -import org.apache.raft.server.storage.LogSegment.SegmentFileInfo; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; - -import java.util.*; - -import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; - -/** - * In-memory RaftLog Cache. Currently we provide a simple implementation that - * caches all the segments in the memory. The cache is not thread-safe and - * requires external lock protection. - */ -class RaftLogCache { - private LogSegment openSegment; - private final List<LogSegment> closedSegments; - - RaftLogCache() { - closedSegments = new ArrayList<>(); - } - - private boolean areConsecutiveSegments(LogSegment prev, LogSegment segment) { - return !prev.isOpen() && prev.getEndIndex() + 1 == segment.getStartIndex(); - } - - private LogSegment getLastClosedSegment() { - return closedSegments.isEmpty() ? - null : closedSegments.get(closedSegments.size() - 1); - } - - private void validateAdding(LogSegment segment) { - final LogSegment lastClosed = getLastClosedSegment(); - if (!segment.isOpen()) { - Preconditions.checkState(lastClosed == null || - areConsecutiveSegments(lastClosed, segment)); - } else { - Preconditions.checkState(openSegment == null && - (lastClosed == null || areConsecutiveSegments(lastClosed, segment))); - } - } - - void addSegment(LogSegment segment) { - validateAdding(segment); - if (segment.isOpen()) { - openSegment = segment; - } else { - closedSegments.add(segment); - } - } - - LogEntryProto getEntry(long index) { - if (openSegment != null && index >= openSegment.getStartIndex()) { - final LogRecord record = openSegment.getLogRecord(index); - return record == null ? null : record.entry; - } else { - int segmentIndex = Collections.binarySearch(closedSegments, index); - if (segmentIndex < 0) { - return null; - } else { - return closedSegments.get(segmentIndex).getLogRecord(index).entry; - } - } - } - - /** - * @param startIndex inclusive - * @param endIndex exclusive - */ - LogEntryProto[] getEntries(final long startIndex, final long endIndex) { - if (startIndex < 0 || startIndex < getStartIndex()) { - throw new IndexOutOfBoundsException("startIndex = " + startIndex - + ", log cache starts from index " + getStartIndex()); - } - if (startIndex > endIndex) { - throw new IndexOutOfBoundsException("startIndex(" + startIndex - + ") > endIndex(" + endIndex + ")"); - } - final long realEnd = Math.min(getEndIndex() + 1, endIndex); - if (startIndex >= realEnd) { - return RaftLog.EMPTY_LOGENTRY_ARRAY; - } - - LogEntryProto[] entries = new LogEntryProto[(int) (realEnd - startIndex)]; - int segmentIndex = Collections.binarySearch(closedSegments, startIndex); - if (segmentIndex < 0) { - getEntriesFromSegment(openSegment, startIndex, entries, 0, entries.length); - } else { - long index = startIndex; - for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; i++) { - LogSegment s = closedSegments.get(i); - int numberFromSegment = (int) Math.min(realEnd - index, - s.getEndIndex() - index + 1); - getEntriesFromSegment(s, index, entries, (int) (index - startIndex), - numberFromSegment); - index += numberFromSegment; - } - if (index < realEnd) { - getEntriesFromSegment(openSegment, index, entries, - (int) (index - startIndex), (int) (realEnd - index)); - } - } - return entries; - } - - private void getEntriesFromSegment(LogSegment segment, long startIndex, - LogEntryProto[] entries, int offset, int size) { - long endIndex = segment.getEndIndex(); - endIndex = Math.min(endIndex, startIndex + size - 1); - int index = offset; - for (long i = startIndex; i <= endIndex; i++) { - entries[index++] = segment.getLogRecord(i).entry; - } - } - - long getStartIndex() { - if (closedSegments.isEmpty()) { - return openSegment != null ? openSegment.getStartIndex() : - RaftServerConstants.INVALID_LOG_INDEX; - } else { - return closedSegments.get(0).getStartIndex(); - } - } - - @VisibleForTesting - long getEndIndex() { - return openSegment != null ? openSegment.getEndIndex() : - (closedSegments.isEmpty() ? - INVALID_LOG_INDEX : - closedSegments.get(closedSegments.size() - 1).getEndIndex()); - } - - LogEntryProto getLastEntry() { - return (openSegment != null && openSegment.numOfEntries() > 0) ? - openSegment.getLastRecord().entry : - (closedSegments.isEmpty() ? null : - closedSegments.get(closedSegments.size() - 1).getLastRecord().entry); - } - - LogSegment getOpenSegment() { - return openSegment; - } - - void appendEntry(LogEntryProto entry) { - // SegmentedRaftLog does the segment creation/rolling work. Here we just - // simply append the entry into the open segment. - Preconditions.checkState(openSegment != null); - openSegment.appendToOpenSegment(entry); - } - - /** - * finalize the current open segment, and start a new open segment - */ - void rollOpenSegment(boolean createNewOpen) { - Preconditions.checkState(openSegment != null - && openSegment.numOfEntries() > 0); - final long nextIndex = openSegment.getEndIndex() + 1; - openSegment.close(); - closedSegments.add(openSegment); - if (createNewOpen) { - openSegment = LogSegment.newOpenSegment(nextIndex); - } else { - openSegment = null; - } - } - - private SegmentFileInfo deleteOpenSegment() { - final long oldEnd = openSegment.getEndIndex(); - openSegment.clear(); - SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(), - oldEnd, true, 0, openSegment.getEndIndex()); - openSegment = null; - return info; - } - - /** - * truncate log entries starting from the given index (inclusive) - */ - TruncationSegments truncate(long index) { - int segmentIndex = Collections.binarySearch(closedSegments, index); - if (segmentIndex == -closedSegments.size() - 1) { - if (openSegment != null && openSegment.getEndIndex() >= index) { - final long oldEnd = openSegment.getEndIndex(); - if (index == openSegment.getStartIndex()) { - // the open segment should be deleted - return new TruncationSegments(null, - Collections.singletonList(deleteOpenSegment())); - } else { - openSegment.truncate(index); - Preconditions.checkState(!openSegment.isOpen()); - SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(), - oldEnd, true, openSegment.getTotalSize(), - openSegment.getEndIndex()); - closedSegments.add(openSegment); - openSegment = null; - return new TruncationSegments(info, Collections.emptyList()); - } - } - } else if (segmentIndex >= 0) { - LogSegment ts = closedSegments.get(segmentIndex); - final long oldEnd = ts.getEndIndex(); - List<SegmentFileInfo> list = new ArrayList<>(); - ts.truncate(index); - final int size = closedSegments.size(); - for (int i = size - 1; - i >= (ts.numOfEntries() == 0 ? segmentIndex : segmentIndex + 1); - i-- ) { - LogSegment s = closedSegments.remove(i); - final long endOfS = i == segmentIndex ? oldEnd : s.getEndIndex(); - s.clear(); - list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0, - s.getEndIndex())); - } - if (openSegment != null) { - list.add(deleteOpenSegment()); - } - SegmentFileInfo t = ts.numOfEntries() == 0 ? null : - new SegmentFileInfo(ts.getStartIndex(), oldEnd, false, - ts.getTotalSize(), ts.getEndIndex()); - return new TruncationSegments(t, list); - } - return null; - } - - static class TruncationSegments { - final SegmentFileInfo toTruncate; // name of the file to be truncated - final SegmentFileInfo[] toDelete; // names of the files to be deleted - - TruncationSegments(SegmentFileInfo toTruncate, - List<SegmentFileInfo> toDelete) { - this.toDelete = toDelete == null ? null : - toDelete.toArray(new SegmentFileInfo[toDelete.size()]); - this.toTruncate = toTruncate; - } - } - - Iterator<LogEntryProto> iterator(long startIndex) { - return new EntryIterator(startIndex); - } - - private class EntryIterator implements Iterator<LogEntryProto> { - private long nextIndex; - private LogSegment currentSegment; - private int segmentIndex; - - EntryIterator(long start) { - this.nextIndex = start; - segmentIndex = Collections.binarySearch(closedSegments, nextIndex); - if (segmentIndex >= 0) { - currentSegment = closedSegments.get(segmentIndex); - } else { - segmentIndex = -segmentIndex - 1; - if (segmentIndex == closedSegments.size()) { - currentSegment = openSegment; - } else { - // the start index is smaller than the first closed segment's start - // index. We no longer keep the log entry (because of the snapshot) or - // the start index is invalid. - Preconditions.checkState(segmentIndex == 0); - throw new IndexOutOfBoundsException(); - } - } - } - - @Override - public boolean hasNext() { - return currentSegment != null && - currentSegment.getLogRecord(nextIndex) != null; - } - - @Override - public LogEntryProto next() { - LogRecord record; - if (currentSegment == null || - (record = currentSegment.getLogRecord(nextIndex)) == null) { - throw new NoSuchElementException(); - } - if (++nextIndex > currentSegment.getEndIndex()) { - if (currentSegment != openSegment) { - segmentIndex++; - currentSegment = segmentIndex == closedSegments.size() ? - openSegment : closedSegments.get(segmentIndex); - } - } - return record.entry; - } - } - - @VisibleForTesting - int getNumOfSegments() { - return closedSegments.size() + (openSegment == null ? 0 : 1); - } - - boolean isEmpty() { - return closedSegments.isEmpty() && openSegment == null; - } - - void clear() { - openSegment = null; - closedSegments.clear(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java deleted file mode 100644 index 6cef212..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java +++ /dev/null @@ -1,371 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.io.nativeio.NativeIO; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.server.storage.LogSegment.SegmentFileInfo; -import org.apache.raft.server.storage.RaftLogCache.TruncationSegments; -import org.apache.raft.server.storage.SegmentedRaftLog.Task; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.ExitUtils; -import org.apache.raft.util.FileUtils; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_DEFAULT; -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_KEY; - -/** - * This class takes the responsibility of all the raft log related I/O ops for a - * raft peer. - */ -class RaftLogWorker implements Runnable { - static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class); - /** - * The task queue accessed by rpc handler threads and the io worker thread. - */ - private final BlockingQueue<Task> queue = new ArrayBlockingQueue<>(4096); - private volatile boolean running = true; - private final Thread workerThread; - - private final RaftStorage storage; - private LogOutputStream out; - private final RaftServerImpl raftServer; - - /** - * The number of entries that have been written into the LogOutputStream but - * has not been flushed. - */ - private int pendingFlushNum = 0; - /** the index of the last entry that has been written */ - private long lastWrittenIndex; - /** the largest index of the entry that has been flushed */ - private volatile long flushedIndex; - - private final int forceSyncNum; - - private final RaftProperties properties; - - RaftLogWorker(RaftServerImpl raftServer, RaftStorage storage, - RaftProperties properties) { - this.raftServer = raftServer; - this.storage = storage; - this.properties = properties; - this.forceSyncNum = properties.getInt(RAFT_LOG_FORCE_SYNC_NUM_KEY, - RAFT_LOG_FORCE_SYNC_NUM_DEFAULT); - workerThread = new Thread(this, - getClass().getSimpleName() + " for " + storage); - } - - void start(long latestIndex, File openSegmentFile) throws IOException { - lastWrittenIndex = latestIndex; - flushedIndex = latestIndex; - if (openSegmentFile != null) { - Preconditions.checkArgument(openSegmentFile.exists()); - out = new LogOutputStream(openSegmentFile, true, properties); - } - workerThread.start(); - } - - void close() { - this.running = false; - workerThread.interrupt(); - try { - workerThread.join(); - } catch (InterruptedException ignored) { - } - } - - /** - * A snapshot has just been installed on the follower. Need to update the IO - * worker's state accordingly. - */ - void syncWithSnapshot(long lastSnapshotIndex) { - queue.clear(); - lastWrittenIndex = lastSnapshotIndex; - flushedIndex = lastSnapshotIndex; - pendingFlushNum = 0; - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "-" - + (raftServer != null ? raftServer.getId() : ""); - } - - /** - * This is protected by the RaftServer and RaftLog's lock. - */ - private Task addIOTask(Task task) { - LOG.debug("add task {}", task); - try { - if (!queue.offer(task, 1, TimeUnit.SECONDS)) { - Preconditions.checkState(isAlive(), - "the worker thread is not alive"); - queue.put(task); - } - } catch (Throwable t) { - if (t instanceof InterruptedException && !running) { - LOG.info("Got InterruptedException when adding task " + task - + ". The RaftLogWorker already stopped."); - } else { - ExitUtils.terminate(2, "Failed to add IO task " + task, t, LOG); - } - } - return task; - } - - boolean isAlive() { - return running && workerThread.isAlive(); - } - - @Override - public void run() { - while (running) { - try { - Task task = queue.poll(1, TimeUnit.SECONDS); - if (task != null) { - try { - task.execute(); - } catch (IOException e) { - if (task.getEndIndex() < lastWrittenIndex) { - LOG.info("Ignore IOException when handling task " + task - + " which is smaller than the lastWrittenIndex." - + " There should be a snapshot installed.", e); - } else { - throw e; - } - } - task.done(); - } - } catch (InterruptedException e) { - LOG.info(Thread.currentThread().getName() - + " was interrupted, exiting. There are " + queue.size() - + " tasks remaining in the queue."); - } catch (Throwable t) { - // TODO avoid terminating the jvm by supporting multiple log directories - ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.", t, LOG); - } - } - } - - private boolean shouldFlush() { - return pendingFlushNum >= forceSyncNum || - (pendingFlushNum > 0 && queue.isEmpty()); - } - - private void flushWrites() throws IOException { - if (out != null) { - LOG.debug("flush data to " + out + ", reset pending_sync_number to 0"); - out.flush(); - updateFlushedIndex(); - } - } - - private void updateFlushedIndex() { - flushedIndex = lastWrittenIndex; - pendingFlushNum = 0; - if (raftServer != null) { - raftServer.submitLocalSyncEvent(); - } - } - - /** - * The following several methods (startLogSegment, rollLogSegment, - * writeLogEntry, and truncate) are only called by SegmentedRaftLog which is - * protected by RaftServer's lock. - * - * Thus all the tasks are created and added sequentially. - */ - Task startLogSegment(long startIndex) { - return addIOTask(new StartLogSegment(startIndex)); - } - - Task rollLogSegment(LogSegment segmentToClose) { - addIOTask(new FinalizeLogSegment(segmentToClose)); - return addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1)); - } - - Task writeLogEntry(LogEntryProto entry) { - return addIOTask(new WriteLog(entry)); - } - - Task truncate(TruncationSegments ts) { - return addIOTask(new TruncateLog(ts)); - } - - // TODO we can add another level of buffer for writing here - private class WriteLog extends Task { - private final LogEntryProto entry; - - WriteLog(LogEntryProto entry) { - this.entry = entry; - } - - @Override - public void execute() throws IOException { - Preconditions.checkState(out != null); - Preconditions.checkState(lastWrittenIndex + 1 == entry.getIndex(), - "lastWrittenIndex == %s, entry == %s", lastWrittenIndex, entry); - out.write(entry); - lastWrittenIndex = entry.getIndex(); - pendingFlushNum++; - if (shouldFlush()) { - flushWrites(); - } - } - - @Override - long getEndIndex() { - return entry.getIndex(); - } - } - - private class FinalizeLogSegment extends Task { - private final LogSegment segmentToClose; - - FinalizeLogSegment(LogSegment segmentToClose) { - this.segmentToClose = segmentToClose; - } - - @Override - public void execute() throws IOException { - RaftUtils.cleanup(null, out); - out = null; - Preconditions.checkState(segmentToClose != null); - - File openFile = storage.getStorageDir() - .getOpenLogFile(segmentToClose.getStartIndex()); - Preconditions.checkState(openFile.exists(), - "File %s does not exist.", openFile); - if (segmentToClose.numOfEntries() > 0) { - // finalize the current open segment - File dstFile = storage.getStorageDir().getClosedLogFile( - segmentToClose.getStartIndex(), segmentToClose.getEndIndex()); - Preconditions.checkState(!dstFile.exists()); - - NativeIO.renameTo(openFile, dstFile); - } else { // delete the file of the empty segment - FileUtils.deleteFile(openFile); - } - updateFlushedIndex(); - } - - @Override - long getEndIndex() { - return segmentToClose.getEndIndex(); - } - } - - private class StartLogSegment extends Task { - private final long newStartIndex; - - StartLogSegment(long newStartIndex) { - this.newStartIndex = newStartIndex; - } - - @Override - void execute() throws IOException { - File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex); - Preconditions.checkState(!openFile.exists(), "open file %s exists for %s", - openFile.getAbsolutePath(), RaftLogWorker.this.toString()); - Preconditions.checkState(out == null && pendingFlushNum == 0); - out = new LogOutputStream(openFile, false, properties); - } - - @Override - long getEndIndex() { - return newStartIndex; - } - } - - private class TruncateLog extends Task { - private final TruncationSegments segments; - - TruncateLog(TruncationSegments ts) { - this.segments = ts; - } - - @Override - void execute() throws IOException { - RaftUtils.cleanup(null, out); - out = null; - if (segments.toTruncate != null) { - File fileToTruncate = segments.toTruncate.isOpen ? - storage.getStorageDir().getOpenLogFile( - segments.toTruncate.startIndex) : - storage.getStorageDir().getClosedLogFile( - segments.toTruncate.startIndex, - segments.toTruncate.endIndex); - FileUtils.truncateFile(fileToTruncate, segments.toTruncate.targetLength); - - // rename the file - File dstFile = storage.getStorageDir().getClosedLogFile( - segments.toTruncate.startIndex, segments.toTruncate.newEndIndex); - Preconditions.checkState(!dstFile.exists()); - NativeIO.renameTo(fileToTruncate, dstFile); - - // update lastWrittenIndex - lastWrittenIndex = segments.toTruncate.newEndIndex; - } - if (segments.toDelete != null && segments.toDelete.length > 0) { - long minStart = segments.toDelete[0].startIndex; - for (SegmentFileInfo del : segments.toDelete) { - final File delFile; - if (del.isOpen) { - delFile = storage.getStorageDir().getOpenLogFile(del.startIndex); - } else { - delFile = storage.getStorageDir() - .getClosedLogFile(del.startIndex, del.endIndex); - } - FileUtils.deleteFile(delFile); - minStart = Math.min(minStart, del.startIndex); - } - if (segments.toTruncate == null) { - lastWrittenIndex = minStart - 1; - } - } - updateFlushedIndex(); - } - - @Override - long getEndIndex() { - if (segments.toTruncate != null) { - return segments.toTruncate.newEndIndex; - } else if (segments.toDelete.length > 0) { - return segments.toDelete[segments.toDelete.length - 1].endIndex; - } - return RaftServerConstants.INVALID_LOG_INDEX; - } - } - - long getFlushedIndex() { - return flushedIndex; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java deleted file mode 100644 index 434f505..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.server.storage.RaftStorageDirectory.StorageState; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.statemachine.StateMachineStorage; -import org.apache.raft.util.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_DEFAULT; -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY; - -public class RaftStorage implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(RaftStorage.class); - - // TODO support multiple storage directories - private final RaftStorageDirectory storageDir; - private final StorageState state; - private volatile MetaFile metaFile; - private StateMachineStorage stateMachineStorage; - - public RaftStorage(RaftProperties prop, RaftServerConstants.StartupOption option) - throws IOException { - final String dir = prop.get(RAFT_SERVER_STORAGE_DIR_KEY, - RAFT_SERVER_STORAGE_DIR_DEFAULT); - storageDir = new RaftStorageDirectory( - new File(FileUtils.stringAsURI(dir).getPath())); - if (option == RaftServerConstants.StartupOption.FORMAT) { - if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) { - throw new IOException("Cannot format " + storageDir); - } - storageDir.lock(); - format(); - state = storageDir.analyzeStorage(false); - Preconditions.checkState(state == StorageState.NORMAL); - } else { - state = analyzeAndRecoverStorage(true); // metaFile is initialized here - if (state != StorageState.NORMAL) { - storageDir.unlock(); - throw new IOException("Cannot load " + storageDir - + ". Its state: " + state); - } - } - } - - StorageState getState() { - return state; - } - - private void format() throws IOException { - storageDir.clearDirectory(); - metaFile = writeMetaFile(MetaFile.DEFAULT_TERM, MetaFile.EMPTY_VOTEFOR); - LOG.info("Storage directory " + storageDir.getRoot() - + " has been successfully formatted."); - } - - private MetaFile writeMetaFile(long term, String votedFor) throws IOException { - MetaFile metaFile = new MetaFile(storageDir.getMetaFile()); - metaFile.set(term, votedFor); - return metaFile; - } - - private void cleanMetaTmpFile() throws IOException { - Files.deleteIfExists(storageDir.getMetaTmpFile().toPath()); - } - - private StorageState analyzeAndRecoverStorage(boolean toLock) - throws IOException { - StorageState storageState = storageDir.analyzeStorage(toLock); - if (storageState == StorageState.NORMAL) { - metaFile = new MetaFile(storageDir.getMetaFile()); - assert metaFile.exists(); - metaFile.readFile(); - // Existence of raft-meta.tmp means the change of votedFor/term has not - // been committed. Thus we should delete the tmp file. - cleanMetaTmpFile(); - return StorageState.NORMAL; - } else if (storageState == StorageState.NOT_FORMATTED && - storageDir.isCurrentEmpty()) { - format(); - return StorageState.NORMAL; - } else { - return storageState; - } - } - - public RaftStorageDirectory getStorageDir() { - return storageDir; - } - - @Override - public void close() throws IOException { - storageDir.unlock(); - } - - MetaFile getMetaFile() { - return metaFile; - } - - public SnapshotInfo getLastestSnapshot() throws IOException { - return getStateMachineStorage().getLatestSnapshot(); - } - - /** - * Called by the state machine after it has initialized the StateMachineStorage. - */ - public void setStateMachineStorage(StateMachineStorage smStorage) { - this.stateMachineStorage = smStorage; - } - - public StateMachineStorage getStateMachineStorage() { - return stateMachineStorage; - } - - @Override - public String toString() { - return getStorageDir() + ""; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java deleted file mode 100644 index 662e4ec..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java +++ /dev/null @@ -1,360 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import org.apache.raft.util.AtomicFileOutputStream; -import org.apache.raft.util.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.lang.management.ManagementFactory; -import java.nio.channels.FileLock; -import java.nio.channels.OverlappingFileLockException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static java.nio.file.Files.newDirectoryStream; -import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; - -public class RaftStorageDirectory { - static final Logger LOG = LoggerFactory.getLogger(RaftStorageDirectory.class); - - static final String STORAGE_DIR_CURRENT = "current"; - static final String STORAGE_FILE_LOCK = "in_use.lock"; - static final String META_FILE_NAME = "raft-meta"; - static final String LOG_FILE_INPROGRESS = "inprogress"; - static final String LOG_FILE_PREFIX = "log"; - static final String STATE_MACHINE = "sm"; // directory containing state machine snapshots - static final String TEMP = "tmp"; - static final Pattern CLOSED_SEGMENT_REGEX = Pattern.compile("log_(\\d+)-(\\d+)"); - static final Pattern OPEN_SEGMENT_REGEX = Pattern.compile("log_inprogress_(\\d+)(?:\\..*)?"); - - private static final List<Pattern> LOGSEGMENTS_REGEXES = - ImmutableList.of(CLOSED_SEGMENT_REGEX, OPEN_SEGMENT_REGEX); - - enum StorageState { - NON_EXISTENT, - NOT_FORMATTED, - NORMAL - } - - public static class LogPathAndIndex { - public final Path path; - public final long startIndex; - public final long endIndex; - - LogPathAndIndex(Path path, long startIndex, long endIndex) { - this.path = path; - this.startIndex = startIndex; - this.endIndex = endIndex; - } - - @Override - public String toString() { - return path + "-" + startIndex + "-" + endIndex; - } - } - - private final File root; // root directory - private FileLock lock; // storage lock - - /** - * Constructor - * @param dir directory corresponding to the storage - */ - RaftStorageDirectory(File dir) { - this.root = dir; - this.lock = null; - } - - /** - * Get root directory of this storage - */ - //TODO - public File getRoot() { - return root; - } - - /** - * Clear and re-create storage directory. - * <p> - * Removes contents of the current directory and creates an empty directory. - * - * This does not fully format storage directory. - * It cannot write the version file since it should be written last after - * all other storage type dependent files are written. - * Derived storage is responsible for setting specific storage values and - * writing the version file to disk. - */ - void clearDirectory() throws IOException { - File curDir = this.getCurrentDir(); - clearDirectory(curDir); - clearDirectory(getStateMachineDir()); - } - - void clearDirectory(File dir) throws IOException { - if (dir.exists()) { - File[] files = FileUtils.listFiles(dir); - LOG.info("Will remove files: " + Arrays.toString(files)); - if (!(FileUtils.fullyDelete(dir))) - throw new IOException("Cannot remove directory: " + dir); - } - if (!dir.mkdirs()) - throw new IOException("Cannot create directory " + dir); - } - - /** - * Directory {@code current} contains latest files defining - * the file system meta-data. - * - * @return the directory path - */ - File getCurrentDir() { - return new File(root, STORAGE_DIR_CURRENT); - } - - File getMetaFile() { - return new File(getCurrentDir(), META_FILE_NAME); - } - - File getMetaTmpFile() { - return new File(getCurrentDir(), META_FILE_NAME - + AtomicFileOutputStream.TMP_EXTENSION); - } - - File getOpenLogFile(long startIndex) { - return new File(getCurrentDir(), getOpenLogFileName(startIndex)); - } - - static String getOpenLogFileName(long startIndex) { - return LOG_FILE_PREFIX + "_" + LOG_FILE_INPROGRESS + "_" + startIndex; - } - - File getClosedLogFile(long startIndex, long endIndex) { - return new File(getCurrentDir(), getClosedLogFileName(startIndex, endIndex)); - } - - static String getClosedLogFileName(long startIndex, long endIndex) { - return LOG_FILE_PREFIX + "_" + startIndex + "-" + endIndex; - } - - public File getStateMachineDir() { - return new File(getRoot(), STATE_MACHINE); - } - - /** Returns a uniquely named temporary directory under $rootdir/tmp/ */ - public File getNewTempDir() { - return new File(new File(getRoot(), TEMP), UUID.randomUUID().toString()); - } - - public Path relativizeToRoot(Path p) { - if (p.isAbsolute()) { - return getRoot().toPath().relativize(p); - } - return p; - } - - /** - * @return log segment files sorted based on their index. - */ - @VisibleForTesting - public List<LogPathAndIndex> getLogSegmentFiles() throws IOException { - List<LogPathAndIndex> list = new ArrayList<>(); - try (DirectoryStream<Path> stream = - Files.newDirectoryStream(getCurrentDir().toPath())) { - for (Path path : stream) { - for (Pattern pattern : LOGSEGMENTS_REGEXES) { - Matcher matcher = pattern.matcher(path.getFileName().toString()); - if (matcher.matches()) { - final long startIndex = Long.parseLong(matcher.group(1)); - final long endIndex = matcher.groupCount() == 2 ? - Long.parseLong(matcher.group(2)) : INVALID_LOG_INDEX; - list.add(new LogPathAndIndex(path, startIndex, endIndex)); - } - } - } - } - Collections.sort(list, - (o1, o2) -> o1.startIndex == o2.startIndex ? - 0 : (o1.startIndex < o2.startIndex ? -1 : 1)); - return list; - } - - /** - * Check to see if current/ directory is empty. - */ - boolean isCurrentEmpty() throws IOException { - File currentDir = getCurrentDir(); - if(!currentDir.exists()) { - // if current/ does not exist, it's safe to format it. - return true; - } - try(DirectoryStream<Path> dirStream = - newDirectoryStream(currentDir.toPath())) { - if (dirStream.iterator().hasNext()) { - return false; - } - } - return true; - } - - /** - * Check consistency of the storage directory. - * - * @return state {@link StorageState} of the storage directory - */ - StorageState analyzeStorage(boolean toLock) throws IOException { - Preconditions.checkState(root != null, "root directory is null"); - - String rootPath = root.getCanonicalPath(); - try { // check that storage exists - if (!root.exists()) { - LOG.info(rootPath + " does not exist. Creating ..."); - if (!root.mkdirs()) { - throw new IOException("Cannot create directory " + rootPath); - } - } - // or is inaccessible - if (!root.isDirectory()) { - LOG.warn(rootPath + "is not a directory"); - return StorageState.NON_EXISTENT; - } - if (!FileUtils.canWrite(root)) { - LOG.warn("Cannot access storage directory " + rootPath); - return StorageState.NON_EXISTENT; - } - } catch(SecurityException ex) { - LOG.warn("Cannot access storage directory " + rootPath, ex); - return StorageState.NON_EXISTENT; - } - - if (toLock) { - this.lock(); // lock storage if it exists - } - - // check whether current directory is valid - if (hasMetaFile()) { - return StorageState.NORMAL; - } else { - return StorageState.NOT_FORMATTED; - } - } - - boolean hasMetaFile() throws IOException { - return getMetaFile().exists(); - } - - /** - * Lock storage to provide exclusive access. - * - * <p> Locking is not supported by all file systems. - * E.g., NFS does not consistently support exclusive locks. - * - * <p> If locking is supported we guarantee exclusive access to the - * storage directory. Otherwise, no guarantee is given. - * - * @throws IOException if locking fails - */ - public void lock() throws IOException { - FileLock newLock = tryLock(); - if (newLock == null) { - String msg = "Cannot lock storage " + this.root - + ". The directory is already locked"; - LOG.info(msg); - throw new IOException(msg); - } - // Don't overwrite lock until success - this way if we accidentally - // call lock twice, the internal state won't be cleared by the second - // (failed) lock attempt - lock = newLock; - } - - /** - * Attempts to acquire an exclusive lock on the storage. - * - * @return A lock object representing the newly-acquired lock or - * <code>null</code> if storage is already locked. - * @throws IOException if locking fails. - */ - private FileLock tryLock() throws IOException { - boolean deletionHookAdded = false; - File lockF = new File(root, STORAGE_FILE_LOCK); - if (!lockF.exists()) { - lockF.deleteOnExit(); - deletionHookAdded = true; - } - RandomAccessFile file = new RandomAccessFile(lockF, "rws"); - String jvmName = ManagementFactory.getRuntimeMXBean().getName(); - FileLock res; - try { - res = file.getChannel().tryLock(); - if (null == res) { - LOG.error("Unable to acquire file lock on path " + lockF.toString()); - throw new OverlappingFileLockException(); - } - file.write(jvmName.getBytes(Charsets.UTF_8)); - LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName); - } catch (OverlappingFileLockException oe) { - // Cannot read from the locked file on Windows. - LOG.error("It appears that another process " - + "has already locked the storage directory: " + root, oe); - file.close(); - return null; - } catch(IOException e) { - LOG.error("Failed to acquire lock on " + lockF - + ". If this storage directory is mounted via NFS, " - + "ensure that the appropriate nfs lock services are running.", e); - file.close(); - throw e; - } - if (!deletionHookAdded) { - // If the file existed prior to our startup, we didn't - // call deleteOnExit above. But since we successfully locked - // the dir, we can take care of cleaning it up. - lockF.deleteOnExit(); - } - return res; - } - - /** - * Unlock storage. - */ - public void unlock() throws IOException { - if (this.lock == null) - return; - this.lock.release(); - lock.channel().close(); - lock = null; - } - - @Override - public String toString() { - return "Storage Directory " + this.root; - } -}
