avoid duplicate index entries ind PrecompactedRow and ParallelCompactionIterable patch by jbellis; reviewed by Sam Tunnicliffe for CASSANDRA-5395
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a143966e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a143966e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a143966e Branch: refs/heads/cassandra-1.2 Commit: a143966e1cfe3f782f6237d67d216ef8bc2d4713 Parents: 0ab3d60 Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed Mar 27 16:17:53 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Mar 29 12:04:08 2013 -0500 ---------------------------------------------------------------------- .../db/compaction/LazilyCompactedRow.java | 6 +- .../db/compaction/ParallelCompactionIterable.java | 25 +++--- .../cassandra/db/compaction/PrecompactedRow.java | 68 +++++++++++---- .../db/compaction/CompactionsPurgeTest.java | 5 +- 4 files changed, 70 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a143966e/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 76d5100..8d59898 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -254,8 +254,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable { IColumn column = (IColumn) current; container.addColumn(column); - if (container.getColumn(column.name()) != column) + if (indexer != SecondaryIndexManager.nullUpdater + && !column.isMarkedForDelete() + && container.getColumn(column.name()) != column) + { indexer.remove(column); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a143966e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index 7e1983c..24b1d00 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -184,6 +184,9 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable executor.shutdown(); } + /** + * Merges a set of in-memory rows + */ private class MergeTask implements Callable<ColumnFamily> { private final List<Row> rows; @@ -195,23 +198,17 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable public ColumnFamily call() throws Exception { - ColumnFamily cf = null; + final ColumnFamily returnCF = ColumnFamily.create(controller.cfs.metadata, ArrayBackedSortedColumns.factory()); + + List<CloseableIterator<IColumn>> data = new ArrayList<CloseableIterator<IColumn>>(rows.size()); for (Row row : rows) { - ColumnFamily thisCF = row.cf; - if (cf == null) - { - cf = thisCF; - } - else - { - // addAll is ok even if cf is an ArrayBackedSortedColumns - SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.updaterFor(row.key, false); - cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<IColumn>identity(), indexer); - } + returnCF.delete(row.cf); + data.add(FBUtilities.closeableIterator(row.cf.iterator())); } - return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, cf); + PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key, false)); + return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF); } } @@ -300,7 +297,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable else { logger.debug("parallel eager deserialize from " + iter.getPath()); - queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(TreeMapBackedSortedColumns.factory())))); + queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory())))); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a143966e/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java index be4b20e..0de9f42 100644 --- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java @@ -20,16 +20,21 @@ package org.apache.cassandra.db.compaction; import java.io.DataOutput; import java.io.IOException; import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; -import com.google.common.base.Functions; - import org.apache.cassandra.db.*; +import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.sstable.ColumnStats; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.utils.HeapAllocator; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MergeIterator; /** * PrecompactedRow merges its rows in its constructor in memory. @@ -97,34 +102,61 @@ public class PrecompactedRow extends AbstractCompactedRow private static ColumnFamily merge(List<SSTableIdentityIterator> rows, CompactionController controller) { assert !rows.isEmpty(); - ColumnFamily cf = null; - SecondaryIndexManager.Updater indexer = null; + + final ColumnFamily returnCF = ColumnFamily.create(controller.cfs.metadata, ArrayBackedSortedColumns.factory()); + + // transform into iterators that MergeIterator will like, and apply row-level tombstones + List<CloseableIterator<IColumn>> data = new ArrayList<CloseableIterator<IColumn>>(rows.size()); for (SSTableIdentityIterator row : rows) { - ColumnFamily thisCF; try { - // use a map for the first once since that will be the one we merge into - ISortedColumns.Factory factory = cf == null ? TreeMapBackedSortedColumns.factory() : ArrayBackedSortedColumns.factory(); - thisCF = row.getColumnFamilyWithColumns(factory); + ColumnFamily cf = row.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory()); + returnCF.delete(cf); + data.add(FBUtilities.closeableIterator(cf.iterator())); } catch (IOException e) { - throw new RuntimeException("Failed merge of rows on row with key: " + row.getKey(), e); + throw new RuntimeException(e); } + } + + merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey(), false)); + + return returnCF; + } - if (cf == null) + // returnCF should already have row-level tombstones applied + public static void merge(final ColumnFamily returnCF, List<CloseableIterator<IColumn>> data, final SecondaryIndexManager.Updater indexer) + { + IDiskAtomFilter filter = new IdentityQueryFilter(); + Comparator<IColumn> fcomp = filter.getColumnComparator(returnCF.getComparator()); + + MergeIterator.Reducer<IColumn, IColumn> reducer = new MergeIterator.Reducer<IColumn, IColumn>() + { + ColumnFamily container = returnCF.cloneMeShallow(); + + public void reduce(IColumn column) { - cf = thisCF; - indexer = controller.cfs.indexManager.updaterFor(row.getKey(), false); // only init indexer once + container.addColumn(column); + if (indexer != SecondaryIndexManager.nullUpdater + && !column.isMarkedForDelete() + && container.getColumn(column.name()) != column) + { + indexer.remove(column); + } } - else + + protected IColumn getReduced() { - // addAll is ok even if cf is an ArrayBackedSortedColumns - cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<IColumn>identity(), indexer); + IColumn c = container.iterator().next(); + container.clear(); + return c; } - } - return cf; + }; + + Iterator<IColumn> reduced = MergeIterator.get(data, fcomp, reducer); + filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC); } public long write(DataOutput out) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/a143966e/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index ea9763a..9629017 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@ -153,6 +153,7 @@ public class CompactionsPurgeTest extends SchemaLoader @Test public void testMinTimestampPurge() throws IOException, ExecutionException, InterruptedException { + // verify that we don't drop tombstones during a minor compaction that might still be relevant CompactionManager.instance.disableAutoCompaction(); Table table = Table.open(TABLE2); String cfName = "Standard1"; @@ -180,8 +181,10 @@ public class CompactionsPurgeTest extends SchemaLoader cfs.forceBlockingFlush(); cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null); + // we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone + // so it would be invalid to assume we can throw out the c1 entry. ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, new QueryPath(cfName))); - Assert.assertTrue(!cf.getColumn(ByteBufferUtil.bytes("c2")).isLive()); + Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive()); Assert.assertEquals(2, cf.getColumnCount()); }