Extend Transactional API to sstable lifecycle management patch by benedict; reviewed by marcus for CASSANDRA-8568
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5a76bdb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5a76bdb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5a76bdb Branch: refs/heads/trunk Commit: e5a76bdb5fc04ffa16b8becaa7877186226c3b32 Parents: 33d71b8 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Thu Mar 12 10:23:35 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Fri May 22 09:44:36 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 249 ++++-- .../org/apache/cassandra/db/DataTracker.java | 793 ------------------- .../cassandra/db/HintedHandOffManager.java | 2 +- src/java/org/apache/cassandra/db/Keyspace.java | 15 +- src/java/org/apache/cassandra/db/Memtable.java | 22 +- .../compaction/AbstractCompactionStrategy.java | 7 +- .../db/compaction/AbstractCompactionTask.java | 19 +- .../db/compaction/CompactionController.java | 4 +- .../db/compaction/CompactionManager.java | 182 +++-- .../cassandra/db/compaction/CompactionTask.java | 54 +- .../DateTieredCompactionStrategy.java | 17 +- .../compaction/LeveledCompactionStrategy.java | 23 +- .../db/compaction/LeveledCompactionTask.java | 11 +- .../db/compaction/LeveledManifest.java | 11 +- .../db/compaction/SSTableSplitter.java | 13 +- .../cassandra/db/compaction/Scrubber.java | 21 +- .../SizeTieredCompactionStrategy.java | 30 +- .../cassandra/db/compaction/Upgrader.java | 15 +- .../compaction/WrappingCompactionStrategy.java | 2 +- .../writers/CompactionAwareWriter.java | 11 +- .../writers/DefaultCompactionWriter.java | 11 +- .../writers/MajorLeveledCompactionWriter.java | 11 +- .../writers/MaxSSTableSizeWriter.java | 10 +- .../SplittingSizeTieredCompactionWriter.java | 14 +- .../AbstractSimplePerColumnSecondaryIndex.java | 4 +- .../db/index/SecondaryIndexManager.java | 2 +- .../apache/cassandra/db/lifecycle/Helpers.java | 241 ++++++ .../db/lifecycle/LifecycleTransaction.java | 511 ++++++++++++ .../db/lifecycle/SSTableIntervalTree.java | 40 + .../apache/cassandra/db/lifecycle/Tracker.java | 468 +++++++++++ .../org/apache/cassandra/db/lifecycle/View.java | 252 ++++++ .../io/compress/CompressionMetadata.java | 2 +- .../io/sstable/IndexSummaryManager.java | 106 ++- .../io/sstable/SSTableDeletingTask.java | 27 +- .../cassandra/io/sstable/SSTableRewriter.java | 295 ++----- .../io/sstable/format/SSTableReader.java | 100 ++- .../io/sstable/format/big/BigTableWriter.java | 6 +- .../cassandra/io/util/SequentialWriter.java | 2 +- .../cassandra/metrics/ColumnFamilyMetrics.java | 18 +- .../cassandra/streaming/StreamSession.java | 7 +- .../cassandra/tools/StandaloneScrubber.java | 12 +- .../cassandra/tools/StandaloneSplitter.java | 7 +- .../cassandra/tools/StandaloneUpgrader.java | 6 +- .../cassandra/utils/concurrent/Blocker.java | 63 ++ .../utils/concurrent/Transactional.java | 31 +- .../db/compaction/LongCompactionsTest.java | 10 +- test/unit/org/apache/cassandra/MockSchema.java | 167 ++++ test/unit/org/apache/cassandra/Util.java | 27 +- .../org/apache/cassandra/db/KeyCacheTest.java | 3 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 58 +- .../db/compaction/AntiCompactionTest.java | 51 +- .../compaction/CompactionAwareWriterTest.java | 45 +- .../DateTieredCompactionStrategyTest.java | 6 +- .../cassandra/db/lifecycle/HelpersTest.java | 158 ++++ .../db/lifecycle/LifecycleTransactionTest.java | 412 ++++++++++ .../cassandra/db/lifecycle/TrackerTest.java | 342 ++++++++ .../apache/cassandra/db/lifecycle/ViewTest.java | 202 +++++ .../io/sstable/IndexSummaryManagerTest.java | 123 ++- .../cassandra/io/sstable/SSTableReaderTest.java | 11 +- .../io/sstable/SSTableRewriterTest.java | 250 +++--- 61 files changed, 3902 insertions(+), 1711 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8b59309..ca87385 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2 + * Extend Transactional API to sstable lifecycle management (CASSANDRA-8568) * (cqlsh) Add support for native protocol 4 (CASSANDRA-9399) * Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409) * Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 4452db2..cc9b26a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -34,6 +34,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.*; import com.google.common.util.concurrent.*; +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.lifecycle.Tracker; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.FSWriteError; import org.json.simple.*; import org.slf4j.Logger; @@ -80,6 +84,8 @@ import org.apache.cassandra.utils.memory.MemtableAllocator; import com.clearspring.analytics.stream.Counter; +import static org.apache.cassandra.utils.Throwables.maybeFail; + public class ColumnFamilyStore implements ColumnFamilyStoreMBean { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); @@ -149,12 +155,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean /** * Memtables and SSTables on disk for this column family. * - * We synchronize on the DataTracker to ensure isolation when we want to make sure + * We synchronize on the Tracker to ensure isolation when we want to make sure * that the memtable we're acting on doesn't change out from under us. I.e., flush * syncronizes on it to make sure it can submit on both executors atomically, * so anyone else who wants to make sure flush doesn't interfere should as well. */ - private final DataTracker data; + private final Tracker data; /* The read order, used to track accesses to off-heap memtable storage */ public final OpOrder readOrdering = new OpOrder(); @@ -288,13 +294,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - private ColumnFamilyStore(Keyspace keyspace, + public ColumnFamilyStore(Keyspace keyspace, + String columnFamilyName, + IPartitioner partitioner, + int generation, + CFMetaData metadata, + Directories directories, + boolean loadSSTables) + { + this(keyspace, columnFamilyName, partitioner, generation, metadata, directories, loadSSTables, true); + } + + + @VisibleForTesting + public ColumnFamilyStore(Keyspace keyspace, String columnFamilyName, IPartitioner partitioner, int generation, CFMetaData metadata, Directories directories, - boolean loadSSTables) + boolean loadSSTables, + boolean registerBookkeeping) { assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName; @@ -315,7 +335,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean logger.info("Initializing {}.{}", keyspace.getName(), name); // scan for sstables corresponding to this cf and load them - data = new DataTracker(this, loadSSTables); + data = new Tracker(this, loadSSTables); if (data.loadsstables) { @@ -343,46 +363,59 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean indexManager.addIndexedColumn(info); } - // register the mbean - String type = this.partitioner instanceof LocalPartitioner ? "IndexColumnFamilies" : "ColumnFamilies"; - mbeanName = "org.apache.cassandra.db:type=" + type + ",keyspace=" + this.keyspace.getName() + ",columnfamily=" + name; - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - ObjectName nameObj = new ObjectName(mbeanName); - mbs.registerMBean(this, nameObj); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - logger.debug("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry()); - latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable() + if (registerBookkeeping) { - public void run() + // register the mbean + String type = this.partitioner instanceof LocalPartitioner ? "IndexColumnFamilies" : "ColumnFamilies"; + mbeanName = "org.apache.cassandra.db:type=" + type + ",keyspace=" + this.keyspace.getName() + ",columnfamily=" + name; + try + { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName nameObj = new ObjectName(mbeanName); + mbs.registerMBean(this, nameObj); + } + catch (Exception e) { - SpeculativeRetry retryPolicy = ColumnFamilyStore.this.metadata.getSpeculativeRetry(); - switch (retryPolicy.type) + throw new RuntimeException(e); + } + logger.debug("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry()); + latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable() + { + public void run() { - case PERCENTILE: - // get percentile in nanos - sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.value) * 1000d); - break; - case CUSTOM: - // convert to nanos, since configuration is in millisecond - sampleLatencyNanos = (long) (retryPolicy.value * 1000d * 1000d); - break; - default: - sampleLatencyNanos = Long.MAX_VALUE; - break; + SpeculativeRetry retryPolicy = ColumnFamilyStore.this.metadata.getSpeculativeRetry(); + switch (retryPolicy.type) + { + case PERCENTILE: + // get percentile in nanos + sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.value) * 1000d); + break; + case CUSTOM: + // convert to nanos, since configuration is in millisecond + sampleLatencyNanos = (long) (retryPolicy.value * 1000d * 1000d); + break; + default: + sampleLatencyNanos = Long.MAX_VALUE; + break; + } } - } - }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS); + }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS); + } + else + { + latencyCalculator = ScheduledExecutors.optionalTasks.schedule(Runnables.doNothing(), 0, TimeUnit.NANOSECONDS); + mbeanName = null; + } } /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */ public void invalidate() { + invalidate(true); + } + + public void invalidate(boolean expectMBean) + { // disable and cancel in-progress compactions before invalidating valid = false; @@ -392,21 +425,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } catch (Exception e) { - JVMStabilityInspector.inspectThrowable(e); - // this shouldn't block anything. - logger.warn("Failed unregistering mbean: {}", mbeanName, e); + if (expectMBean) + { + JVMStabilityInspector.inspectThrowable(e); + // this shouldn't block anything. + logger.warn("Failed unregistering mbean: {}", mbeanName, e); + } } latencyCalculator.cancel(false); SystemKeyspace.removeTruncationRecord(metadata.cfId); - data.unreferenceSSTables(); + data.dropSSTables(); indexManager.invalidate(); invalidateCaches(); } /** - * Removes every SSTable in the directory from the DataTracker's view. + * Removes every SSTable in the directory from the Tracker's view. * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily. */ void maybeRemoveUnreadableSSTables(File directory) @@ -542,7 +578,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } /** - * Replacing compacted sstables is atomic as far as observers of DataTracker are concerned, but not on the + * Replacing compacted sstables is atomic as far as observers of Tracker are concerned, but not on the * filesystem: first the new sstables are renamed to "live" status (i.e., the tmp marker is removed), then * their ancestors are removed. * @@ -826,7 +862,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean /* * switchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete, * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads. - * This method does not block except for synchronizing on DataTracker, but the Future it returns will + * This method does not block except for synchronizing on Tracker, but the Future it returns will * not complete until the Memtable (and all prior Memtables) have been successfully flushed, and the CL * marked clean up to the position owned by the Memtable. */ @@ -849,7 +885,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // reclaiming includes that which we are GC-ing; float onHeapRatio = 0, offHeapRatio = 0; long onHeapTotal = 0, offHeapTotal = 0; - Memtable memtable = getDataTracker().getView().getCurrentMemtable(); + Memtable memtable = getTracker().getView().getCurrentMemtable(); onHeapRatio += memtable.getAllocator().onHeap().ownershipRatio(); offHeapRatio += memtable.getAllocator().offHeap().ownershipRatio(); onHeapTotal += memtable.getAllocator().onHeap().owns(); @@ -859,7 +895,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { if (index.getIndexCfs() != null) { - MemtableAllocator allocator = index.getIndexCfs().getDataTracker().getView().getCurrentMemtable().getAllocator(); + MemtableAllocator allocator = index.getIndexCfs().getTracker().getView().getCurrentMemtable().getAllocator(); onHeapRatio += allocator.onHeap().ownershipRatio(); offHeapRatio += allocator.offHeap().ownershipRatio(); onHeapTotal += allocator.onHeap().owns(); @@ -984,7 +1020,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } /** - * Should only be constructed/used from switchMemtable() or truncate(), with ownership of the DataTracker monitor. + * Should only be constructed/used from switchMemtable() or truncate(), with ownership of the Tracker monitor. * In the constructor the current memtable(s) are swapped, and a barrier on outstanding writes is issued; * when run by the flushWriter the barrier is waited on to ensure all outstanding writes have completed * before all memtables are immediately written, and the CL is either immediately marked clean or, if @@ -1117,7 +1153,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them - Memtable current = cfs.getDataTracker().getView().getCurrentMemtable(); + Memtable current = cfs.getTracker().getView().getCurrentMemtable(); // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF, // both on- and off-heap, and select the largest of the two ratios to weight this CF @@ -1129,7 +1165,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { if (index.getIndexCfs() != null) { - MemtableAllocator allocator = index.getIndexCfs().getDataTracker().getView().getCurrentMemtable().getAllocator(); + MemtableAllocator allocator = index.getIndexCfs().getTracker().getView().getCurrentMemtable().getAllocator(); onHeap += allocator.onHeap().ownershipRatio(); offHeap += allocator.offHeap().ownershipRatio(); } @@ -1278,7 +1314,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (!sstables.iterator().hasNext()) return ImmutableSet.of(); - DataTracker.SSTableIntervalTree tree = data.getView().intervalTree; + SSTableIntervalTree tree = data.getView().intervalTree; Set<SSTableReader> results = null; for (SSTableReader sstable : sstables) @@ -1454,7 +1490,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType) { assert !sstables.isEmpty(); - data.markObsolete(sstables, compactionType); + maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null)); } void replaceFlushed(Memtable memtable, SSTableReader sstable) @@ -1473,7 +1509,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean /** * Package protected for access from the CompactionManager. */ - public DataTracker getDataTracker() + public Tracker getTracker() { return data; } @@ -1485,7 +1521,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public Set<SSTableReader> getUncompactingSSTables() { - return data.getUncompactingSSTables(); + return data.getUncompacting(); } public ColumnFamily getColumnFamily(DecoratedKey key, @@ -1759,7 +1795,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return repairedSSTables; } - public RefViewFragment selectAndReference(Function<DataTracker.View, List<SSTableReader>> filter) + public RefViewFragment selectAndReference(Function<View, List<SSTableReader>> filter) { while (true) { @@ -1770,9 +1806,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - public ViewFragment select(Function<DataTracker.View, List<SSTableReader>> filter) + public ViewFragment select(Function<View, List<SSTableReader>> filter) { - DataTracker.View view = data.getView(); + View view = data.getView(); List<SSTableReader> sstables = view.intervalTree.isEmpty() ? Collections.<SSTableReader>emptyList() : filter.apply(view); @@ -1784,12 +1820,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return a ViewFragment containing the sstables and memtables that may need to be merged * for the given @param key, according to the interval tree */ - public Function<DataTracker.View, List<SSTableReader>> viewFilter(final DecoratedKey key) + public Function<View, List<SSTableReader>> viewFilter(final DecoratedKey key) { assert !key.isMinimum(); - return new Function<DataTracker.View, List<SSTableReader>>() + return new Function<View, List<SSTableReader>>() { - public List<SSTableReader> apply(DataTracker.View view) + public List<SSTableReader> apply(View view) { return compactionStrategyWrapper.filterSSTablesForReads(view.intervalTree.search(key)); } @@ -1800,17 +1836,43 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return a ViewFragment containing the sstables and memtables that may need to be merged * for rows within @param rowBounds, inclusive, according to the interval tree. */ - public Function<DataTracker.View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds) + public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds) { - return new Function<DataTracker.View, List<SSTableReader>>() + return new Function<View, List<SSTableReader>>() { - public List<SSTableReader> apply(DataTracker.View view) + public List<SSTableReader> apply(View view) { return compactionStrategyWrapper.filterSSTablesForReads(view.sstablesInBounds(rowBounds)); } }; } + /** + * @return a ViewFragment containing the sstables and memtables that may need to be merged + * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree. + */ + public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired) + { + return new Function<View, List<SSTableReader>>() + { + public List<SSTableReader> apply(View view) + { + Set<SSTableReader> sstables = Sets.newHashSet(); + for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection) + { + for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) + { + if (includeRepaired || !sstable.isRepaired()) + sstables.add(sstable); + } + } + + logger.debug("ViewFilter for {}/{} sstables", sstables.size(), getSSTables().size()); + return ImmutableList.copyOf(sstables); + } + }; + } + public List<String> getSSTablesForKey(String key) { DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key)); @@ -2388,6 +2450,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * thread safety. All we do is wipe the sstable containers clean, while leaving the actual * data files present on disk. (This allows tests to easily call loadNewSSTables on them.) */ + @VisibleForTesting public void clearUnsafe() { for (final ColumnFamilyStore cfs : concatWithIndexes()) @@ -2396,7 +2459,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { public Void call() { - cfs.data.init(); + cfs.data.reset(); return null; } }, true); @@ -2489,7 +2552,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // doublecheck that we finished, instead of timing out for (ColumnFamilyStore cfs : selfWithIndexes) { - if (!cfs.getDataTracker().getCompacting().isEmpty()) + if (!cfs.getTracker().getCompacting().isEmpty()) { logger.warn("Unable to cancel in-progress compactions for {}. Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.cfName); return null; @@ -2515,19 +2578,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - public Iterable<SSTableReader> markAllCompacting() + public LifecycleTransaction markAllCompacting(final OperationType operationType) { - Callable<Iterable<SSTableReader>> callable = new Callable<Iterable<SSTableReader>>() + Callable<LifecycleTransaction> callable = new Callable<LifecycleTransaction>() { - public Iterable<SSTableReader> call() throws Exception + public LifecycleTransaction call() throws Exception { assert data.getCompacting().isEmpty() : data.getCompacting(); Collection<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables())); if (Iterables.isEmpty(sstables)) - return Collections.emptyList(); - boolean success = data.markCompacting(sstables); - assert success : "something marked things compacting while compactions are disabled"; - return sstables; + return null; + LifecycleTransaction modifier = data.tryModify(sstables, operationType); + assert modifier != null: "something marked things compacting while compactions are disabled"; + return modifier; } }; @@ -2634,12 +2697,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public int getMeanColumns() { - return data.getMeanColumns(); + long sum = 0; + long count = 0; + for (SSTableReader sstable : getSSTables()) + { + long n = sstable.getEstimatedColumnCount().count(); + sum += sstable.getEstimatedColumnCount().mean() * n; + count += n; + } + return count > 0 ? (int) (sum / count) : 0; } public long estimateKeys() { - return data.estimatedKeys(); + long n = 0; + for (SSTableReader sstable : getSSTables()) + n += sstable.estimatedKeys(); + return n; } /** true if this CFS contains secondary index data */ @@ -2703,18 +2777,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - /** - * Returns the creation time of the oldest memtable not fully flushed yet. - */ - public long oldestUnflushedMemtable() - { - return data.getView().getOldestMemtable().creationTime(); - } - public boolean isEmpty() { - DataTracker.View view = data.getView(); - return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable(); + View view = data.getView(); + return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size() == 0; } private boolean isRowCacheEnabled() @@ -2753,7 +2819,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public double getDroppableTombstoneRatio() { - return getDataTracker().getDroppableTombstoneRatio(); + double allDroppable = 0; + long allColumns = 0; + int localTime = (int)(System.currentTimeMillis()/1000); + + for (SSTableReader sstable : getSSTables()) + { + allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds()); + allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count(); + } + return allColumns > 0 ? allDroppable / allColumns : 0; } public long trueSnapshotsSize() @@ -2770,9 +2845,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // returns the "canonical" version of any current sstable, i.e. if an sstable is being replaced and is only partially // visible to reads, this sstable will be returned as its original entirety, and its replacement will not be returned // (even if it completely replaces it) - public static final Function<DataTracker.View, List<SSTableReader>> CANONICAL_SSTABLES = new Function<DataTracker.View, List<SSTableReader>>() + public static final Function<View, List<SSTableReader>> CANONICAL_SSTABLES = new Function<View, List<SSTableReader>>() { - public List<SSTableReader> apply(DataTracker.View view) + public List<SSTableReader> apply(View view) { List<SSTableReader> sstables = new ArrayList<>(); for (SSTableReader sstable : view.compacting) @@ -2785,9 +2860,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } }; - public static final Function<DataTracker.View, List<SSTableReader>> UNREPAIRED_SSTABLES = new Function<DataTracker.View, List<SSTableReader>>() + public static final Function<View, List<SSTableReader>> UNREPAIRED_SSTABLES = new Function<View, List<SSTableReader>>() { - public List<SSTableReader> apply(DataTracker.View view) + public List<SSTableReader> apply(View view) { List<SSTableReader> sstables = new ArrayList<>(); for (SSTableReader sstable : CANONICAL_SSTABLES.apply(view)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java deleted file mode 100644 index 36f22c5..0000000 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ /dev/null @@ -1,793 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.File; -import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicReference; - -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.*; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.notifications.*; -import org.apache.cassandra.utils.Interval; -import org.apache.cassandra.utils.IntervalTree; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.concurrent.Refs; - -public class DataTracker -{ - private static final Logger logger = LoggerFactory.getLogger(DataTracker.class); - - public final Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<>(); - public final ColumnFamilyStore cfstore; - private final AtomicReference<View> view; - - // Indicates if it is safe to load the initial sstables (may not be true when running in - //standalone processes meant to repair or upgrade sstables (e.g. standalone scrubber) - public final boolean loadsstables; - - public DataTracker(ColumnFamilyStore cfstore, boolean loadsstables) - { - this.cfstore = cfstore; - this.view = new AtomicReference<>(); - this.loadsstables = loadsstables; - this.init(); - } - - // get the Memtable that the ordered writeOp should be directed to - public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition) - { - // since any new memtables appended to the list after we fetch it will be for operations started - // after us, we can safely assume that we will always find the memtable that 'accepts' us; - // if the barrier for any memtable is set whilst we are reading the list, it must accept us. - - // there may be multiple memtables in the list that would 'accept' us, however we only ever choose - // the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't - // assign operations to a memtable that was retired/queued before we started) - for (Memtable memtable : view.get().liveMemtables) - { - if (memtable.accepts(opGroup, replayPosition)) - return memtable; - } - throw new AssertionError(view.get().liveMemtables.toString()); - } - - public Set<SSTableReader> getSSTables() - { - return view.get().sstables; - } - - public Set<SSTableReader> getUncompactingSSTables() - { - return view.get().nonCompactingSStables(); - } - - public Iterable<SSTableReader> getUncompactingSSTables(Iterable<SSTableReader> candidates) - { - final View v = view.get(); - return Iterables.filter(candidates, new Predicate<SSTableReader>() - { - public boolean apply(SSTableReader sstable) - { - return !v.compacting.contains(sstable); - } - }); - } - - public View getView() - { - return view.get(); - } - - /** - * Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables, - * returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until - * discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable - * must be followed by discarding(m), they cannot be interleaved. - * - * @return the previously active memtable - */ - public Memtable switchMemtable(boolean truncating) - { - Memtable newMemtable = new Memtable(cfstore); - Memtable toFlushMemtable; - View currentView, newView; - do - { - currentView = view.get(); - toFlushMemtable = currentView.getCurrentMemtable(); - newView = currentView.switchMemtable(newMemtable); - } - while (!view.compareAndSet(currentView, newView)); - - if (truncating) - notifyRenewed(newMemtable); - - return toFlushMemtable; - } - - public void markFlushing(Memtable memtable) - { - View currentView, newView; - do - { - currentView = view.get(); - newView = currentView.markFlushing(memtable); - } - while (!view.compareAndSet(currentView, newView)); - } - - public void replaceFlushed(Memtable memtable, SSTableReader sstable) - { - // sstable may be null if we flushed batchlog and nothing needed to be retained - - if (!cfstore.isValid()) - { - View currentView, newView; - do - { - currentView = view.get(); - newView = currentView.replaceFlushed(memtable, sstable); - if (sstable != null) - newView = newView.replace(Arrays.asList(sstable), Collections.<SSTableReader>emptyList()); - } - while (!view.compareAndSet(currentView, newView)); - return; - } - - // back up before creating a new View (which makes the new one eligible for compaction) - if (sstable != null) - maybeIncrementallyBackup(sstable); - - View currentView, newView; - do - { - currentView = view.get(); - newView = currentView.replaceFlushed(memtable, sstable); - } - while (!view.compareAndSet(currentView, newView)); - - if (sstable != null) - { - addNewSSTablesSize(Arrays.asList(sstable)); - notifyAdded(sstable); - } - } - - public void maybeIncrementallyBackup(final SSTableReader sstable) - { - if (!DatabaseDescriptor.isIncrementalBackupsEnabled()) - return; - - File backupsDir = Directories.getBackupsDirectory(sstable.descriptor); - sstable.createLinks(FileUtils.getCanonicalPath(backupsDir)); - } - - /** - * @return true if we are able to mark the given @param sstables as compacted, before anyone else - * - * Note that we could acquire references on the marked sstables and release them in - * unmarkCompacting, but since we will never call markObsolete on a sstable marked - * as compacting (unless there is a serious bug), we can skip this. - */ - public boolean markCompacting(Iterable<SSTableReader> sstables) - { - return markCompacting(sstables, false, false); - } - - public boolean markCompacting(Iterable<SSTableReader> sstables, boolean newTables, boolean offline) - { - assert sstables != null && !Iterables.isEmpty(sstables); - while (true) - { - final View currentView = view.get(); - if (Iterables.any(sstables, Predicates.in(currentView.compacting))) - return false; - - Predicate<SSTableReader> live = new Predicate<SSTableReader>() - { - public boolean apply(SSTableReader sstable) - { - return currentView.sstablesMap.get(sstable) == sstable && !sstable.isMarkedCompacted(); - } - }; - if (newTables) - assert !Iterables.any(sstables, Predicates.in(currentView.sstables)); - else if (!offline && !Iterables.all(sstables, live)) - return false; - - View newView = currentView.markCompacting(sstables); - if (view.compareAndSet(currentView, newView)) - return true; - } - } - - /** - * Removes files from compacting status: this is different from 'markObsolete' - * because it should be run regardless of whether a compaction succeeded. - */ - public void unmarkCompacting(Iterable<SSTableReader> unmark) - { - View currentView, newView; - do - { - currentView = view.get(); - newView = currentView.unmarkCompacting(unmark); - } - while (!view.compareAndSet(currentView, newView)); - - if (!cfstore.isValid()) - { - // when the CFS is invalidated, it will call unreferenceSSTables(). However, unreferenceSSTables only deals - // with sstables that aren't currently being compacted. If there are ongoing compactions that finish or are - // interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here. - unreferenceSSTables(); - } - } - - public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType) - { - removeSSTablesFromTracker(sstables); - releaseReferences(sstables, false); - notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType); - } - - /** - * - * @param oldSSTables - * @param allReplacements - * @param compactionType - */ - // note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners - // that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call - public void markCompactedSSTablesReplaced(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> allReplacements, OperationType compactionType) - { - removeSSTablesFromTracker(oldSSTables); - releaseReferences(oldSSTables, false); - notifySSTablesChanged(oldSSTables, allReplacements, compactionType); - addNewSSTablesSize(allReplacements); - } - - public void addInitialSSTables(Collection<SSTableReader> sstables) - { - addSSTablesToTracker(sstables); - // no notifications or backup necessary - } - - public void addSSTables(Collection<SSTableReader> sstables) - { - addSSTablesToTracker(sstables); - for (SSTableReader sstable : sstables) - { - maybeIncrementallyBackup(sstable); - notifyAdded(sstable); - } - } - - /** - * Replaces existing sstables with new instances, makes sure compaction strategies have the correct instance - * - * @param toReplace - * @param replaceWith - */ - public void replaceWithNewInstances(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith) - { - replaceReaders(toReplace, replaceWith, true); - } - - /** - * Adds the early opened files to the data tracker, but does not tell compaction strategies about it - * - * note that we dont track the live size of these sstables - * @param toReplace - * @param replaceWith - */ - public void replaceEarlyOpenedFiles(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith) - { - for (SSTableReader s : toReplace) - assert s.openReason == SSTableReader.OpenReason.EARLY; - // note that we can replace an early opened file with a real one - replaceReaders(toReplace, replaceWith, false); - } - - /** - * removes all sstables that are not busy compacting. - */ - public void unreferenceSSTables() - { - Set<SSTableReader> notCompacting; - - View currentView, newView; - do - { - currentView = view.get(); - if (!currentView.compacting.isEmpty()) - logger.error("Set of compacting sstables is non-empty when invalidating sstables {}", currentView.compacting); - notCompacting = currentView.nonCompactingSStables(); - newView = currentView.replace(notCompacting, Collections.<SSTableReader>emptySet()); - } - while (!view.compareAndSet(currentView, newView)); - - if (notCompacting.isEmpty()) - { - // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion" - return; - } - notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN); - removeOldSSTablesSize(notCompacting); - releaseReferences(notCompacting, true); - } - - /** - * Removes every SSTable in the directory from the DataTracker's view. - * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily. - */ - void removeUnreadableSSTables(File directory) - { - View currentView, newView; - Set<SSTableReader> remaining = new HashSet<>(); - do - { - currentView = view.get(); - for (SSTableReader r : currentView.nonCompactingSStables()) - if (!r.descriptor.directory.equals(directory)) - remaining.add(r); - - if (remaining.size() == currentView.nonCompactingSStables().size()) - return; - - newView = currentView.replace(currentView.sstables, remaining); - } - while (!view.compareAndSet(currentView, newView)); - for (SSTableReader sstable : currentView.sstables) - if (!remaining.contains(sstable)) - sstable.selfRef().release(); - notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN); - } - - /** (Re)initializes the tracker, purging all references. */ - void init() - { - view.set(new View( - ImmutableList.of(new Memtable(cfstore)), - ImmutableList.<Memtable>of(), - Collections.<SSTableReader, SSTableReader>emptyMap(), - Collections.<SSTableReader>emptySet(), - Collections.<SSTableReader>emptySet(), - SSTableIntervalTree.empty())); - } - - /** - * A special kind of replacement for SSTableReaders that were cloned with a new index summary sampling level (see - * SSTableReader.cloneWithNewSummarySamplingLevel and CASSANDRA-5519). This does not mark the old reader - * as compacted. - * @param oldSSTables replaced readers - * @param newSSTables replacement readers - */ - private void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify) - { - View currentView, newView; - do - { - currentView = view.get(); - newView = currentView.replace(oldSSTables, newSSTables); - } - while (!view.compareAndSet(currentView, newView)); - - if (!oldSSTables.isEmpty() && notify) - notifySSTablesChanged(oldSSTables, newSSTables, OperationType.UNKNOWN); - - for (SSTableReader sstable : newSSTables) - sstable.setTrackedBy(this); - - Refs.release(Refs.selfRefs(oldSSTables)); - } - - private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables) - { - View currentView, newView; - do - { - currentView = view.get(); - newView = currentView.replace(oldSSTables, Collections.<SSTableReader>emptyList()); - } - while (!view.compareAndSet(currentView, newView)); - removeOldSSTablesSize(oldSSTables); - } - - private void addSSTablesToTracker(Collection<SSTableReader> sstables) - { - View currentView, newView; - do - { - currentView = view.get(); - newView = currentView.replace(Collections.<SSTableReader>emptyList(), sstables); - } - while (!view.compareAndSet(currentView, newView)); - addNewSSTablesSize(sstables); - } - - private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables) - { - for (SSTableReader sstable : newSSTables) - { - if (logger.isDebugEnabled()) - logger.debug(String.format("adding %s to list of files tracked for %s.%s", - sstable.descriptor, cfstore.keyspace.getName(), cfstore.name)); - long size = sstable.bytesOnDisk(); - StorageMetrics.load.inc(size); - cfstore.metric.liveDiskSpaceUsed.inc(size); - cfstore.metric.totalDiskSpaceUsed.inc(size); - sstable.setTrackedBy(this); - } - } - - private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables) - { - for (SSTableReader sstable : oldSSTables) - { - if (logger.isDebugEnabled()) - logger.debug(String.format("removing %s from list of files tracked for %s.%s", - sstable.descriptor, cfstore.keyspace.getName(), cfstore.name)); - long size = sstable.bytesOnDisk(); - StorageMetrics.load.dec(size); - cfstore.metric.liveDiskSpaceUsed.dec(size); - } - } - - private void releaseReferences(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted) - { - for (SSTableReader sstable : oldSSTables) - { - boolean firstToCompact = sstable.markObsolete(); - assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted"; - sstable.selfRef().release(); - } - } - - public void spaceReclaimed(long size) - { - cfstore.metric.totalDiskSpaceUsed.dec(size); - } - - public long estimatedKeys() - { - long n = 0; - for (SSTableReader sstable : getSSTables()) - n += sstable.estimatedKeys(); - return n; - } - - public int getMeanColumns() - { - long sum = 0; - long count = 0; - for (SSTableReader sstable : getSSTables()) - { - long n = sstable.getEstimatedColumnCount().count(); - sum += sstable.getEstimatedColumnCount().mean() * n; - count += n; - } - return count > 0 ? (int) (sum / count) : 0; - } - - public double getDroppableTombstoneRatio() - { - double allDroppable = 0; - long allColumns = 0; - int localTime = (int)(System.currentTimeMillis()/1000); - - for (SSTableReader sstable : getSSTables()) - { - allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds()); - allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count(); - } - return allColumns > 0 ? allDroppable / allColumns : 0; - } - - public void notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType) - { - INotification notification = new SSTableListChangedNotification(added, removed, compactionType); - for (INotificationConsumer subscriber : subscribers) - subscriber.handleNotification(notification, this); - } - - public void notifyAdded(SSTableReader added) - { - INotification notification = new SSTableAddedNotification(added); - for (INotificationConsumer subscriber : subscribers) - subscriber.handleNotification(notification, this); - } - - public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged) - { - INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged); - for (INotificationConsumer subscriber : subscribers) - subscriber.handleNotification(notification, this); - - } - - public void notifyDeleting(SSTableReader deleting) - { - INotification notification = new SSTableDeletingNotification(deleting); - for (INotificationConsumer subscriber : subscribers) - subscriber.handleNotification(notification, this); - } - - public void notifyRenewed(Memtable renewed) - { - INotification notification = new MemtableRenewedNotification(renewed); - for (INotificationConsumer subscriber : subscribers) - subscriber.handleNotification(notification, this); - } - - public void notifyTruncated(long truncatedAt) - { - INotification notification = new TruncationNotification(truncatedAt); - for (INotificationConsumer subscriber : subscribers) - subscriber.handleNotification(notification, this); - } - - public void subscribe(INotificationConsumer consumer) - { - subscribers.add(consumer); - } - - public void unsubscribe(INotificationConsumer consumer) - { - subscribers.remove(consumer); - } - - public static SSTableIntervalTree buildIntervalTree(Iterable<SSTableReader> sstables) - { - return new SSTableIntervalTree(buildIntervals(sstables)); - } - - public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables) - { - List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables)); - for (SSTableReader sstable : sstables) - intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable)); - return intervals; - } - - public Set<SSTableReader> getCompacting() - { - return getView().compacting; - } - - public static class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>> - { - private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null); - - private SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals) - { - super(intervals); - } - - public static SSTableIntervalTree empty() - { - return EMPTY; - } - } - - /** - * An immutable structure holding the current memtable, the memtables pending - * flush, the sstables for a column family, and the sstables that are active - * in compaction (a subset of the sstables). - */ - public static class View - { - /** - * ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush - * and the new replacement memtable, until all outstanding write operations on the old table complete. - * The last item in the list is always the "current" memtable. - */ - private final List<Memtable> liveMemtables; - /** - * contains all memtables that are no longer referenced for writing and are queued for / in the process of being - * flushed. In chronologically ascending order. - */ - private final List<Memtable> flushingMemtables; - public final Set<SSTableReader> compacting; - public final Set<SSTableReader> sstables; - // we use a Map here so that we can easily perform identity checks as well as equality checks. - // When marking compacting, we now indicate if we expect the sstables to be present (by default we do), - // and we then check that not only are they all present in the live set, but that the exact instance present is - // the one we made our decision to compact against. - public final Map<SSTableReader, SSTableReader> sstablesMap; - - // all sstables that are still in the live set, but have been completely shadowed by a replacement sstable - public final Set<SSTableReader> shadowed; - public final SSTableIntervalTree intervalTree; - - View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> shadowed, SSTableIntervalTree intervalTree) - { - this.shadowed = shadowed; - assert liveMemtables != null; - assert flushingMemtables != null; - assert sstables != null; - assert compacting != null; - assert intervalTree != null; - - this.liveMemtables = liveMemtables; - this.flushingMemtables = flushingMemtables; - - this.sstablesMap = sstables; - this.sstables = sstablesMap.keySet(); - this.compacting = compacting; - this.intervalTree = intervalTree; - } - - public Memtable getOldestMemtable() - { - if (!flushingMemtables.isEmpty()) - return flushingMemtables.get(0); - return liveMemtables.get(0); - } - - public Memtable getCurrentMemtable() - { - return liveMemtables.get(liveMemtables.size() - 1); - } - - public Iterable<Memtable> getMemtablesPendingFlush() - { - if (liveMemtables.size() == 1) - return flushingMemtables; - return Iterables.concat(liveMemtables.subList(0, 1), flushingMemtables); - } - - /** - * @return the active memtable and all the memtables that are pending flush. - */ - public Iterable<Memtable> getAllMemtables() - { - return Iterables.concat(flushingMemtables, liveMemtables); - } - - public Sets.SetView<SSTableReader> nonCompactingSStables() - { - return Sets.difference(ImmutableSet.copyOf(sstables), compacting); - } - - View switchMemtable(Memtable newMemtable) - { - List<Memtable> newLiveMemtables = ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build(); - return new View(newLiveMemtables, flushingMemtables, sstablesMap, compacting, shadowed, intervalTree); - } - - View markFlushing(Memtable toFlushMemtable) - { - List<Memtable> live = liveMemtables, flushing = flushingMemtables; - - // since we can have multiple flushes queued, we may occasionally race and start a flush out of order, - // so must locate it in the list to remove, rather than just removing from the beginning - int i = live.indexOf(toFlushMemtable); - assert i < live.size() - 1; - List<Memtable> newLive = ImmutableList.<Memtable>builder() - .addAll(live.subList(0, i)) - .addAll(live.subList(i + 1, live.size())) - .build(); - - // similarly, if we out-of-order markFlushing once, we may afterwards need to insert a memtable into the - // flushing list in a position other than the end, though this will be rare - i = flushing.size(); - while (i > 0 && flushing.get(i - 1).creationTime() > toFlushMemtable.creationTime()) - i--; - List<Memtable> newFlushing = ImmutableList.<Memtable>builder() - .addAll(flushing.subList(0, i)) - .add(toFlushMemtable) - .addAll(flushing.subList(i, flushing.size())) - .build(); - - return new View(newLive, newFlushing, sstablesMap, compacting, shadowed, intervalTree); - } - - View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable) - { - int index = flushingMemtables.indexOf(flushedMemtable); - List<Memtable> newQueuedMemtables = ImmutableList.<Memtable>builder() - .addAll(flushingMemtables.subList(0, index)) - .addAll(flushingMemtables.subList(index + 1, flushingMemtables.size())) - .build(); - Map<SSTableReader, SSTableReader> newSSTables = sstablesMap; - SSTableIntervalTree intervalTree = this.intervalTree; - if (newSSTable != null) - { - assert !sstables.contains(newSSTable); - assert !shadowed.contains(newSSTable); - newSSTables = ImmutableMap.<SSTableReader, SSTableReader>builder() - .putAll(sstablesMap).put(newSSTable, newSSTable).build(); - intervalTree = buildIntervalTree(newSSTables.keySet()); - } - return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, shadowed, intervalTree); - } - - View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements) - { - ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables); - int newSSTablesSize = shadowed.size() + sstables.size() - oldSSTables.size() + Iterables.size(replacements); - assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this); - Map<SSTableReader, SSTableReader> newSSTables = new HashMap<>(newSSTablesSize); - Set<SSTableReader> newShadowed = new HashSet<>(shadowed.size()); - - for (SSTableReader sstable : sstables) - if (!oldSet.contains(sstable)) - newSSTables.put(sstable, sstable); - - for (SSTableReader sstable : shadowed) - if (!oldSet.contains(sstable)) - newShadowed.add(sstable); - - for (SSTableReader replacement : replacements) - { - if (replacement.openReason == SSTableReader.OpenReason.SHADOWED) - newShadowed.add(replacement); - else - newSSTables.put(replacement, replacement); - } - - assert newSSTables.size() + newShadowed.size() == newSSTablesSize : - String.format("Expecting new size of %d, got %d while replacing %s by %s in %s", - newSSTablesSize, newSSTables.size() + newShadowed.size(), oldSSTables, replacements, this); - newShadowed = ImmutableSet.copyOf(newShadowed); - newSSTables = ImmutableMap.copyOf(newSSTables); - SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables.keySet()); - return new View(liveMemtables, flushingMemtables, newSSTables, compacting, newShadowed, intervalTree); - } - - View markCompacting(Iterable<SSTableReader> tomark) - { - Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build(); - return new View(liveMemtables, flushingMemtables, sstablesMap, compactingNew, shadowed, intervalTree); - } - - View unmarkCompacting(Iterable<SSTableReader> tounmark) - { - Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark))); - return new View(liveMemtables, flushingMemtables, sstablesMap, compactingNew, shadowed, intervalTree); - } - - @Override - public String toString() - { - return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting); - } - - public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds) - { - if (intervalTree.isEmpty()) - return Collections.emptyList(); - RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right; - return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree)); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 589958e..df8820b 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -250,7 +250,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean protected synchronized void compact() { ArrayList<Descriptor> descriptors = new ArrayList<>(); - for (SSTable sstable : hintStore.getDataTracker().getUncompactingSSTables()) + for (SSTable sstable : hintStore.getTracker().getUncompacting()) descriptors.add(sstable.descriptor); if (descriptors.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index f30bdaa..1d86784 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,7 +146,7 @@ public class Keyspace } /** - * Removes every SSTable in the directory from the appropriate DataTracker's view. + * Removes every SSTable in the directory from the appropriate Tracker's view. * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily. */ public static void removeUnreadableSSTables(File directory) @@ -276,6 +275,18 @@ public class Keyspace } } + private Keyspace(KSMetaData metadata) + { + this.metadata = metadata; + createReplicationStrategy(metadata); + this.metric = new KeyspaceMetrics(this); + } + + public static Keyspace mockKS(KSMetaData metadata) + { + return new Keyspace(metadata); + } + public void createReplicationStrategy(KSMetaData ksm) { replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index eab64ae..55b0bfe 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -27,6 +27,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; @@ -46,7 +49,7 @@ import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.memory.*; -public class Memtable +public class Memtable implements Comparable<Memtable> { private static final Logger logger = LoggerFactory.getLogger(Memtable.class); @@ -64,6 +67,11 @@ public class Memtable // the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly private final ReplayPosition minReplayPosition = CommitLog.instance.getContext(); + public int compareTo(Memtable that) + { + return this.minReplayPosition.compareTo(that.minReplayPosition); + } + public static final class LastReplayPosition extends ReplayPosition { public LastReplayPosition(ReplayPosition copy) { @@ -92,6 +100,15 @@ public class Memtable this.cfs.scheduleFlush(); } + // ONLY to be used for testing, to create a mock Memtable + @VisibleForTesting + public Memtable(CFMetaData metadata) + { + this.initialComparator = metadata.comparator; + this.cfs = null; + this.allocator = null; + } + public MemtableAllocator getAllocator() { return allocator; @@ -107,7 +124,8 @@ public class Memtable return currentOperations.get(); } - void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition) + @VisibleForTesting + public void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition) { assert this.writeBarrier == null; this.lastReplayPosition = lastReplayPosition; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 29826b8..38107c0 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@ -175,9 +176,9 @@ public abstract class AbstractCompactionStrategy */ public abstract AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore); - public AbstractCompactionTask getCompactionTask(Collection<SSTableReader> sstables, final int gcBefore, long maxSSTableBytes) + public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final int gcBefore, long maxSSTableBytes) { - return new CompactionTask(cfs, sstables, gcBefore, false); + return new CompactionTask(cfs, txn, gcBefore, false); } /** @@ -231,7 +232,7 @@ public abstract class AbstractCompactionStrategy */ public void replaceFlushed(Memtable memtable, SSTableReader sstable) { - cfs.getDataTracker().replaceFlushed(memtable, sstable); + cfs.getTracker().replaceFlushed(memtable, sstable); if (sstable != null) CompactionManager.instance.submitBackground(cfs); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index ac646ef..3bf224e 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -24,27 +24,28 @@ import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorSt import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; public abstract class AbstractCompactionTask extends WrappedRunnable { protected final ColumnFamilyStore cfs; - protected Set<SSTableReader> sstables; + protected LifecycleTransaction transaction; protected boolean isUserDefined; protected OperationType compactionType; /** * @param cfs - * @param sstables must be marked compacting + * @param transaction the modifying managing the status of the sstables we're replacing */ - public AbstractCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables) + public AbstractCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction transaction) { this.cfs = cfs; - this.sstables = sstables; + this.transaction = transaction; this.isUserDefined = false; this.compactionType = OperationType.COMPACTION; // enforce contract that caller should mark sstables compacting - Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting(); - for (SSTableReader sstable : sstables) + Set<SSTableReader> compacting = transaction.tracker.getCompacting(); + for (SSTableReader sstable : transaction.originals()) assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting"; } @@ -59,10 +60,10 @@ public abstract class AbstractCompactionTask extends WrappedRunnable } finally { - cfs.getDataTracker().unmarkCompacting(sstables); + transaction.close(); } } - public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables); + public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables); protected abstract int executeInternal(CompactionExecutorStatsCollector collector); @@ -80,6 +81,6 @@ public abstract class AbstractCompactionTask extends WrappedRunnable public String toString() { - return "CompactionTask(" + sstables + ")"; + return "CompactionTask(" + transaction + ")"; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index a49a3ea..2292e01 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -24,6 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; +import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.utils.AlwaysPresentFilter; @@ -31,7 +33,7 @@ import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.OverlapIterator; import org.apache.cassandra.utils.concurrent.Refs; -import static org.apache.cassandra.db.DataTracker.buildIntervals; +import static org.apache.cassandra.db.lifecycle.SSTableIntervalTree.buildIntervals; /** * Manage compaction options.