Author: jbellis
Date: Thu Oct  7 18:12:13 2010
New Revision: 1005551

URL: http://svn.apache.org/viewvc?rev=1005551&view=rev
Log:
Allow dynamic secondary index creation  and destruction
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1532

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
    cassandra/trunk/test/system/test_thrift_server.py

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1005551&r1=1005550&r2=1005551&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Oct  7 18:12:13 2010
@@ -20,6 +20,7 @@ dev
  * add cache save/load ability (CASSANDRA-1417)
  * add StorageService.getDrainProgress (CASSANDRA-1588)
  * Disallow bootstrap to an in-use token (CASSANDRA-1561)
+ * Allow dynamic secondary index creation and destruction (CASSANDRA-1532)
 
 
 0.7-beta2

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1005551&r1=1005550&r2=1005551&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu 
Oct  7 18:12:13 2010
@@ -547,6 +547,20 @@ public final class CFMetaData
 
         validateMinMaxCompactionThresholds(cf_def);
 
+        Map<byte[], ColumnDefinition> metadata = new HashMap<byte[], 
ColumnDefinition>();
+        if (cf_def.column_metadata == null)
+        {
+            metadata = column_metadata;
+        }
+        else
+        {
+            for (org.apache.cassandra.thrift.ColumnDef def : 
cf_def.column_metadata)
+            {
+                ColumnDefinition cd = new ColumnDefinition(def.name, 
def.validation_class, def.index_type, def.index_name);
+                metadata.put(cd.name, cd);
+            }
+        }
+
         return new CFMetaData(tableName, 
                               cfName, 
                               cfType, 
@@ -564,7 +578,7 @@ public final class CFMetaData
                               cf_def.min_compaction_threshold,
                               cf_def.max_compaction_threshold,
                               cfId,
-                              column_metadata,
+                              metadata,
                               rowCacheSavePeriodInSeconds,
                               keyCacheSavePeriodInSeconds);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=1005551&r1=1005550&r2=1005551&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Thu Oct  
7 18:12:13 2010
@@ -345,4 +345,20 @@ public class SystemTable
 
         forceBlockingFlush(INDEX_CF);
     }
+
+    public static void setIndexRemoved(String table, String indexName)
+    {
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, 
table.getBytes(UTF_8));
+        rm.delete(new QueryPath(INDEX_CF, null, indexName.getBytes(UTF_8)), 
new TimestampClock(System.currentTimeMillis()));
+        try
+        {
+            rm.apply();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+
+        forceBlockingFlush(INDEX_CF);
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java?rev=1005551&r1=1005550&r2=1005551&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
 Thu Oct  7 18:12:13 2010
@@ -1,10 +1,8 @@
 package org.apache.cassandra.db.migration;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
@@ -12,6 +10,7 @@ import org.apache.cassandra.utils.UUIDGe
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -51,10 +50,6 @@ public class UpdateColumnFamily extends 
         this.oldCfm = oldCfm;
         this.newCfm = newCfm;
         
-        // we'll allow this eventually.
-        if (!oldCfm.column_metadata.equals(newCfm.column_metadata))
-            throw new ConfigurationException("Column meta information is not 
identical.");
-        
         // clone ksm but include the new cf def.
         KSMetaData newKsm = makeNewKeyspaceDefinition(ksm);
         rm = Migration.makeDefinitionMutation(newKsm, null, newVersion);
@@ -78,12 +73,30 @@ public class UpdateColumnFamily extends 
 
     void applyModels() throws IOException
     {
-        // all we really need to do is reload the cfstore.
+        logger.debug("Updating " + oldCfm + " to " + newCfm);
         KSMetaData newKsm = 
makeNewKeyspaceDefinition(DatabaseDescriptor.getTableDefinition(newCfm.tableName));
         DatabaseDescriptor.setTableDefinition(newKsm, newVersion);
         
         if (!clientMode)
-            Table.open(oldCfm.tableName).reloadCf(newCfm.cfId);
+        {
+            Table table = Table.open(oldCfm.tableName);
+            ColumnFamilyStore oldCfs = 
table.getColumnFamilyStore(oldCfm.cfName);
+            table.reloadCf(newCfm.cfId);
+
+            // clean up obsolete index data files
+            for (Map.Entry<byte[], ColumnDefinition> entry : 
oldCfm.column_metadata.entrySet())
+            {
+                byte[] column = entry.getKey();
+                ColumnDefinition def = entry.getValue();
+                if (def.index_type != null
+                    && (!newCfm.column_metadata.containsKey(column) || 
newCfm.column_metadata.get(column).index_type == null))
+                {
+                    ColumnFamilyStore indexCfs = 
oldCfs.getIndexedColumnFamilyStore(column);
+                    SystemTable.setIndexRemoved(table.name, 
indexCfs.columnFamily);
+                    indexCfs.removeAllSSTables();
+                }
+            }
+        }
     }
 
     public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi)

Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1005551&r1=1005550&r2=1005551&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Thu Oct  7 18:12:13 2010
@@ -1364,6 +1364,46 @@ class TestMutations(ThriftTester):
         assert 'NewColumnFamily' not in [x.name for x in ks1.cf_defs]
         assert 'Standard1' in [x.name for x in ks1.cf_defs]
 
+    def test_dynamic_indexes_with_system_update_cf(self):
+        _set_keyspace('Keyspace1')
+        cd = ColumnDef('birthdate', 'BytesType', None, None)
+        newcf = CfDef('Keyspace1', 'ToBeIndexed', 
default_validation_class='LongType', column_metadata=[cd])
+        client.system_add_column_family(newcf)
+
+        client.insert('key1', ColumnParent('ToBeIndexed'), Column('birthdate', 
_i64(1), 0), ConsistencyLevel.ONE)
+        client.insert('key2', ColumnParent('ToBeIndexed'), Column('birthdate', 
_i64(2), 0), ConsistencyLevel.ONE)
+        client.insert('key2', ColumnParent('ToBeIndexed'), Column('b', 
_i64(2), 0), ConsistencyLevel.ONE)
+        client.insert('key3', ColumnParent('ToBeIndexed'), Column('birthdate', 
_i64(3), 0), ConsistencyLevel.ONE)
+        client.insert('key3', ColumnParent('ToBeIndexed'), Column('b', 
_i64(3), 0), ConsistencyLevel.ONE)
+
+        # Should fail without index
+        cp = ColumnParent('ToBeIndexed')
+        sp = SlicePredicate(slice_range=SliceRange('', ''))
+        clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, 
_i64(1))], '')
+        _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, 
ConsistencyLevel.ONE), InvalidRequestException)
+
+        # add an index on 'birthdate'
+        ks1 = client.describe_keyspace('Keyspace1')
+        cfid = [x.id for x in ks1.cf_defs if x.name=='ToBeIndexed'][0]
+        modified_cd = ColumnDef('birthdate', 'BytesType', IndexType.KEYS, None)
+        modified_cf = CfDef('Keyspace1', 'ToBeIndexed', 
column_metadata=[modified_cd])
+        modified_cf.id = cfid
+        client.system_update_column_family(modified_cf)
+        ks1 = client.describe_keyspace('Keyspace1')
+        server_cf = [x for x in ks1.cf_defs if x.name=='ToBeIndexed'][0]
+        assert server_cf
+        assert server_cf.column_metadata[0].index_type == 
modified_cd.index_type
+        assert server_cf.column_metadata[0].index_name == 
modified_cd.index_name
+ 
+        # simple query on one index expression
+        cp = ColumnParent('ToBeIndexed')
+        sp = SlicePredicate(slice_range=SliceRange('', ''))
+        clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, 
_i64(1))], '')
+        result = client.get_indexed_slices(cp, clause, sp, 
ConsistencyLevel.ONE)
+        assert len(result) == 1, result
+        assert result[0].key == 'key1'
+        assert len(result[0].columns) == 1, result[0].columns
+
     def test_system_super_column_family_operations(self):
         _set_keyspace('Keyspace1')
         


Reply via email to