Author: jbellis Date: Wed Oct 6 17:19:26 2010 New Revision: 1005174 URL: http://svn.apache.org/viewvc?rev=1005174&view=rev Log: fix 2ary index support for deletions patch by jbellis; reviewed by Stu Hood for CASSANDRA-1546
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/Table.java cassandra/trunk/test/conf/cassandra.yaml cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1005174&r1=1005173&r2=1005174&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Wed Oct 6 17:19:26 2010 @@ -12,6 +12,7 @@ dev * fix unbootstrap when no data is present in a transfer range (CASSANDRA-1573) * take advantage of AVRO-495 to simplify our avro IDL (CASSANDRA-1436) * extend authorization hierarchy to column family (CASSANDRA-1554) + * deletion support in secondary indexes (CASSANDRA-1571) 0.7-beta2 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1005174&r1=1005173&r2=1005174&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Oct 6 17:19:26 2010 @@ -354,7 +354,7 @@ public class Table SortedSet<byte[]> mutatedIndexedColumns = null; for (byte[] column : cfs.getIndexedColumns()) { - if (cf.getColumnNames().contains(column)) + if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete()) { if (mutatedIndexedColumns == null) mutatedIndexedColumns = new TreeSet<byte[]>(FBUtilities.byteArrayComparator); @@ -367,8 +367,12 @@ public class Table ColumnFamily oldIndexedColumns = null; if (mutatedIndexedColumns != null) { + // with the raw data CF, we can just apply every update in any order and let + // read-time resolution throw out obsolete versions, thus avoiding read-before-write. + // but for indexed data we need to make sure that we're not creating index entries + // for obsolete writes. oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns); - ignoreObsoleteMutations(cf, cfs.metadata.reconciler, mutatedIndexedColumns, oldIndexedColumns); + ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns); } Memtable fullMemtable = cfs.apply(key, cf); @@ -402,14 +406,22 @@ public class Table return memtablesToFlush; } - private static void ignoreObsoleteMutations(ColumnFamily cf, AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns, ColumnFamily oldIndexedColumns) + private static void ignoreObsoleteMutations(ColumnFamily cf, SortedSet<byte[]> mutatedIndexedColumns, ColumnFamily oldIndexedColumns) { if (oldIndexedColumns == null) return; + ColumnFamily cf2 = cf.cloneMe(); for (IColumn oldColumn : oldIndexedColumns) { - if (reconciler.reconcile((Column) oldColumn, (Column) cf.getColumn(oldColumn.name())).equals(oldColumn)) + cf2.addColumn(oldColumn); + } + ColumnFamily resolved = ColumnFamilyStore.removeDeleted(cf2, Integer.MAX_VALUE); + + for (IColumn oldColumn : oldIndexedColumns) + { + IColumn resolvedColumn = resolved == null ? null : resolved.getColumn(oldColumn.name()); + if (resolvedColumn != null && resolvedColumn.equals(oldColumn)) { cf.remove(oldColumn.name()); mutatedIndexedColumns.remove(oldColumn.name()); @@ -424,6 +436,10 @@ public class Table return cfs.getColumnFamily(filter); } + /** + * removes obsolete index entries and creates new ones for the given row key and mutated columns. + * @return list of full (index CF) memtables + */ private static List<Memtable> applyIndexUpdates(byte[] key, ColumnFamily cf, ColumnFamilyStore cfs, @@ -436,6 +452,9 @@ public class Table for (byte[] columnName : mutatedIndexedColumns) { IColumn column = cf.getColumn(columnName); + if (column == null || column.isMarkedForDelete()) + continue; // null column == row deletion + DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value()); ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName); if (column instanceof ExpiringColumn) @@ -444,7 +463,9 @@ public class Table cfi.addColumn(new ExpiringColumn(key, ArrayUtils.EMPTY_BYTE_ARRAY, ec.clock(), ec.getTimeToLive(), ec.getLocalDeletionTime())); } else + { cfi.addColumn(new Column(key, ArrayUtils.EMPTY_BYTE_ARRAY, column.clock())); + } Memtable fullMemtable = cfs.getIndexedColumnFamilyStore(columnName).apply(valueKey, cfi); if (fullMemtable != null) fullMemtables = addFullMemtable(fullMemtables, fullMemtable); @@ -458,6 +479,8 @@ public class Table { byte[] columnName = entry.getKey(); IColumn column = entry.getValue(); + if (column.isMarkedForDelete()) + continue; DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value()); ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName); cfi.deleteColumn(key, localDeletionTime, column.clock()); Modified: cassandra/trunk/test/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1005174&r1=1005173&r2=1005174&view=diff ============================================================================== --- cassandra/trunk/test/conf/cassandra.yaml (original) +++ cassandra/trunk/test/conf/cassandra.yaml Wed Oct 6 17:19:26 2010 @@ -105,14 +105,18 @@ keyspaces: replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy replication_factor: 5 column_families: - - name: Standard1 + - name: Indexed1 + column_metadata: + - name: birthdate + validator_class: LongType + index_type: KEYS + - name: Keyspace4 replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy replication_factor: 3 column_families: - - name: Standard1 - name: Standard3 @@ -128,5 +132,4 @@ keyspaces: replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy replication_factor: 2 column_families: - - name: Standard1 Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1005174&r1=1005173&r2=1005174&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Wed Oct 6 17:19:26 2010 @@ -177,7 +177,7 @@ public class ColumnFamilyStoreTest exten rm.add(new QueryPath("Indexed1", null, "notbirthdate".getBytes("UTF8")), FBUtilities.toByteArray(2L), new TimestampClock(0)); rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(0)); rm.apply(); - + rm = new RowMutation("Keyspace1", "k4aaaa".getBytes()); rm.add(new QueryPath("Indexed1", null, "notbirthdate".getBytes("UTF8")), FBUtilities.toByteArray(2L), new TimestampClock(0)); rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(3L), new TimestampClock(0)); @@ -223,6 +223,55 @@ public class ColumnFamilyStoreTest exten } @Test + public void testIndexDeletions() throws IOException + { + ColumnFamilyStore cfs = Table.open("Keyspace3").getColumnFamilyStore("Indexed1"); + RowMutation rm; + + rm = new RowMutation("Keyspace3", "k1".getBytes()); + rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(0)); + rm.apply(); + + IndexExpression expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, FBUtilities.toByteArray(1L)); + IndexClause clause = new IndexClause(Arrays.asList(expr), ArrayUtils.EMPTY_BYTE_ARRAY, 100); + IFilter filter = new IdentityQueryFilter(); + IPartitioner p = StorageService.getPartitioner(); + Range range = new Range(p.getMinimumToken(), p.getMinimumToken()); + List<Row> rows = cfs.scan(clause, range, filter); + assert rows.size() == 1 : StringUtils.join(rows, ","); + assert Arrays.equals("k1".getBytes(), rows.get(0).key.key); + + // delete the column directly + rm = new RowMutation("Keyspace3", "k1".getBytes()); + rm.delete(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), new TimestampClock(1)); + rm.apply(); + rows = cfs.scan(clause, range, filter); + assert rows.isEmpty(); + + // verify that it's not being indexed under the deletion column value either + IColumn deletion = rm.getColumnFamilies().iterator().next().iterator().next(); + IndexExpression expr0 = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, deletion.value()); + IndexClause clause0 = new IndexClause(Arrays.asList(expr0), ArrayUtils.EMPTY_BYTE_ARRAY, 100); + rows = cfs.scan(clause0, range, filter); + assert rows.isEmpty(); + + // resurrect w/ a newer timestamp + rm = new RowMutation("Keyspace3", "k1".getBytes()); + rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(2)); + rm.apply(); + rows = cfs.scan(clause, range, filter); + assert rows.size() == 1 : StringUtils.join(rows, ","); + assert Arrays.equals("k1".getBytes(), rows.get(0).key.key); + + // delete the entire row + rm = new RowMutation("Keyspace3", "k1".getBytes()); + rm.delete(new QueryPath("Indexed1"), new TimestampClock(3)); + rm.apply(); + rows = cfs.scan(clause, range, filter); + assert rows.isEmpty() : StringUtils.join(rows, ","); + } + + @Test public void testIndexUpdate() throws IOException { Table table = Table.open("Keyspace2");