Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73952075 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73952075 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73952075 Branch: refs/heads/cassandra-2.2 Commit: 73952075253c535b35a42269edc86133a5dd9f6d Parents: 94c826e 878d616 Author: Yuki Morishita <yu...@apache.org> Authored: Tue Jul 28 16:33:04 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Jul 28 16:33:04 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../apache/cassandra/db/ColumnFamilyStore.java | 3 ++- .../cassandra/io/sstable/SSTableReader.java | 28 ++++++++++++-------- 3 files changed, 21 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/73952075/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 1ce95d6,5ce2cc7..c4bb21c --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -15,25 -4,6 +15,26 @@@ Merged from 2.0 * Complete CASSANDRA-8448 fix (CASSANDRA-9519) * Don't include auth credentials in debug log (CASSANDRA-9682) * Can't transition from write survey to normal mode (CASSANDRA-9740) + * Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591) + * Fix growing pending background compaction (CASSANDRA-9662) ++ * Don't track hotness when opening from snapshot for validation (CASSANDRA-9382) + + +2.1.8 + * (cqlsh) Fix bad check for CQL compatibility when DESCRIBE'ing + COMPACT STORAGE tables with no clustering columns + * Warn when an extra-large partition is compacted (CASSANDRA-9643) + * Eliminate strong self-reference chains in sstable ref tidiers (CASSANDRA-9656) + * Ensure StreamSession uses canonical sstable reader instances (CASSANDRA-9700) + * Ensure memtable book keeping is not corrupted in the event we shrink usage (CASSANDRA-9681) + * Update internal python driver for cqlsh (CASSANDRA-9064) + * Fix IndexOutOfBoundsException when inserting tuple with too many + elements using the string literal notation (CASSANDRA-9559) + * Allow JMX over SSL directly from nodetool (CASSANDRA-9090) + * Fix incorrect result for IN queries where column not found (CASSANDRA-9540) + * Enable describe on indices (CASSANDRA-7814) + * ColumnFamilyStore.selectAndReference may block during compaction (CASSANDRA-9637) +Merged from 2.0: * Avoid NPE in AuthSuccess#decode (CASSANDRA-9727) * Add listen_address to system.local (CASSANDRA-9603) * Bug fixes to resultset metadata construction (CASSANDRA-9636) @@@ -929,112 -480,10 +930,113 @@@ Merged from 1.2 * Fix bug with some IN queries missig results (CASSANDRA-7105) * Fix availability validation for LOCAL_ONE CL (CASSANDRA-7319) * Hint streaming can cause decommission to fail (CASSANDRA-7219) - * RepairTask didn't send a correct message on IllegalArgumentException (CASSANDRA-7336) -2.0.7 +2.1.0-beta2 + * Increase default CL space to 8GB (CASSANDRA-7031) + * Add range tombstones to read repair digests (CASSANDRA-6863) + * Fix BTree.clear for large updates (CASSANDRA-6943) + * Fail write instead of logging a warning when unable to append to CL + (CASSANDRA-6764) + * Eliminate possibility of CL segment appearing twice in active list + (CASSANDRA-6557) + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759) + * Switch CRC component to Adler and include it for compressed sstables + (CASSANDRA-4165) + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451) + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899) + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897) + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573) + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692) + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660) + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742) + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705) + * Scrub should not always clear out repaired status (CASSANDRA-5351) + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446) + * Fix ClassCastException for compact table with composites (CASSANDRA-6738) + * Fix potentially repairing with wrong nodes (CASSANDRA-6808) + * Change caching option syntax (CASSANDRA-6745) + * Fix stress to do proper counter reads (CASSANDRA-6835) + * Fix help message for stress counter_write (CASSANDRA-6824) + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848) + * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849) + * Fix race condition in Batch CLE (CASSANDRA-6860) + * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774) + * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781) + * Proper compare function for CollectionType (CASSANDRA-6783) + * Update native server to Netty 4 (CASSANDRA-6236) + * Fix off-by-one error in stress (CASSANDRA-6883) + * Make OpOrder AutoCloseable (CASSANDRA-6901) + * Remove sync repair JMX interface (CASSANDRA-6900) + * Add multiple memory allocation options for memtables (CASSANDRA-6689, 6694) + * Remove adjusted op rate from stress output (CASSANDRA-6921) + * Add optimized CF.hasColumns() implementations (CASSANDRA-6941) + * Serialize batchlog mutations with the version of the target node + (CASSANDRA-6931) + * Optimize CounterColumn#reconcile() (CASSANDRA-6953) + * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869) + * Lock counter cells, not partitions (CASSANDRA-6880) + * Track presence of legacy counter shards in sstables (CASSANDRA-6888) + * Ensure safe resource cleanup when replacing sstables (CASSANDRA-6912) + * Add failure handler to async callback (CASSANDRA-6747) + * Fix AE when closing SSTable without releasing reference (CASSANDRA-7000) + * Clean up IndexInfo on keyspace/table drops (CASSANDRA-6924) + * Only snapshot relative SSTables when sequential repair (CASSANDRA-7024) + * Require nodetool rebuild_index to specify index names (CASSANDRA-7038) + * fix cassandra stress errors on reads with native protocol (CASSANDRA-7033) + * Use OpOrder to guard sstable references for reads (CASSANDRA-6919) + * Preemptive opening of compaction result (CASSANDRA-6916) + * Multi-threaded scrub/cleanup/upgradesstables (CASSANDRA-5547) + * Optimize cellname comparison (CASSANDRA-6934) + * Native protocol v3 (CASSANDRA-6855) + * Optimize Cell liveness checks and clean up Cell (CASSANDRA-7119) + * Support consistent range movements (CASSANDRA-2434) ++ * Display min timestamp in sstablemetadata viewer (CASSANDRA-6767) +Merged from 2.0: + * Avoid race-prone second "scrub" of system keyspace (CASSANDRA-6797) + * Pool CqlRecordWriter clients by inetaddress rather than Range + (CASSANDRA-6665) + * Fix compaction_history timestamps (CASSANDRA-6784) + * Compare scores of full replica ordering in DES (CASSANDRA-6683) + * fix CME in SessionInfo updateProgress affecting netstats (CASSANDRA-6577) + * Allow repairing between specific replicas (CASSANDRA-6440) + * Allow per-dc enabling of hints (CASSANDRA-6157) + * Add compatibility for Hadoop 0.2.x (CASSANDRA-5201) + * Fix EstimatedHistogram races (CASSANDRA-6682) + * Failure detector correctly converts initial value to nanos (CASSANDRA-6658) + * Add nodetool taketoken to relocate vnodes (CASSANDRA-4445) + * Expose bulk loading progress over JMX (CASSANDRA-4757) + * Correctly handle null with IF conditions and TTL (CASSANDRA-6623) + * Account for range/row tombstones in tombstone drop + time histogram (CASSANDRA-6522) + * Stop CommitLogSegment.close() from calling sync() (CASSANDRA-6652) + * Make commitlog failure handling configurable (CASSANDRA-6364) + * Avoid overlaps in LCS (CASSANDRA-6688) + * Improve support for paginating over composites (CASSANDRA-4851) + * Fix count(*) queries in a mixed cluster (CASSANDRA-6707) + * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566) + * Fix replaying pre-2.0 commit logs (CASSANDRA-6714) + * Add static columns to CQL3 (CASSANDRA-6561) + * Optimize single partition batch statements (CASSANDRA-6737) + * Disallow post-query re-ordering when paging (CASSANDRA-6722) + * Fix potential paging bug with deleted columns (CASSANDRA-6748) + * Fix NPE on BulkLoader caused by losing StreamEvent (CASSANDRA-6636) + * Fix truncating compression metadata (CASSANDRA-6791) + * Add CMSClassUnloadingEnabled JVM option (CASSANDRA-6541) + * Catch memtable flush exceptions during shutdown (CASSANDRA-6735) + * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645) + * Fix UPDATE updating PRIMARY KEY columns implicitly (CASSANDRA-6782) + * Fix IllegalArgumentException when updating from 1.2 with SuperColumns + (CASSANDRA-6733) + * FBUtilities.singleton() should use the CF comparator (CASSANDRA-6778) + * Fix CQLSStableWriter.addRow(Map<String, Object>) (CASSANDRA-6526) + * Fix HSHA server introducing corrupt data (CASSANDRA-6285) + * Fix CAS conditions for COMPACT STORAGE tables (CASSANDRA-6813) + * Starting threads in OutboundTcpConnectionPool constructor causes race conditions (CASSANDRA-7177) + * Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072) + * Set JMX RMI port to 7199 (CASSANDRA-7087) + * Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939) + * Log a warning for large batches (CASSANDRA-6487) * Put nodes in hibernate when join_ring is false (CASSANDRA-6961) * Avoid early loading of non-system keyspaces before compaction-leftovers cleanup at startup (CASSANDRA-6913) http://git-wip-us.apache.org/repos/asf/cassandra/blob/73952075/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 20e74dc,c125cf0..ad66f8e --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -2353,9 -1890,10 +2353,10 @@@ public class ColumnFamilyStore implemen { if (logger.isDebugEnabled()) logger.debug("using snapshot sstable {}", entries.getKey()); - sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner); + // open without tracking hotness + sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner, true, false); // This is technically not necessary since it's a snapshot but makes things easier - sstable.acquireReference(); + refs.tryRef(sstable); } else if (logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/73952075/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 92c9b55,39d46e9..32eb1b9 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@@ -164,30 -66,15 +164,35 @@@ public class SSTableReader extends SSTa private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class); private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1); + static + { + // Immediately remove readMeter sync task when cancelled. + syncExecutor.setRemoveOnCancelPolicy(true); + } private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0); + public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + long ts1 = o1.getMaxTimestamp(); + long ts2 = o2.getMaxTimestamp(); + return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1)); + } + }; + + public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + return o1.first.compareTo(o2.first); + } + }; + + public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator); + /** - * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound + * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created * later than maxDataAge. * @@@ -377,50 -155,14 +382,50 @@@ return open(desc, componentsFor(desc), metadata, p); } + public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException + { - return open(descriptor, components, metadata, partitioner, true); ++ return open(descriptor, components, metadata, partitioner, true, true); + } + + // use only for offline or "Standalone" operations public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException { - return open(descriptor, components, metadata, StorageService.getPartitioner(), false); + return open(descriptor, components, metadata, StorageService.getPartitioner(), false, false); // do not track hotness } + /** + * Open SSTable reader to be used in batch mode(such as sstableloader). + * + * @param descriptor + * @param components + * @param metadata + * @param partitioner + * @return opened SSTableReader + * @throws IOException + */ public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException { - SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner, true); + // Minimum components without which we can't do anything + assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; + assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; + + Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, + EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS)); + ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); + StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS); + + // Check if sstable is created using same partitioner. + // Partitioner can be null, which indicates older version of sstable or no stats available. + // In that case, we skip the check. + String partitionerName = partitioner.getClass().getCanonicalName(); + if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) + { + logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", + descriptor, validationMetadata.partitioner, partitionerName)); + System.exit(1); + } + + logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); SSTableReader sstable = new SSTableReader(descriptor, components, metadata, @@@ -432,73 -174,83 +437,74 @@@ // special implementation of load to use non-pooled SegmentedFile builders SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder(); SegmentedFile.Builder dbuilder = sstable.compression - ? new CompressedSegmentedFile.Builder() + ? new CompressedSegmentedFile.Builder(null) : new BufferedSegmentedFile.Builder(); - if (!loadSummary(sstable, ibuilder, dbuilder, sstable.metadata)) - sstable.buildSummary(false, ibuilder, dbuilder, false); + if (!sstable.loadSummary(ibuilder, dbuilder)) + sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL); sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)); sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA)); - sstable.bf = FilterFactory.AlwaysPresent; - sstable.setup(true); ++ sstable.setup(false); return sstable; } - private static SSTableReader open(Descriptor descriptor, - public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException - { - // Don't track read rates for tables in the system keyspace and don't bother trying to load or persist - // the read meter when in client mode - boolean trackHotness = !(Keyspace.SYSTEM_KS.equals(descriptor.ksname) || Config.isClientMode()); - return open(descriptor, components, metadata, partitioner, true, trackHotness); - } - + public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, - boolean validate) throws IOException + boolean validate, + boolean trackHotness) throws IOException { - long start = System.nanoTime(); - SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner, validate); - - SSTableReader sstable = new SSTableReader(descriptor, - components, - metadata, - partitioner, - System.currentTimeMillis(), - sstableMetadata, - trackHotness); - - sstable.load(); - - if (validate) - sstable.validate(); - - logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - - if (sstable.getKeyCache() != null) - logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity()); - - return sstable; - } - - private static SSTableMetadata openMetadata(Descriptor descriptor, - Set<Component> components, - IPartitioner partitioner, - boolean primaryIndexRequired) throws IOException - { - assert partitioner != null; // Minimum components without which we can't do anything assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; + assert !validate || components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; - assert !primaryIndexRequired || components.contains(Component.PRIMARY_INDEX) - : "Primary index component is missing for sstable " + descriptor; - - logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length()); - - SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor).left; + Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, + EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS)); + ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); + StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS); // Check if sstable is created using same partitioner. // Partitioner can be null, which indicates older version of sstable or no stats available. // In that case, we skip the check. String partitionerName = partitioner.getClass().getCanonicalName(); - if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner)) + if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) { logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", - descriptor, sstableMetadata.partitioner, partitionerName)); + descriptor, validationMetadata.partitioner, partitionerName)); System.exit(1); } - return sstableMetadata; + + logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); + SSTableReader sstable = new SSTableReader(descriptor, + components, + metadata, + partitioner, + System.currentTimeMillis(), + statsMetadata, + OpenReason.NORMAL); + + try + { + // load index and filter + long start = System.nanoTime(); + sstable.load(validationMetadata); + logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + - sstable.setup(!validate); ++ sstable.setup(trackHotness); + if (validate) + sstable.validate(); + + if (sstable.getKeyCache() != null) + logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity()); + + return sstable; + } + catch (Throwable t) + { + sstable.selfRef().release(); + throw t; + } } public static void logOpenException(Descriptor descriptor, IOException e) @@@ -624,43 -388,35 +630,43 @@@ this.dfile = dfile; this.indexSummary = indexSummary; this.bf = bloomFilter; - this.setup(false); ++ this.setup(true); } - /** - * Clean up all opened resources. - * - * @throws IOException - */ - public void close() throws IOException + public static long getTotalBytes(Iterable<SSTableReader> sstables) { - if (readMeterSyncFuture != null) - readMeterSyncFuture.cancel(true); + long sum = 0; + for (SSTableReader sstable : sstables) + sum += sstable.onDiskLength(); + return sum; + } - // Force finalizing mmapping if necessary + public static long getTotalUncompressedBytes(Iterable<SSTableReader> sstables) + { + long sum = 0; + for (SSTableReader sstable : sstables) + sum += sstable.uncompressedLength(); - if (null != ifile) - ifile.cleanup(); + return sum; + } - dfile.cleanup(); - // close the BF so it can be opened later. - if (null != bf) - bf.close(); + public boolean equals(Object that) + { + return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor); + } - if (null != indexSummary) - indexSummary.close(); + public int hashCode() + { + return this.descriptor.hashCode(); } - public void setTrackedBy(DataTracker tracker) + public String getFilename() + { + return dfile.path; + } + + public void setupKeyCache() { - deletingTask.setTracker(tracker); // under normal operation we can do this at any time, but SSTR is also used outside C* proper, // e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache // here when we know we're being wired into the rest of the server infrastructure. @@@ -2090,155 -1484,73 +2096,155 @@@ } /** - * @param sstables - * @return true if all desired references were acquired. Otherwise, it will unreference any partial acquisition, and return false. + * Increment the total row read count and read rate for this SSTable. This should not be incremented for range + * slice queries, row cache hits, or non-query reads, like compaction. */ - public static boolean acquireReferences(Iterable<SSTableReader> sstables) + public void incrementReadCount() { - SSTableReader failed = null; - for (SSTableReader sstable : sstables) + if (readMeter != null) + readMeter.mark(); + } + + public static class SizeComparator implements Comparator<SSTableReader> + { + public int compare(SSTableReader o1, SSTableReader o2) { - if (!sstable.acquireReference()) - { - failed = sstable; - break; - } + return Longs.compare(o1.onDiskLength(), o2.onDiskLength()); } + } - if (failed == null) - return true; + public Ref<SSTableReader> tryRef() + { + return selfRef.tryRef(); + } - for (SSTableReader sstable : sstables) - { - if (sstable == failed) - break; - sstable.releaseReference(); - } - return false; + public Ref<SSTableReader> selfRef() + { + return selfRef; } - public static void releaseReferences(Iterable<SSTableReader> sstables) + public Ref<SSTableReader> ref() { - for (SSTableReader sstable : sstables) - { - sstable.releaseReference(); - } + return selfRef.ref(); } - void setup(boolean isOffline) - private void dropPageCache() ++ void setup(boolean trackHotness) { - tidy.setup(this, isOffline); - dropPageCache(dfile.path); - if (null != ifile) - dropPageCache(ifile.path); ++ tidy.setup(this, trackHotness); + this.readMeter = tidy.global.readMeter; } - private void dropPageCache(String filePath) + @VisibleForTesting + public void overrideReadMeter(RestorableMeter readMeter) { - RandomAccessFile file = null; + this.readMeter = tidy.global.readMeter = readMeter; + } - try + /** + * One instance per SSTableReader we create. This references the type-shared tidy, which in turn references + * the globally shared tidy, i.e. + * + * InstanceTidier => DescriptorTypeTitdy => GlobalTidy + * + * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example), but there can only be + * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy for one single logical sstable. + * + * When the InstanceTidier cleansup, it releases its reference to its DescriptorTypeTidy; when all InstanceTidiers + * for that type have run, the DescriptorTypeTidy cleansup. DescriptorTypeTidy behaves in the same way towards GlobalTidy. + * + * For ease, we stash a direct reference to both our type-shared and global tidier + */ + private static final class InstanceTidier implements Tidy + { + private final Descriptor descriptor; + private final CFMetaData metadata; + private IFilter bf; + private IndexSummary summary; + + private SegmentedFile dfile; + private SegmentedFile ifile; + private Runnable runOnClose; + private boolean isReplaced = false; + + // a reference to our shared per-Descriptor.Type tidy instance, that + // we will release when we are ourselves released + private Ref<DescriptorTypeTidy> typeRef; + + // a convenience stashing of the shared per-descriptor-type tidy instance itself + // and the per-logical-sstable globally shared state that it is linked to + private DescriptorTypeTidy type; + private GlobalTidy global; + + private boolean setup; + - void setup(SSTableReader reader, boolean isOffline) ++ void setup(SSTableReader reader, boolean trackHotness) { - file = new RandomAccessFile(filePath, "r"); + this.setup = true; + this.bf = reader.bf; + this.summary = reader.indexSummary; + this.dfile = reader.dfile; + this.ifile = reader.ifile; + // get a new reference to the shared descriptor-type tidy + this.typeRef = DescriptorTypeTidy.get(reader); + this.type = typeRef.get(); + this.global = type.globalRef.get(); - if (!isOffline) ++ if (trackHotness) + global.ensureReadMeter(); + } - int fd = CLibrary.getfd(file.getFD()); + InstanceTidier(Descriptor descriptor, CFMetaData metadata) + { + this.descriptor = descriptor; + this.metadata = metadata; + } - if (fd > 0) - { - if (logger.isDebugEnabled()) - logger.debug(String.format("Dropping page cache of file %s.", filePath)); + public void tidy() + { + // don't try to cleanup if the sstablereader was never fully constructed + if (!setup) + return; - CLibrary.trySkipCache(fd, 0, 0); + final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId); + final OpOrder.Barrier barrier; + if (cfs != null) + { + barrier = cfs.readOrdering.newBarrier(); + barrier.issue(); } + else + barrier = null; + + ScheduledExecutors.nonPeriodicTasks.execute(new Runnable() + { + public void run() + { + if (barrier != null) + barrier.await(); + if (bf != null) + bf.close(); + if (summary != null) + summary.close(); + if (runOnClose != null) + runOnClose.run(); + if (dfile != null) + dfile.close(); + if (ifile != null) + ifile.close(); + typeRef.release(); + } + }); } - catch (IOException e) + + public String name() { - // we don't care if cache cleanup fails + return descriptor.toString(); } - finally + + void releaseSummary() { - FileUtils.closeQuietly(file); + summary.close(); + assert summary.isCleanedUp(); + summary = null; } }