Updated Branches: refs/heads/trunk d16d5c4f2 -> a9b93c257
Fix row tombstones in larger-than-memory compactions patch by thobbs; reviewed by jbellis for CASSANDRA-6008 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3edb62bf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3edb62bf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3edb62bf Branch: refs/heads/trunk Commit: 3edb62bf773617aeb3a348edc5667a6b0bad0ffe Parents: e6eb550 Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Dec 12 23:28:13 2013 +0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Dec 13 00:17:33 2013 +0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/AbstractThreadUnsafeSortedColumns.java | 6 +- .../cassandra/db/AtomicSortedColumns.java | 4 +- .../org/apache/cassandra/db/ColumnFamily.java | 11 ++- .../apache/cassandra/db/ColumnFamilyStore.java | 21 ++++- .../org/apache/cassandra/db/ColumnIndex.java | 2 +- .../org/apache/cassandra/db/DeletionInfo.java | 76 +++++++++++++----- .../org/apache/cassandra/db/DeletionTime.java | 16 ++++ .../apache/cassandra/db/RangeTombstoneList.java | 2 +- .../db/compaction/LazilyCompactedRow.java | 54 +++++++------ test/unit/org/apache/cassandra/Util.java | 6 +- .../org/apache/cassandra/db/KeyCacheTest.java | 2 +- .../db/compaction/CompactionsPurgeTest.java | 84 +++++++++++++++++--- .../streaming/StreamingTransferTest.java | 4 +- 14 files changed, 220 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 30f863e..d573e37 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.4 + * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008) * Fix cleanup ClassCastException (CASSANDRA-6462) * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410) * Allow specifying datacenters to participate in a repair (CASSANDRA-6218) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java index 1b245eb..36b051b 100644 --- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java @@ -59,7 +59,11 @@ public abstract class AbstractThreadUnsafeSortedColumns extends ColumnFamily deletionInfo = newInfo; } - public void maybeResetDeletionTimes(int gcBefore) + /** + * Purges any tombstones with a local deletion time before gcBefore. + * @param gcBefore a timestamp (in seconds) before which tombstones should be purged + */ + public void purgeTombstones(int gcBefore) { deletionInfo.purge(gcBefore); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/AtomicSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java index f6a6b83..b44d8bf 100644 --- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java @@ -120,12 +120,12 @@ public class AtomicSortedColumns extends ColumnFamily ref.set(ref.get().with(newInfo)); } - public void maybeResetDeletionTimes(int gcBefore) + public void purgeTombstones(int gcBefore) { while (true) { Holder current = ref.get(); - if (!current.deletionInfo.hasIrrelevantData(gcBefore)) + if (!current.deletionInfo.hasPurgeableTombstones(gcBefore)) break; DeletionInfo purgedInfo = current.deletionInfo.copy(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index 47b14b9..2c00071 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -185,7 +185,11 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry public abstract void delete(DeletionTime deletionTime); protected abstract void delete(RangeTombstone tombstone); - public abstract void maybeResetDeletionTimes(int gcBefore); + /** + * Purges top-level and range tombstones whose localDeletionTime is older than gcBefore. + * @param gcBefore a timestamp (in seconds) before which tombstones should be purged + */ + public abstract void purgeTombstones(int gcBefore); /** * Adds a column to this column map. @@ -268,6 +272,9 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry */ public abstract boolean isInsertReversed(); + /** + * If `columns` has any tombstones (top-level or range tombstones), they will be applied to this set of columns. + */ public void delete(ColumnFamily columns) { delete(columns.deletionInfo()); @@ -459,7 +466,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry public boolean hasIrrelevantData(int gcBefore) { // Do we have gcable deletion infos? - if (deletionInfo().hasIrrelevantData(gcBefore)) + if (deletionInfo().hasPurgeableTombstones(gcBefore)) return true; // Do we have colums that are either deleted by the container or gcable tombstone? http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index eb715ac..d585407 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -862,12 +862,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } + /** + * Purges gc-able top-level and range tombstones, returning `cf` if there are any columns or tombstones left, + * null otherwise. + * @param gcBefore a timestamp (in seconds); tombstones with a localDeletionTime before this will be purged + */ public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore) { - cf.maybeResetDeletionTimes(gcBefore); + // purge old top-level and range tombstones + cf.purgeTombstones(gcBefore); + + // if there are no columns or tombstones left, return null return cf.getColumnCount() == 0 && !cf.isMarkedForDelete() ? null : cf; } + /** + * Removes deleted columns and purges gc-able tombstones. + * @return an updated `cf` if any columns or tombstones remain, null otherwise + */ public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore) { return removeDeleted(cf, gcBefore, SecondaryIndexManager.nullUpdater); @@ -890,7 +902,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return removeDeletedCF(cf, gcBefore); } - private static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) + /** + * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or + * columns that have been dropped from the schema (for CQL3 tables only). + * @return the updated ColumnFamily + */ + public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) { Iterator<Column> iter = cf.iterator(); DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index 75a06d7..f6b38b2 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -67,7 +67,7 @@ public class ColumnIndex private final RangeTombstone.Tracker tombstoneTracker; private int atomCount; private final ByteBuffer key; - private final DeletionInfo deletionInfo; + private final DeletionInfo deletionInfo; // only used for serializing and calculating row header size public Builder(ColumnFamily cf, ByteBuffer key, http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/DeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java index 4e1d68d..13fc824 100644 --- a/src/java/org/apache/cassandra/db/DeletionInfo.java +++ b/src/java/org/apache/cassandra/db/DeletionInfo.java @@ -29,15 +29,32 @@ import com.google.common.collect.Iterators; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.IVersionedSerializer; +/** + * A combination of a top-level (or row) tombstone and range tombstones describing the deletions + * within a {@link ColumnFamily} (or row). + */ public class DeletionInfo { private static final Serializer serializer = new Serializer(); - // We don't have way to represent the full interval of keys (Interval don't support the minimum token as the right bound), - // so we keep the topLevel deletion info separatly. This also slightly optimize the case of full row deletion which is rather common. + /** + * This represents a deletion of the entire row. We can't represent this within the RangeTombstoneList, so it's + * kept separately. This also slightly optimizes the common case of a full row deletion. + */ private DeletionTime topLevel; - private RangeTombstoneList ranges; // null if no range tombstones (to save an allocation since it's a common case). + /** + * A list of range tombstones within the row. This is left as null if there are no range tombstones + * (to save an allocation (since it's a common case). + */ + private RangeTombstoneList ranges; + + /** + * Creates a DeletionInfo with only a top-level (row) tombstone. + * @param markedForDeleteAt the time after which the entire row should be considered deleted + * @param localDeletionTime what time the deletion write was applied locally (for purposes of + * purging the tombstone after gc_grace_seconds). + */ public DeletionInfo(long markedForDeleteAt, int localDeletionTime) { // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE @@ -61,17 +78,20 @@ public class DeletionInfo this(rangeTombstone.min, rangeTombstone.max, comparator, rangeTombstone.data.markedForDeleteAt, rangeTombstone.data.localDeletionTime); } - public static DeletionInfo live() - { - return new DeletionInfo(DeletionTime.LIVE); - } - private DeletionInfo(DeletionTime topLevel, RangeTombstoneList ranges) { this.topLevel = topLevel; this.ranges = ranges; } + /** + * Returns a new DeletionInfo that has no top-level tombstone or any range tombstones. + */ + public static DeletionInfo live() + { + return new DeletionInfo(DeletionTime.LIVE); + } + public static Serializer serializer() { return serializer; @@ -93,8 +113,7 @@ public class DeletionInfo } /** - * Return whether a given column is deleted by the container having this - * deletion info. + * Return whether a given column is deleted by the container having this deletion info. * * @param column the column to check. * @return true if the column is deleted, false otherwise @@ -137,8 +156,7 @@ public class DeletionInfo /** * Purge every tombstones that are older than {@code gcbefore}. * - * @param gcBefore timestamp (in seconds) before which tombstones should - * be purged + * @param gcBefore timestamp (in seconds) before which tombstones should be purged */ public void purge(int gcBefore) { @@ -152,14 +170,24 @@ public class DeletionInfo } } - public boolean hasIrrelevantData(int gcBefore) + /** + * Returns true if {@code purge} would remove the top-level tombstone or any of the range + * tombstones, false otherwise. + * @param gcBefore timestamp (in seconds) before which tombstones should be purged + */ + public boolean hasPurgeableTombstones(int gcBefore) { if (topLevel.localDeletionTime < gcBefore) return true; - return ranges != null && ranges.hasIrrelevantData(gcBefore); + return ranges != null && ranges.hasPurgeableTombstones(gcBefore); } + /** + * Potentially replaces the top-level tombstone with another, keeping whichever has the higher markedForDeleteAt + * timestamp. + * @param newInfo + */ public void add(DeletionTime newInfo) { if (topLevel.markedForDeleteAt < newInfo.markedForDeleteAt) @@ -175,7 +203,9 @@ public class DeletionInfo } /** - * Adds the provided deletion infos to the current ones. + * Combines another DeletionInfo with this one and returns the result. Whichever top-level tombstone + * has the higher markedForDeleteAt timestamp will be kept, along with its localDeletionTime. The + * range tombstones will be combined. * * @return this object. */ @@ -191,6 +221,9 @@ public class DeletionInfo return this; } + /** + * Returns the minimum timestamp in any of the range tombstones or the top-level tombstone. + */ public long minTimestamp() { return ranges == null @@ -199,7 +232,7 @@ public class DeletionInfo } /** - * The maximum timestamp mentioned by this DeletionInfo. + * Returns the maximum timestamp in any of the range tombstones or the top-level tombstone. */ public long maxTimestamp() { @@ -208,6 +241,9 @@ public class DeletionInfo : Math.max(topLevel.markedForDeleteAt, ranges.maxMarkedAt()); } + /** + * Returns the top-level (or "row") tombstone. + */ public DeletionTime getTopLevelDeletion() { return topLevel; @@ -326,7 +362,7 @@ public class DeletionInfo /** * This object allow testing whether a given column (name/timestamp) is deleted - * or not by this DeletionInfo, assuming that the column given to this + * or not by this DeletionInfo, assuming that the columns given to this * object are passed in forward or reversed comparator sorted order. * * This is more efficient that calling DeletionInfo.isDeleted() repeatedly @@ -336,9 +372,9 @@ public class DeletionInfo { /* * Note that because because range tombstone are added to this DeletionInfo while we iterate, - * ranges may be null initially and we need to wait the first range to create the tester (once - * created the test will pick up new tombstones however). We do are guaranteed that a range tombstone - * will be added *before* we test any column that it may delete so this is ok. + * `ranges` may be null initially and we need to wait for the first range to create the tester (once + * created the test will pick up new tombstones however). We are guaranteed that a range tombstone + * will be added *before* we test any column that it may delete, so this is ok. */ private RangeTombstoneList.InOrderTester tester; private final boolean reversed; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/DeletionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index 3d6fad4..b80422c 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -27,11 +27,27 @@ import com.google.common.base.Objects; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.utils.ObjectSizes; +/** + * A top-level (row) tombstone. + */ public class DeletionTime implements Comparable<DeletionTime> { + /** + * A special DeletionTime that signifies that there is no top-level (row) tombstone. + */ public static final DeletionTime LIVE = new DeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE); + /** + * A timestamp (typically in microseconds since the unix epoch, although this is not enforced) after which + * data should be considered deleted. If set to Long.MIN_VALUE, this implies that the data has not been marked + * for deletion at all. + */ public final long markedForDeleteAt; + + /** + * The local server timestamp, in seconds since the unix epoch, at which this tombstone was created. This is + * only used for purposes of purging the tombstone after gc_grace_seconds have elapsed. + */ public final int localDeletionTime; public static final ISerializer<DeletionTime> serializer = new Serializer(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/RangeTombstoneList.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java index fe61916..dad9004 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java +++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java @@ -305,7 +305,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone> /** * Returns whether {@code purge(gcBefore)} would remove something or not. */ - public boolean hasIrrelevantData(int gcBefore) + public boolean hasPurgeableTombstones(int gcBefore) { for (int i = 0; i < size; i++) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 0cdbbb7..3b7a3d4 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -59,6 +59,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable private ColumnIndex.Builder indexBuilder; private final SecondaryIndexManager.Updater indexer; private long maxTombstoneTimestamp; + private DeletionInfo deletionInfo; public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows) { @@ -67,17 +68,15 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable this.controller = controller; indexer = controller.cfs.indexManager.updaterFor(key); - ColumnFamily rawCf = null; + // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp. This may be + // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge + deletionInfo = DeletionInfo.live(); maxTombstoneTimestamp = Long.MIN_VALUE; for (OnDiskAtomIterator row : rows) { - ColumnFamily cf = row.getColumnFamily(); - maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp()); - - if (rawCf == null) - rawCf = cf; - else - rawCf.delete(cf); + DeletionInfo delInfo = row.getColumnFamily().deletionInfo(); + maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp()); + deletionInfo = deletionInfo.add(delInfo); } // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with @@ -86,12 +85,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable // no other versions of this row present. this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE); - // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted - // to get rid of redundant row-level and range tombstones - assert rawCf != null; - int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE; - ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore); - emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf; + emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata); + emptyColumnFamily.setDeletionInfo(deletionInfo.copy()); + if (shouldPurge) + emptyColumnFamily.purgeTombstones(controller.gcBefore); } public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException @@ -103,14 +100,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable { indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out); columnsIndex = indexBuilder.buildForCompaction(iterator()); - if (columnsIndex.columnsIndex.isEmpty()) - { - boolean cfIrrelevant = shouldPurge - ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null - : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant - if (cfIrrelevant) - return null; - } + + // if there aren't any columns or tombstones, return null + if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete()) + return null; } catch (IOException e) { @@ -128,7 +121,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable ); reducer = null; + // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone indexBuilder.maybeWriteEmptyRowHeader(); + out.writeShort(SSTableWriter.END_OF_ROW); close(); @@ -201,7 +196,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable // in the container; we just want to leverage the conflict resolution code from CF ColumnFamily container = emptyColumnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory, false); - // tombstone reference; will be reconciled w/ column during getReduced + // tombstone reference; will be reconciled w/ column during getReduced. Note that the top-level (row) tombstone + // is held by LCR.deletionInfo. RangeTombstone tombstone; int columns = 0; @@ -212,11 +208,16 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable List<ByteBuffer> minColumnNameSeen = Collections.emptyList(); List<ByteBuffer> maxColumnNameSeen = Collections.emptyList(); + /** + * Called once per version of a cell that we need to merge, after which getReduced() is called. In other words, + * this will be called one or more times with cells that share the same column name. + */ public void reduce(OnDiskAtom current) { if (current instanceof RangeTombstone) { - tombstone = (RangeTombstone)current; + if (tombstone == null || current.maxTimestamp() >= tombstone.maxTimestamp()) + tombstone = (RangeTombstone)current; } else { @@ -235,6 +236,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable } } + /** + * Called after reduce() has been called for each cell sharing the same name. + */ protected OnDiskAtom getReduced() { if (tombstone != null) @@ -253,6 +257,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable } else { + // when we clear() the container, it removes the deletion info, so this needs to be reset each time + container.setDeletionInfo(deletionInfo); ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container); if (purged == null || !purged.iterator().hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index b4a375a..a71dc48 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -245,12 +245,12 @@ public class Util assertTrue(ss.getTokenMetadata().isMember(hosts.get(i))); } - public static Future<?> compactAll(ColumnFamilyStore cfs) + public static Future<?> compactAll(ColumnFamilyStore cfs, int gcBefore) { - List<Descriptor> descriptors = new ArrayList<Descriptor>(); + List<Descriptor> descriptors = new ArrayList<>(); for (SSTableReader sstable : cfs.getSSTables()) descriptors.add(sstable.descriptor); - return CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE); + return CompactionManager.instance.submitUserDefined(cfs, descriptors, gcBefore); } public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index fd685b4..1f41860 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -145,7 +145,7 @@ public class KeyCacheTest extends SchemaLoader assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1); - Util.compactAll(cfs).get(); + Util.compactAll(cfs, Integer.MAX_VALUE).get(); // after compaction cache should have entries for // new SSTables, if we had 2 keys in cache previously it should become 4 assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index 48c0b3c..8461023 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@ -21,12 +21,12 @@ package org.apache.cassandra.db.compaction; import java.io.IOException; import java.util.Collection; import java.util.concurrent.ExecutionException; - -import org.junit.Assert; +import java.util.concurrent.Future; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.DecoratedKey; @@ -38,7 +38,11 @@ import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.Util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.apache.cassandra.db.KeyspaceTest.assertColumns; +import static org.apache.cassandra.cql3.QueryProcessor.processInternal; + import org.apache.cassandra.utils.ByteBufferUtil; @@ -144,7 +148,7 @@ public class CompactionsPurgeTest extends SchemaLoader // verify that minor compaction still GC when key is present // in a non-compacted sstable but the timestamp ensures we won't miss anything cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName, System.currentTimeMillis())); - Assert.assertEquals(1, cf.getColumnCount()); + assertEquals(1, cf.getColumnCount()); } @Test @@ -181,8 +185,8 @@ public class CompactionsPurgeTest extends SchemaLoader // we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone // so it would be invalid to assume we can throw out the c1 entry. ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName, System.currentTimeMillis())); - Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis())); - Assert.assertEquals(2, cf.getColumnCount()); + assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis())); + assertEquals(2, cf.getColumnCount()); } @Test @@ -216,7 +220,7 @@ public class CompactionsPurgeTest extends SchemaLoader assert cfs.getSSTables().size() == 1 : cfs.getSSTables(); // inserts & deletes were in the same memtable -> only deletes in sstable // compact and test that the row is completely gone - Util.compactAll(cfs).get(); + Util.compactAll(cfs, Integer.MAX_VALUE).get(); assert cfs.getSSTables().isEmpty(); ColumnFamily cf = keyspace.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis())); assert cf == null : cf; @@ -253,7 +257,7 @@ public class CompactionsPurgeTest extends SchemaLoader // flush and major compact cfs.forceBlockingFlush(); - Util.compactAll(cfs).get(); + Util.compactAll(cfs, Integer.MAX_VALUE).get(); // re-inserts with timestamp lower than delete rm = new RowMutation(keyspaceName, key.key); @@ -282,6 +286,7 @@ public class CompactionsPurgeTest extends SchemaLoader DecoratedKey key = Util.dk("key3"); RowMutation rm; + QueryFilter filter = QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()); // inserts rm = new RowMutation(keyspaceName, key.key); @@ -295,10 +300,13 @@ public class CompactionsPurgeTest extends SchemaLoader rm = new RowMutation(keyspaceName, key.key); rm.delete(cfName, 4); rm.apply(); + ColumnFamily cf = cfs.getColumnFamily(filter); + assertTrue(cf.isMarkedForDelete()); // flush and major compact (with tombstone purging) cfs.forceBlockingFlush(); - Util.compactAll(cfs).get(); + Util.compactAll(cfs, Integer.MAX_VALUE).get(); + assertFalse(cfs.getColumnFamily(filter).isMarkedForDelete()); // re-inserts with timestamp lower than delete rm = new RowMutation(keyspaceName, key.key); @@ -308,10 +316,66 @@ public class CompactionsPurgeTest extends SchemaLoader } rm.apply(); - // Check that the second insert did went in - ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis())); + // Check that the second insert went in + cf = cfs.getColumnFamily(filter); assertEquals(10, cf.getColumnCount()); for (Column c : cf) assert !c.isMarkedForDelete(System.currentTimeMillis()); } + + @Test + public void testRowTombstoneObservedBeforePurging() throws InterruptedException, ExecutionException, IOException + { + String keyspace = "cql_keyspace"; + String table = "table1"; + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + cfs.disableAutoCompaction(); + + // write a row out to one sstable + processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)", + keyspace, table, 1, "foo", 1)); + cfs.forceBlockingFlush(); + + UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1)); + assertEquals(1, result.size()); + + // write a row tombstone out to a second sstable + processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1)); + cfs.forceBlockingFlush(); + + // basic check that the row is considered deleted + assertEquals(2, cfs.getSSTables().size()); + result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1)); + assertEquals(0, result.size()); + + // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged + Future future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000); + future.get(); + + // the data should be gone, but the tombstone should still exist + assertEquals(1, cfs.getSSTables().size()); + result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1)); + assertEquals(0, result.size()); + + // write a row out to one sstable + processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)", + keyspace, table, 1, "foo", 1)); + cfs.forceBlockingFlush(); + assertEquals(2, cfs.getSSTables().size()); + result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1)); + assertEquals(1, result.size()); + + // write a row tombstone out to a different sstable + processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1)); + cfs.forceBlockingFlush(); + + // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged + future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000); + future.get(); + + // both the data and the tombstone should be gone this time + assertEquals(0, cfs.getSSTables().size()); + result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1)); + assertEquals(0, result.size()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 7787314..7622728 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -136,7 +136,7 @@ public class StreamingTransferTest extends SchemaLoader for (int i = 1; i <= 3; i++) mutator.mutate("key" + i, "col" + i, timestamp); cfs.forceBlockingFlush(); - Util.compactAll(cfs).get(); + Util.compactAll(cfs, Integer.MAX_VALUE).get(); assertEquals(1, cfs.getSSTables().size()); // transfer the first and last key @@ -468,7 +468,7 @@ public class StreamingTransferTest extends SchemaLoader for (int i = 1; i <= 6000; i++) mutator.mutate("key" + i, "col" + i, System.currentTimeMillis()); cfs.forceBlockingFlush(); - Util.compactAll(cfs).get(); + Util.compactAll(cfs, Integer.MAX_VALUE).get(); SSTableReader sstable = cfs.getSSTables().iterator().next(); cfs.clearUnsafe();