merge from 2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0bfa210d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0bfa210d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0bfa210d Branch: refs/heads/trunk Commit: 0bfa210d071b664b37d6ba5ee4eda280f47d7b0e Parents: d53c838 4e9a7b8 Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Dec 13 00:34:09 2013 +0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Dec 13 00:34:09 2013 +0600 ---------------------------------------------------------------------- .../db/compaction/LazilyCompactedRow.java | 30 ++++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bfa210d/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 8237ff5,0d33b22..23457bc --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@@ -56,9 -58,7 +56,9 @@@ public class LazilyCompactedRow extend private boolean closed; private ColumnIndex.Builder indexBuilder; private final SecondaryIndexManager.Updater indexer; + private final Reducer reducer; + private final Iterator<OnDiskAtom> merger; - private DeletionInfo deletionInfo; + private DeletionTime maxRowTombstone; public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows) { @@@ -69,36 -69,25 +69,40 @@@ // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp. This may be // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge - deletionInfo = DeletionInfo.live(); + maxRowTombstone = DeletionTime.LIVE; for (OnDiskAtomIterator row : rows) - deletionInfo = deletionInfo.add(row.getColumnFamily().deletionInfo()); + { + DeletionTime rowTombstone = row.getColumnFamily().deletionInfo().getTopLevelDeletion(); + if (maxRowTombstone.compareTo(rowTombstone) < 0) + maxRowTombstone = rowTombstone; + } - - // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with - // tombstones newer than the row-level tombstones we've seen -- but we won't know that - // until we iterate over them. By passing MAX_VALUE we will only purge if there are - // no other versions of this row present. - this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE); + // tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable + // containing `key` outside of the set of sstables involved in this compaction. + maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key); - emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata); - emptyColumnFamily.setDeletionInfo(deletionInfo.copy()); - if (deletionInfo.maxTimestamp() < maxPurgeableTimestamp) + emptyColumnFamily = EmptyColumns.factory.create(controller.cfs.metadata); + emptyColumnFamily.delete(maxRowTombstone); - if (shouldPurge) ++ if (maxRowTombstone.markedForDeleteAt < maxPurgeableTimestamp) emptyColumnFamily.purgeTombstones(controller.gcBefore); + + reducer = new Reducer(); + merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull()); + } + + private static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf) + { + // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that + // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this + // without purging tombstones. + int overriddenGCBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE; + ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.updaterFor(key)); + + // if we have counters, remove old shards + if (cf.metadata().getDefaultValidator().isCommutative()) + CounterColumn.mergeAndRemoveOldShards(key, cf, controller.gcBefore, controller.mergeShardBefore); + + return cf; } public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException @@@ -252,19 -257,18 +257,20 @@@ } else { + boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp; // when we clear() the container, it removes the deletion info, so this needs to be reset each time - container.setDeletionInfo(deletionInfo); - ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container); - if (purged == null || !purged.iterator().hasNext()) + container.delete(maxRowTombstone); - ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container); - if (purged == null || !purged.iterator().hasNext()) ++ removeDeletedAndOldShards(key, shouldPurge, controller, container); ++ Iterator<Column> iter = container.iterator(); ++ if (!iter.hasNext()) { container.clear(); return null; } -- Column reduced = purged.iterator().next(); ++ Column reduced = iter.next(); container.clear(); - // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times, + // removeDeletedAndOldShards have only checked the top-level CF deletion times, // not the range tombstone. For that we use the columnIndexer tombstone tracker. if (indexBuilder.tombstoneTracker().isDeleted(reduced)) {