merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa55f3cf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa55f3cf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa55f3cf Branch: refs/heads/trunk Commit: aa55f3cf0d9e1e6e19178712d1ac725028f6d907 Parents: d0dc597 079ae68 Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu May 30 23:28:05 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu May 30 23:28:05 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../apache/cassandra/io/sstable/SSTableLoader.java | 2 +- .../apache/cassandra/io/sstable/SSTableReader.java | 148 ++++++++++----- 4 files changed, 107 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b83b88a,a746c09..e4da98d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,62 -1,5 +1,63 @@@ +2.0 + * Removed on-heap row cache (CASSANDRA-5348) + * use nanotime consistently for node-local timeouts (CASSANDRA-5581) + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577) + * Experimental triggers (CASSANDRA-1311) + * JEMalloc support for off-heap allocation (CASSANDRA-3997) + * Single-pass compaction (CASSANDRA-4180) + * Removed token range bisection (CASSANDRA-5518) + * Removed compatibility with pre-1.2.5 sstables and network messages + (CASSANDRA-5511) + * removed PBSPredictor (CASSANDRA-5455) + * CAS support (CASSANDRA-5062, 5441, 5442, 5443) + * Leveled compaction performs size-tiered compactions in L0 + (CASSANDRA-5371, 5439) + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339) + * Log when a node is down longer than the hint window (CASSANDRA-4554) + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917) + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407) + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430) + * Change Message IDs to ints (CASSANDRA-5307) + * Move sstable level information into the Stats component, removing the + need for a separate Manifest file (CASSANDRA-4872) + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199) + * make index_interval configurable per columnfamily (CASSANDRA-3961) + * add default_time_to_live (CASSANDRA-3974) + * add memtable_flush_period_in_ms (CASSANDRA-4237) + * replace supercolumns internally by composites (CASSANDRA-3237, 5123) + * upgrade thrift to 0.9.0 (CASSANDRA-3719) + * drop unnecessary keyspace parameter from user-defined compaction API + (CASSANDRA-5139) + * more robust solution to incomplete compactions + counters (CASSANDRA-5151) + * Change order of directory searching for c*.in.sh (CASSANDRA-3983) + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271) + * Allow custom configuration loader (CASSANDRA-5045) + * Remove memory emergency pressure valve logic (CASSANDRA-3534) + * Reduce request latency with eager retry (CASSANDRA-4705) + * cqlsh: Remove ASSUME command (CASSANDRA-5331) + * Rebuild BF when loading sstables if bloom_filter_fp_chance + has changed since compaction (CASSANDRA-5015) + * remove row-level bloom filters (CASSANDRA-4885) + * Change Kernel Page Cache skipping into row preheating (disabled by default) + (CASSANDRA-4937) + * Improve repair by deciding on a gcBefore before sending + out TreeRequests (CASSANDRA-4932) + * Add an official way to disable compactions (CASSANDRA-5074) + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919) + * Add binary protocol versioning (CASSANDRA-5436) + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530) + * Add alias support to SELECT statement (CASSANDRA-5075) + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541) + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579) + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585) + * Track max/min column names in sstables to be able to optimize slice + queries (CASSANDRA-5514, CASSANDRA-5595) + * Binary protocol: allow batching already prepared statements (CASSANDRA-4693) + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450) + * Support native link w/o JNA in Java7 (CASSANDRA-3734) + 1.2.6 + * Reduce SSTableLoader memory usage (CASSANDRA-5555) * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272) * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536) * Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index dcd7814,81ced05..4287df6 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -259,8 -232,34 +259,8 @@@ public class ColumnFamilyStore implemen if (loadSSTables) { Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true); - Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner); + Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner); - if (metadata.getDefaultValidator().isCommutative()) - { - // Filter non-compacted sstables, remove compacted ones - Set<Integer> compactedSSTables = new HashSet<Integer>(); - for (SSTableReader sstable : sstables) - compactedSSTables.addAll(sstable.getAncestors()); - - Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>(); - for (SSTableReader sstable : sstables) - { - if (compactedSSTables.contains(sstable.descriptor.generation)) - { - logger.info("{} is already compacted and will be removed.", sstable); - sstable.markCompacted(); // we need to mark as compacted to be deleted - sstable.releaseReference(); // this amount to deleting the sstable - } - else - { - liveSSTables.add(sstable); - } - } - data.addInitialSSTables(liveSSTables); - } - else - { - data.addInitialSSTables(sstables); - } + data.addInitialSSTables(sstables); } if (caching == Caching.ALL || caching == Caching.KEYS_ONLY) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 2f37d0f,574465d..8119388 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@@ -161,39 -173,25 +175,17 @@@ public class SSTableReader extends SSTa IPartitioner partitioner, boolean validate) throws IOException { - assert partitioner != null; - // Minimum components without which we can't do anything - assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; - assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; - - long start = System.currentTimeMillis(); + long start = System.nanoTime(); - logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length()); - - SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor); - - // Check if sstable is created using same partitioner. - // Partitioner can be null, which indicates older version of sstable or no stats available. - // In that case, we skip the check. - String partitionerName = partitioner.getClass().getCanonicalName(); - if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner)) - { - logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", - descriptor, sstableMetadata.partitioner, partitionerName)); - System.exit(1); - } + SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner); SSTableReader sstable = new SSTableReader(descriptor, components, metadata, partitioner, - null, - null, - null, - null, System.currentTimeMillis(), sstableMetadata); - // versions before 'c' encoded keys as utf-16 before hashing to the filter - if (descriptor.version.hasStringsInBloomFilter) - { - sstable.load(true); - } - else - { - sstable.load(false); - sstable.loadBloomFilter(); - } + + sstable.load(); if (validate) sstable.validate(); @@@ -361,16 -375,56 +388,55 @@@ { SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); SegmentedFile.Builder dbuilder = compression - ? SegmentedFile.getCompressedBuilder() - : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + - boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder); - if (recreatebloom || !summaryLoaded) - buildSummary(recreatebloom, ibuilder, dbuilder, summaryLoaded); ++ boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata); ++ if (recreateBloomFilter || !summaryLoaded) ++ buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded); + + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - if (recreatebloom || !summaryLoaded) // save summary information to disk ++ if (recreateBloomFilter || !summaryLoaded) // save summary information to disk + saveSummary(this, ibuilder, dbuilder); + } + + /** + * A simplified load that creates a minimal partition index + */ + private void loadForBatch() throws IOException + { + // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments + SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder(); + SegmentedFile.Builder dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : new BufferedSegmentedFile.Builder(); + + // build a bare-bones IndexSummary - IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1); - RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); ++ IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1, 1); ++ RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + try + { - ByteBuffer key = ByteBufferUtil.readWithShortLength(in); - first = decodeKey(partitioner, descriptor, key); ++ first = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in)); + summaryBuilder.maybeAddEntry(first, 0); + indexSummary = summaryBuilder.build(partitioner); + } + finally + { + FileUtils.closeQuietly(in); + } + + last = null; // shouldn't need this for batch operations + + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + } + - private void buildSummary(boolean recreatebloom, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException ++ private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException + { // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. - RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); + RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); - - // try to load summaries from the disk and check if we need - // to read primary index because we should re-create a BloomFilter or pre-load KeyCache - final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata); - final boolean readIndex = recreateBloomFilter || !summaryLoaded; try { long indexSize = primaryIndex.length(); @@@ -384,10 -437,10 +450,10 @@@ IndexSummaryBuilder summaryBuilder = null; if (!summaryLoaded) - summaryBuilder = new IndexSummaryBuilder(estimatedKeys); + summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getIndexInterval()); long indexPosition; - while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize) + while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) { ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version); @@@ -418,18 -471,12 +484,12 @@@ first = getMinimalKey(first); last = getMinimalKey(last); - // finalize the load. - // finalize the state of the reader - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - if (readIndex) // save summary information to disk - saveSummary(this, ibuilder, dbuilder); } - public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata) { File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY)); - if (!summariesFile.exists()) + if (!reader.descriptor.version.offHeapSummaries || !summariesFile.exists()) return false; DataInputStream iStream = null;