Author: jbellis Date: Thu Oct 7 18:12:13 2010 New Revision: 1005551 URL: http://svn.apache.org/viewvc?rev=1005551&view=rev Log: Allow dynamic secondary index creation and destruction patch by jbellis; reviewed by gdusbabek for CASSANDRA-1532
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java cassandra/trunk/test/system/test_thrift_server.py Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1005551&r1=1005550&r2=1005551&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Thu Oct 7 18:12:13 2010 @@ -20,6 +20,7 @@ dev * add cache save/load ability (CASSANDRA-1417) * add StorageService.getDrainProgress (CASSANDRA-1588) * Disallow bootstrap to an in-use token (CASSANDRA-1561) + * Allow dynamic secondary index creation and destruction (CASSANDRA-1532) 0.7-beta2 Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1005551&r1=1005550&r2=1005551&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu Oct 7 18:12:13 2010 @@ -547,6 +547,20 @@ public final class CFMetaData validateMinMaxCompactionThresholds(cf_def); + Map<byte[], ColumnDefinition> metadata = new HashMap<byte[], ColumnDefinition>(); + if (cf_def.column_metadata == null) + { + metadata = column_metadata; + } + else + { + for (org.apache.cassandra.thrift.ColumnDef def : cf_def.column_metadata) + { + ColumnDefinition cd = new ColumnDefinition(def.name, def.validation_class, def.index_type, def.index_name); + metadata.put(cd.name, cd); + } + } + return new CFMetaData(tableName, cfName, cfType, @@ -564,7 +578,7 @@ public final class CFMetaData cf_def.min_compaction_threshold, cf_def.max_compaction_threshold, cfId, - column_metadata, + metadata, rowCacheSavePeriodInSeconds, keyCacheSavePeriodInSeconds); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=1005551&r1=1005550&r2=1005551&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Thu Oct 7 18:12:13 2010 @@ -345,4 +345,20 @@ public class SystemTable forceBlockingFlush(INDEX_CF); } + + public static void setIndexRemoved(String table, String indexName) + { + RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, table.getBytes(UTF_8)); + rm.delete(new QueryPath(INDEX_CF, null, indexName.getBytes(UTF_8)), new TimestampClock(System.currentTimeMillis())); + try + { + rm.apply(); + } + catch (IOException e) + { + throw new IOError(e); + } + + forceBlockingFlush(INDEX_CF); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java?rev=1005551&r1=1005550&r2=1005551&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java Thu Oct 7 18:12:13 2010 @@ -1,10 +1,8 @@ package org.apache.cassandra.db.migration; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ConfigurationException; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.*; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemTable; import org.apache.cassandra.db.Table; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; @@ -12,6 +10,7 @@ import org.apache.cassandra.utils.UUIDGe import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Licensed to the Apache Software Foundation (ASF) under one @@ -51,10 +50,6 @@ public class UpdateColumnFamily extends this.oldCfm = oldCfm; this.newCfm = newCfm; - // we'll allow this eventually. - if (!oldCfm.column_metadata.equals(newCfm.column_metadata)) - throw new ConfigurationException("Column meta information is not identical."); - // clone ksm but include the new cf def. KSMetaData newKsm = makeNewKeyspaceDefinition(ksm); rm = Migration.makeDefinitionMutation(newKsm, null, newVersion); @@ -78,12 +73,30 @@ public class UpdateColumnFamily extends void applyModels() throws IOException { - // all we really need to do is reload the cfstore. + logger.debug("Updating " + oldCfm + " to " + newCfm); KSMetaData newKsm = makeNewKeyspaceDefinition(DatabaseDescriptor.getTableDefinition(newCfm.tableName)); DatabaseDescriptor.setTableDefinition(newKsm, newVersion); if (!clientMode) - Table.open(oldCfm.tableName).reloadCf(newCfm.cfId); + { + Table table = Table.open(oldCfm.tableName); + ColumnFamilyStore oldCfs = table.getColumnFamilyStore(oldCfm.cfName); + table.reloadCf(newCfm.cfId); + + // clean up obsolete index data files + for (Map.Entry<byte[], ColumnDefinition> entry : oldCfm.column_metadata.entrySet()) + { + byte[] column = entry.getKey(); + ColumnDefinition def = entry.getValue(); + if (def.index_type != null + && (!newCfm.column_metadata.containsKey(column) || newCfm.column_metadata.get(column).index_type == null)) + { + ColumnFamilyStore indexCfs = oldCfs.getIndexedColumnFamilyStore(column); + SystemTable.setIndexRemoved(table.name, indexCfs.columnFamily); + indexCfs.removeAllSSTables(); + } + } + } } public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi) Modified: cassandra/trunk/test/system/test_thrift_server.py URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1005551&r1=1005550&r2=1005551&view=diff ============================================================================== --- cassandra/trunk/test/system/test_thrift_server.py (original) +++ cassandra/trunk/test/system/test_thrift_server.py Thu Oct 7 18:12:13 2010 @@ -1364,6 +1364,46 @@ class TestMutations(ThriftTester): assert 'NewColumnFamily' not in [x.name for x in ks1.cf_defs] assert 'Standard1' in [x.name for x in ks1.cf_defs] + def test_dynamic_indexes_with_system_update_cf(self): + _set_keyspace('Keyspace1') + cd = ColumnDef('birthdate', 'BytesType', None, None) + newcf = CfDef('Keyspace1', 'ToBeIndexed', default_validation_class='LongType', column_metadata=[cd]) + client.system_add_column_family(newcf) + + client.insert('key1', ColumnParent('ToBeIndexed'), Column('birthdate', _i64(1), 0), ConsistencyLevel.ONE) + client.insert('key2', ColumnParent('ToBeIndexed'), Column('birthdate', _i64(2), 0), ConsistencyLevel.ONE) + client.insert('key2', ColumnParent('ToBeIndexed'), Column('b', _i64(2), 0), ConsistencyLevel.ONE) + client.insert('key3', ColumnParent('ToBeIndexed'), Column('birthdate', _i64(3), 0), ConsistencyLevel.ONE) + client.insert('key3', ColumnParent('ToBeIndexed'), Column('b', _i64(3), 0), ConsistencyLevel.ONE) + + # Should fail without index + cp = ColumnParent('ToBeIndexed') + sp = SlicePredicate(slice_range=SliceRange('', '')) + clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '') + _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException) + + # add an index on 'birthdate' + ks1 = client.describe_keyspace('Keyspace1') + cfid = [x.id for x in ks1.cf_defs if x.name=='ToBeIndexed'][0] + modified_cd = ColumnDef('birthdate', 'BytesType', IndexType.KEYS, None) + modified_cf = CfDef('Keyspace1', 'ToBeIndexed', column_metadata=[modified_cd]) + modified_cf.id = cfid + client.system_update_column_family(modified_cf) + ks1 = client.describe_keyspace('Keyspace1') + server_cf = [x for x in ks1.cf_defs if x.name=='ToBeIndexed'][0] + assert server_cf + assert server_cf.column_metadata[0].index_type == modified_cd.index_type + assert server_cf.column_metadata[0].index_name == modified_cd.index_name + + # simple query on one index expression + cp = ColumnParent('ToBeIndexed') + sp = SlicePredicate(slice_range=SliceRange('', '')) + clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '') + result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE) + assert len(result) == 1, result + assert result[0].key == 'key1' + assert len(result[0].columns) == 1, result[0].columns + def test_system_super_column_family_operations(self): _set_keyspace('Keyspace1')