Ensure that PerRowSecondaryIndex is notified of row-level deletes patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-5445
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7eb146e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7eb146e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7eb146e Branch: refs/heads/cassandra-1.2 Commit: c7eb146e5669a8e97b1997ce9860b769a3cc7b32 Parents: 87b350f Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Apr 9 16:48:29 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Apr 9 16:48:29 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/AtomicSortedColumns.java | 2 +- src/java/org/apache/cassandra/db/Table.java | 2 +- .../db/compaction/LazilyCompactedRow.java | 2 +- .../db/compaction/ParallelCompactionIterable.java | 2 +- .../cassandra/db/compaction/PrecompactedRow.java | 4 +- .../cassandra/db/index/SecondaryIndexManager.java | 93 ++------------- test/unit/org/apache/cassandra/SchemaLoader.java | 2 +- .../db/index/PerRowSecondaryIndexTest.java | 55 +++++++++ 9 files changed, 75 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0bbc133..32aba15 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,7 @@ 1.2.5 * Include fatal errors in trace events (CASSANDRA-5447) + * Ensure that PerRowSecondaryIndex is notified of row-level deletes + (CASSANDRA-5445) 1.2.4 http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/AtomicSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java index 552ad6a..bdb2168 100644 --- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java @@ -195,7 +195,7 @@ public class AtomicSortedColumns implements ISortedColumns } while (!ref.compareAndSet(current, modified)); - indexer.commit(); + indexer.updateRowLevelIndexes(); return sizeDelta; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/Table.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java index c718586..17c510b 100644 --- a/src/java/org/apache/cassandra/db/Table.java +++ b/src/java/org/apache/cassandra/db/Table.java @@ -387,7 +387,7 @@ public class Table } Tracing.trace("Adding to {} memtable", cf.metadata().cfName); - cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, true) : SecondaryIndexManager.nullUpdater); + cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key) : SecondaryIndexManager.nullUpdater); } } finally http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 8d59898..22f5413 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -71,7 +71,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable super(rows.get(0).getKey()); this.rows = rows; this.controller = controller; - indexer = controller.cfs.indexManager.updaterFor(key, false); + indexer = controller.cfs.indexManager.updaterFor(key); long maxDelTimestamp = Long.MIN_VALUE; for (OnDiskAtomIterator row : rows) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index 091b247..225393e 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -205,7 +205,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable data.add(FBUtilities.closeableIterator(row.cf.iterator())); } - PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key, false)); + PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key)); return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java index 0de9f42..b5dfed0 100644 --- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java @@ -87,7 +87,7 @@ public class PrecompactedRow extends AbstractCompactedRow // See comment in preceding method ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, shouldPurge ? controller.gcBefore : Integer.MIN_VALUE, - controller.cfs.indexManager.updaterFor(key, false)); + controller.cfs.indexManager.updaterFor(key)); if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative()) CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore); return compacted; @@ -121,7 +121,7 @@ public class PrecompactedRow extends AbstractCompactedRow } } - merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey(), false)); + merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey())); return returnCF; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index df7ceff..3b27614 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -52,7 +52,7 @@ public class SecondaryIndexManager public void remove(IColumn current) { } - public void commit() {} + public void updateRowLevelIndexes() {} }; /** @@ -480,11 +480,11 @@ public class SecondaryIndexManager * can get updated. Note: only a CF backed by AtomicSortedColumns implements this behaviour * fully, other types simply ignore the index updater. */ - public Updater updaterFor(final DecoratedKey key, boolean includeRowIndexes) + public Updater updaterFor(final DecoratedKey key) { - return (includeRowIndexes && !rowLevelIndexMap.isEmpty()) - ? new MixedIndexUpdater(key) - : indexesByColumn.isEmpty() ? nullUpdater : new PerColumnIndexUpdater(key); + return (indexesByColumn.isEmpty() && rowLevelIndexMap.isEmpty()) + ? nullUpdater + : new StandardUpdater(key); } /** @@ -589,65 +589,14 @@ public class SecondaryIndexManager public void remove(IColumn current); /** called after memtable updates are complete (CASSANDRA-5397) */ - public void commit(); + public void updateRowLevelIndexes(); } - private class PerColumnIndexUpdater implements Updater + private class StandardUpdater implements Updater { private final DecoratedKey key; - public PerColumnIndexUpdater(DecoratedKey key) - { - this.key = key; - } - - public void insert(IColumn column) - { - if (column.isMarkedForDelete()) - return; - - SecondaryIndex index = indexFor(column.name()); - if (index == null) - return; - - ((PerColumnSecondaryIndex) index).insert(key.key, column); - } - - public void update(IColumn oldColumn, IColumn column) - { - SecondaryIndex index = indexFor(column.name()); - if (index == null) - return; - - ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn); - if (!column.isMarkedForDelete()) - ((PerColumnSecondaryIndex) index).insert(key.key, column); - } - - public void remove(IColumn column) - { - if (column.isMarkedForDelete()) - return; - - SecondaryIndex index = indexFor(column.name()); - if (index == null) - return; - - ((PerColumnSecondaryIndex) index).delete(key.key, column); - } - - public void commit() - { - // this is a no-op as per-column index updates are applied immediately - } - } - - private class MixedIndexUpdater implements Updater - { - private final DecoratedKey key; - ConcurrentHashMap<SecondaryIndex, ByteBuffer> deferredUpdates = new ConcurrentHashMap<SecondaryIndex, ByteBuffer>(); - - public MixedIndexUpdater(DecoratedKey key) + public StandardUpdater(DecoratedKey key) { this.key = key; } @@ -662,13 +611,7 @@ public class SecondaryIndexManager return; if (index instanceof PerColumnSecondaryIndex) - { ((PerColumnSecondaryIndex) index).insert(key.key, column); - } - else - { - deferredUpdates.putIfAbsent(index, key.key); - } } public void update(IColumn oldColumn, IColumn column) @@ -683,10 +626,6 @@ public class SecondaryIndexManager if (!column.isMarkedForDelete()) ((PerColumnSecondaryIndex) index).insert(key.key, column); } - else - { - deferredUpdates.putIfAbsent(index, key.key); - } } public void remove(IColumn column) @@ -699,23 +638,13 @@ public class SecondaryIndexManager return; if (index instanceof PerColumnSecondaryIndex) - { ((PerColumnSecondaryIndex) index).delete(key.key, column); - } - else - { - // per-row secondary indexes are assumed to keep the index up-to-date at insert time, rather - // than performing lazy updates - } } - public void commit() + public void updateRowLevelIndexes() { - for (Map.Entry<SecondaryIndex, ByteBuffer> update : deferredUpdates.entrySet()) - { - assert update.getKey() instanceof PerRowSecondaryIndex; - ((PerRowSecondaryIndex) update.getKey()).index(update.getValue()); - } + for (SecondaryIndex index : rowLevelIndexMap.values()) + ((PerRowSecondaryIndex) index).index(key.key); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index cb17665..3db1fc5 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -323,7 +323,7 @@ public class SchemaLoader indexOptions, ByteBufferUtil.bytesToHex(cName), null)); - }}); + }}); } private static void useCompression(List<KSMetaData> schema) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java index 3a4f947..3080912 100644 --- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java @@ -26,14 +26,17 @@ import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.ByteBufferUtil; +import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Set; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class PerRowSecondaryIndexTest extends SchemaLoader { @@ -44,6 +47,12 @@ public class PerRowSecondaryIndexTest extends SchemaLoader // indexed & stashes it in a static variable for inspection // in the test. + @Before + public void clearTestStub() + { + TestIndex.reset(); + } + @Test public void testIndexInsertAndUpdate() throws IOException { @@ -65,11 +74,56 @@ public class PerRowSecondaryIndexTest extends SchemaLoader indexedRow = TestIndex.LAST_INDEXED_ROW; assertNotNull(indexedRow); assertEquals(ByteBufferUtil.bytes("bar"), indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value()); + assertTrue(Arrays.equals("k1".getBytes(), TestIndex.LAST_INDEXED_KEY.array())); + } + + @Test + public void testColumnDelete() throws IOException + { + // issue a column delete and test that the configured index instance was notified to update + RowMutation rm; + rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k2")); + rm.delete(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("indexed")), 1); + rm.apply(); + + ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW; + assertNotNull(indexedRow); + + for (IColumn column : indexedRow.getSortedColumns()) + { + assertTrue(column.isMarkedForDelete()); + } + assertTrue(Arrays.equals("k2".getBytes(), TestIndex.LAST_INDEXED_KEY.array())); + } + + @Test + public void testRowDelete() throws IOException + { + // issue a row level delete and test that the configured index instance was notified to update + RowMutation rm; + rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k3")); + rm.delete(new QueryPath("Indexed1"), 1); + rm.apply(); + + ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW; + assertNotNull(indexedRow); + for (IColumn column : indexedRow.getSortedColumns()) + { + assertTrue(column.isMarkedForDelete()); + } + assertTrue(Arrays.equals("k3".getBytes(), TestIndex.LAST_INDEXED_KEY.array())); } public static class TestIndex extends PerRowSecondaryIndex { public static ColumnFamily LAST_INDEXED_ROW; + public static ByteBuffer LAST_INDEXED_KEY; + + public static void reset() + { + LAST_INDEXED_KEY = null; + LAST_INDEXED_ROW = null; + } @Override public void index(ByteBuffer rowKey, ColumnFamily cf) @@ -82,6 +136,7 @@ public class PerRowSecondaryIndexTest extends SchemaLoader QueryFilter filter = QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey), new QueryPath(baseCfs.getColumnFamilyName())); LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter); + LAST_INDEXED_KEY = rowKey; } @Override