Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c0f96e1d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c0f96e1d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c0f96e1d Branch: refs/heads/cassandra-2.2 Commit: c0f96e1d46d664a9d4ca7f982d21cdfcaed1a24a Parents: 14a3324 9b10928 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Jun 2 14:51:20 2015 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Jun 2 14:51:20 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/sstable/SSTableIdentityIterator.java | 10 +++ .../cassandra/io/sstable/SSTableReader.java | 68 ++++++++++++++------ .../compaction/BlacklistingCompactionsTest.java | 17 +++-- 4 files changed, 69 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c0f96e1d/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 71ce442,1aad965..4a7e174 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,37 -1,5 +1,38 @@@ -2.0.16: +2.1.6 + * Consistent error message when a table mixes counter and non-counter + columns (CASSANDRA-9492) + * Avoid getting unreadable keys during anticompaction (CASSANDRA-9508) + * (cqlsh) Better float precision by default (CASSANDRA-9224) + * Improve estimated row count (CASSANDRA-9107) + * Optimize range tombstone memory footprint (CASSANDRA-8603) + * Use configured gcgs in anticompaction (CASSANDRA-9397) + * Warn on misuse of unlogged batches (CASSANDRA-9282) + * Failure detector detects and ignores local pauses (CASSANDRA-9183) + * Add utility class to support for rate limiting a given log statement (CASSANDRA-9029) + * Add missing consistency levels to cassandra-stess (CASSANDRA-9361) + * Fix commitlog getCompletedTasks to not increment (CASSANDRA-9339) + * Fix for harmless exceptions logged as ERROR (CASSANDRA-8564) + * Delete processed sstables in sstablesplit/sstableupgrade (CASSANDRA-8606) + * Improve sstable exclusion from partition tombstones (CASSANDRA-9298) + * Validate the indexed column rather than the cell's contents for 2i (CASSANDRA-9057) + * Add support for top-k custom 2i queries (CASSANDRA-8717) + * Fix error when dropping table during compaction (CASSANDRA-9251) + * cassandra-stress supports validation operations over user profiles (CASSANDRA-8773) + * Add support for rate limiting log messages (CASSANDRA-9029) + * Log the partition key with tombstone warnings (CASSANDRA-8561) + * Reduce runWithCompactionsDisabled poll interval to 1ms (CASSANDRA-9271) + * Fix PITR commitlog replay (CASSANDRA-9195) + * GCInspector logs very different times (CASSANDRA-9124) + * Fix deleting from an empty list (CASSANDRA-9198) + * Update tuple and collection types that use a user-defined type when that UDT + is modified (CASSANDRA-9148, CASSANDRA-9192) + * Use higher timeout for prepair and snapshot in repair (CASSANDRA-9261) + * Fix anticompaction blocking ANTI_ENTROPY stage (CASSANDRA-9151) + * Repair waits for anticompaction to finish (CASSANDRA-9097) + * Fix streaming not holding ref when stream error (CASSANDRA-9295) + * Fix canonical view returning early opened SSTables (CASSANDRA-9396) +Merged from 2.0: + * Always mark sstable suspect when corrupted (CASSANDRA-9478) * Add database users and permissions to CQL3 documentation (CASSANDRA-7558) * Allow JVM_OPTS to be passed to standalone tools (CASSANDRA-5969) * Fix bad condition in RangeTombstoneList (CASSANDRA-9485) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c0f96e1d/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index b784a7e,8b45005..498ad26 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@@ -79,17 -95,18 +82,18 @@@ public class SSTableIdentityIterator im this.filename = filename; this.key = key; this.dataSize = dataSize; - this.expireBefore = (int)(System.currentTimeMillis() / 1000); this.flag = flag; this.validateColumns = checkData; - this.dataVersion = sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version; + this.sstable = sstable; + Descriptor.Version dataVersion = sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version; + int expireBefore = (int) (System.currentTimeMillis() / 1000); + columnFamily = ArrayBackedSortedColumns.factory.create(metadata); + try { - columnFamily = EmptyColumns.factory.create(metadata); columnFamily.delete(DeletionTime.serializer.deserialize(in)); - columnCount = dataVersion.hasRowSizeAndColumnCount ? in.readInt() : Integer.MAX_VALUE; - atomIterator = columnFamily.metadata().getOnDiskIterator(in, columnCount, flag, expireBefore, dataVersion); + atomIterator = columnFamily.metadata().getOnDiskIterator(in, flag, expireBefore, dataVersion); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c0f96e1d/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 43873a0,15808e8..0475665 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@@ -706,381 -450,21 +706,407 @@@ public class SSTableReader extends SSTa ? SegmentedFile.getCompressedBuilder() : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata); - if (recreateBloomFilter || !summaryLoaded) - buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded); + boolean summaryLoaded = loadSummary(ibuilder, dbuilder); + boolean builtSummary = false; + if (recreateBloomFilter || !summaryLoaded) + { + buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL); + builtSummary = true; + } + + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + + // Check for an index summary that was downsampled even though the serialization format doesn't support + // that. If it was downsampled, rebuild it. See CASSANDRA-8993 for details. + if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel()) + { + indexSummary.close(); + ifile.close(); + dfile.close(); + + logger.info("Detected erroneously downsampled index summary; will rebuild summary at full sampling"); + FileUtils.deleteWithConfirm(new File(descriptor.filenameFor(Component.SUMMARY))); + ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL); + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + saveSummary(ibuilder, dbuilder); + } + else if (saveSummaryIfCreated && builtSummary) + { + saveSummary(ibuilder, dbuilder); + } + } + + /** + * Build index summary(and optionally bloom filter) by reading through Index.db file. + * + * @param recreateBloomFilter true if recreate bloom filter + * @param ibuilder + * @param dbuilder + * @param summaryLoaded true if index summary is already loaded and not need to build again + * @throws IOException + */ + private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException + { + // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. + RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + + try + { + long indexSize = primaryIndex.length(); + long histogramCount = sstableMetadata.estimatedRowSize.count(); + long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed() + ? histogramCount + : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional + + try(IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel)) + { + + if (recreateBloomFilter) + bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true); + + long indexPosition; + while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) + { + ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); + RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex, descriptor.version); + DecoratedKey decoratedKey = partitioner.decorateKey(key); + if (first == null) + first = decoratedKey; + last = decoratedKey; + + if (recreateBloomFilter) + bf.add(decoratedKey.getKey()); + + // if summary was already read from disk we don't want to re-populate it using primary index + if (!summaryLoaded) + { + summaryBuilder.maybeAddEntry(decoratedKey, indexPosition); + ibuilder.addPotentialBoundary(indexPosition); + dbuilder.addPotentialBoundary(indexEntry.position); + } + } + + if (!summaryLoaded) + indexSummary = summaryBuilder.build(partitioner); + } + } + finally + { + FileUtils.closeQuietly(primaryIndex); + } + + first = getMinimalKey(first); + last = getMinimalKey(last); + } + + /** + * Load index summary from Summary.db file if it exists. + * + * if loaded index summary has different index interval from current value stored in schema, + * then Summary.db file will be deleted and this returns false to rebuild summary. + * + * @param ibuilder + * @param dbuilder + * @return true if index summary is loaded successfully from Summary.db file. + */ + public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY)); + if (!summariesFile.exists()) + return false; + + DataInputStream iStream = null; + try + { + iStream = new DataInputStream(new FileInputStream(summariesFile)); + indexSummary = IndexSummary.serializer.deserialize( + iStream, partitioner, descriptor.version.hasSamplingLevel, + metadata.getMinIndexInterval(), metadata.getMaxIndexInterval()); + first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); + last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); + ibuilder.deserializeBounds(iStream); + dbuilder.deserializeBounds(iStream); + } + catch (IOException e) + { + if (indexSummary != null) + indexSummary.close(); + logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage()); + // corrupted; delete it and fall back to creating a new summary + FileUtils.closeQuietly(iStream); + // delete it and fall back to creating a new summary + FileUtils.deleteWithConfirm(summariesFile); + return false; + } + finally + { + FileUtils.closeQuietly(iStream); + } + + return true; + } + + /** + * Validates that an index summary has full sampling, as expected when the serialization format does not support + * persisting the sampling level. + * @return true if the summary has full sampling, false otherwise + */ + private boolean validateSummarySamplingLevel() + { + // We need to check index summary entries against the index to verify that none of them were dropped due to + // downsampling. Downsampling can drop any of the first BASE_SAMPLING_LEVEL entries (repeating that drop pattern + // for the remainder of the summary). Unfortunately, the first entry to be dropped is the entry at + // index (BASE_SAMPLING_LEVEL - 1), so we need to check a full set of BASE_SAMPLING_LEVEL entries. + Iterator<FileDataInput> segments = ifile.iterator(0); + int i = 0; + int summaryEntriesChecked = 0; + int expectedIndexInterval = getMinIndexInterval(); + while (segments.hasNext()) + { + FileDataInput in = segments.next(); + try + { + while (!in.isEOF()) + { + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + if (i % expectedIndexInterval == 0) + { + ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval)); + if (!summaryKey.equals(indexKey)) + return false; + summaryEntriesChecked++; + + if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL) + return true; + } + RowIndexEntry.Serializer.skip(in); + i++; + } + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, in.getPath()); + } + finally + { + FileUtils.closeQuietly(in); + } + } + + return true; + } + + /** + * Save index summary to Summary.db file. + * + * @param ibuilder + * @param dbuilder + */ + public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + saveSummary(ibuilder, dbuilder, indexSummary); + } + + private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary) + { + File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY)); + if (summariesFile.exists()) + FileUtils.deleteWithConfirm(summariesFile); + + DataOutputStreamAndChannel oStream = null; + try + { + oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile)); + IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel); + ByteBufferUtil.writeWithLength(first.getKey(), oStream); + ByteBufferUtil.writeWithLength(last.getKey(), oStream); + ibuilder.serializeBounds(oStream); + dbuilder.serializeBounds(oStream); + } + catch (IOException e) + { + logger.debug("Cannot save SSTable Summary: ", e); + + // corrupted hence delete it and let it load it now. + if (summariesFile.exists()) + FileUtils.deleteWithConfirm(summariesFile); + } + finally + { + FileUtils.closeQuietly(oStream); + } + } + + public void setReplacedBy(SSTableReader replacement) + { + synchronized (tidy.global) + { + assert replacement != null; + assert !tidy.isReplaced; + assert tidy.global.live == this; + tidy.isReplaced = true; + tidy.global.live = replacement; + } + } + ++ /** ++ * Clone this reader with the provided start and open reason, and set the clone as replacement. ++ * ++ * @param newFirst the first key for the replacement (which can be different from the original due to the pre-emptive ++ * opening of compaction results). ++ * @param reason the {@code OpenReason} for the replacement. ++ * ++ * @return the cloned reader. That reader is set as a replacement by the method. ++ */ ++ private SSTableReader cloneAndReplace(DecoratedKey newFirst, OpenReason reason) ++ { ++ return cloneAndReplace(newFirst, reason, indexSummary.sharedCopy()); ++ } ++ ++ /** ++ * Clone this reader with the new values and set the clone as replacement. ++ * ++ * @param newFirst the first key for the replacement (which can be different from the original due to the pre-emptive ++ * opening of compaction results). ++ * @param reason the {@code OpenReason} for the replacement. ++ * @param newSummary the index summary for the replacement. ++ * ++ * @return the cloned reader. That reader is set as a replacement by the method. ++ */ ++ private SSTableReader cloneAndReplace(DecoratedKey newFirst, OpenReason reason, IndexSummary newSummary) ++ { ++ SSTableReader replacement = internalOpen(descriptor, ++ components, ++ metadata, ++ partitioner, ++ ifile.sharedCopy(), ++ dfile.sharedCopy(), ++ newSummary, ++ bf.sharedCopy(), ++ maxDataAge, ++ sstableMetadata, ++ reason); ++ replacement.first = newFirst; ++ replacement.last = last; ++ replacement.isSuspect.set(isSuspect.get()); ++ setReplacedBy(replacement); ++ return replacement; ++ } ++ + public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose) + { + synchronized (tidy.global) + { + assert openReason != OpenReason.EARLY; - SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(), - dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(), - maxDataAge, sstableMetadata, OpenReason.MOVED_START); + // TODO: make data/index start accurate for compressed files + // TODO: merge with caller's firstKeyBeyond() work,to save time + if (newStart.compareTo(first) > 0) + { + final long dataStart = getPosition(newStart, Operator.EQ).position; + final long indexStart = getIndexScanPosition(newStart); + this.tidy.runOnClose = new Runnable() + { + public void run() + { + dfile.dropPageCache(dataStart); + ifile.dropPageCache(indexStart); + if (runOnClose != null) + runOnClose.run(); + } + }; + } + - replacement.first = newStart; - replacement.last = this.last; - setReplacedBy(replacement); - return replacement; ++ return cloneAndReplace(newStart, OpenReason.MOVED_START); + } + } + + public SSTableReader cloneAsShadowed(final Runnable runOnClose) + { + synchronized (tidy.global) + { + assert openReason != OpenReason.EARLY; + this.tidy.runOnClose = new Runnable() + { + public void run() + { + dfile.dropPageCache(0); + ifile.dropPageCache(0); + runOnClose.run(); + } + }; + - SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(), - dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(), - maxDataAge, sstableMetadata, OpenReason.SHADOWED); - replacement.first = first; - replacement.last = last; - setReplacedBy(replacement); - return replacement; ++ return cloneAndReplace(first, OpenReason.SHADOWED); + } + } + + /** + * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will + * be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have + * its DeletingTask removed, and have its periodic read-meter sync task cancelled. + * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader + * @return a new SSTableReader + * @throws IOException + */ + public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException + { + assert descriptor.version.hasSamplingLevel; + + synchronized (tidy.global) + { + assert openReason != OpenReason.EARLY; + + int minIndexInterval = metadata.getMinIndexInterval(); + int maxIndexInterval = metadata.getMaxIndexInterval(); + double effectiveInterval = indexSummary.getEffectiveIndexInterval(); + + IndexSummary newSummary; + long oldSize = bytesOnDisk(); + + // We have to rebuild the summary from the on-disk primary index in three cases: + // 1. The sampling level went up, so we need to read more entries off disk + // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary + // at full sampling (and consequently at any other sampling level) + // 3. The max_index_interval was lowered, forcing us to raise the sampling level + if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval) + { + newSummary = buildSummaryAtLevel(samplingLevel); + } + else if (samplingLevel < indexSummary.getSamplingLevel()) + { + // we can use the existing index summary to make a smaller one + newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner); + + SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + SegmentedFile.Builder dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + saveSummary(ibuilder, dbuilder, newSummary); + } + else + { + throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " + + "no adjustments to min/max_index_interval"); + } - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk - saveSummary(this, ibuilder, dbuilder); + long newSize = bytesOnDisk(); + StorageMetrics.load.inc(newSize - oldSize); + parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize); + - SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(), - dfile.sharedCopy(), newSummary, bf.sharedCopy(), maxDataAge, - sstableMetadata, OpenReason.METADATA_CHANGE); - replacement.first = this.first; - replacement.last = this.last; - setReplacedBy(replacement); - return replacement; ++ return cloneAndReplace(first, OpenReason.METADATA_CHANGE, newSummary); + } } - private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException - { + private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException + { // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); - try { long indexSize = primaryIndex.length(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c0f96e1d/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java index 08e3fb3,08d1d66..572ad36 --- a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java @@@ -22,12 -22,8 +22,10 @@@ package org.apache.cassandra.db.compact import java.io.RandomAccessFile; - import java.util.Collection; - import java.util.HashSet; - import java.util.Set; + import java.util.*; +import org.junit.After; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@@ -40,7 -39,7 +38,8 @@@ import org.apache.cassandra.utils.ByteB import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; + import static org.junit.Assert.assertTrue; +import static org.apache.cassandra.Util.cellname; public class BlacklistingCompactionsTest extends SchemaLoader { @@@ -160,8 -159,8 +166,7 @@@ break; } -- cfs.truncateBlocking(); - assertEquals(failures, sstablesToCorrupt); + assertEquals(sstablesToCorrupt, failures); } }