Make SSTableWriter.openEarly more robust and obvious patch by benedict; reviewed by marcus for CASSANDRA-8747
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4eb9fa78 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4eb9fa78 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4eb9fa78 Branch: refs/heads/trunk Commit: 4eb9fa78bd233f5f9b901dd677636842b351330b Parents: f57ec8c Author: Benedict Elliott Smith <bened...@apache.org> Authored: Thu Feb 12 13:35:23 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Feb 12 13:35:23 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/compress/CompressedSequentialWriter.java | 2 + .../io/sstable/IndexSummaryBuilder.java | 162 ++++++++++++++----- .../cassandra/io/sstable/SSTableWriter.java | 76 +++++---- .../cassandra/io/util/SequentialWriter.java | 10 ++ 5 files changed, 168 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2466b19..cbb4334 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.4 + * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747) * Enforce SSTableReader.first/last (CASSANDRA-8744) * Cleanup SegmentedFile API (CASSANDRA-8749) * Avoid overlap with early compaction replacement (CASSANDRA-8683) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 81bb3e9..87eb2fb 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -138,6 +138,8 @@ public class CompressedSequentialWriter extends SequentialWriter // next chunk should be written right after current + length of the checksum (int) chunkOffset += compressedLength + 4; + if (runPostFlush != null) + runPostFlush.run(); } public CompressionMetadata open(SSTableWriter.FinishType finishType) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java index df326d7..3b93b31 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java @@ -20,6 +20,8 @@ package org.apache.cassandra.io.sstable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +47,41 @@ public class IndexSummaryBuilder private long keysWritten = 0; private long indexIntervalMatches = 0; private long offheapSize = 0; + private long nextSamplePosition; + + // for each ReadableBoundary, we map its dataLength property to itself, permitting us to lookup the + // last readable boundary from the perspective of the data file + // [data file position limit] => [ReadableBoundary] + private TreeMap<Long, ReadableBoundary> lastReadableByData = new TreeMap<>(); + // for each ReadableBoundary, we map its indexLength property to itself, permitting us to lookup the + // last readable boundary from the perspective of the index file + // [index file position limit] => [ReadableBoundary] + private TreeMap<Long, ReadableBoundary> lastReadableByIndex = new TreeMap<>(); + // the last synced data file position + private long dataSyncPosition; + // the last synced index file position + private long indexSyncPosition; + + // the last summary interval boundary that is fully readable in both data and index files + private ReadableBoundary lastReadableBoundary; + + /** + * Represents a boundary that is guaranteed fully readable in the summary, index file and data file. + * The key contained is the last key readable if the index and data files have been flushed to the + * stored lengths. + */ + public static class ReadableBoundary + { + final DecoratedKey lastKey; + final long indexLength; + final long dataLength; + public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength) + { + this.lastKey = lastKey; + this.indexLength = indexLength; + this.dataLength = dataLength; + } + } public IndexSummaryBuilder(long expectedKeys, int minIndexInterval, int samplingLevel) { @@ -71,74 +108,113 @@ public class IndexSummaryBuilder maxExpectedEntries = (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL; positions = new ArrayList<>((int)maxExpectedEntries); keys = new ArrayList<>((int)maxExpectedEntries); + // if we're downsampling we may not use index 0 + setNextSamplePosition(-minIndexInterval); } - // finds the last (-offset) decorated key that can be guaranteed to occur fully in the index file before the provided file position - public DecoratedKey getMaxReadableKey(long position, int offset) + // the index file has been flushed to the provided position; stash it and use that to recalculate our max readable boundary + public void markIndexSynced(long upToPosition) { - int i = Collections.binarySearch(positions, position); - if (i < 0) - { - i = -1 - i; - if (i == positions.size()) - i -= 2; - else - i -= 1; - } - else - i -= 1; - i -= offset; - // we don't want to return any key if there's only 1 item in the summary, to make sure the sstable range is non-empty - if (i <= 0) - return null; - return keys.get(i); + indexSyncPosition = upToPosition; + refreshReadableBoundary(); } - public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexPosition) + // the data file has been flushed to the provided position; stash it and use that to recalculate our max readable boundary + public void markDataSynced(long upToPosition) { - if (keysWritten % minIndexInterval == 0) - { - // see if we should skip this key based on our sampling level - boolean shouldSkip = false; - for (int start : startPoints) - { - if ((indexIntervalMatches - start) % BASE_SAMPLING_LEVEL == 0) - { - shouldSkip = true; - break; - } - } + dataSyncPosition = upToPosition; + refreshReadableBoundary(); + } - if (!shouldSkip) - { - keys.add(getMinimalKey(decoratedKey)); - offheapSize += decoratedKey.getKey().remaining(); - positions.add(indexPosition); - offheapSize += TypeSizes.NATIVE.sizeof(indexPosition); - } + private void refreshReadableBoundary() + { + // grab the readable boundary prior to the given position in either the data or index file + Map.Entry<?, ReadableBoundary> byData = lastReadableByData.floorEntry(dataSyncPosition); + Map.Entry<?, ReadableBoundary> byIndex = lastReadableByIndex.floorEntry(indexSyncPosition); + if (byData == null || byIndex == null) + return; + + // take the lowest of the two, and stash it + lastReadableBoundary = byIndex.getValue().indexLength < byData.getValue().indexLength + ? byIndex.getValue() : byData.getValue(); + + // clear our data prior to this, since we no longer need it + lastReadableByData.headMap(lastReadableBoundary.dataLength, false).clear(); + lastReadableByIndex.headMap(lastReadableBoundary.indexLength, false).clear(); + } - indexIntervalMatches++; + public ReadableBoundary getLastReadableBoundary() + { + return lastReadableBoundary; + } + + public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart) + { + return maybeAddEntry(decoratedKey, indexStart, 0, 0); + } + + /** + * + * @param decoratedKey the key for this record + * @param indexStart the position in the index file this record begins + * @param indexEnd the position in the index file we need to be able to read to (exclusive) to read this record + * @param dataEnd the position in the data file we need to be able to read to (exclusive) to read this record + * a value of 0 indicates we are not tracking readable boundaries + */ + public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd) + { + if (keysWritten == nextSamplePosition) + { + keys.add(getMinimalKey(decoratedKey)); + offheapSize += decoratedKey.getKey().remaining(); + positions.add(indexStart); + offheapSize += TypeSizes.NATIVE.sizeof(indexStart); + setNextSamplePosition(keysWritten); + } + else if (dataEnd != 0 && keysWritten + 1 == nextSamplePosition) + { + // this is the last key in this summary interval, so stash it + ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd); + lastReadableByData.put(dataEnd, boundary); + lastReadableByIndex.put(indexEnd, boundary); } keysWritten++; return this; } + // calculate the next key we will store to our summary + private void setNextSamplePosition(long position) + { + tryAgain: while (true) + { + position += minIndexInterval; + long test = indexIntervalMatches++; + for (int start : startPoints) + if ((test - start) % BASE_SAMPLING_LEVEL == 0) + continue tryAgain; + + nextSamplePosition = position; + return; + } + } + public IndexSummary build(IPartitioner partitioner) { return build(partitioner, null); } - public IndexSummary build(IPartitioner partitioner, DecoratedKey exclusiveUpperBound) + // lastIntervalKey should come from getLastReadableBoundary().lastKey + public IndexSummary build(IPartitioner partitioner, DecoratedKey lastIntervalKey) { assert keys.size() > 0; assert keys.size() == positions.size(); int length; - if (exclusiveUpperBound == null) + if (lastIntervalKey == null) length = keys.size(); - else - length = Collections.binarySearch(keys, exclusiveUpperBound); + else // since it's an inclusive upper bound, this should never match exactly + length = -1 -Collections.binarySearch(keys, lastIntervalKey); assert length > 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index d430314..2c1cf0e 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -131,7 +131,6 @@ public class SSTableWriter extends SSTable metadata, partitioner); this.repairedAt = repairedAt; - iwriter = new IndexWriter(keyCount); if (compression) { @@ -146,6 +145,7 @@ public class SSTableWriter extends SSTable dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC))); dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); } + iwriter = new IndexWriter(keyCount, dataFile); this.sstableMetadataCollector = sstableMetadataCollector; } @@ -183,7 +183,7 @@ public class SSTableWriter extends SSTable if (logger.isTraceEnabled()) logger.trace("wrote " + decoratedKey + " at " + dataPosition); - iwriter.append(decoratedKey, index); + iwriter.append(decoratedKey, index, dataPosition); dbuilder.addPotentialBoundary(dataPosition); } @@ -193,11 +193,11 @@ public class SSTableWriter extends SSTable */ public RowIndexEntry append(AbstractCompactedRow row) { - long currentPosition = beforeAppend(row.key); + long startPosition = beforeAppend(row.key); RowIndexEntry entry; try { - entry = row.write(currentPosition, dataFile.stream); + entry = row.write(startPosition, dataFile.stream); if (entry == null) return null; } @@ -205,8 +205,9 @@ public class SSTableWriter extends SSTable { throw new FSWriteError(e, dataFile.getPath()); } - sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); - afterAppend(row.key, currentPosition, entry); + long endPosition = dataFile.getFilePointer(); + sstableMetadataCollector.update(endPosition - startPosition, row.columnStats()); + afterAppend(row.key, endPosition, entry); return entry; } @@ -390,10 +391,11 @@ public class SSTableWriter extends SSTable repairedAt).get(MetadataType.STATS); // find the max (exclusive) readable key - DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0); - if (exclusiveUpperBoundOfReadableIndex == null) + IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable(); + if (boundary == null) return null; + assert boundary.indexLength > 0 && boundary.dataLength > 0; Descriptor link = makeTmpLinks(); // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY); @@ -401,33 +403,12 @@ public class SSTableWriter extends SSTable SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL), components, metadata, partitioner, ifile, - dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex), + dfile, iwriter.summary.build(partitioner, boundary.lastKey), iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY); // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed) sstable.first = getMinimalKey(first); - sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex); - DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1); - if (inclusiveUpperBoundOfReadableData == null) - { - // Prevent leaving tmplink files on disk - sstable.selfRef().release(); - return null; - } - int offset = 2; - while (true) - { - RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT); - if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset()) - break; - inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++); - if (inclusiveUpperBoundOfReadableData == null) - { - sstable.selfRef().release(); - return null; - } - } - sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData); + sstable.last = getMinimalKey(boundary.lastKey); return sstable; } @@ -593,25 +574,39 @@ public class SSTableWriter extends SSTable public final IFilter bf; private FileMark mark; - IndexWriter(long keyCount) + IndexWriter(long keyCount, final SequentialWriter dataFile) { indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL); bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true); + // register listeners to be alerted when the data files are flushed + indexFile.setPostFlushListener(new Runnable() + { + public void run() + { + summary.markIndexSynced(indexFile.getLastFlushOffset()); + } + }); + dataFile.setPostFlushListener(new Runnable() + { + public void run() + { + summary.markDataSynced(dataFile.getLastFlushOffset()); + } + }); } // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file - DecoratedKey getMaxReadableKey(int offset) + IndexSummaryBuilder.ReadableBoundary getMaxReadable() { - long maxIndexLength = indexFile.getLastFlushOffset(); - return summary.getMaxReadableKey(maxIndexLength, offset); + return summary.getLastReadableBoundary(); } - public void append(DecoratedKey key, RowIndexEntry indexEntry) + public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) { bf.add(key.getKey()); - long indexPosition = indexFile.getFilePointer(); + long indexStart = indexFile.getFilePointer(); try { ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream); @@ -621,12 +616,13 @@ public class SSTableWriter extends SSTable { throw new FSWriteError(e, indexFile.getPath()); } + long indexEnd = indexFile.getFilePointer(); if (logger.isTraceEnabled()) - logger.trace("wrote index entry: " + indexEntry + " at " + indexPosition); + logger.trace("wrote index entry: " + indexEntry + " at " + indexStart); - summary.maybeAddEntry(key, indexPosition); - builder.addPotentialBoundary(indexPosition); + summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd); + builder.addPotentialBoundary(indexStart); } public void abort() http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 227c79d..40f3e9d 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -69,6 +69,8 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne public final DataOutputPlus stream; protected long lastFlushOffset; + protected Runnable runPostFlush; + public SequentialWriter(File file, int bufferSize) { try @@ -304,6 +306,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne } } + public void setPostFlushListener(Runnable runPostFlush) + { + assert this.runPostFlush == null; + this.runPostFlush = runPostFlush; + } + /** * Override this method instead of overriding flush() * @throws FSWriteError on any I/O error. @@ -319,6 +327,8 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne { throw new FSWriteError(e, getPath()); } + if (runPostFlush != null) + runPostFlush.run(); } public long getFilePointer()