Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 bedd97f7a -> 871f0039c
Fix regression in SSTableRewriter causing some rows to become unreadable during compaction patch by marcus and benedict for CASSANDRA-8429 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/871f0039 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/871f0039 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/871f0039 Branch: refs/heads/cassandra-2.1 Commit: 871f0039c5bf89be343039478c64ce835b04b5cf Parents: bedd97f Author: Benedict Elliott Smith <bened...@apache.org> Authored: Fri Dec 19 14:04:38 2014 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Fri Dec 19 14:24:47 2014 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../db/compaction/CompactionManager.java | 3 - .../io/compress/CompressedSequentialWriter.java | 12 +- .../io/compress/CompressionMetadata.java | 31 +-- .../cassandra/io/sstable/SSTableReader.java | 26 +-- .../cassandra/io/sstable/SSTableRewriter.java | 189 +++++++++---------- .../cassandra/io/sstable/SSTableWriter.java | 118 +++++++----- .../io/util/BufferedPoolingSegmentedFile.java | 9 +- .../io/util/BufferedSegmentedFile.java | 9 +- .../io/util/CompressedPoolingSegmentedFile.java | 11 +- .../io/util/CompressedSegmentedFile.java | 16 +- .../cassandra/io/util/MmappedSegmentedFile.java | 8 +- .../apache/cassandra/io/util/SegmentedFile.java | 12 +- .../io/sstable/SSTableRewriterTest.java | 71 ++++++- 14 files changed, 278 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e5a8f05..ac28d78 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.1.3 + * Fix regression in SSTableRewriter causing some rows to become unreadable + during compaction (CASSANDRA-8429) * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510) * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression is disabled (CASSANDRA-8288) http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 9f5951c..872ebed 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1035,9 +1035,6 @@ public class CompactionManager implements CompactionManagerMBean unrepairedKeyCount++; } } - // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them - // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness - // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt)); anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE)); cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION); http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index e533b1e..d3c41fa 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -27,6 +27,7 @@ import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.DataIntegrityMetadata; import org.apache.cassandra.io.util.FileMark; @@ -139,15 +140,10 @@ public class CompressedSequentialWriter extends SequentialWriter chunkOffset += compressedLength + 4; } - public CompressionMetadata openEarly() + public CompressionMetadata open(SSTableWriter.FinishType finishType) { - return metadataWriter.openEarly(originalSize, chunkOffset); - } - - public CompressionMetadata openAfterClose() - { - assert current == originalSize; - return metadataWriter.openAfterClose(current, chunkOffset); + assert finishType != SSTableWriter.FinishType.NORMAL || current == originalSize; + return metadataWriter.open(originalSize, chunkOffset, finishType); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index c922963..173722f 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -47,10 +47,10 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.Memory; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; /** @@ -217,7 +217,7 @@ public class CompressionMetadata throw new CorruptSSTableException(new EOFException(), indexFilePath); long chunkOffset = chunkOffsets.getLong(idx); - long nextChunkOffset = (idx + 8 == chunkOffsets.size()) + long nextChunkOffset = (idx + 8 == chunkOffsetsSize) ? compressedFileLength : chunkOffsets.getLong(idx + 8); @@ -319,18 +319,25 @@ public class CompressionMetadata } } - public CompressionMetadata openEarly(long dataLength, long compressedLength) + public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType finishType) { + RefCountedMemory offsets; + switch (finishType) + { + case EARLY: + offsets = this.offsets; + break; + case NORMAL: + case FINISH_EARLY: + offsets = this.offsets.copy(count * 8L); + this.offsets.unreference(); + break; + default: + throw new AssertionError(); + } return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums); } - public CompressionMetadata openAfterClose(long dataLength, long compressedLength) - { - RefCountedMemory newOffsets = offsets.copy(count * 8L); - offsets.unreference(); - return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums); - } - /** * Get a chunk offset by it's index. * @@ -360,8 +367,8 @@ public class CompressionMetadata out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath))); assert chunks == count; writeHeader(out, dataLength, chunks); - for (int i = 0 ; i < count ; i++) - out.writeLong(offsets.getLong(i * 8)); + for (int i = 0 ; i < count ; i++) + out.writeLong(offsets.getLong(i * 8)); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index bd20226..217a109 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -201,7 +201,6 @@ public class SSTableReader extends SSTable private Object replaceLock = new Object(); private SSTableReader replacedBy; private SSTableReader replaces; - private SSTableReader sharesBfWith; private SSTableDeletingTask deletingTask; private Runnable runOnClose; @@ -575,7 +574,7 @@ public class SSTableReader extends SSTable synchronized (replaceLock) { - boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false; + boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get(); if (replacedBy != null) { @@ -594,19 +593,11 @@ public class SSTableReader extends SSTable deleteFiles &= !dfile.path.equals(replaces.dfile.path); } - if (sharesBfWith != null) - { - closeBf &= sharesBfWith.bf != bf; - closeSummary &= sharesBfWith.indexSummary != indexSummary; - closeFiles &= sharesBfWith.dfile != dfile; - deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path); - } - boolean deleteAll = false; if (release && isCompacted.get()) { assert replacedBy == null; - if (replaces != null) + if (replaces != null && !deleteFiles) { replaces.replacedBy = null; replaces.deletingTask = deletingTask; @@ -936,19 +927,6 @@ public class SSTableReader extends SSTable } } - /** - * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter - * - * note that the reason we don't use replacedBy is that we are not yet actually replaced - * - * @param newReader - */ - public void sharesBfWith(SSTableReader newReader) - { - assert openReason.equals(OpenReason.EARLY); - this.sharesBfWith = newReader; - } - public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose) { synchronized (replaceLock) http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index cd9435d..43ac4b6 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -17,18 +17,11 @@ */ package org.apache.cassandra.io.sstable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.google.common.collect.Lists; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; @@ -38,7 +31,6 @@ import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; /** * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb @@ -84,8 +76,10 @@ public class SSTableRewriter private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at - private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables - private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>(); + private final Queue<Finished> finishedEarly = new ArrayDeque<>(); + // as writers are closed from finishedEarly, their last readers are moved + // into discard, so that abort can cleanup after us safely + private final List<SSTableReader> discard = new ArrayList<>(); private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker) private SSTableWriter writer; @@ -183,42 +177,30 @@ public class SSTableRewriter public void abort() { switchWriter(null); - moveStarts(null, Functions.forMap(originalStarts), true); - List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly); - - for (Pair<SSTableWriter, SSTableReader> w : finishedWriters) - { - // we should close the bloom filter if we have not opened an sstable reader from this - // writer (it will get closed when we release the sstable reference below): - w.left.abort(w.right == null); - if (isOffline && w.right != null) - { - // the pairs get removed from finishedWriters when they are closedAndOpened in finish(), the ones left need to be removed here: - w.right.markObsolete(); - w.right.releaseReference(); - } - } - - // also remove already completed SSTables - for (SSTableReader sstable : close) - sstable.markObsolete(); - + // remove already completed SSTables for (SSTableReader sstable : finished) { sstable.markObsolete(); sstable.releaseReference(); } - // releases reference in replaceReaders - if (!isOffline) + // abort the writers + for (Finished finished : finishedEarly) { - dataTracker.replaceEarlyOpenedFiles(close, Collections.<SSTableReader>emptyList()); - dataTracker.unmarkCompacting(close); + boolean opened = finished.reader != null; + finished.writer.abort(!opened); + if (opened) + { + // if we've already been opened, add ourselves to the discard pile + discard.add(finished.reader); + finished.reader.markObsolete(); + } } - } + replaceWithFinishedReaders(Collections.<SSTableReader>emptyList()); + } /** * Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer @@ -274,8 +256,6 @@ public class SSTableRewriter rewriting.addAll(replaceWith); } - - private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith) { if (isOffline) @@ -301,15 +281,19 @@ public class SSTableRewriter writer = newWriter; return; } - // we leave it as a tmp file, but we open it early and add it to the dataTracker - SSTableReader reader = writer.openEarly(maxAge); - if (reader != null) + + // we leave it as a tmp file, but we open it and add it to the dataTracker + if (writer.getFilePointer() != 0) { - finishedOpenedEarly.add(reader); + SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1); replaceEarlyOpenedFile(currentlyOpenedEarly, reader); moveStarts(reader, Functions.constant(reader.last), false); + finishedEarly.add(new Finished(writer, reader)); + } + else + { + writer.abort(); } - finishedWriters.add(Pair.create(writer, reader)); currentlyOpenedEarly = null; currentlyOpenedEarlyAt = 0; writer = newWriter; @@ -337,85 +321,82 @@ public class SSTableRewriter */ public List<SSTableReader> finish(long repairedAt) { - List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>(); + return finishAndMaybeThrow(repairedAt, false, false); + } + + @VisibleForTesting + void finishAndThrow(boolean throwEarly) + { + finishAndMaybeThrow(-1, throwEarly, !throwEarly); + } + + private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate) + { + List<SSTableReader> newReaders = new ArrayList<>(); switchWriter(null); - // make real sstables of the written ones: - Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator(); - while(it.hasNext()) + + if (throwEarly) + throw new RuntimeException("exception thrown early in finish, for testing"); + + while (!finishedEarly.isEmpty()) { - Pair<SSTableWriter, SSTableReader> w = it.next(); - if (w.left.getFilePointer() > 0) + Finished f = finishedEarly.poll(); + if (f.writer.getFilePointer() > 0) { - SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt); - finished.add(newReader); + if (f.reader != null) + discard.add(f.reader); - if (w.right != null) - { - w.right.sharesBfWith(newReader); - if (isOffline) - { - // remove the tmplink files if we are offline - no one is using them - w.right.markObsolete(); - w.right.releaseReference(); - } - } - // w.right is the tmplink-reader we added when switching writer, replace with the real sstable. - toReplace.add(Pair.create(w.right, newReader)); + SSTableReader newReader = f.writer.finish(SSTableWriter.FinishType.FINISH_EARLY, maxAge, repairedAt); + + if (f.reader != null) + f.reader.setReplacedBy(newReader); + + finished.add(newReader); + newReaders.add(newReader); } else { - assert w.right == null; - w.left.abort(true); + f.writer.abort(true); + assert f.reader == null; } - it.remove(); } - if (!isOffline) - { - for (Pair<SSTableReader, SSTableReader> replace : toReplace) - replaceEarlyOpenedFile(replace.left, replace.right); - dataTracker.unmarkCompacting(finished); - } + if (throwLate) + throw new RuntimeException("exception thrown after all sstables finished, for testing"); + + replaceWithFinishedReaders(newReaders); return finished; } - @VisibleForTesting - void finishAndThrow(boolean early) + // cleanup all our temporary readers and swap in our new ones + private void replaceWithFinishedReaders(List<SSTableReader> finished) { - List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>(); - switchWriter(null); - if (early) - throw new RuntimeException("exception thrown early in finish"); - // make real sstables of the written ones: - Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator(); - while(it.hasNext()) + if (isOffline) { - Pair<SSTableWriter, SSTableReader> w = it.next(); - if (w.left.getFilePointer() > 0) - { - SSTableReader newReader = w.left.closeAndOpenReader(maxAge); - finished.add(newReader); - - if (w.right != null) - { - w.right.sharesBfWith(newReader); - if (isOffline) - { - w.right.markObsolete(); - w.right.releaseReference(); - } - } - // w.right is the tmplink-reader we added when switching writer, replace with the real sstable. - toReplace.add(Pair.create(w.right, newReader)); - } - else + for (SSTableReader reader : discard) { - assert w.right == null; - w.left.abort(true); + if (reader.getCurrentReplacement() == null) + reader.markObsolete(); + reader.releaseReference(); } - it.remove(); } + else + { + dataTracker.replaceEarlyOpenedFiles(discard, finished); + dataTracker.unmarkCompacting(discard); + } + discard.clear(); + } - throw new RuntimeException("exception thrown after all sstables finished"); + private static final class Finished + { + final SSTableWriter writer; + final SSTableReader reader; + + private Finished(SSTableWriter writer, SSTableReader reader) + { + this.writer = writer; + this.reader = reader; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index ec64561..595012d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -380,6 +380,18 @@ public class SSTableWriter extends SSTable last = lastWrittenKey = getMinimalKey(last); } + private Descriptor makeTmpLinks() + { + // create temp links if they don't already exist + Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK); + if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists()) + { + FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX))); + FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA))); + } + return link; + } + public SSTableReader openEarly(long maxDataAge) { StatsMetadata sstableMetadata = (StatsMetadata) sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), @@ -391,17 +403,10 @@ public class SSTableWriter extends SSTable if (exclusiveUpperBoundOfReadableIndex == null) return null; - // create temp links if they don't already exist - Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK); - if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists()) - { - FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX))); - FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA))); - } - + Descriptor link = makeTmpLinks(); // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers - SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX)); - SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA)); + SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY); + SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY); SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL), components, metadata, partitioner, ifile, @@ -435,6 +440,19 @@ public class SSTableWriter extends SSTable return sstable; } + public static enum FinishType + { + NORMAL(SSTableReader.OpenReason.NORMAL), + EARLY(SSTableReader.OpenReason.EARLY), // no renaming + FINISH_EARLY(SSTableReader.OpenReason.NORMAL); // tidy up an EARLY finish + final SSTableReader.OpenReason openReason; + + FinishType(SSTableReader.OpenReason openReason) + { + this.openReason = openReason; + } + } + public SSTableReader closeAndOpenReader() { return closeAndOpenReader(System.currentTimeMillis()); @@ -442,68 +460,84 @@ public class SSTableWriter extends SSTable public SSTableReader closeAndOpenReader(long maxDataAge) { - return closeAndOpenReader(maxDataAge, this.repairedAt); + return finish(FinishType.NORMAL, maxDataAge, this.repairedAt); } - public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt) + public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt) { - Pair<Descriptor, StatsMetadata> p = close(repairedAt); - Descriptor newdesc = p.left; - StatsMetadata sstableMetadata = p.right; + Pair<Descriptor, StatsMetadata> p; + + p = close(finishType, repairedAt); + Descriptor desc = p.left; + StatsMetadata metadata = p.right; + + if (finishType == FinishType.EARLY) + desc = makeTmpLinks(); // finalize in-memory state for the reader - SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX)); - SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA)); - SSTableReader sstable = SSTableReader.internalOpen(newdesc, + SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType); + SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType); + SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL), components, - metadata, + this.metadata, partitioner, ifile, dfile, iwriter.summary.build(partitioner), iwriter.bf, maxDataAge, - sstableMetadata, - SSTableReader.OpenReason.NORMAL); + metadata, + finishType.openReason); sstable.first = getMinimalKey(first); sstable.last = getMinimalKey(last); - // try to save the summaries to disk - sstable.saveSummary(iwriter.builder, dbuilder); - iwriter = null; - dbuilder = null; + + switch (finishType) + { + case NORMAL: case FINISH_EARLY: + // try to save the summaries to disk + sstable.saveSummary(iwriter.builder, dbuilder); + iwriter = null; + dbuilder = null; + } return sstable; } // Close the writer and return the descriptor to the new sstable and it's metadata public Pair<Descriptor, StatsMetadata> close() { - return close(this.repairedAt); + return close(FinishType.NORMAL, this.repairedAt); } - private Pair<Descriptor, StatsMetadata> close(long repairedAt) + private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt) { + switch (type) + { + case EARLY: case NORMAL: + iwriter.close(); + dataFile.close(); + } - // index and filter - iwriter.close(); - // main data, close will truncate if necessary - dataFile.close(); - dataFile.writeFullChecksum(descriptor); // write sstable statistics - Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata( - partitioner.getClass().getCanonicalName(), - metadata.getBloomFilterFpChance(), - repairedAt); - writeMetadata(descriptor, metadataComponents); - - // save the table of components - SSTable.appendTOC(descriptor, components); + Map<MetadataType, MetadataComponent> metadataComponents ; + metadataComponents = sstableMetadataCollector + .finalizeMetadata(partitioner.getClass().getCanonicalName(), + metadata.getBloomFilterFpChance(),repairedAt); // remove the 'tmp' marker from all components - return Pair.create(rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS)); + Descriptor descriptor = this.descriptor; + switch (type) + { + case NORMAL: case FINISH_EARLY: + dataFile.writeFullChecksum(descriptor); + writeMetadata(descriptor, metadataComponents); + // save the table of components + SSTable.appendTOC(descriptor, components); + descriptor = rename(descriptor, components); + } + return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS)); } - private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components) { SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java index b284f61..57f465f 100644 --- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java @@ -19,6 +19,8 @@ package org.apache.cassandra.io.util; import java.io.File; +import org.apache.cassandra.io.sstable.SSTableWriter; + public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile { public BufferedPoolingSegmentedFile(String path, long length) @@ -33,16 +35,11 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile // only one segment in a standard-io file } - public SegmentedFile complete(String path) + public SegmentedFile complete(String path, SSTableWriter.FinishType finishType) { long length = new File(path).length(); return new BufferedPoolingSegmentedFile(path, length); } - - public SegmentedFile openEarly(String path) - { - return complete(path); - } } protected RandomAccessReader createReader(String path) http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java index aa031e3..2f715da 100644 --- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java @@ -19,6 +19,8 @@ package org.apache.cassandra.io.util; import java.io.File; +import org.apache.cassandra.io.sstable.SSTableWriter; + public class BufferedSegmentedFile extends SegmentedFile { public BufferedSegmentedFile(String path, long length) @@ -33,16 +35,11 @@ public class BufferedSegmentedFile extends SegmentedFile // only one segment in a standard-io file } - public SegmentedFile complete(String path) + public SegmentedFile complete(String path, SSTableWriter.FinishType finishType) { long length = new File(path).length(); return new BufferedSegmentedFile(path, length); } - - public SegmentedFile openEarly(String path) - { - return complete(path); - } } public FileDataInput getSegment(long position) http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java index 1803e69..11d091a 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java @@ -20,6 +20,7 @@ package org.apache.cassandra.io.util; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.SSTableWriter; public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile { @@ -43,17 +44,11 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme // only one segment in a standard-io file } - public SegmentedFile complete(String path) + public SegmentedFile complete(String path, SSTableWriter.FinishType finishType) { - return new CompressedPoolingSegmentedFile(path, metadata(path, false)); - } - - public SegmentedFile openEarly(String path) - { - return new CompressedPoolingSegmentedFile(path, metadata(path, true)); + return new CompressedPoolingSegmentedFile(path, metadata(path, finishType)); } } - protected RandomAccessReader createReader(String path) { return CompressedRandomAccessReader.open(path, metadata, this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java index 4afe0a0..b788715 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java @@ -20,6 +20,7 @@ package org.apache.cassandra.io.util; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.SSTableWriter; public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile { @@ -44,24 +45,17 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse // only one segment in a standard-io file } - protected CompressionMetadata metadata(String path, boolean early) + protected CompressionMetadata metadata(String path, SSTableWriter.FinishType finishType) { if (writer == null) return CompressionMetadata.create(path); - else if (early) - return writer.openEarly(); - else - return writer.openAfterClose(); - } - public SegmentedFile complete(String path) - { - return new CompressedSegmentedFile(path, metadata(path, false)); + return writer.open(finishType); } - public SegmentedFile openEarly(String path) + public SegmentedFile complete(String path, SSTableWriter.FinishType finishType) { - return new CompressedSegmentedFile(path, metadata(path, true)); + return new CompressedSegmentedFile(path, metadata(path, finishType)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java index ccc03fc..3b2cc98 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.utils.JVMStabilityInspector; public class MmappedSegmentedFile extends SegmentedFile @@ -160,18 +161,13 @@ public class MmappedSegmentedFile extends SegmentedFile } } - public SegmentedFile complete(String path) + public SegmentedFile complete(String path, SSTableWriter.FinishType finishType) { long length = new File(path).length(); // create the segments return new MmappedSegmentedFile(path, length, createSegments(path)); } - public SegmentedFile openEarly(String path) - { - return complete(path); - } - private Segment[] createSegments(String path) { RandomAccessFile raf; http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index be549a6..badae56 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -29,6 +29,7 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.utils.Pair; /** @@ -115,13 +116,12 @@ public abstract class SegmentedFile * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk. * @param path The file on disk. */ - public abstract SegmentedFile complete(String path); + public abstract SegmentedFile complete(String path, SSTableWriter.FinishType openType); - /** - * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk. - * @param path The file on disk. - */ - public abstract SegmentedFile openEarly(String path); + public SegmentedFile complete(String path) + { + return complete(path, SSTableWriter.FinishType.NORMAL); + } public void serializeBounds(DataOutput out) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 6f9acea..392936d 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -483,7 +483,6 @@ public class SSTableRewriterTest extends SchemaLoader cfs.disableAutoCompaction(); SSTableReader s = writeFile(cfs, 400); - DecoratedKey origFirst = s.first; cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); SSTableRewriter.overrideOpenInterval(1000000); @@ -499,8 +498,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000) { - assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ... - assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same + assertEquals(files, cfs.getSSTables().size()); // all files are now opened early rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); files++; } @@ -616,6 +614,73 @@ public class SSTableRewriterTest extends SchemaLoader } + @Test + public void testAllKeysReadable() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + for (int i = 0; i < 100; i++) + { + DecoratedKey key = Util.dk(Integer.toString(i)); + Mutation rm = new Mutation(KEYSPACE, key.getKey()); + for (int j = 0; j < 10; j++) + rm.add(CF, Util.cellname(Integer.toString(j)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100); + rm.apply(); + } + cfs.forceBlockingFlush(); + cfs.forceMajorCompaction(); + validateKeys(keyspace); + + assertEquals(1, cfs.getSSTables().size()); + SSTableReader s = cfs.getSSTables().iterator().next(); + Set<SSTableReader> compacting = new HashSet<>(); + compacting.add(s); + cfs.getDataTracker().markCompacting(compacting); + + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); + SSTableRewriter.overrideOpenInterval(1); + SSTableWriter w = getWriter(cfs, s.descriptor.directory); + rewriter.switchWriter(w); + int keyCount = 0; + try (ICompactionScanner scanner = compacting.iterator().next().getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0)) + { + while (scanner.hasNext()) + { + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (keyCount % 10 == 0) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + } + keyCount++; + validateKeys(keyspace); + } + try + { + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(), OperationType.COMPACTION); + cfs.getDataTracker().unmarkCompacting(compacting); + } + catch (Throwable t) + { + rewriter.abort(); + } + } + validateKeys(keyspace); + Thread.sleep(1000); + validateCFS(cfs); + } + + private void validateKeys(Keyspace ks) + { + for (int i = 0; i < 100; i++) + { + DecoratedKey key = Util.dk(Integer.toString(i)); + ColumnFamily cf = Util.getColumnFamily(ks, key, CF); + assertTrue(cf != null); + } + } + private SSTableReader writeFile(ColumnFamilyStore cfs, int count) { ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);