Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/compaction/Scrubber.java src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c94ef20 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c94ef20 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c94ef20 Branch: refs/heads/trunk Commit: 4c94ef20d3562ab8f0a922945d78464d6c475d98 Parents: 4de943f 452d6a4 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Tue Jul 7 16:27:58 2015 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Tue Jul 7 16:27:58 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 5 ++ .../cassandra/db/compaction/Scrubber.java | 37 +++++++++++--- .../cassandra/io/sstable/SSTableReader.java | 52 ++++++++++++++------ .../cassandra/tools/StandaloneScrubber.java | 2 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 25 ++++++++++ 5 files changed, 97 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 95dc8be,bd1db92..2cbc7c4 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,18 -1,8 +1,23 @@@ -2.0.18 -* Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591) - - -2.0.17 ++2.1.9 ++Merged from 2.0: ++ * Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591) ++ ++ +2.1.8 + * (cqlsh) Fix bad check for CQL compatibility when DESCRIBE'ing + COMPACT STORAGE tables with no clustering columns + * Warn when an extra-large partition is compacted (CASSANDRA-9643) + * Eliminate strong self-reference chains in sstable ref tidiers (CASSANDRA-9656) + * Ensure StreamSession uses canonical sstable reader instances (CASSANDRA-9700) + * Ensure memtable book keeping is not corrupted in the event we shrink usage (CASSANDRA-9681) + * Update internal python driver for cqlsh (CASSANDRA-9064) + * Fix IndexOutOfBoundsException when inserting tuple with too many + elements using the string literal notation (CASSANDRA-9559) + * Allow JMX over SSL directly from nodetool (CASSANDRA-9090) + * Fix incorrect result for IN queries where column not found (CASSANDRA-9540) + * Enable describe on indices (CASSANDRA-7814) + * ColumnFamilyStore.selectAndReference may block during compaction (CASSANDRA-9637) +Merged from 2.0: * Avoid NPE in AuthSuccess#decode (CASSANDRA-9727) * Add listen_address to system.local (CASSANDRA-9603) * Bug fixes to resultset metadata construction (CASSANDRA-9636) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index ce98a13,dc60efa..b1c12e0 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -100,8 -95,17 +100,18 @@@ public class Scrubber implements Closea this.controller = isOffline ? new ScrubController(cfs) : new CompactionController(cfs, Collections.singleton(sstable), CompactionManager.getDefaultGcBefore(cfs)); - this.isCommutative = cfs.metadata.getDefaultValidator().isCommutative(); + this.isCommutative = cfs.metadata.isCounter(); - this.expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub))); + + boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists(); + if (!hasIndexFile) + { + // if there's any corruption in the -Data.db then rows can't be skipped over. but it's worth a shot. + outputHandler.warn("Missing component: " + sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)); + } + - this.expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(), - hasIndexFile ? (int)(SSTableReader.getApproximateKeyCount(toScrub,cfs.metadata)) : 0); ++ this.expectedBloomFilterSize = Math.max( ++ cfs.metadata.getMinIndexInterval(), ++ hasIndexFile ? (int)(SSTableReader.getApproximateKeyCount(toScrub)) : 0); // loop through each row, deserializing to check for damage. // we'll also loop through the index at the same time, using the position from the index to recover if the @@@ -120,14 -128,13 +134,15 @@@ public void scrub() { outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length())); + Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable); + SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline); try { - nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); + nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null; + if (indexAvailable()) { // throw away variable so we don't have a side effect in the assert - long firstRowPositionFromIndex = RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position; + long firstRowPositionFromIndex = sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position; assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex; } @@@ -167,13 -179,25 +182,13 @@@ dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex; } - if (!sstable.descriptor.version.hasRowSizeAndColumnCount) - { - dataSize = dataSizeFromIndex; - // avoid an NPE if key is null - String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.key); - outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize)); - } - else - { - if (currentIndexKey != null) - outputHandler.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex)); - } + dataSize = dataSizeFromIndex; + // avoid an NPE if key is null + String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.getKey()); + outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize)); - assert currentIndexKey != null || indexFile.isEOF(); + assert currentIndexKey != null || !indexAvailable(); - writer.mark(); try { if (key == null) @@@ -188,11 -212,11 +203,11 @@@ if (dataSize > dataFile.length()) throw new IOError(new IOException("Impossible row size (greater than file length): " + dataSize)); - if (dataStart != dataStartFromIndex) + if (indexFile != null && dataStart != dataStartFromIndex) outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataStartFromIndex)); - if (dataSize != dataSizeFromIndex) + if (indexFile != null && dataSize != dataSizeFromIndex) - outputHandler.warn(String.format("Data file row size %d differs from index file row size %d", dataSize, dataSizeFromIndex)); + outputHandler.warn(String.format("Data file row size %d different from index file row size %d", dataSize, dataSizeFromIndex)); SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, validateColumns); if (prevKey != null && prevKey.compareTo(key) > 0) @@@ -312,10 -331,11 +327,11 @@@ currentRowPositionFromIndex = nextRowPositionFromIndex; try { - nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); - nextRowPositionFromIndex = indexFile.isEOF() + nextIndexKey = !indexAvailable() ? null : ByteBufferUtil.readWithShortLength(indexFile); + + nextRowPositionFromIndex = !indexAvailable() ? dataFile.length() - : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position; + : sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position; } catch (Throwable th) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 7551d46,8919a09..6879834 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@@ -448,27 -190,9 +448,27 @@@ public class SSTableReader extends SSTa IPartitioner partitioner, boolean validate) throws IOException { - long start = System.nanoTime(); - SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner, validate); + // Minimum components without which we can't do anything + assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; - assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; ++ assert !validate || components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; + + Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, - EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS)); ++ EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS)); + ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); + StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS); + + // Check if sstable is created using same partitioner. + // Partitioner can be null, which indicates older version of sstable or no stats available. + // In that case, we skip the check. + String partitionerName = partitioner.getClass().getCanonicalName(); + if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) + { + logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", + descriptor, validationMetadata.partitioner, partitionerName)); + System.exit(1); + } + logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); SSTableReader sstable = new SSTableReader(descriptor, components, metadata, @@@ -603,48 -374,35 +603,43 @@@ this.dfile = dfile; this.indexSummary = indexSummary; this.bf = bloomFilter; + this.setup(false); } - /** - * Clean up all opened resources. - * - * @throws IOException - */ - public void close() throws IOException + public static long getTotalBytes(Iterable<SSTableReader> sstables) { - if (readMeterSyncFuture != null) - readMeterSyncFuture.cancel(false); + long sum = 0; + for (SSTableReader sstable : sstables) + sum += sstable.onDiskLength(); + return sum; + } - // Force finalizing mmapping if necessary + public static long getTotalUncompressedBytes(Iterable<SSTableReader> sstables) + { + long sum = 0; + for (SSTableReader sstable : sstables) + sum += sstable.uncompressedLength(); - if (null != ifile) - ifile.cleanup(); + return sum; + } + + public boolean equals(Object that) + { + return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor); + } - dfile.cleanup(); - // close the BF so it can be opened later. - if (null != bf) - bf.close(); + public int hashCode() + { + return this.descriptor.hashCode(); + } - if (null != indexSummary) - indexSummary.close(); + public String getFilename() + { + return dfile.path; } - public String getIndexFilename() - { - return ifile.path; - } - - public void setTrackedBy(DataTracker tracker) + public void setupKeyCache() { - deletingTask.setTracker(tracker); // under normal operation we can do this at any time, but SSTR is also used outside C* proper, // e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache // here when we know we're being wired into the rest of the server infrastructure. @@@ -659,7 -417,13 +654,13 @@@ load(false, true); bf = FilterFactory.AlwaysPresent; } + else if (!components.contains(Component.PRIMARY_INDEX)) + { + // avoid any reading of the missing primary index component. - // this should only happen during StandaloneScrubber ++ // this should only happen for standalone tools + load(false, false); + } - else if (!components.contains(Component.FILTER)) + else if (!components.contains(Component.FILTER) || validation == null) { // bf is enabled, but filter component is missing. load(true, true); @@@ -708,418 -467,26 +709,427 @@@ ? SegmentedFile.getCompressedBuilder() : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata); + boolean summaryLoaded = loadSummary(ibuilder, dbuilder); + boolean builtSummary = false; if (recreateBloomFilter || !summaryLoaded) - buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded); + { + buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL); + builtSummary = true; + } - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + if (components.contains(Component.PRIMARY_INDEX)) + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk - saveSummary(this, ibuilder, dbuilder); + + // Check for an index summary that was downsampled even though the serialization format doesn't support + // that. If it was downsampled, rebuild it. See CASSANDRA-8993 for details. - if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel()) ++ if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel() && ifile != null) + { + indexSummary.close(); + ifile.close(); + dfile.close(); + + logger.info("Detected erroneously downsampled index summary; will rebuild summary at full sampling"); + FileUtils.deleteWithConfirm(new File(descriptor.filenameFor(Component.SUMMARY))); + ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL); + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + saveSummary(ibuilder, dbuilder); + } + else if (saveSummaryIfCreated && builtSummary) + { + saveSummary(ibuilder, dbuilder); + } } - private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException - { + /** + * Build index summary(and optionally bloom filter) by reading through Index.db file. + * + * @param recreateBloomFilter true if recreate bloom filter + * @param ibuilder + * @param dbuilder + * @param summaryLoaded true if index summary is already loaded and not need to build again + * @throws IOException + */ + private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException + { + if (!components.contains(Component.PRIMARY_INDEX)) + return; + // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); - + + try + { + long indexSize = primaryIndex.length(); + long histogramCount = sstableMetadata.estimatedRowSize.count(); + long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed() + ? histogramCount + : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional + + try(IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel)) + { + + if (recreateBloomFilter) + bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true); + + long indexPosition; + while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) + { + ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); + RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex, descriptor.version); + DecoratedKey decoratedKey = partitioner.decorateKey(key); + if (first == null) + first = decoratedKey; + last = decoratedKey; + + if (recreateBloomFilter) + bf.add(decoratedKey.getKey()); + + // if summary was already read from disk we don't want to re-populate it using primary index + if (!summaryLoaded) + { + summaryBuilder.maybeAddEntry(decoratedKey, indexPosition); + ibuilder.addPotentialBoundary(indexPosition); + dbuilder.addPotentialBoundary(indexEntry.position); + } + } + + if (!summaryLoaded) + indexSummary = summaryBuilder.build(partitioner); + } + } + finally + { + FileUtils.closeQuietly(primaryIndex); + } + + first = getMinimalKey(first); + last = getMinimalKey(last); + } + + /** + * Load index summary from Summary.db file if it exists. + * + * if loaded index summary has different index interval from current value stored in schema, + * then Summary.db file will be deleted and this returns false to rebuild summary. + * + * @param ibuilder + * @param dbuilder + * @return true if index summary is loaded successfully from Summary.db file. + */ + public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY)); + if (!summariesFile.exists()) + return false; + + DataInputStream iStream = null; + try + { + iStream = new DataInputStream(new FileInputStream(summariesFile)); + indexSummary = IndexSummary.serializer.deserialize( + iStream, partitioner, descriptor.version.hasSamplingLevel, + metadata.getMinIndexInterval(), metadata.getMaxIndexInterval()); + first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); + last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); + ibuilder.deserializeBounds(iStream); + dbuilder.deserializeBounds(iStream); + } + catch (IOException e) + { + if (indexSummary != null) + indexSummary.close(); + logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage()); + // corrupted; delete it and fall back to creating a new summary + FileUtils.closeQuietly(iStream); + // delete it and fall back to creating a new summary + FileUtils.deleteWithConfirm(summariesFile); + return false; + } + finally + { + FileUtils.closeQuietly(iStream); + } + + return true; + } + + /** + * Validates that an index summary has full sampling, as expected when the serialization format does not support + * persisting the sampling level. + * @return true if the summary has full sampling, false otherwise + */ + private boolean validateSummarySamplingLevel() + { + // We need to check index summary entries against the index to verify that none of them were dropped due to + // downsampling. Downsampling can drop any of the first BASE_SAMPLING_LEVEL entries (repeating that drop pattern + // for the remainder of the summary). Unfortunately, the first entry to be dropped is the entry at + // index (BASE_SAMPLING_LEVEL - 1), so we need to check a full set of BASE_SAMPLING_LEVEL entries. ++ if (ifile == null) ++ return false; ++ + Iterator<FileDataInput> segments = ifile.iterator(0); + int i = 0; + int summaryEntriesChecked = 0; + int expectedIndexInterval = getMinIndexInterval(); + while (segments.hasNext()) + { + FileDataInput in = segments.next(); + try + { + while (!in.isEOF()) + { + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + if (i % expectedIndexInterval == 0) + { + ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval)); + if (!summaryKey.equals(indexKey)) + return false; + summaryEntriesChecked++; + + if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL) + return true; + } + RowIndexEntry.Serializer.skip(in); + i++; + } + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, in.getPath()); + } + finally + { + FileUtils.closeQuietly(in); + } + } + + return true; + } + + /** + * Save index summary to Summary.db file. + * + * @param ibuilder + * @param dbuilder + */ + public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + saveSummary(ibuilder, dbuilder, indexSummary); + } + + private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary) + { + File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY)); + if (summariesFile.exists()) + FileUtils.deleteWithConfirm(summariesFile); + + DataOutputStreamAndChannel oStream = null; + try + { + oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile)); + IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel); + ByteBufferUtil.writeWithLength(first.getKey(), oStream); + ByteBufferUtil.writeWithLength(last.getKey(), oStream); + ibuilder.serializeBounds(oStream); + dbuilder.serializeBounds(oStream); + } + catch (IOException e) + { + logger.debug("Cannot save SSTable Summary: ", e); + + // corrupted hence delete it and let it load it now. + if (summariesFile.exists()) + FileUtils.deleteWithConfirm(summariesFile); + } + finally + { + FileUtils.closeQuietly(oStream); + } + } + + public void setReplacedBy(SSTableReader replacement) + { + synchronized (tidy.global) + { + assert replacement != null; + assert !tidy.isReplaced; + tidy.isReplaced = true; + } + } + + public boolean isReplaced() + { + return tidy.isReplaced; + } + + /** + * Clone this reader with the provided start and open reason, and set the clone as replacement. + * + * @param newFirst the first key for the replacement (which can be different from the original due to the pre-emptive + * opening of compaction results). + * @param reason the {@code OpenReason} for the replacement. + * + * @return the cloned reader. That reader is set as a replacement by the method. + */ + private SSTableReader cloneAndReplace(DecoratedKey newFirst, OpenReason reason) + { + return cloneAndReplace(newFirst, reason, indexSummary.sharedCopy()); + } + + /** + * Clone this reader with the new values and set the clone as replacement. + * + * @param newFirst the first key for the replacement (which can be different from the original due to the pre-emptive + * opening of compaction results). + * @param reason the {@code OpenReason} for the replacement. + * @param newSummary the index summary for the replacement. + * + * @return the cloned reader. That reader is set as a replacement by the method. + */ + private SSTableReader cloneAndReplace(DecoratedKey newFirst, OpenReason reason, IndexSummary newSummary) + { + SSTableReader replacement = internalOpen(descriptor, + components, + metadata, + partitioner, - ifile.sharedCopy(), ++ ifile != null ? ifile.sharedCopy() : null, + dfile.sharedCopy(), + newSummary, + bf.sharedCopy(), + maxDataAge, + sstableMetadata, + reason); + replacement.first = newFirst; + replacement.last = last; + replacement.isSuspect.set(isSuspect.get()); + setReplacedBy(replacement); + return replacement; + } + + // runOnClose must NOT be an anonymous or non-static inner class, nor must it retain a reference chain to this reader + public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose) + { + synchronized (tidy.global) + { + assert openReason != OpenReason.EARLY; + // TODO: merge with caller's firstKeyBeyond() work,to save time + if (newStart.compareTo(first) > 0) + { + final long dataStart = getPosition(newStart, Operator.EQ).position; + final long indexStart = getIndexScanPosition(newStart); + this.tidy.runOnClose = new DropPageCache(dfile, dataStart, ifile, indexStart, runOnClose); + } + + return cloneAndReplace(newStart, OpenReason.MOVED_START); + } + } + + // runOnClose must NOT be an anonymous or non-static inner class, nor must it retain a reference chain to this reader + public SSTableReader cloneAsShadowed(final Runnable runOnClose) + { + synchronized (tidy.global) + { + assert openReason != OpenReason.EARLY; + this.tidy.runOnClose = new DropPageCache(dfile, 0, ifile, 0, runOnClose); + return cloneAndReplace(first, OpenReason.SHADOWED); + } + } + + private static class DropPageCache implements Runnable + { + final SegmentedFile dfile; + final long dfilePosition; + final SegmentedFile ifile; + final long ifilePosition; + final Runnable andThen; + + private DropPageCache(SegmentedFile dfile, long dfilePosition, SegmentedFile ifile, long ifilePosition, Runnable andThen) + { + this.dfile = dfile; + this.dfilePosition = dfilePosition; + this.ifile = ifile; + this.ifilePosition = ifilePosition; + this.andThen = andThen; + } + + public void run() + { + dfile.dropPageCache(dfilePosition); - ifile.dropPageCache(ifilePosition); ++ if (ifile != null) ++ ifile.dropPageCache(ifilePosition); + andThen.run(); + } + } + + /** + * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will + * be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have + * its DeletingTask removed, and have its periodic read-meter sync task cancelled. + * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader + * @return a new SSTableReader + * @throws IOException + */ + public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException + { + assert descriptor.version.hasSamplingLevel; + + synchronized (tidy.global) + { + assert openReason != OpenReason.EARLY; + + int minIndexInterval = metadata.getMinIndexInterval(); + int maxIndexInterval = metadata.getMaxIndexInterval(); + double effectiveInterval = indexSummary.getEffectiveIndexInterval(); + + IndexSummary newSummary; + long oldSize = bytesOnDisk(); + + // We have to rebuild the summary from the on-disk primary index in three cases: + // 1. The sampling level went up, so we need to read more entries off disk + // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary + // at full sampling (and consequently at any other sampling level) + // 3. The max_index_interval was lowered, forcing us to raise the sampling level + if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval) + { + newSummary = buildSummaryAtLevel(samplingLevel); + } + else if (samplingLevel < indexSummary.getSamplingLevel()) + { + // we can use the existing index summary to make a smaller one + newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner); + + SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + SegmentedFile.Builder dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + saveSummary(ibuilder, dbuilder, newSummary); + } + else + { + throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " + + "no adjustments to min/max_index_interval"); + } + + long newSize = bytesOnDisk(); + StorageMetrics.load.inc(newSize - oldSize); + parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize); + + return cloneAndReplace(first, OpenReason.METADATA_CHANGE, newSummary); + } + } + + private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException + { + // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. + RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); try { long indexSize = primaryIndex.length(); @@@ -1566,16 -974,22 +1576,19 @@@ { if (op == Operator.EQ && updateCacheAndStats) bloomFilterTracker.addFalsePositive(); - // we matched the -1th position: if the operator might match forward, we'll start at the first - // position. We however need to return the correct index entry for that first position. - if (op.apply(1) >= 0) - { - sampledPosition = 0; - } - else - { - Tracing.trace("Partition summary allows skipping sstable {}", descriptor.generation); - return null; - } + Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation); + return null; } + int binarySearchResult = indexSummary.binarySearch(key); + long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary); + int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult); + + int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex); + + if (ifile == null) + return null; + // scan the on-disk index, starting at the nearest sampled position. // The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present // (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the @@@ -1670,11 -1084,13 +1683,14 @@@ */ public DecoratedKey firstKeyBeyond(RowPosition token) { + if (token.compareTo(first) < 0) + return first; + long sampledPosition = getIndexScanPosition(token); - if (sampledPosition == -1) - sampledPosition = 0; + if (ifile == null) + return null; + Iterator<FileDataInput> segments = ifile.iterator(sampledPosition); while (segments.hasNext()) { @@@ -1948,8 -1391,7 +1964,10 @@@ { try { - return SSTableMetadata.serializer.deserialize(descriptor).right; + CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION); - return compactionMetadata.ancestors; ++ if (compactionMetadata != null) ++ return compactionMetadata.ancestors; ++ return Collections.emptySet(); } catch (IOException e) { @@@ -1995,7 -1441,7 +2013,9 @@@ public RandomAccessReader openIndexReader() { - return ifile.createReader(); - return RandomAccessReader.open(new File(getIndexFilename())); ++ if (ifile != null) ++ return ifile.createReader(); ++ return null; } /** @@@ -2024,152 -1470,73 +2044,154 @@@ } /** - * @param sstables - * @return true if all desired references were acquired. Otherwise, it will unreference any partial acquisition, and return false. + * Increment the total row read count and read rate for this SSTable. This should not be incremented for range + * slice queries, row cache hits, or non-query reads, like compaction. */ - public static boolean acquireReferences(Iterable<SSTableReader> sstables) + public void incrementReadCount() { - SSTableReader failed = null; - for (SSTableReader sstable : sstables) + if (readMeter != null) + readMeter.mark(); + } + + public static class SizeComparator implements Comparator<SSTableReader> + { + public int compare(SSTableReader o1, SSTableReader o2) { - if (!sstable.acquireReference()) - { - failed = sstable; - break; - } + return Longs.compare(o1.onDiskLength(), o2.onDiskLength()); } + } - if (failed == null) - return true; + public Ref<SSTableReader> tryRef() + { + return selfRef.tryRef(); + } - for (SSTableReader sstable : sstables) - { - if (sstable == failed) - break; - sstable.releaseReference(); - } - return false; + public Ref<SSTableReader> selfRef() + { + return selfRef; } - public static void releaseReferences(Iterable<SSTableReader> sstables) + public Ref<SSTableReader> ref() { - for (SSTableReader sstable : sstables) - { - sstable.releaseReference(); - } + return selfRef.ref(); } - private void dropPageCache() + void setup(boolean isOffline) { - dropPageCache(dfile.path); - if (null != ifile) - dropPageCache(ifile.path); + tidy.setup(this, isOffline); + this.readMeter = tidy.global.readMeter; } - private void dropPageCache(String filePath) + @VisibleForTesting + public void overrideReadMeter(RestorableMeter readMeter) { - RandomAccessFile file = null; + this.readMeter = tidy.global.readMeter = readMeter; + } - try + /** + * One instance per SSTableReader we create. This references the type-shared tidy, which in turn references + * the globally shared tidy, i.e. + * + * InstanceTidier => DescriptorTypeTitdy => GlobalTidy + * + * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example), but there can only be + * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy for one single logical sstable. + * + * When the InstanceTidier cleansup, it releases its reference to its DescriptorTypeTidy; when all InstanceTidiers + * for that type have run, the DescriptorTypeTidy cleansup. DescriptorTypeTidy behaves in the same way towards GlobalTidy. + * + * For ease, we stash a direct reference to both our type-shared and global tidier + */ + private static final class InstanceTidier implements Tidy + { + private final Descriptor descriptor; + private final CFMetaData metadata; + private IFilter bf; + private IndexSummary summary; + + private SegmentedFile dfile; + private SegmentedFile ifile; + private Runnable runOnClose; + private boolean isReplaced = false; + + // a reference to our shared per-Descriptor.Type tidy instance, that + // we will release when we are ourselves released + private Ref<DescriptorTypeTidy> typeRef; + + // a convenience stashing of the shared per-descriptor-type tidy instance itself + // and the per-logical-sstable globally shared state that it is linked to + private DescriptorTypeTidy type; + private GlobalTidy global; + + private boolean setup; + + void setup(SSTableReader reader, boolean isOffline) + { + this.setup = true; + this.bf = reader.bf; + this.summary = reader.indexSummary; + this.dfile = reader.dfile; + this.ifile = reader.ifile; + // get a new reference to the shared descriptor-type tidy + this.typeRef = DescriptorTypeTidy.get(reader); + this.type = typeRef.get(); + this.global = type.globalRef.get(); + if (!isOffline) + global.ensureReadMeter(); + } + + InstanceTidier(Descriptor descriptor, CFMetaData metadata) { - file = new RandomAccessFile(filePath, "r"); + this.descriptor = descriptor; + this.metadata = metadata; + } - int fd = CLibrary.getfd(file.getFD()); + public void tidy() + { + // don't try to cleanup if the sstablereader was never fully constructed + if (!setup) + return; - if (fd > 0) + final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId); + final OpOrder.Barrier barrier; + if (cfs != null) { - if (logger.isDebugEnabled()) - logger.debug(String.format("Dropping page cache of file %s.", filePath)); - - CLibrary.trySkipCache(fd, 0, 0); + barrier = cfs.readOrdering.newBarrier(); + barrier.issue(); } + else + barrier = null; + + ScheduledExecutors.nonPeriodicTasks.execute(new Runnable() + { + public void run() + { + if (barrier != null) + barrier.await(); - bf.close(); ++ if (bf != null) ++ bf.close(); + if (summary != null) + summary.close(); + if (runOnClose != null) + runOnClose.run(); + dfile.close(); - ifile.close(); ++ if (ifile != null) ++ ifile.close(); + typeRef.release(); + } + }); } - catch (IOException e) + + public String name() { - // we don't care if cache cleanup fails + return descriptor.toString(); } - finally + + void releaseSummary() { - FileUtils.closeQuietly(file); + summary.close(); + assert summary.isCleanedUp(); + summary = null; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/test/unit/org/apache/cassandra/db/ScrubTest.java ----------------------------------------------------------------------