http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index a526ec9..8029075 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -20,33 +20,28 @@ package org.apache.cassandra.io.sstable; import java.util.*; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.utils.CLibrary; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.concurrent.Refs; import org.apache.cassandra.utils.concurrent.Transactional; -import static org.apache.cassandra.utils.Throwables.merge; - /** * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb * we look in the summary we're collecting for the latest writer for the penultimate key that we know to have been fully * flushed to the index file, and then double check that the key is fully present in the flushed data file. - * Then we move the starts of each reader forwards to that point, replace them in the datatracker, and attach a runnable + * Then we move the starts of each reader forwards to that point, replace them in the Tracker, and attach a runnable * for on-close (i.e. when all references expire) that drops the page cache prior to that key position * * hard-links are created for each partially written sstable so that readers opened against them continue to work past * the rename of the temporary file, which is deleted once all readers against the hard-link have been closed. - * If for any reason the writer is rolled over, we immediately rename and fully expose the completed file in the DataTracker. + * If for any reason the writer is rolled over, we immediately rename and fully expose the completed file in the Tracker. * * On abort we restore the original lower bounds to the existing readers and delete any temporary files we had in progress, * but leave any hard-links in place for the readers we opened to cleanup when they're finished as we would had we finished @@ -74,26 +69,19 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme return preemptiveOpenInterval; } - private final DataTracker dataTracker; private final ColumnFamilyStore cfs; private final long maxAge; private long repairedAt = -1; // the set of final readers we will expose on commit + private final LifecycleTransaction transaction; // the readers we are rewriting (updated as they are replaced) private final List<SSTableReader> preparedForCommit = new ArrayList<>(); - private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced) - private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting - private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at - private final List<Finished> finishedWriters = new ArrayList<>(); - // as writers are closed from finishedWriters, their last readers are moved into discard, so that abort can cleanup - // after us safely; we use a set so we can add in both prepareToCommit and abort - private final Set<SSTableReader> discard = new HashSet<>(); - // true for operations that are performed without Cassandra running (prevents updates of DataTracker) - private final boolean isOffline; + private final List<SSTableWriter> writers = new ArrayList<>(); + private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of Tracker) private SSTableWriter writer; private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>(); @@ -101,15 +89,11 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme // for testing (TODO: remove when have byteman setup) private boolean throwEarly, throwLate; - public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline) + public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline) { - this.rewriting = rewriting; - for (SSTableReader sstable : rewriting) - { - originalStarts.put(sstable.descriptor, sstable.first); + this.transaction = transaction; + for (SSTableReader sstable : this.transaction.originals()) fileDescriptors.put(sstable.descriptor, CLibrary.getfd(sstable.getFilename())); - } - this.dataTracker = cfs.getDataTracker(); this.cfs = cfs; this.maxAge = maxAge; this.isOffline = isOffline; @@ -134,7 +118,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme else { boolean save = false; - for (SSTableReader reader : rewriting) + for (SSTableReader reader : transaction.originals()) { if (reader.getCachedPosition(row.key, false) != null) { @@ -170,7 +154,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme { if (isOffline) { - for (SSTableReader reader : rewriting) + for (SSTableReader reader : transaction.originals()) { RowIndexEntry index = reader.getPosition(key, SSTableReader.Operator.GE); CLibrary.trySkipCache(fileDescriptors.get(reader.descriptor), 0, index == null ? 0 : index.position); @@ -181,10 +165,10 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme SSTableReader reader = writer.setMaxDataAge(maxAge).openEarly(); if (reader != null) { - replaceEarlyOpenedFile(currentlyOpenedEarly, reader); - currentlyOpenedEarly = reader; + transaction.update(reader, false); currentlyOpenedEarlyAt = writer.getFilePointer(); - moveStarts(reader, reader.last, false); + moveStarts(reader, reader.last); + transaction.checkpoint(); } } } @@ -192,59 +176,19 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme protected Throwable doAbort(Throwable accumulate) { - try - { - moveStarts(null, null, true); - } - catch (Throwable t) - { - accumulate = merge(accumulate, t); - } - - // cleanup any sstables we prepared for commit - for (SSTableReader sstable : preparedForCommit) - { - try - { - sstable.markObsolete(); - sstable.selfRef().release(); - } - catch (Throwable t) - { - accumulate = merge(accumulate , t); - } - } - - // abort the writers, and add the early opened readers to our discard pile - - if (writer != null) - finishedWriters.add(new Finished(writer, currentlyOpenedEarly)); - - for (Finished finished : finishedWriters) - { - accumulate = finished.writer.abort(accumulate); - - // if we've already been opened, add ourselves to the discard pile - if (finished.reader != null) - discard.add(finished.reader); - } - - accumulate = replaceWithFinishedReaders(Collections.<SSTableReader>emptyList(), accumulate); + // abort the writers + for (SSTableWriter writer : writers) + accumulate = writer.abort(accumulate); + // abort the lifecycle transaction + accumulate = transaction.abort(accumulate); return accumulate; } protected Throwable doCommit(Throwable accumulate) { - for (Finished f : finishedWriters) - accumulate = f.writer.commit(accumulate); - accumulate = replaceWithFinishedReaders(preparedForCommit, accumulate); - - return accumulate; - } - - protected Throwable doCleanup(Throwable accumulate) - { - // we have no state of our own to cleanup; Transactional objects cleanup their own state in abort or commit + for (SSTableWriter writer : writers) + accumulate = writer.commit(accumulate); + accumulate = transaction.commit(accumulate); return accumulate; } @@ -260,100 +204,70 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme * * @param newReader the rewritten reader that replaces them for this region * @param lowerbound if !reset, must be non-null, and marks the exclusive lowerbound of the start for each sstable - * @param reset true iff we are restoring earlier starts (increasing the range over which they are valid) */ - private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound, boolean reset) + private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound) { if (isOffline) return; if (preemptiveOpenInterval == Long.MAX_VALUE) return; - List<SSTableReader> toReplace = new ArrayList<>(); - List<SSTableReader> replaceWith = new ArrayList<>(); final List<DecoratedKey> invalidateKeys = new ArrayList<>(); - if (!reset) - { - invalidateKeys.addAll(cachedKeys.keySet()); - for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet()) - newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue()); - } + invalidateKeys.addAll(cachedKeys.keySet()); + for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet()) + newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue()); cachedKeys = new HashMap<>(); - for (SSTableReader sstable : ImmutableList.copyOf(rewriting)) + for (SSTableReader sstable : transaction.originals()) { // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once. // note: only one such writer should be written to at any moment - final SSTableReader latest = sstable.getCurrentReplacement(); - SSTableReader replacement; - if (reset) - { - DecoratedKey newStart = originalStarts.get(sstable.descriptor); - replacement = latest.cloneWithNewStart(newStart, null); - } - else - { - // skip any sstables that we know to already be shadowed - if (latest.openReason == SSTableReader.OpenReason.SHADOWED) - continue; - if (latest.first.compareTo(lowerbound) > 0) - continue; + final SSTableReader latest = transaction.current(sstable); - final Runnable runOnClose = new Runnable() - { - public void run() - { - // this is somewhat racey, in that we could theoretically be closing this old reader - // when an even older reader is still in use, but it's not likely to have any major impact - for (DecoratedKey key : invalidateKeys) - latest.invalidateCacheKey(key); - } - }; + // skip any sstables that we know to already be shadowed + if (latest.first.compareTo(lowerbound) > 0) + continue; - if (lowerbound.compareTo(latest.last) >= 0) + final Runnable runOnClose = new Runnable() + { + public void run() { - replacement = latest.cloneAsShadowed(runOnClose); + // this is somewhat racey, in that we could theoretically be closing this old reader + // when an even older reader is still in use, but it's not likely to have any major impact + for (DecoratedKey key : invalidateKeys) + latest.invalidateCacheKey(key); } - else + }; + + if (lowerbound.compareTo(latest.last) >= 0) + { + if (!transaction.isObsolete(latest)) { - DecoratedKey newStart = latest.firstKeyBeyond(lowerbound); - assert newStart != null; - replacement = latest.cloneWithNewStart(newStart, runOnClose); + latest.runOnClose(runOnClose); + transaction.obsolete(latest); } + continue; } - toReplace.add(latest); - replaceWith.add(replacement); - rewriting.remove(sstable); - rewriting.add(replacement); + DecoratedKey newStart = latest.firstKeyBeyond(lowerbound); + assert newStart != null; + SSTableReader replacement = latest.cloneWithNewStart(newStart, runOnClose); + transaction.update(replacement, true); } - cfs.getDataTracker().replaceWithNewInstances(toReplace, replaceWith); - } - - private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith) - { - if (isOffline) - return; - Set<SSTableReader> toReplaceSet; - if (toReplace != null) - { - toReplace.setReplacedBy(replaceWith); - toReplaceSet = Collections.singleton(toReplace); - } - else - { - dataTracker.markCompacting(Collections.singleton(replaceWith), true, isOffline); - toReplaceSet = Collections.emptySet(); - } - dataTracker.replaceEarlyOpenedFiles(toReplaceSet, Collections.singleton(replaceWith)); } public void switchWriter(SSTableWriter newWriter) { + if (newWriter != null) + writers.add(newWriter.setMaxDataAge(maxAge)); + if (writer == null || writer.getFilePointer() == 0) { if (writer != null) + { writer.abort(); + writers.remove(writer); + } writer = newWriter; return; } @@ -361,14 +275,13 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme SSTableReader reader = null; if (preemptiveOpenInterval != Long.MAX_VALUE) { - // we leave it as a tmp file, but we open it and add it to the dataTracker + // we leave it as a tmp file, but we open it and add it to the Tracker reader = writer.setMaxDataAge(maxAge).openFinalEarly(); - replaceEarlyOpenedFile(currentlyOpenedEarly, reader); - moveStarts(reader, reader.last, false); + transaction.update(reader, false); + moveStarts(reader, reader.last); + transaction.checkpoint(); } - finishedWriters.add(new Finished(writer, reader)); - currentlyOpenedEarly = null; currentlyOpenedEarlyAt = 0; writer = newWriter; } @@ -387,12 +300,12 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme /** * Finishes the new file(s) * - * Creates final files, adds the new files to the dataTracker (via replaceReader). + * Creates final files, adds the new files to the Tracker (via replaceReader). * * We add them to the tracker to be able to get rid of the tmpfiles * * It is up to the caller to do the compacted sstables replacement - * gymnastics (ie, call DataTracker#markCompactedSSTablesReplaced(..)) + * gymnastics (ie, call Tracker#markCompactedSSTablesReplaced(..)) * * */ @@ -402,6 +315,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme return finished(); } + // returns, in list form, the public List<SSTableReader> finished() { assert state() == State.COMMITTED || state() == State.READY_TO_COMMIT; @@ -416,82 +330,31 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme throw new RuntimeException("exception thrown early in finish, for testing"); // No early open to finalize and replace - for (Finished f : finishedWriters) + for (SSTableWriter writer : writers) { - if (f.reader != null) - discard.add(f.reader); - - f.writer.setRepairedAt(repairedAt).setMaxDataAge(maxAge).setOpenResult(true).prepareToCommit(); - SSTableReader newReader = f.writer.finished(); - - if (f.reader != null) - f.reader.setReplacedBy(newReader); - - preparedForCommit.add(newReader); + assert writer.getFilePointer() > 0; + writer.setRepairedAt(repairedAt).setOpenResult(true).prepareToCommit(); + SSTableReader reader = writer.finished(); + transaction.update(reader, false); + preparedForCommit.add(reader); } + transaction.checkpoint(); if (throwLate) throw new RuntimeException("exception thrown after all sstables finished, for testing"); - } - @VisibleForTesting - void throwDuringPrepare(boolean throwEarly) - { - this.throwEarly = throwEarly; - this.throwLate = !throwEarly; - } + // TODO: do we always want to avoid obsoleting if offline? + if (!isOffline) + transaction.obsoleteOriginals(); - // cleanup all our temporary readers and swap in our new ones - private Throwable replaceWithFinishedReaders(List<SSTableReader> finished, Throwable accumulate) - { - if (isOffline) - { - for (SSTableReader reader : discard) - { - try - { - if (reader.getCurrentReplacement() == reader) - reader.markObsolete(); - } - catch (Throwable t) - { - accumulate = merge(accumulate, t); - } - } - accumulate = Refs.release(Refs.selfRefs(discard), accumulate); - } - else - { - try - { - dataTracker.replaceEarlyOpenedFiles(discard, finished); - } - catch (Throwable t) - { - accumulate = merge(accumulate, t); - } - try - { - dataTracker.unmarkCompacting(discard); - } - catch (Throwable t) - { - accumulate = merge(accumulate, t); - } - } - discard.clear(); - return accumulate; + transaction.prepareToCommit(); } - private static final class Finished + public void throwDuringPrepare(boolean earlyException) { - final SSTableWriter writer; - final SSTableReader reader; - - private Finished(SSTableWriter writer, SSTableReader reader) - { - this.writer = writer; - this.reader = reader; - } + if (earlyException) + throwEarly = true; + else + throwLate = true; } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 23c27b0..8e701b3 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -46,6 +46,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.dht.*; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.*; @@ -122,7 +123,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR * managed as another resource each instance tracks its own Ref instance to, to ensure all of these resources are * cleaned up safely and can be debugged otherwise. * - * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies + * TODO: fill in details about Tracker and lifecycle interactions for tools, and for compaction strategies */ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader> { @@ -141,6 +142,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } }; + // it's just an object, which we use regular Object equality on; we introduce a special class just for easy recognition + public static final class UniqueIdentifier {} + public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>() { public int compare(SSTableReader o1, SSTableReader o2) @@ -170,11 +174,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS NORMAL, EARLY, METADATA_CHANGE, - MOVED_START, - SHADOWED // => MOVED_START past end + MOVED_START } public final OpenReason openReason; + public final UniqueIdentifier instanceId = new UniqueIdentifier(); // indexfile and datafile: might be null before a call to load() protected SegmentedFile ifile; @@ -594,9 +598,22 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return ifile.path(); } - public void setTrackedBy(DataTracker tracker) + // this is only used for restoring tracker state at delete (and wiring up the keycache) and so + // should only be called once it is actually added to the tracker + public void setupDeleteNotification(Tracker tracker) { tidy.type.deletingTask.setTracker(tracker); + setupKeyCache(); + } + + @VisibleForTesting + public boolean isDeleteNotificationSetup() + { + return tidy.type.deletingTask.getTracker() != null; + } + + public void setupKeyCache() + { // under normal operation we can do this at any time, but SSTR is also used outside C* proper, // e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache // here when we know we're being wired into the rest of the server infrastructure. @@ -908,15 +925,38 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } } - public void setReplacedBy(SSTableReader replacement) + public void setReplaced() { synchronized (tidy.global) { - assert replacement != null; assert !tidy.isReplaced; - assert tidy.global.live == this; tidy.isReplaced = true; - tidy.global.live = replacement; + } + } + + public boolean isReplaced() + { + synchronized (tidy.global) + { + return tidy.isReplaced; + } + } + + public void runOnClose(final Runnable runOnClose) + { + synchronized (tidy.global) + { + final Runnable existing = tidy.runOnClose; + tidy.runOnClose = existing == null + ? runOnClose + : new Runnable() + { + public void run() + { + existing.run(); + runOnClose.run(); + } + }; } } @@ -948,32 +988,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS replacement.first = newStart; replacement.last = this.last; - setReplacedBy(replacement); - return replacement; - } - } - - public SSTableReader cloneAsShadowed(final Runnable runOnClose) - { - synchronized (tidy.global) - { - assert openReason != OpenReason.EARLY; - this.tidy.runOnClose = new Runnable() - { - public void run() - { - dfile.dropPageCache(0); - ifile.dropPageCache(0); - runOnClose.run(); - } - }; - - SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(), - dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(), - maxDataAge, sstableMetadata, OpenReason.SHADOWED); - replacement.first = first; - replacement.last = last; - setReplacedBy(replacement); return replacement; } } @@ -1036,7 +1050,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS sstableMetadata, OpenReason.METADATA_CHANGE); replacement.first = this.first; replacement.last = this.last; - setReplacedBy(replacement); return replacement; } } @@ -1520,7 +1533,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS * except for threads holding a reference. * * @return true if the this is the first time the file was marked obsolete. Calling this - * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize). + * multiple times is usually buggy (see exceptions in Tracker.unmarkCompacting and removeOldSSTablesSize). */ public boolean markObsolete() { @@ -1638,11 +1651,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE; } - public SSTableReader getCurrentReplacement() - { - return tidy.global.live; - } - /** * TODO: Move someplace reusable */ @@ -2048,8 +2056,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>(); private final Descriptor desc; - // a single convenience property for getting the most recent version of an sstable, not related to tidying - private SSTableReader live; // the readMeter that is shared between all instances of the sstable, and can be overridden in all of them // at once also, for testing purposes private RestorableMeter readMeter; @@ -2064,7 +2070,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { this.desc = reader.descriptor; this.isCompacted = new AtomicBoolean(); - this.live = reader; } void ensureReadMeter() @@ -2128,6 +2133,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } } + @VisibleForTesting + public static void resetTidying() + { + GlobalTidy.lookup.clear(); + DescriptorTypeTidy.lookup.clear(); + } + public static abstract class Factory { public abstract SSTableReader open(final Descriptor descriptor, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index fa17c20..a7a7fcc 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -377,7 +377,8 @@ public class BigTableWriter extends SSTableWriter return accumulate; } - protected Throwable doCleanup(Throwable accumulate) + @Override + protected Throwable doPreCleanup(Throwable accumulate) { accumulate = dbuilder.close(accumulate); return accumulate; @@ -562,7 +563,8 @@ public class BigTableWriter extends SSTableWriter return indexFile.abort(accumulate); } - protected Throwable doCleanup(Throwable accumulate) + @Override + protected Throwable doPreCleanup(Throwable accumulate) { accumulate = summary.close(accumulate); accumulate = bf.close(accumulate); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index d63be31..3c35a34 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -77,7 +77,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne protected class TransactionalProxy extends AbstractTransactional { @Override - protected Throwable doCleanup(Throwable accumulate) + protected Throwable doPreCleanup(Throwable accumulate) { if (directoryFD >= 0) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java index 842d06d..4ab4446 100644 --- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java @@ -215,28 +215,28 @@ public class ColumnFamilyMetrics { public Long getValue() { - return cfs.getDataTracker().getView().getCurrentMemtable().getOperations(); + return cfs.getTracker().getView().getCurrentMemtable().getOperations(); } }); memtableOnHeapSize = createColumnFamilyGauge("MemtableOnHeapSize", new Gauge<Long>() { public Long getValue() { - return cfs.getDataTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns(); + return cfs.getTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns(); } }); memtableOffHeapSize = createColumnFamilyGauge("MemtableOffHeapSize", new Gauge<Long>() { public Long getValue() { - return cfs.getDataTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns(); + return cfs.getTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns(); } }); memtableLiveDataSize = createColumnFamilyGauge("MemtableLiveDataSize", new Gauge<Long>() { public Long getValue() { - return cfs.getDataTracker().getView().getCurrentMemtable().getLiveDataSize(); + return cfs.getTracker().getView().getCurrentMemtable().getLiveDataSize(); } }); allMemtablesOnHeapSize = createColumnFamilyGauge("AllMemtablesHeapSize", new Gauge<Long>() @@ -245,7 +245,7 @@ public class ColumnFamilyMetrics { long size = 0; for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes()) - size += cfs2.getDataTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns(); + size += cfs2.getTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns(); return size; } }); @@ -255,7 +255,7 @@ public class ColumnFamilyMetrics { long size = 0; for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes()) - size += cfs2.getDataTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns(); + size += cfs2.getTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns(); return size; } }); @@ -265,7 +265,7 @@ public class ColumnFamilyMetrics { long size = 0; for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes()) - size += cfs2.getDataTracker().getView().getCurrentMemtable().getLiveDataSize(); + size += cfs2.getTracker().getView().getCurrentMemtable().getLiveDataSize(); return size; } }); @@ -288,7 +288,7 @@ public class ColumnFamilyMetrics public Long getValue() { long memtablePartitions = 0; - for (Memtable memtable : cfs.getDataTracker().getView().getAllMemtables()) + for (Memtable memtable : cfs.getTracker().getView().getAllMemtables()) memtablePartitions += memtable.partitionCount(); return SSTableReader.getApproximateKeyCount(cfs.getSSTables()) + memtablePartitions; } @@ -358,7 +358,7 @@ public class ColumnFamilyMetrics { public Integer getValue() { - return cfs.getDataTracker().getSSTables().size(); + return cfs.getTracker().getSSTables().size(); } }); liveDiskSpaceUsed = createColumnFamilyCounter("LiveDiskSpaceUsed"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 6a70692..44522db 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -28,13 +28,14 @@ import javax.annotation.Nullable; import com.google.common.base.Function; import com.google.common.collect.*; + +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.dht.AbstractBounds; @@ -318,9 +319,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); for (Range<Token> range : ranges) rowBoundsList.add(Range.makeRowRange(range)); - refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>() + refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>() { - public List<SSTableReader> apply(DataTracker.View view) + public List<SSTableReader> apply(View view) { List<SSTableReader> filteredSSTables = ColumnFamilyStore.CANONICAL_SSTABLES.apply(view); Set<SSTableReader> sstables = Sets.newHashSet(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index 9f26637..d32ef88 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -32,12 +32,8 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; -import org.apache.cassandra.db.compaction.LeveledManifest; -import org.apache.cassandra.db.compaction.Scrubber; -import org.apache.cassandra.db.compaction.WrappingCompactionStrategy; +import org.apache.cassandra.db.compaction.*; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -121,9 +117,9 @@ public class StandaloneScrubber { for (SSTableReader sstable : sstables) { - try + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable)) { - Scrubber scrubber = new Scrubber(cfs, sstable, options.skipCorrupted, handler, true, !options.noValidate); + Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, true, !options.noValidate); try { scrubber.scrub(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/tools/StandaloneSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java index 2541d6e..e881133 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java +++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java @@ -27,10 +27,12 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.cli.*; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.SSTableSplitter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; @@ -145,12 +147,11 @@ public class StandaloneSplitter if (options.snapshot) System.out.println(String.format("Pre-split sstables snapshotted into snapshot %s", snapshotName)); - cfs.getDataTracker().markCompacting(sstables, false, true); for (SSTableReader sstable : sstables) { - try + try (LifecycleTransaction transaction = LifecycleTransaction.offline(OperationType.UNKNOWN, sstable)) { - new SSTableSplitter(cfs, sstable, options.sizeInMB).split(); + new SSTableSplitter(cfs, transaction, options.sizeInMB).split(); // Remove the sstable (it's been copied by split and snapshotted) sstable.markObsolete(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java index 409a5f0..626d429 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java +++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java @@ -29,7 +29,9 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.compaction.Upgrader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.OutputHandler; @@ -98,9 +100,9 @@ public class StandaloneUpgrader for (SSTableReader sstable : readers) { - try + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.UPGRADE_SSTABLES, sstable)) { - Upgrader upgrader = new Upgrader(cfs, sstable, handler); + Upgrader upgrader = new Upgrader(cfs, txn, handler); upgrader.upgrade(); if (!options.keepSource) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/utils/concurrent/Blocker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Blocker.java b/src/java/org/apache/cassandra/utils/concurrent/Blocker.java new file mode 100644 index 0000000..5192e98 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/concurrent/Blocker.java @@ -0,0 +1,63 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class Blocker +{ + private final ReentrantLock lock = new ReentrantLock(); + private final Condition unblocked = lock.newCondition(); + private volatile boolean block = false; + + public void block(boolean block) + { + this.block = block; + if (!block) + { + lock.lock(); + try + { + unblocked.signalAll(); + } + finally + { + lock.unlock(); + } + } + } + + public void ask() + { + if (block) + { + lock.lock(); + try + { + while (block) + unblocked.awaitUninterruptibly(); + } + finally + { + lock.unlock(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/utils/concurrent/Transactional.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java index bcf5095..5b0eb8e 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java @@ -70,6 +70,7 @@ public interface Transactional extends AutoCloseable ABORTED; } + private boolean permitRedundantTransitions; private State state = State.IN_PROGRESS; // the methods for actually performing the necessary behaviours, that are themselves protected against @@ -79,9 +80,18 @@ public interface Transactional extends AutoCloseable protected abstract Throwable doCommit(Throwable accumulate); protected abstract Throwable doAbort(Throwable accumulate); - // this only needs to perform cleanup of state unique to this instance; any internal + // these only needs to perform cleanup of state unique to this instance; any internal // Transactional objects will perform cleanup in the commit() or abort() calls - protected abstract Throwable doCleanup(Throwable accumulate); + + /** + * perform an exception-safe pre-abort cleanup; this will still be run *after* commit + */ + protected Throwable doPreCleanup(Throwable accumulate){ return accumulate; } + + /** + * perform an exception-safe post-abort cleanup + */ + protected Throwable doPostCleanup(Throwable accumulate){ return accumulate; } /** * Do any preparatory work prior to commit. This method should throw any exceptions that can be encountered @@ -94,10 +104,13 @@ public interface Transactional extends AutoCloseable */ public final Throwable commit(Throwable accumulate) { + if (permitRedundantTransitions && state == State.COMMITTED) + return accumulate; if (state != State.READY_TO_COMMIT) - throw new IllegalStateException("Commit attempted before prepared to commit"); + throw new IllegalStateException("Cannot commit unless READY_TO_COMMIT; state is " + state); accumulate = doCommit(accumulate); - accumulate = doCleanup(accumulate); + accumulate = doPreCleanup(accumulate); + accumulate = doPostCleanup(accumulate); state = State.COMMITTED; return accumulate; } @@ -123,8 +136,9 @@ public interface Transactional extends AutoCloseable } state = State.ABORTED; // we cleanup first so that, e.g., file handles can be released prior to deletion - accumulate = doCleanup(accumulate); + accumulate = doPreCleanup(accumulate); accumulate = doAbort(accumulate); + accumulate = doPostCleanup(accumulate); return accumulate; } @@ -147,6 +161,8 @@ public interface Transactional extends AutoCloseable */ public final void prepareToCommit() { + if (permitRedundantTransitions && state == State.READY_TO_COMMIT) + return; if (state != State.IN_PROGRESS) throw new IllegalStateException("Cannot prepare to commit unless IN_PROGRESS; state is " + state); @@ -183,6 +199,11 @@ public interface Transactional extends AutoCloseable { return state; } + + protected void permitRedundantTransitions() + { + permitRedundantTransitions = true; + } } // commit should generally never throw an exception, and preferably never generate one, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java index bf71639..e6c8f56 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java @@ -18,9 +18,7 @@ */ package org.apache.cassandra.db.compaction; -import java.io.IOException; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -35,6 +33,7 @@ import org.apache.cassandra.Util; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.*; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.SSTableUtils; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.ByteBufferUtil; @@ -126,8 +125,11 @@ public class LongCompactionsTest long start = System.nanoTime(); final int gcBefore = (int) (System.currentTimeMillis() / 1000) - Schema.instance.getCFMetaData(KEYSPACE1, "Standard1").getGcGraceSeconds(); - assert store.getDataTracker().markCompacting(sstables): "Cannot markCompacting all sstables"; - new CompactionTask(store, sstables, gcBefore, false).execute(null); + try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.COMPACTION)) + { + assert txn != null : "Cannot markCompacting all sstables"; + new CompactionTask(store, txn, gcBefore, false).execute(null); + } System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms", this.getClass().getName(), sstableCount, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/MockSchema.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java new file mode 100644 index 0000000..bc236e1 --- /dev/null +++ b/test/unit/org/apache/cassandra/MockSchema.java @@ -0,0 +1,167 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.cache.CachingOptions; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.composites.SimpleSparseCellNameType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.IndexSummary; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.BufferedSegmentedFile; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.Memory; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.utils.AlwaysPresentFilter; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class MockSchema +{ + static + { + Memory offsets = Memory.allocate(4); + offsets.setInt(0, 0); + indexSummary = new IndexSummary(Murmur3Partitioner.instance, offsets, 0, Memory.allocate(4), 0, 0, 0, 1); + } + private static final AtomicInteger id = new AtomicInteger(); + public static final Keyspace ks = Keyspace.mockKS(new KSMetaData("mockks", SimpleStrategy.class, ImmutableMap.of("replication_factor", "1"), false)); + public static final ColumnFamilyStore cfs = newCFS(); + + private static final IndexSummary indexSummary; + private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), 0); + + public static Memtable memtable() + { + return new Memtable(cfs.metadata); + } + + public static SSTableReader sstable(int generation) + { + return sstable(generation, false); + } + + public static SSTableReader sstable(int generation, boolean keepRef) + { + return sstable(generation, 0, keepRef); + } + + public static SSTableReader sstable(int generation, int size) + { + return sstable(generation, size, false); + } + + public static SSTableReader sstable(int generation, int size, boolean keepRef) + { + return sstable(generation, size, keepRef, cfs); + } + public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs) + { + Descriptor descriptor = new Descriptor(temp("mockcfdir").getParentFile(), "mockks", "mockcf", generation, Descriptor.Type.FINAL); + Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC); + for (Component component : components) + { + File file = new File(descriptor.filenameFor(component)); + try + { + file.createNewFile(); + } + catch (IOException e) + { + } + file.deleteOnExit(); + } + if (size > 0) + { + try + { + File file = new File(descriptor.filenameFor(Component.DATA)); + try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) + { + raf.setLength(size); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator) + .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1) + .get(MetadataType.STATS); + SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, Murmur3Partitioner.instance, + segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(), + new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL); + reader.first = reader.last = readerBounds(generation); + if (!keepRef) + reader.selfRef().release(); + return reader; + } + + public static ColumnFamilyStore newCFS() + { + String cfname = "mockcf" + (id.incrementAndGet()); + CFMetaData metadata = newCFMetaData(cfname); + return new ColumnFamilyStore(ks, cfname, Murmur3Partitioner.instance, 0, metadata, new Directories(metadata), false, false); + } + + private static CFMetaData newCFMetaData(String cfname) + { + CFMetaData metadata = new CFMetaData("mockks", cfname, ColumnFamilyType.Standard, new SimpleSparseCellNameType(UTF8Type.instance)); + metadata.caching(CachingOptions.NONE); + return metadata; + } + + public static BufferDecoratedKey readerBounds(int generation) + { + return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(generation), ByteBufferUtil.EMPTY_BYTE_BUFFER); + } + + private static File temp(String id) + { + try + { + File file = File.createTempFile(id, "tmp"); + file.deleteOnExit(); + return file; + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 9b8e5df..c2205c4 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -24,16 +24,17 @@ import java.io.*; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; +import java.nio.channels.FileChannel; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.cache.CachingOptions; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.compaction.AbstractCompactionTask; @@ -44,17 +45,27 @@ import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.sstable.format.big.BigTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.*; +import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CounterId; +import org.apache.hadoop.fs.FileUtil; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index e5fd470..27e7a2b 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -43,7 +43,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Refs; import static org.junit.Assert.assertEquals; @@ -171,7 +170,7 @@ public class KeyCacheTest assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1); - Set<SSTableReader> readers = cfs.getDataTracker().getSSTables(); + Set<SSTableReader> readers = cfs.getTracker().getSSTables(); Refs<SSTableReader> refs = Refs.tryRef(readers); if (refs == null) throw new IllegalStateException(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index a5af823..dbbce9e 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -20,57 +20,63 @@ package org.apache.cassandra.db; * */ -import java.io.*; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ExecutionException; +import java.io.File; +import java.io.IOError; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.composites.CellNameType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.CounterColumnType; +import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.marshal.LongType; -import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.lang3.StringUtils; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.Operator; -import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.Scrubber; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableRewriter; -import org.apache.cassandra.io.sstable.format.SSTableFormat; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.Util; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.UUIDGen; import static org.junit.Assert.*; import static org.junit.Assume.assumeTrue; import static org.apache.cassandra.Util.cellname; import static org.apache.cassandra.Util.column; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; @RunWith(OrderedJUnit4ClassRunner.class) public class ScrubTest @@ -155,7 +161,8 @@ public class ScrubTest overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1")); // with skipCorrupted == false, the scrub is expected to fail - try(Scrubber scrubber = new Scrubber(cfs, sstable, false, false, true)) + try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB); + Scrubber scrubber = new Scrubber(cfs, txn, false, false, true);) { scrubber.scrub(); fail("Expected a CorruptSSTableException to be thrown"); @@ -164,7 +171,8 @@ public class ScrubTest // with skipCorrupted == true, the corrupt rows will be skipped Scrubber.ScrubResult scrubResult; - try(Scrubber scrubber = new Scrubber(cfs, sstable, true, false, true)) + try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB); + Scrubber scrubber = new Scrubber(cfs, txn, true, false, true);) { scrubResult = scrubber.scrubWithResult(); } @@ -213,20 +221,24 @@ public class ScrubTest overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1")); // with skipCorrupted == false, the scrub is expected to fail - Scrubber scrubber = new Scrubber(cfs, sstable, false, false, true); - try + try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB); + Scrubber scrubber = new Scrubber(cfs, txn, false, false, true)) { + // with skipCorrupted == true, the corrupt row will be skipped scrubber.scrub(); fail("Expected a CorruptSSTableException to be thrown"); } catch (IOError err) {} - // with skipCorrupted == true, the corrupt row will be skipped - scrubber = new Scrubber(cfs, sstable, true, false, true); - scrubber.scrub(); - scrubber.close(); - assertEquals(1, cfs.getSSTables().size()); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB); + Scrubber scrubber = new Scrubber(cfs, txn, true, false, true)) + { + // with skipCorrupted == true, the corrupt row will be skipped + scrubber.scrub(); + scrubber.close(); + } + assertEquals(1, cfs.getSSTables().size()); // verify that we can read all of the rows, and there is now one less row rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); assertEquals(1, rows.size()); @@ -369,9 +381,13 @@ public class ScrubTest components.add(Component.STATS); components.add(Component.SUMMARY); components.add(Component.TOC); + SSTableReader sstable = SSTableReader.openNoValidation(desc, components, cfs); + if (sstable.last.compareTo(sstable.first) < 0) + sstable.last = sstable.first; - try(Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true)) + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable); + Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);) { scrubber.scrub(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 1dc72ae..235462b 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.db.compaction; -import junit.framework.Assert; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.utils.concurrent.Refs; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; @@ -30,7 +30,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.concurrent.ExecutionException; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.exceptions.ConfigurationException; @@ -91,13 +90,18 @@ public class AntiCompactionTest Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes())); List<Range<Token>> ranges = Arrays.asList(range); - Refs<SSTableReader> refs = Refs.ref(sstables); - long repairedAt = 1000; - CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt); - - assertEquals(2, store.getSSTables().size()); int repairedKeys = 0; int nonRepairedKeys = 0; + try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + Refs<SSTableReader> refs = Refs.ref(sstables)) + { + if (txn == null) + throw new IllegalStateException(); + long repairedAt = 1000; + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt); + } + + assertEquals(2, store.getSSTables().size()); for (SSTableReader sstable : store.getSSTables()) { try (ISSTableScanner scanner = sstable.getScanner()) @@ -123,7 +127,7 @@ public class AntiCompactionTest assertFalse(sstable.isMarkedCompacted()); assertEquals(1, sstable.selfRef().globalCount()); } - assertEquals(0, store.getDataTracker().getCompacting().size()); + assertEquals(0, store.getTracker().getCompacting().size()); assertEquals(repairedKeys, 4); assertEquals(nonRepairedKeys, 6); } @@ -139,13 +143,16 @@ public class AntiCompactionTest long origSize = s.bytesOnDisk(); Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500))); Collection<SSTableReader> sstables = cfs.getSSTables(); - CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), Refs.tryRef(sstables), 12345); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + Refs<SSTableReader> refs = Refs.ref(sstables)) + { + CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345); + } long sum = 0; for (SSTableReader x : cfs.getSSTables()) sum += x.bytesOnDisk(); assertEquals(sum, cfs.metric.liveDiskSpaceUsed.getCount()); assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.getCount(), 100000); - } private SSTableReader writeFile(ColumnFamilyStore cfs, int count) @@ -210,10 +217,12 @@ public class AntiCompactionTest Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes())); List<Range<Token>> ranges = Arrays.asList(range); - Refs<SSTableReader> refs = Refs.tryRef(sstables); - Assert.assertNotNull(refs); long repairedAt = 1000; - CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt); + try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + Refs<SSTableReader> refs = Refs.ref(sstables)) + { + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt); + } /* Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time so there will be no net change in the number of sstables @@ -256,12 +265,16 @@ public class AntiCompactionTest Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes())); List<Range<Token>> ranges = Arrays.asList(range); - CompactionManager.instance.performAnticompaction(store, ranges, Refs.tryRef(sstables), 1); + try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + Refs<SSTableReader> refs = Refs.ref(sstables)) + { + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1); + } assertThat(store.getSSTables().size(), is(1)); assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true)); assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1)); - assertThat(store.getDataTracker().getCompacting().size(), is(0)); + assertThat(store.getTracker().getCompacting().size(), is(0)); } @@ -282,8 +295,12 @@ public class AntiCompactionTest Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes())); List<Range<Token>> ranges = Arrays.asList(range); - Refs<SSTableReader> refs = Refs.ref(sstables); - CompactionManager.instance.performAnticompaction(store, ranges, refs, 0); + + try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + Refs<SSTableReader> refs = Refs.ref(sstables)) + { + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 0); + } assertThat(store.getSSTables().size(), is(10)); assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java index 88074af..235fd49 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.ExecutionException; import com.google.common.primitives.Longs; import org.junit.Before; @@ -41,11 +40,12 @@ import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter; import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter; import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter; import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.SimpleStrategy; -import org.apache.cassandra.utils.ByteBufferUtil; + import static org.junit.Assert.assertEquals; public class CompactionAwareWriterTest @@ -81,10 +81,10 @@ public class CompactionAwareWriterTest int rowCount = 1000; cfs.disableAutoCompaction(); populate(cfs, rowCount); - Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); - long beforeSize = sstables.iterator().next().onDiskLength(); - CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, sstables, sstables, false, OperationType.COMPACTION); - int rows = compact(cfs, sstables, writer); + LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION); + long beforeSize = txn.originals().iterator().next().onDiskLength(); + CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals(), false, OperationType.COMPACTION); + int rows = compact(cfs, txn, writer); assertEquals(1, cfs.getSSTables().size()); assertEquals(rowCount, rows); assertEquals(beforeSize, cfs.getSSTables().iterator().next().onDiskLength()); @@ -100,11 +100,11 @@ public class CompactionAwareWriterTest cfs.disableAutoCompaction(); int rowCount = 1000; populate(cfs, rowCount); - Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); - long beforeSize = sstables.iterator().next().onDiskLength(); + LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION); + long beforeSize = txn.originals().iterator().next().onDiskLength(); int sstableSize = (int)beforeSize/10; - CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, sstables, sstables, sstableSize, 0, false, OperationType.COMPACTION); - int rows = compact(cfs, sstables, writer); + CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0, false, OperationType.COMPACTION); + int rows = compact(cfs, txn, writer); assertEquals(10, cfs.getSSTables().size()); assertEquals(rowCount, rows); validateData(cfs, rowCount); @@ -118,10 +118,10 @@ public class CompactionAwareWriterTest cfs.disableAutoCompaction(); int rowCount = 10000; populate(cfs, rowCount); - Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); - long beforeSize = sstables.iterator().next().onDiskLength(); - CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, sstables, sstables, OperationType.COMPACTION, 0); - int rows = compact(cfs, sstables, writer); + LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION); + long beforeSize = txn.originals().iterator().next().onDiskLength(); + CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), OperationType.COMPACTION, 0); + int rows = compact(cfs, txn, writer); long expectedSize = beforeSize / 2; List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getSSTables()); @@ -154,11 +154,11 @@ public class CompactionAwareWriterTest int rowCount = 20000; int targetSSTableCount = 50; populate(cfs, rowCount); - Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); - long beforeSize = sstables.iterator().next().onDiskLength(); + LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION); + long beforeSize = txn.originals().iterator().next().onDiskLength(); int sstableSize = (int)beforeSize/targetSSTableCount; - CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, sstables, sstables, sstableSize, false, OperationType.COMPACTION); - int rows = compact(cfs, sstables, writer); + CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize, false, OperationType.COMPACTION); + int rows = compact(cfs, txn, writer); assertEquals(targetSSTableCount, cfs.getSSTables().size()); int [] levelCounts = new int[5]; assertEquals(rowCount, rows); @@ -175,13 +175,13 @@ public class CompactionAwareWriterTest cfs.truncateBlocking(); } - private int compact(ColumnFamilyStore cfs, Set<SSTableReader> sstables, CompactionAwareWriter writer) + private int compact(ColumnFamilyStore cfs, LifecycleTransaction txn, CompactionAwareWriter writer) { - assert sstables.size() == 1; + assert txn.originals().size() == 1; int rowsWritten = 0; - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables)) + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(txn.originals())) { - CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); + CompactionController controller = new CompactionController(cfs, txn.originals(), cfs.gcBefore(System.currentTimeMillis())); ISSTableScanner scanner = scanners.scanners.get(0); while(scanner.hasNext()) { @@ -191,7 +191,6 @@ public class CompactionAwareWriterTest } } Collection<SSTableReader> newSSTables = writer.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newSSTables, OperationType.COMPACTION); return rowsWritten; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java index f1d016b..64e4465 100644 --- a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java @@ -324,10 +324,10 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader Thread.sleep(2000); AbstractCompactionTask t = dtcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000)); assertNotNull(t); - assertEquals(1, Iterables.size(t.sstables)); - SSTableReader sstable = t.sstables.iterator().next(); + assertEquals(1, Iterables.size(t.transaction.originals())); + SSTableReader sstable = t.transaction.originals().iterator().next(); assertEquals(sstable, expiredSSTable); - cfs.getDataTracker().unmarkCompacting(cfs.getSSTables()); + t.transaction.abort(); } }