Ensure that PerRowSecondaryIndex is notified of row-level deletes
patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-5445


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7eb146e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7eb146e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7eb146e

Branch: refs/heads/cassandra-1.2
Commit: c7eb146e5669a8e97b1997ce9860b769a3cc7b32
Parents: 87b350f
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Tue Apr 9 16:48:29 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Tue Apr 9 16:48:29 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../apache/cassandra/db/AtomicSortedColumns.java   |    2 +-
 src/java/org/apache/cassandra/db/Table.java        |    2 +-
 .../db/compaction/LazilyCompactedRow.java          |    2 +-
 .../db/compaction/ParallelCompactionIterable.java  |    2 +-
 .../cassandra/db/compaction/PrecompactedRow.java   |    4 +-
 .../cassandra/db/index/SecondaryIndexManager.java  |   93 ++-------------
 test/unit/org/apache/cassandra/SchemaLoader.java   |    2 +-
 .../db/index/PerRowSecondaryIndexTest.java         |   55 +++++++++
 9 files changed, 75 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0bbc133..32aba15 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,7 @@
 1.2.5
  * Include fatal errors in trace events (CASSANDRA-5447)
+ * Ensure that PerRowSecondaryIndex is notified of row-level deletes
+   (CASSANDRA-5445)
 
 
 1.2.4

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java 
b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 552ad6a..bdb2168 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -195,7 +195,7 @@ public class AtomicSortedColumns implements ISortedColumns
         }
         while (!ref.compareAndSet(current, modified));
 
-        indexer.commit();
+        indexer.updateRowLevelIndexes();
 
         return sizeDelta;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java 
b/src/java/org/apache/cassandra/db/Table.java
index c718586..17c510b 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -387,7 +387,7 @@ public class Table
                 }
 
                 Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
-                cfs.apply(key, cf, updateIndexes ? 
cfs.indexManager.updaterFor(key, true) : SecondaryIndexManager.nullUpdater);
+                cfs.apply(key, cf, updateIndexes ? 
cfs.indexManager.updaterFor(key) : SecondaryIndexManager.nullUpdater);
             }
         }
         finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 8d59898..22f5413 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -71,7 +71,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow 
implements Iterable
         super(rows.get(0).getKey());
         this.rows = rows;
         this.controller = controller;
-        indexer = controller.cfs.indexManager.updaterFor(key, false);
+        indexer = controller.cfs.indexManager.updaterFor(key);
 
         long maxDelTimestamp = Long.MIN_VALUE;
         for (OnDiskAtomIterator row : rows)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java 
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 091b247..225393e 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -205,7 +205,7 @@ public class ParallelCompactionIterable extends 
AbstractCompactionIterable
                     data.add(FBUtilities.closeableIterator(row.cf.iterator()));
                 }
 
-                PrecompactedRow.merge(returnCF, data, 
controller.cfs.indexManager.updaterFor(rows.get(0).key, false));
+                PrecompactedRow.merge(returnCF, data, 
controller.cfs.indexManager.updaterFor(rows.get(0).key));
                 return 
PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, 
returnCF);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 0de9f42..b5dfed0 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -87,7 +87,7 @@ public class PrecompactedRow extends AbstractCompactedRow
         // See comment in preceding method
         ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
                                                                  shouldPurge ? 
controller.gcBefore : Integer.MIN_VALUE,
-                                                                 
controller.cfs.indexManager.updaterFor(key, false));
+                                                                 
controller.cfs.indexManager.updaterFor(key));
         if (shouldPurge && compacted != null && 
compacted.metadata().getDefaultValidator().isCommutative())
             CounterColumn.mergeAndRemoveOldShards(key, compacted, 
controller.gcBefore, controller.mergeShardBefore);
         return compacted;
@@ -121,7 +121,7 @@ public class PrecompactedRow extends AbstractCompactedRow
             }
         }
 
-        merge(returnCF, data, 
controller.cfs.indexManager.updaterFor(rows.get(0).getKey(), false));
+        merge(returnCF, data, 
controller.cfs.indexManager.updaterFor(rows.get(0).getKey()));
 
         return returnCF;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index df7ceff..3b27614 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -52,7 +52,7 @@ public class SecondaryIndexManager
 
         public void remove(IColumn current) { }
 
-        public void commit() {}
+        public void updateRowLevelIndexes() {}
     };
 
     /**
@@ -480,11 +480,11 @@ public class SecondaryIndexManager
      * can get updated. Note: only a CF backed by AtomicSortedColumns 
implements this behaviour
      * fully, other types simply ignore the index updater.
      */
-    public Updater updaterFor(final DecoratedKey key, boolean 
includeRowIndexes)
+    public Updater updaterFor(final DecoratedKey key)
     {
-        return (includeRowIndexes && !rowLevelIndexMap.isEmpty())
-               ? new MixedIndexUpdater(key)
-               : indexesByColumn.isEmpty() ? nullUpdater : new 
PerColumnIndexUpdater(key);
+        return (indexesByColumn.isEmpty() && rowLevelIndexMap.isEmpty())
+                ? nullUpdater
+                : new StandardUpdater(key);
     }
 
     /**
@@ -589,65 +589,14 @@ public class SecondaryIndexManager
         public void remove(IColumn current);
 
         /** called after memtable updates are complete (CASSANDRA-5397) */
-        public void commit();
+        public void updateRowLevelIndexes();
     }
 
-    private class PerColumnIndexUpdater implements Updater
+    private class StandardUpdater implements Updater
     {
         private final DecoratedKey key;
 
-        public PerColumnIndexUpdater(DecoratedKey key)
-        {
-            this.key = key;
-        }
-
-        public void insert(IColumn column)
-        {
-            if (column.isMarkedForDelete())
-                return;
-
-            SecondaryIndex index = indexFor(column.name());
-            if (index == null)
-                return;
-
-            ((PerColumnSecondaryIndex) index).insert(key.key, column);
-        }
-
-        public void update(IColumn oldColumn, IColumn column)
-        {
-            SecondaryIndex index = indexFor(column.name());
-            if (index == null)
-                return;
-
-            ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
-            if (!column.isMarkedForDelete())
-                ((PerColumnSecondaryIndex) index).insert(key.key, column);
-        }
-
-        public void remove(IColumn column)
-        {
-            if (column.isMarkedForDelete())
-                return;
-
-            SecondaryIndex index = indexFor(column.name());
-            if (index == null)
-                return;
-
-            ((PerColumnSecondaryIndex) index).delete(key.key, column);
-        }
-
-        public void commit()
-        {
-            // this is a no-op as per-column index updates are applied 
immediately
-        }
-    }
-
-    private class MixedIndexUpdater implements Updater
-    {
-        private final DecoratedKey key;
-        ConcurrentHashMap<SecondaryIndex, ByteBuffer> deferredUpdates = new 
ConcurrentHashMap<SecondaryIndex, ByteBuffer>();
-
-        public MixedIndexUpdater(DecoratedKey key)
+        public StandardUpdater(DecoratedKey key)
         {
             this.key = key;
         }
@@ -662,13 +611,7 @@ public class SecondaryIndexManager
                 return;
 
             if (index instanceof PerColumnSecondaryIndex)
-            {
                 ((PerColumnSecondaryIndex) index).insert(key.key, column);
-            }
-            else
-            {
-                deferredUpdates.putIfAbsent(index, key.key);
-            }
         }
 
         public void update(IColumn oldColumn, IColumn column)
@@ -683,10 +626,6 @@ public class SecondaryIndexManager
                 if (!column.isMarkedForDelete())
                     ((PerColumnSecondaryIndex) index).insert(key.key, column);
             }
-            else
-            {
-                deferredUpdates.putIfAbsent(index, key.key);
-            }
         }
 
         public void remove(IColumn column)
@@ -699,23 +638,13 @@ public class SecondaryIndexManager
                 return;
 
             if (index instanceof PerColumnSecondaryIndex)
-            {
                 ((PerColumnSecondaryIndex) index).delete(key.key, column);
-            }
-            else
-            {
-                // per-row secondary indexes are assumed to keep the index 
up-to-date at insert time, rather
-                // than performing lazy updates
-            }
         }
 
-        public void commit()
+        public void updateRowLevelIndexes()
         {
-            for (Map.Entry<SecondaryIndex, ByteBuffer> update : 
deferredUpdates.entrySet())
-            {
-                assert update.getKey() instanceof PerRowSecondaryIndex;
-                ((PerRowSecondaryIndex) 
update.getKey()).index(update.getValue());
-            }
+            for (SecondaryIndex index : rowLevelIndexMap.values())
+                ((PerRowSecondaryIndex) index).index(key.key);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java 
b/test/unit/org/apache/cassandra/SchemaLoader.java
index cb17665..3db1fc5 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -323,7 +323,7 @@ public class SchemaLoader
                                 indexOptions,
                                 ByteBufferUtil.bytesToHex(cName),
                                 null));
-                    }});
+                }});
     }
 
     private static void useCompression(List<KSMetaData> schema)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7eb146e/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java 
b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
index 3a4f947..3080912 100644
--- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
@@ -26,14 +26,17 @@ import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Set;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class PerRowSecondaryIndexTest extends SchemaLoader
 {
@@ -44,6 +47,12 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
     // indexed & stashes it in a static variable for inspection
     // in the test.
 
+    @Before
+    public void clearTestStub()
+    {
+        TestIndex.reset();
+    }
+
     @Test
     public void testIndexInsertAndUpdate() throws IOException
     {
@@ -65,11 +74,56 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
         indexedRow = TestIndex.LAST_INDEXED_ROW;
         assertNotNull(indexedRow);
         assertEquals(ByteBufferUtil.bytes("bar"), 
indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value());
+        assertTrue(Arrays.equals("k1".getBytes(), 
TestIndex.LAST_INDEXED_KEY.array()));
+    }
+
+    @Test
+    public void testColumnDelete() throws IOException
+    {
+        // issue a column delete and test that the configured index instance 
was notified to update
+        RowMutation rm;
+        rm = new RowMutation("PerRowSecondaryIndex", 
ByteBufferUtil.bytes("k2"));
+        rm.delete(new QueryPath("Indexed1", null, 
ByteBufferUtil.bytes("indexed")), 1);
+        rm.apply();
+
+        ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW;
+        assertNotNull(indexedRow);
+
+        for (IColumn column : indexedRow.getSortedColumns())
+        {
+            assertTrue(column.isMarkedForDelete());
+        }
+        assertTrue(Arrays.equals("k2".getBytes(), 
TestIndex.LAST_INDEXED_KEY.array()));
+    }
+
+    @Test
+    public void testRowDelete() throws IOException
+    {
+        // issue a row level delete and test that the configured index 
instance was notified to update
+        RowMutation rm;
+        rm = new RowMutation("PerRowSecondaryIndex", 
ByteBufferUtil.bytes("k3"));
+        rm.delete(new QueryPath("Indexed1"), 1);
+        rm.apply();
+
+        ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW;
+        assertNotNull(indexedRow);
+        for (IColumn column : indexedRow.getSortedColumns())
+        {
+            assertTrue(column.isMarkedForDelete());
+        }
+        assertTrue(Arrays.equals("k3".getBytes(), 
TestIndex.LAST_INDEXED_KEY.array()));
     }
 
     public static class TestIndex extends PerRowSecondaryIndex
     {
         public static ColumnFamily LAST_INDEXED_ROW;
+        public static ByteBuffer LAST_INDEXED_KEY;
+
+        public static void reset()
+        {
+            LAST_INDEXED_KEY = null;
+            LAST_INDEXED_ROW = null;
+        }
 
         @Override
         public void index(ByteBuffer rowKey, ColumnFamily cf)
@@ -82,6 +136,7 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
             QueryFilter filter = 
QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey),
                                                                new 
QueryPath(baseCfs.getColumnFamilyName()));
             LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter);
+            LAST_INDEXED_KEY = rowKey;
         }
 
         @Override

Reply via email to