Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d53c838c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d53c838c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d53c838c Branch: refs/heads/trunk Commit: d53c838c9d2f89ac6c88c8306f2302f7fbc6b33d Parents: 448e4d4 3edb62b Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Dec 13 00:17:45 2013 +0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Dec 13 00:18:14 2013 +0600 ---------------------------------------------------------------------- .../db/AbstractThreadUnsafeSortedColumns.java | 6 +- .../cassandra/db/AtomicSortedColumns.java | 4 +- .../org/apache/cassandra/db/ColumnFamily.java | 11 ++- .../apache/cassandra/db/ColumnFamilyStore.java | 26 ++++++- .../org/apache/cassandra/db/ColumnIndex.java | 2 +- .../org/apache/cassandra/db/DeletionInfo.java | 76 ++++++++++++++----- .../org/apache/cassandra/db/DeletionTime.java | 16 ++++ .../apache/cassandra/db/RangeTombstoneList.java | 2 +- .../db/compaction/CompactionController.java | 5 +- .../db/compaction/LazilyCompactedRow.java | 77 +++++++++++--------- test/unit/org/apache/cassandra/Util.java | 6 +- .../org/apache/cassandra/db/KeyCacheTest.java | 2 +- .../db/compaction/CompactionsPurgeTest.java | 77 ++++++++++++++++++-- .../streaming/StreamingTransferTest.java | 4 +- 14 files changed, 236 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 396bbd3,d585407..4e54af0 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -885,6 -898,17 +897,16 @@@ public class ColumnFamilyStore implemen return null; } - removeDeletedColumnsOnly(cf, gcBefore, indexer); - return removeDeletedCF(cf, gcBefore); ++ return removeDeletedCF(removeDeletedColumnsOnly(cf, gcBefore, indexer), gcBefore); + } + + /** + * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or + * columns that have been dropped from the schema (for CQL3 tables only). + * @return the updated ColumnFamily + */ - public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) ++ public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) + { Iterator<Column> iter = cf.iterator(); DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester(); boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty(); @@@ -899,10 -924,15 +921,10 @@@ { iter.remove(); indexer.remove(c); - removedBytes += c.dataSize(); } } - return removedBytes; - } - return removeDeletedCF(cf, gcBefore); - public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore) - { - return removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater); ++ return cf; } // returns true if http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/DeletionTime.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java index dc7730c,7edc60e..c4ce2e8 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@@ -148,24 -153,25 +148,25 @@@ public class CompactionControlle } /** - * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row - * older than @param maxDeletionTimestamp are included in the compaction set + * @return the largest timestamp before which it's okay to drop tombstones for the given partition; - * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be supressed - * in other sstables. ++ * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed ++ * in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not ++ * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists. */ - public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp) + public long maxPurgeableTimestamp(DecoratedKey key) { List<SSTableReader> filteredSSTables = overlappingTree.search(key); + long min = Long.MAX_VALUE; for (SSTableReader sstable : filteredSSTables) { - if (sstable.getMinTimestamp() <= maxDeletionTimestamp) - { - // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing), - // we check index file instead. - if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null) - return false; - else if (sstable.getBloomFilter().isPresent(key.key)) - return false; - } + // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing), + // we check index file instead. + if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null) + min = Math.min(min, sstable.getMinTimestamp()); + else if (sstable.getBloomFilter().isPresent(key.key)) + min = Math.min(min, sstable.getMinTimestamp()); } - return true; + return min; } public void invalidateCachedRow(DecoratedKey key) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 2cb014a,3b7a3d4..8237ff5 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@@ -56,8 -58,8 +56,9 @@@ public class LazilyCompactedRow extend private boolean closed; private ColumnIndex.Builder indexBuilder; private final SecondaryIndexManager.Updater indexer; - private long maxTombstoneTimestamp; + private final Reducer reducer; + private final Iterator<OnDiskAtom> merger; + private DeletionInfo deletionInfo; public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows) { @@@ -66,39 -68,27 +67,38 @@@ this.controller = controller; indexer = controller.cfs.indexManager.updaterFor(key); - ColumnFamily rawCf = null; + // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp. This may be + // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge + deletionInfo = DeletionInfo.live(); - maxTombstoneTimestamp = Long.MIN_VALUE; for (OnDiskAtomIterator row : rows) -- { - ColumnFamily cf = row.getColumnFamily(); - DeletionInfo delInfo = row.getColumnFamily().deletionInfo(); - maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp()); - deletionInfo = deletionInfo.add(delInfo); - } ++ deletionInfo = deletionInfo.add(row.getColumnFamily().deletionInfo()); - if (rawCf == null) - rawCf = cf; - else - rawCf.delete(cf); - } - // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with - // tombstones newer than the row-level tombstones we've seen -- but we won't know that - // until we iterate over them. By passing MAX_VALUE we will only purge if there are - // no other versions of this row present. - this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE); ++ // tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable ++ // containing `key` outside of the set of sstables involved in this compaction. + maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key); - // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted - // to get rid of redundant row-level and range tombstones - assert rawCf != null; - int overriddenGcBefore = rawCf.deletionInfo().maxTimestamp() < maxPurgeableTimestamp ? controller.gcBefore : Integer.MIN_VALUE; - ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore); - emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf; + + emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata); + emptyColumnFamily.setDeletionInfo(deletionInfo.copy()); - if (shouldPurge) ++ if (deletionInfo.maxTimestamp() < maxPurgeableTimestamp) + emptyColumnFamily.purgeTombstones(controller.gcBefore); + + reducer = new Reducer(); + merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull()); + } + - public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf) ++ private static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf) + { - // We should only gc tombstone if shouldPurge == true. But otherwise, - // it is still ok to collect column that shadowed by their (deleted) - // container, which removeDeleted(cf, Integer.MAX_VALUE) will do - ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, - shouldPurge ? controller.gcBefore : Integer.MIN_VALUE, - controller.cfs.indexManager.updaterFor(key)); - if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative()) - CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore); - return compacted; ++ // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that ++ // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this ++ // without purging tombstones. ++ int overriddenGCBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE; ++ ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.updaterFor(key)); ++ ++ // if we have counters, remove old shards ++ if (cf.metadata().getDefaultValidator().isCommutative()) ++ CounterColumn.mergeAndRemoveOldShards(key, cf, controller.gcBefore, controller.mergeShardBefore); ++ ++ return cf; } public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException @@@ -109,31 -99,31 +109,29 @@@ try { indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out); - columnsIndex = indexBuilder.buildForCompaction(iterator()); + columnsIndex = indexBuilder.buildForCompaction(merger); - if (columnsIndex.columnsIndex.isEmpty()) - { - boolean cfIrrelevant = emptyColumnFamily.deletionInfo().maxTimestamp() < maxPurgeableTimestamp - ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null - : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant - if (cfIrrelevant) - return null; - } + + // if there aren't any columns or tombstones, return null + if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete()) + return null; } catch (IOException e) { throw new RuntimeException(e); } // reach into the reducer (created during iteration) to get column count, size, max column timestamp - // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null) - columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, - reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen, - reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen), - reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen, - reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones, - reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen, - reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.maxColumnNameSeen + columnStats = new ColumnStats(reducer.columns, + reducer.minTimestampSeen, + Math.max(emptyColumnFamily.deletionInfo().maxTimestamp(), reducer.maxTimestampSeen), + reducer.maxLocalDeletionTimeSeen, + reducer.tombstones, + reducer.minColumnNameSeen, + reducer.maxColumnNameSeen ); - reducer = null; + // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone indexBuilder.maybeWriteEmptyRowHeader(); + out.writeShort(SSTableWriter.END_OF_ROW); close(); @@@ -245,8 -257,9 +252,10 @@@ } else { + boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp; + // when we clear() the container, it removes the deletion info, so this needs to be reset each time + container.setDeletionInfo(deletionInfo); - ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container); + ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container); if (purged == null || !purged.iterator().hasNext()) { container.clear(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index 18e637b,8461023..bd814d0 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@@ -38,8 -38,10 +38,11 @@@ import org.apache.cassandra.io.sstable. import org.apache.cassandra.Util; import static org.junit.Assert.assertEquals; +import static org.apache.cassandra.db.KeyspaceTest.assertColumns; import static org.junit.Assert.assertFalse; + import static org.junit.Assert.assertTrue; -import static org.apache.cassandra.db.KeyspaceTest.assertColumns; ++ + import static org.apache.cassandra.cql3.QueryProcessor.processInternal; import org.apache.cassandra.utils.ByteBufferUtil; @@@ -286,8 -283,10 +289,9 @@@ public class CompactionsPurgeTest exten String cfName = "Standard1"; Keyspace keyspace = Keyspace.open(keyspaceName); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName); - DecoratedKey key = Util.dk("key3"); RowMutation rm; + QueryFilter filter = QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()); // inserts rm = new RowMutation(keyspaceName, key.key); @@@ -316,4 -322,60 +323,60 @@@ for (Column c : cf) assert !c.isMarkedForDelete(System.currentTimeMillis()); } - } + + @Test + public void testRowTombstoneObservedBeforePurging() throws InterruptedException, ExecutionException, IOException + { + String keyspace = "cql_keyspace"; + String table = "table1"; + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + cfs.disableAutoCompaction(); + + // write a row out to one sstable + processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)", + keyspace, table, 1, "foo", 1)); + cfs.forceBlockingFlush(); + + UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1)); + assertEquals(1, result.size()); + + // write a row tombstone out to a second sstable + processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1)); + cfs.forceBlockingFlush(); + + // basic check that the row is considered deleted + assertEquals(2, cfs.getSSTables().size()); + result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1)); + assertEquals(0, result.size()); + + // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged + Future future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000); + future.get(); + + // the data should be gone, but the tombstone should still exist + assertEquals(1, cfs.getSSTables().size()); + result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1)); + assertEquals(0, result.size()); + + // write a row out to one sstable + processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)", + keyspace, table, 1, "foo", 1)); + cfs.forceBlockingFlush(); + assertEquals(2, cfs.getSSTables().size()); + result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1)); + assertEquals(1, result.size()); + + // write a row tombstone out to a different sstable + processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1)); + cfs.forceBlockingFlush(); + + // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged + future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000); + future.get(); + + // both the data and the tombstone should be gone this time + assertEquals(0, cfs.getSSTables().size()); + result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1)); + assertEquals(0, result.size()); + } -} ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ----------------------------------------------------------------------