http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java 
b/test/unit/org/apache/cassandra/SchemaLoader.java
index 2bf4805..c2efb6a 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.index.sasi.SASIIndex;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
@@ -39,7 +40,7 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.StubIndex;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.*;
-import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -119,44 +120,31 @@ public class SchemaLoader
                 KeyspaceParams.simple(1),
                 Tables.of(
                 // Column Families
-                standardCFMD(ks1, 
"Standard1").compaction(CompactionParams.scts(compactionOptions)),
-                standardCFMD(ks1, "Standard2"),
-                standardCFMD(ks1, "Standard3"),
-                standardCFMD(ks1, "Standard4"),
-                standardCFMD(ks1, "StandardGCGS0").gcGraceSeconds(0),
-                standardCFMD(ks1, "StandardLong1"),
-                standardCFMD(ks1, "StandardLong2"),
-                //CFMetaData.Builder.create(ks1, "ValuesWithQuotes").build(),
-                superCFMD(ks1, "Super1", LongType.instance),
-                superCFMD(ks1, "Super2", LongType.instance),
-                superCFMD(ks1, "Super3", LongType.instance),
-                superCFMD(ks1, "Super4", UTF8Type.instance),
-                superCFMD(ks1, "Super5", bytes),
-                superCFMD(ks1, "Super6", LexicalUUIDType.instance, 
UTF8Type.instance),
-                keysIndexCFMD(ks1, "Indexed1", true),
-                keysIndexCFMD(ks1, "Indexed2", false),
-                //CFMetaData.Builder.create(ks1, 
"StandardInteger1").withColumnNameComparator(IntegerType.instance).build(),
-                //CFMetaData.Builder.create(ks1, 
"StandardLong3").withColumnNameComparator(IntegerType.instance).build(),
-                //CFMetaData.Builder.create(ks1, "Counter1", false, false, 
true).build(),
-                //CFMetaData.Builder.create(ks1, "SuperCounter1", false, 
false, true, true).build(),
-                superCFMD(ks1, "SuperDirectGC", 
BytesType.instance).gcGraceSeconds(0),
-//                jdbcCFMD(ks1, "JdbcInteger", 
IntegerType.instance).addColumnDefinition(integerColumn(ks1, "JdbcInteger")),
-                jdbcCFMD(ks1, "JdbcUtf8", 
UTF8Type.instance).addColumnDefinition(utf8Column(ks1, "JdbcUtf8")),
-                jdbcCFMD(ks1, "JdbcLong", LongType.instance),
-                jdbcCFMD(ks1, "JdbcBytes", bytes),
-                jdbcCFMD(ks1, "JdbcAscii", AsciiType.instance),
-                //CFMetaData.Builder.create(ks1, "StandardComposite", false, 
true, false).withColumnNameComparator(composite).build(),
-                //CFMetaData.Builder.create(ks1, "StandardComposite2", false, 
true, false).withColumnNameComparator(compositeMaxMin).build(),
-                //CFMetaData.Builder.create(ks1, "StandardDynamicComposite", 
false, true, false).withColumnNameComparator(dynamicComposite).build(),
-                standardCFMD(ks1, 
"StandardLeveled").compaction(CompactionParams.lcs(leveledOptions)),
-                standardCFMD(ks1, 
"legacyleveled").compaction(CompactionParams.lcs(leveledOptions)),
+                standardCFMD(ks1, 
"Standard1").compaction(CompactionParams.scts(compactionOptions)).build(),
+                standardCFMD(ks1, "Standard2").build(),
+                standardCFMD(ks1, "Standard3").build(),
+                standardCFMD(ks1, "Standard4").build(),
+                standardCFMD(ks1, "StandardGCGS0").gcGraceSeconds(0).build(),
+                standardCFMD(ks1, "StandardLong1").build(),
+                standardCFMD(ks1, "StandardLong2").build(),
+                superCFMD(ks1, "Super1", LongType.instance).build(),
+                superCFMD(ks1, "Super2", LongType.instance).build(),
+                superCFMD(ks1, "Super3", LongType.instance).build(),
+                superCFMD(ks1, "Super4", UTF8Type.instance).build(),
+                superCFMD(ks1, "Super5", bytes).build(),
+                superCFMD(ks1, "Super6", LexicalUUIDType.instance, 
UTF8Type.instance).build(),
+                keysIndexCFMD(ks1, "Indexed1", true).build(),
+                keysIndexCFMD(ks1, "Indexed2", false).build(),
+                superCFMD(ks1, "SuperDirectGC", 
BytesType.instance).gcGraceSeconds(0).build(),
+                jdbcCFMD(ks1, "JdbcUtf8", 
UTF8Type.instance).addColumn(utf8Column(ks1, "JdbcUtf8")).build(),
+                jdbcCFMD(ks1, "JdbcLong", LongType.instance).build(),
+                jdbcCFMD(ks1, "JdbcBytes", bytes).build(),
+                jdbcCFMD(ks1, "JdbcAscii", AsciiType.instance).build(),
+                standardCFMD(ks1, 
"StandardLeveled").compaction(CompactionParams.lcs(leveledOptions)).build(),
+                standardCFMD(ks1, 
"legacyleveled").compaction(CompactionParams.lcs(leveledOptions)).build(),
                 standardCFMD(ks1, 
"StandardLowIndexInterval").minIndexInterval(8)
                                                              
.maxIndexInterval(256)
-                                                             
.caching(CachingParams.CACHE_NOTHING)
-                //CFMetaData.Builder.create(ks1, 
"UUIDKeys").addPartitionKey("key",UUIDType.instance).build(),
-                //CFMetaData.Builder.create(ks1, 
"MixedTypes").withColumnNameComparator(LongType.instance).addPartitionKey("key",
 UUIDType.instance).build(),
-                //CFMetaData.Builder.create(ks1, "MixedTypesComposite", false, 
true, false).withColumnNameComparator(composite).addPartitionKey("key", 
composite).build(),
-                //CFMetaData.Builder.create(ks1, 
"AsciiKeys").addPartitionKey("key", AsciiType.instance).build()
+                                                             
.caching(CachingParams.CACHE_NOTHING).build()
         )));
 
         // Keyspace 2
@@ -164,118 +152,135 @@ public class SchemaLoader
                 KeyspaceParams.simple(1),
                 Tables.of(
                 // Column Families
-                standardCFMD(ks2, "Standard1"),
-                standardCFMD(ks2, "Standard3"),
-                superCFMD(ks2, "Super3", bytes),
-                superCFMD(ks2, "Super4", TimeUUIDType.instance),
-                keysIndexCFMD(ks2, "Indexed1", true),
-                compositeIndexCFMD(ks2, "Indexed2", true),
-                compositeIndexCFMD(ks2, "Indexed3", true).gcGraceSeconds(0))));
+                standardCFMD(ks2, "Standard1").build(),
+                standardCFMD(ks2, "Standard3").build(),
+                superCFMD(ks2, "Super3", bytes).build(),
+                superCFMD(ks2, "Super4", TimeUUIDType.instance).build(),
+                keysIndexCFMD(ks2, "Indexed1", true).build(),
+                compositeIndexCFMD(ks2, "Indexed2", true).build(),
+                compositeIndexCFMD(ks2, "Indexed3", 
true).gcGraceSeconds(0).build())));
 
         // Keyspace 3
         schema.add(KeyspaceMetadata.create(ks3,
                 KeyspaceParams.simple(5),
                 Tables.of(
-                standardCFMD(ks3, "Standard1"),
-                keysIndexCFMD(ks3, "Indexed1", true))));
+                standardCFMD(ks3, "Standard1").build(),
+                keysIndexCFMD(ks3, "Indexed1", true).build())));
 
         // Keyspace 4
         schema.add(KeyspaceMetadata.create(ks4,
                 KeyspaceParams.simple(3),
                 Tables.of(
-                standardCFMD(ks4, "Standard1"),
-                standardCFMD(ks4, "Standard3"),
-                superCFMD(ks4, "Super3", bytes),
-                superCFMD(ks4, "Super4", TimeUUIDType.instance),
-                superCFMD(ks4, "Super5", TimeUUIDType.instance, 
BytesType.instance))));
+                standardCFMD(ks4, "Standard1").build(),
+                standardCFMD(ks4, "Standard3").build(),
+                superCFMD(ks4, "Super3", bytes).build(),
+                superCFMD(ks4, "Super4", TimeUUIDType.instance).build(),
+                superCFMD(ks4, "Super5", TimeUUIDType.instance, 
BytesType.instance).build())));
 
         // Keyspace 5
         schema.add(KeyspaceMetadata.create(ks5,
                 KeyspaceParams.simple(2),
-                Tables.of(standardCFMD(ks5, "Standard1"))));
+                Tables.of(standardCFMD(ks5, "Standard1").build())));
 
         // Keyspace 6
         schema.add(KeyspaceMetadata.create(ks6,
                 KeyspaceParams.simple(1),
-                Tables.of(keysIndexCFMD(ks6, "Indexed1", true))));
+                Tables.of(keysIndexCFMD(ks6, "Indexed1", true).build())));
 
         // Keyspace 7
         schema.add(KeyspaceMetadata.create(ks7,
                 KeyspaceParams.simple(1),
-                Tables.of(customIndexCFMD(ks7, "Indexed1"))));
+                Tables.of(customIndexCFMD(ks7, "Indexed1").build())));
 
         // KeyCacheSpace
         schema.add(KeyspaceMetadata.create(ks_kcs,
                 KeyspaceParams.simple(1),
                 Tables.of(
-                standardCFMD(ks_kcs, "Standard1"),
-                standardCFMD(ks_kcs, "Standard2"),
-                standardCFMD(ks_kcs, "Standard3"))));
+                standardCFMD(ks_kcs, "Standard1").build(),
+                standardCFMD(ks_kcs, "Standard2").build(),
+                standardCFMD(ks_kcs, "Standard3").build())));
 
         // RowCacheSpace
         schema.add(KeyspaceMetadata.create(ks_rcs,
                 KeyspaceParams.simple(1),
                 Tables.of(
-                standardCFMD(ks_rcs, 
"CFWithoutCache").caching(CachingParams.CACHE_NOTHING),
-                standardCFMD(ks_rcs, 
"CachedCF").caching(CachingParams.CACHE_EVERYTHING),
-                standardCFMD(ks_rcs, "CachedNoClustering", 1, 
IntegerType.instance, IntegerType.instance, 
null).caching(CachingParams.CACHE_EVERYTHING),
-                standardCFMD(ks_rcs, "CachedIntCF").
-                        caching(new CachingParams(true, 100)))));
-
-        // CounterCacheSpace
-        /*schema.add(KeyspaceMetadata.testMetadata(ks_ccs,
-                simple,
-                opts_rf1,
-                CFMetaData.Builder.create(ks_ccs, "Counter1", false, false, 
true).build(),
-                CFMetaData.Builder.create(ks_ccs, "Counter1", false, false, 
true).build()));*/
+                standardCFMD(ks_rcs, 
"CFWithoutCache").caching(CachingParams.CACHE_NOTHING).build(),
+                standardCFMD(ks_rcs, 
"CachedCF").caching(CachingParams.CACHE_EVERYTHING).build(),
+                standardCFMD(ks_rcs, "CachedNoClustering", 1, 
IntegerType.instance, IntegerType.instance, 
null).caching(CachingParams.CACHE_EVERYTHING).build(),
+                standardCFMD(ks_rcs, "CachedIntCF").caching(new 
CachingParams(true, 100)).build())));
 
         schema.add(KeyspaceMetadata.create(ks_nocommit, 
KeyspaceParams.simpleTransient(1), Tables.of(
-                standardCFMD(ks_nocommit, "Standard1"))));
+                standardCFMD(ks_nocommit, "Standard1").build())));
 
         // CQLKeyspace
         schema.add(KeyspaceMetadata.create(ks_cql, KeyspaceParams.simple(1), 
Tables.of(
 
-                // Column Families
-                CFMetaData.compile("CREATE TABLE table1 ("
-                        + "k int PRIMARY KEY,"
-                        + "v1 text,"
-                        + "v2 int"
-                        + ")", ks_cql),
-
-                CFMetaData.compile("CREATE TABLE table2 ("
-                        + "k text,"
-                        + "c text,"
-                        + "v text,"
-                        + "PRIMARY KEY (k, c))", ks_cql),
-                CFMetaData.compile("CREATE TABLE foo ("
-                        + "bar text, "
-                        + "baz text, "
-                        + "qux text, "
-                        + "PRIMARY KEY(bar, baz) ) "
-                        + "WITH COMPACT STORAGE", ks_cql),
-                CFMetaData.compile("CREATE TABLE foofoo ("
-                        + "bar text, "
-                        + "baz text, "
-                        + "qux text, "
-                        + "quz text, "
-                        + "foo text, "
-                        + "PRIMARY KEY((bar, baz), qux, quz) ) "
-                        + "WITH COMPACT STORAGE", ks_cql)
+        // Column Families
+        CreateTableStatement.parse("CREATE TABLE table1 ("
+                                   + "k int PRIMARY KEY,"
+                                   + "v1 text,"
+                                   + "v2 int"
+                                   + ")", ks_cql)
+                            .build(),
+
+        CreateTableStatement.parse("CREATE TABLE table2 ("
+                                   + "k text,"
+                                   + "c text,"
+                                   + "v text,"
+                                   + "PRIMARY KEY (k, c))", ks_cql)
+                            .build(),
+
+        CreateTableStatement.parse("CREATE TABLE foo ("
+                                   + "bar text, "
+                                   + "baz text, "
+                                   + "qux text, "
+                                   + "PRIMARY KEY(bar, baz) ) "
+                                   + "WITH COMPACT STORAGE", ks_cql)
+                            .build(),
+
+        CreateTableStatement.parse("CREATE TABLE foofoo ("
+                                   + "bar text, "
+                                   + "baz text, "
+                                   + "qux text, "
+                                   + "quz text, "
+                                   + "foo text, "
+                                   + "PRIMARY KEY((bar, baz), qux, quz) ) "
+                                   + "WITH COMPACT STORAGE", ks_cql)
+                            .build()
         )));
 
         if (DatabaseDescriptor.getPartitioner() instanceof Murmur3Partitioner)
-            schema.add(KeyspaceMetadata.create("sasi", 
KeyspaceParams.simpleTransient(1), Tables.of(sasiCFMD("sasi", "test_cf"), 
clusteringSASICFMD("sasi", "clustering_test_cf"))));
-
-        if 
(Boolean.parseBoolean(System.getProperty("cassandra.test.compression", 
"false")))
-            useCompression(schema);
+        {
+            schema.add(KeyspaceMetadata.create("sasi",
+                                               
KeyspaceParams.simpleTransient(1),
+                                               Tables.of(sasiCFMD("sasi", 
"test_cf").build(),
+                                                         
clusteringSASICFMD("sasi", "clustering_test_cf").build())));
+        }
 
         // if you're messing with low-level sstable stuff, it can be useful to 
inject the schema directly
         // Schema.instance.load(schemaDefinition());
         for (KeyspaceMetadata ksm : schema)
             MigrationManager.announceNewKeyspace(ksm, false);
+
+        if 
(Boolean.parseBoolean(System.getProperty("cassandra.test.compression", 
"false")))
+            useCompression(schema);
+    }
+
+    public static void createKeyspace(String name, KeyspaceParams params)
+    {
+        MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(name, 
params, Tables.of()), true);
     }
 
-    public static void createKeyspace(String name, KeyspaceParams params, 
CFMetaData... tables)
+    public static void createKeyspace(String name, KeyspaceParams params, 
TableMetadata.Builder... builders)
+    {
+        Tables.Builder tables = Tables.builder();
+        for (TableMetadata.Builder builder : builders)
+            tables.add(builder.build());
+
+        MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(name, 
params, tables.build()), true);
+    }
+
+    public static void createKeyspace(String name, KeyspaceParams params, 
TableMetadata... tables)
     {
         MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(name, 
params, Tables.of(tables)), true);
     }
@@ -285,384 +290,374 @@ public class SchemaLoader
         MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(name, 
params, tables, Views.none(), types, Functions.none()), true);
     }
 
-    public static ColumnDefinition integerColumn(String ksName, String cfName)
+    public static ColumnMetadata integerColumn(String ksName, String cfName)
     {
-        return new ColumnDefinition(ksName,
-                                    cfName,
-                                    
ColumnIdentifier.getInterned(IntegerType.instance.fromString("42"), 
IntegerType.instance),
-                                    UTF8Type.instance,
-                                    ColumnDefinition.NO_POSITION,
-                                    ColumnDefinition.Kind.REGULAR);
+        return new ColumnMetadata(ksName,
+                                  cfName,
+                                  
ColumnIdentifier.getInterned(IntegerType.instance.fromString("42"), 
IntegerType.instance),
+                                  UTF8Type.instance,
+                                  ColumnMetadata.NO_POSITION,
+                                  ColumnMetadata.Kind.REGULAR);
     }
 
-    public static ColumnDefinition utf8Column(String ksName, String cfName)
+    public static ColumnMetadata utf8Column(String ksName, String cfName)
     {
-        return new ColumnDefinition(ksName,
-                                    cfName,
-                                    ColumnIdentifier.getInterned("fortytwo", 
true),
-                                    UTF8Type.instance,
-                                    ColumnDefinition.NO_POSITION,
-                                    ColumnDefinition.Kind.REGULAR);
+        return new ColumnMetadata(ksName,
+                                  cfName,
+                                  ColumnIdentifier.getInterned("fortytwo", 
true),
+                                  UTF8Type.instance,
+                                  ColumnMetadata.NO_POSITION,
+                                  ColumnMetadata.Kind.REGULAR);
     }
 
-    public static CFMetaData perRowIndexedCFMD(String ksName, String cfName)
+    public static TableMetadata perRowIndexedCFMD(String ksName, String cfName)
     {
-        final Map<String, String> indexOptions = Collections.singletonMap(
-                                                      
IndexTarget.CUSTOM_INDEX_OPTION_NAME,
-                                                      
StubIndex.class.getName());
+        ColumnMetadata indexedColumn = ColumnMetadata.regularColumn(ksName, 
cfName, "indexed", AsciiType.instance);
 
-        CFMetaData cfm =  CFMetaData.Builder.create(ksName, cfName)
-                .addPartitionKey("key", AsciiType.instance)
-                .build();
+        TableMetadata.Builder builder =
+            TableMetadata.builder(ksName, cfName)
+                         .addPartitionKeyColumn("key", AsciiType.instance)
+                         .addColumn(indexedColumn);
 
-        ColumnDefinition indexedColumn = ColumnDefinition.regularDef(ksName, 
cfName, "indexed", AsciiType.instance);
-        cfm.addOrReplaceColumnDefinition(indexedColumn);
+        final Map<String, String> indexOptions = 
Collections.singletonMap(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
StubIndex.class.getName());
+        builder.indexes(Indexes.of(IndexMetadata.fromIndexTargets(
+        Collections.singletonList(new IndexTarget(indexedColumn.name,
+                                                                               
                             IndexTarget.Type.VALUES)),
+                                                                  "indexe1",
+                                                                  
IndexMetadata.Kind.CUSTOM,
+                                                                  
indexOptions)));
 
-        cfm.indexes(
-            cfm.getIndexes()
-               .with(IndexMetadata.fromIndexTargets(cfm,
-                                                    
Collections.singletonList(new IndexTarget(indexedColumn.name,
-                                                                               
               IndexTarget.Type.VALUES)),
-                                                    "indexe1",
-                                                    IndexMetadata.Kind.CUSTOM,
-                                                    indexOptions)));
-        return cfm;
+        return builder.build();
     }
 
     private static void useCompression(List<KeyspaceMetadata> schema)
     {
         for (KeyspaceMetadata ksm : schema)
-            for (CFMetaData cfm : ksm.tablesAndViews())
-                cfm.compression(CompressionParams.snappy());
+            for (TableMetadata cfm : ksm.tablesAndViews())
+                
MigrationManager.announceTableUpdate(cfm.unbuild().compression(CompressionParams.snappy()).build(),
 true);
     }
 
-    public static CFMetaData counterCFMD(String ksName, String cfName)
+    public static TableMetadata.Builder counterCFMD(String ksName, String 
cfName)
     {
-        return CFMetaData.Builder.create(ksName, cfName, false, true, true)
-                .addPartitionKey("key", AsciiType.instance)
-                .addClusteringColumn("name", AsciiType.instance)
-                .addRegularColumn("val", CounterColumnType.instance)
-                .addRegularColumn("val2", CounterColumnType.instance)
-                .build()
-                .compression(getCompressionParameters());
+        return TableMetadata.builder(ksName, cfName)
+                            .isCounter(true)
+                            .addPartitionKeyColumn("key", AsciiType.instance)
+                            .addClusteringColumn("name", AsciiType.instance)
+                            .addRegularColumn("val", 
CounterColumnType.instance)
+                            .addRegularColumn("val2", 
CounterColumnType.instance)
+                            .compression(getCompressionParameters());
     }
 
-    public static CFMetaData standardCFMD(String ksName, String cfName)
+    public static TableMetadata.Builder standardCFMD(String ksName, String 
cfName)
     {
         return standardCFMD(ksName, cfName, 1, AsciiType.instance);
     }
 
-    public static CFMetaData standardCFMD(String ksName, String cfName, int 
columnCount, AbstractType<?> keyType)
+    public static TableMetadata.Builder standardCFMD(String ksName, String 
cfName, int columnCount, AbstractType<?> keyType)
     {
         return standardCFMD(ksName, cfName, columnCount, keyType, 
AsciiType.instance);
     }
 
-    public static CFMetaData standardCFMD(String ksName, String cfName, int 
columnCount, AbstractType<?> keyType, AbstractType<?> valType)
+    public static TableMetadata.Builder standardCFMD(String ksName, String 
cfName, int columnCount, AbstractType<?> keyType, AbstractType<?> valType)
     {
         return standardCFMD(ksName, cfName, columnCount, keyType, valType, 
AsciiType.instance);
     }
 
-    public static CFMetaData standardCFMD(String ksName, String cfName, int 
columnCount, AbstractType<?> keyType, AbstractType<?> valType, AbstractType<?> 
clusteringType)
+    public static TableMetadata.Builder standardCFMD(String ksName, String 
cfName, int columnCount, AbstractType<?> keyType, AbstractType<?> valType, 
AbstractType<?> clusteringType)
     {
-        CFMetaData.Builder builder;
-        builder = CFMetaData.Builder.create(ksName, cfName)
-                                    .addPartitionKey("key", keyType)
-                                    .addRegularColumn("val", valType);
+        TableMetadata.Builder builder =
+            TableMetadata.builder(ksName, cfName)
+                         .addPartitionKeyColumn("key", keyType)
+                         .addRegularColumn("val", valType)
+                         .compression(getCompressionParameters());
 
-        if(clusteringType != null)
-            builder = builder.addClusteringColumn("name", clusteringType);
+        if (clusteringType != null)
+            builder.addClusteringColumn("name", clusteringType);
 
         for (int i = 0; i < columnCount; i++)
             builder.addRegularColumn("val" + i, AsciiType.instance);
 
-        return builder.build()
-                      .compression(getCompressionParameters());
+        return builder;
     }
 
 
-    public static CFMetaData denseCFMD(String ksName, String cfName)
+    public static TableMetadata.Builder denseCFMD(String ksName, String cfName)
     {
         return denseCFMD(ksName, cfName, AsciiType.instance);
     }
-    public static CFMetaData denseCFMD(String ksName, String cfName, 
AbstractType cc)
+    public static TableMetadata.Builder denseCFMD(String ksName, String 
cfName, AbstractType cc)
     {
         return denseCFMD(ksName, cfName, cc, null);
     }
-    public static CFMetaData denseCFMD(String ksName, String cfName, 
AbstractType cc, AbstractType subcc)
+    public static TableMetadata.Builder denseCFMD(String ksName, String 
cfName, AbstractType cc, AbstractType subcc)
     {
         AbstractType comp = cc;
         if (subcc != null)
             comp = CompositeType.getInstance(Arrays.asList(new 
AbstractType<?>[]{cc, subcc}));
 
-        return CFMetaData.Builder.createDense(ksName, cfName, subcc != null, 
false)
-            .addPartitionKey("key", AsciiType.instance)
-            .addClusteringColumn("cols", comp)
-            .addRegularColumn("val", AsciiType.instance)
-            .build()
-            .compression(getCompressionParameters());
+        return TableMetadata.builder(ksName, cfName)
+                            .isDense(true)
+                            .isCompound(subcc != null)
+                            .addPartitionKeyColumn("key", AsciiType.instance)
+                            .addClusteringColumn("cols", comp)
+                            .addRegularColumn("val", AsciiType.instance)
+                            .compression(getCompressionParameters());
     }
 
     // TODO: Fix superCFMD failing on legacy table creation. Seems to be 
applying composite comparator to partition key
-    public static CFMetaData superCFMD(String ksName, String cfName, 
AbstractType subcc)
+    public static TableMetadata.Builder superCFMD(String ksName, String 
cfName, AbstractType subcc)
     {
         return superCFMD(ksName, cfName, BytesType.instance, subcc);
     }
-    public static CFMetaData superCFMD(String ksName, String cfName, 
AbstractType cc, AbstractType subcc)
+    public static TableMetadata.Builder superCFMD(String ksName, String 
cfName, AbstractType cc, AbstractType subcc)
     {
         return superCFMD(ksName, cfName, "cols", cc, subcc);
     }
-    public static CFMetaData superCFMD(String ksName, String cfName, String 
ccName, AbstractType cc, AbstractType subcc)
+    public static TableMetadata.Builder superCFMD(String ksName, String 
cfName, String ccName, AbstractType cc, AbstractType subcc)
     {
-        //This is busted
-//        return CFMetaData.Builder.createSuper(ksName, cfName, false)
-//            .addPartitionKey("0", BytesType.instance)
-//            .addClusteringColumn("1", cc)
-//            .addClusteringColumn("2", subcc)
-//            .addRegularColumn("3", AsciiType.instance)
-//            .build();
         return standardCFMD(ksName, cfName);
 
     }
-    public static CFMetaData compositeIndexCFMD(String ksName, String cfName, 
boolean withRegularIndex) throws ConfigurationException
+    public static TableMetadata.Builder compositeIndexCFMD(String ksName, 
String cfName, boolean withRegularIndex) throws ConfigurationException
     {
         return compositeIndexCFMD(ksName, cfName, withRegularIndex, false);
     }
 
-    public static CFMetaData compositeIndexCFMD(String ksName, String cfName, 
boolean withRegularIndex, boolean withStaticIndex) throws ConfigurationException
+    public static TableMetadata.Builder compositeIndexCFMD(String ksName, 
String cfName, boolean withRegularIndex, boolean withStaticIndex) throws 
ConfigurationException
     {
         // the withIndex flag exists to allow tests index creation
         // on existing columns
-        CFMetaData cfm = CFMetaData.Builder.create(ksName, cfName)
-                .addPartitionKey("key", AsciiType.instance)
-                .addClusteringColumn("c1", AsciiType.instance)
-                .addRegularColumn("birthdate", LongType.instance)
-                .addRegularColumn("notbirthdate", LongType.instance)
-                .addStaticColumn("static", LongType.instance)
-                .build();
+        TableMetadata.Builder builder =
+            TableMetadata.builder(ksName, cfName)
+                         .addPartitionKeyColumn("key", AsciiType.instance)
+                         .addClusteringColumn("c1", AsciiType.instance)
+                         .addRegularColumn("birthdate", LongType.instance)
+                         .addRegularColumn("notbirthdate", LongType.instance)
+                         .addStaticColumn("static", LongType.instance)
+                         .compression(getCompressionParameters());
+
+        Indexes.Builder indexes = Indexes.builder();
 
         if (withRegularIndex)
         {
-            cfm.indexes(
-                cfm.getIndexes()
-                   .with(IndexMetadata.fromIndexTargets(cfm,
-                                                        
Collections.singletonList(
-                                                            new 
IndexTarget(new ColumnIdentifier("birthdate", true),
-                                                                            
IndexTarget.Type.VALUES)),
-                                                        "birthdate_key_index",
-                                                        
IndexMetadata.Kind.COMPOSITES,
-                                                        
Collections.EMPTY_MAP)));
+            indexes.add(IndexMetadata.fromIndexTargets(
+            Collections.singletonList(
+                                                           new IndexTarget(new 
ColumnIdentifier("birthdate", true),
+                                                                           
IndexTarget.Type.VALUES)),
+                                                       cfName + 
"_birthdate_key_index",
+                                                       
IndexMetadata.Kind.COMPOSITES,
+                                                       Collections.EMPTY_MAP));
         }
 
         if (withStaticIndex)
         {
-            cfm.indexes(
-                    cfm.getIndexes()
-                       .with(IndexMetadata.fromIndexTargets(cfm,
-                                                            
Collections.singletonList(
-                                                                new 
IndexTarget(new ColumnIdentifier("static", true),
-                                                                               
 IndexTarget.Type.VALUES)),
-                                                            "static_index",
-                                                            
IndexMetadata.Kind.COMPOSITES,
-                                                            
Collections.EMPTY_MAP)));
+            indexes.add(IndexMetadata.fromIndexTargets(
+            Collections.singletonList(
+                                                           new IndexTarget(new 
ColumnIdentifier("static", true),
+                                                                           
IndexTarget.Type.VALUES)),
+                                                       cfName + 
"_static_index",
+                                                       
IndexMetadata.Kind.COMPOSITES,
+                                                       Collections.EMPTY_MAP));
         }
 
-        return cfm.compression(getCompressionParameters());
+        return builder.indexes(indexes.build());
     }
 
-    public static CFMetaData keysIndexCFMD(String ksName, String cfName, 
boolean withIndex) throws ConfigurationException
+    public static TableMetadata.Builder keysIndexCFMD(String ksName, String 
cfName, boolean withIndex)
     {
-        CFMetaData cfm = CFMetaData.Builder.createDense(ksName, cfName, false, 
false)
-                                           .addPartitionKey("key", 
AsciiType.instance)
-                                           .addClusteringColumn("c1", 
AsciiType.instance)
-                                           .addStaticColumn("birthdate", 
LongType.instance)
-                                           .addStaticColumn("notbirthdate", 
LongType.instance)
-                                           .addRegularColumn("value", 
LongType.instance)
-                                           .build();
+        TableMetadata.Builder builder =
+            TableMetadata.builder(ksName, cfName)
+                         .isCompound(false)
+                         .isDense(true)
+                         .addPartitionKeyColumn("key", AsciiType.instance)
+                         .addClusteringColumn("c1", AsciiType.instance)
+                         .addStaticColumn("birthdate", LongType.instance)
+                         .addStaticColumn("notbirthdate", LongType.instance)
+                         .addRegularColumn("value", LongType.instance)
+                         .compression(getCompressionParameters());
 
         if (withIndex)
-            cfm.indexes(
-                cfm.getIndexes()
-                   .with(IndexMetadata.fromIndexTargets(cfm,
-                                                        
Collections.singletonList(
-                                                            new 
IndexTarget(new ColumnIdentifier("birthdate", true),
-                                                                            
IndexTarget.Type.VALUES)),
-                                                         
"birthdate_composite_index",
-                                                         
IndexMetadata.Kind.KEYS,
-                                                         
Collections.EMPTY_MAP)));
-
-
-        return cfm.compression(getCompressionParameters());
-    }
-
-    public static CFMetaData customIndexCFMD(String ksName, String cfName) 
throws ConfigurationException
-    {
-        CFMetaData cfm = CFMetaData.Builder.createDense(ksName, cfName, false, 
false)
-                                           .addPartitionKey("key", 
AsciiType.instance)
-                                           .addClusteringColumn("c1", 
AsciiType.instance)
-                                           .addRegularColumn("value", 
LongType.instance)
-                                           .build();
-
-            cfm.indexes(
-                cfm.getIndexes()
-                .with(IndexMetadata.fromIndexTargets(cfm,
-                                                     Collections.singletonList(
-                                                             new 
IndexTarget(new ColumnIdentifier("value", true),
-                                                                             
IndexTarget.Type.VALUES)),
-                                                     "value_index",
-                                                     IndexMetadata.Kind.CUSTOM,
-                                                     Collections.singletonMap(
-                                                             
IndexTarget.CUSTOM_INDEX_OPTION_NAME,
-                                                             
StubIndex.class.getName()))));
-
-
-        return cfm.compression(getCompressionParameters());
-    }
-
-    public static CFMetaData jdbcCFMD(String ksName, String cfName, 
AbstractType comp)
-    {
-        return CFMetaData.Builder.create(ksName, 
cfName).addPartitionKey("key", BytesType.instance)
-                                                        .build()
-                                                        
.compression(getCompressionParameters());
-    }
-
-    public static CFMetaData sasiCFMD(String ksName, String cfName)
-    {
-        CFMetaData cfm = CFMetaData.Builder.create(ksName, cfName)
-                                           .addPartitionKey("id", 
UTF8Type.instance)
-                                           .addRegularColumn("first_name", 
UTF8Type.instance)
-                                           .addRegularColumn("last_name", 
UTF8Type.instance)
-                                           .addRegularColumn("age", 
Int32Type.instance)
-                                           .addRegularColumn("height", 
Int32Type.instance)
-                                           .addRegularColumn("timestamp", 
LongType.instance)
-                                           .addRegularColumn("address", 
UTF8Type.instance)
-                                           .addRegularColumn("score", 
DoubleType.instance)
-                                           .addRegularColumn("comment", 
UTF8Type.instance)
-                                           
.addRegularColumn("comment_suffix_split", UTF8Type.instance)
-                                           
.addRegularColumn("/output/full-name/", UTF8Type.instance)
-                                           
.addRegularColumn("/data/output/id", UTF8Type.instance)
-                                           
.addRegularColumn("first_name_prefix", UTF8Type.instance)
-                                           .build();
-
-        cfm.indexes(cfm.getIndexes()
-                        .with(IndexMetadata.fromSchemaMetadata("first_name", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, "first_name");
-                            put("mode", 
OnDiskIndexBuilder.Mode.CONTAINS.toString());
-                        }}))
-                        .with(IndexMetadata.fromSchemaMetadata("last_name", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, "last_name");
-                            put("mode", 
OnDiskIndexBuilder.Mode.CONTAINS.toString());
-                        }}))
-                        .with(IndexMetadata.fromSchemaMetadata("age", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, "age");
-
-                        }}))
-                        .with(IndexMetadata.fromSchemaMetadata("timestamp", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, "timestamp");
-                            put("mode", 
OnDiskIndexBuilder.Mode.SPARSE.toString());
-
-                        }}))
-                        .with(IndexMetadata.fromSchemaMetadata("address", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put("analyzer_class", 
"org.apache.cassandra.index.sasi.analyzer.NonTokenizingAnalyzer");
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, "address");
-                            put("mode", 
OnDiskIndexBuilder.Mode.PREFIX.toString());
-                            put("case_sensitive", "false");
-                        }}))
-                        .with(IndexMetadata.fromSchemaMetadata("score", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, "score");
-                        }}))
-                        .with(IndexMetadata.fromSchemaMetadata("comment", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, "comment");
-                            put("mode", 
OnDiskIndexBuilder.Mode.CONTAINS.toString());
-                            put("analyzed", "true");
-                        }}))
-                        
.with(IndexMetadata.fromSchemaMetadata("comment_suffix_split", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, 
"comment_suffix_split");
-                            put("mode", 
OnDiskIndexBuilder.Mode.CONTAINS.toString());
-                            put("analyzed", "false");
-                        }}))
-                        
.with(IndexMetadata.fromSchemaMetadata("output_full_name", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, 
"/output/full-name/");
-                            put("analyzed", "true");
-                            put("analyzer_class", 
"org.apache.cassandra.index.sasi.analyzer.NonTokenizingAnalyzer");
-                            put("case_sensitive", "false");
-                        }}))
-                        
.with(IndexMetadata.fromSchemaMetadata("data_output_id", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, 
"/data/output/id");
-                            put("mode", 
OnDiskIndexBuilder.Mode.CONTAINS.toString());
-                        }}))
-                        
.with(IndexMetadata.fromSchemaMetadata("first_name_prefix", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
-                        {{
-                            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
-                            put(IndexTarget.TARGET_OPTION_NAME, 
"first_name_prefix");
-                            put("analyzed", "true");
-                            put("tokenization_normalize_lowercase", "true");
-                        }})));
-
-        return cfm;
-    }
-
-    public static CFMetaData clusteringSASICFMD(String ksName, String cfName)
-    {
-        return clusteringSASICFMD(ksName, cfName, "location", "age", "height", 
"score");
-    }
-
-    public static CFMetaData clusteringSASICFMD(String ksName, String cfName, 
String...indexedColumns)
-    {
-        CFMetaData cfm = CFMetaData.Builder.create(ksName, cfName)
-                                           .addPartitionKey("name", 
UTF8Type.instance)
-                                           .addClusteringColumn("location", 
UTF8Type.instance)
-                                           .addClusteringColumn("age", 
Int32Type.instance)
-                                           .addRegularColumn("height", 
Int32Type.instance)
-                                           .addRegularColumn("score", 
DoubleType.instance)
-                                           .addStaticColumn("nickname", 
UTF8Type.instance)
-                                           .build();
-
-        Indexes indexes = cfm.getIndexes();
+        {
+            IndexMetadata index =
+                IndexMetadata.fromIndexTargets(
+                Collections.singletonList(new IndexTarget(new 
ColumnIdentifier("birthdate", true),
+                                                                               
          IndexTarget.Type.VALUES)),
+                                                                               
          cfName + "_birthdate_composite_index",
+                                                                               
          IndexMetadata.Kind.KEYS,
+                                                                               
          Collections.EMPTY_MAP);
+            builder.indexes(Indexes.builder().add(index).build());
+        }
+
+        return builder;
+    }
+
+    public static TableMetadata.Builder customIndexCFMD(String ksName, String 
cfName)
+    {
+        TableMetadata.Builder builder  =
+            TableMetadata.builder(ksName, cfName)
+                         .isCompound(false)
+                         .isDense(true)
+                         .addPartitionKeyColumn("key", AsciiType.instance)
+                         .addClusteringColumn("c1", AsciiType.instance)
+                         .addRegularColumn("value", LongType.instance)
+                         .compression(getCompressionParameters());
+
+        IndexMetadata index =
+            IndexMetadata.fromIndexTargets(
+            Collections.singletonList(new IndexTarget(new 
ColumnIdentifier("value", true), IndexTarget.Type.VALUES)),
+                                           cfName + "_value_index",
+                                           IndexMetadata.Kind.CUSTOM,
+                                           
Collections.singletonMap(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
StubIndex.class.getName()));
+
+        builder.indexes(Indexes.of(index));
+
+        return builder;
+    }
+
+    public static TableMetadata.Builder jdbcCFMD(String ksName, String cfName, 
AbstractType comp)
+    {
+        return TableMetadata.builder(ksName, cfName)
+                            .addPartitionKeyColumn("key", BytesType.instance)
+                            .compression(getCompressionParameters());
+    }
+
+    public static TableMetadata.Builder sasiCFMD(String ksName, String cfName)
+    {
+        TableMetadata.Builder builder =
+            TableMetadata.builder(ksName, cfName)
+                         .addPartitionKeyColumn("id", UTF8Type.instance)
+                         .addRegularColumn("first_name", UTF8Type.instance)
+                         .addRegularColumn("last_name", UTF8Type.instance)
+                         .addRegularColumn("age", Int32Type.instance)
+                         .addRegularColumn("height", Int32Type.instance)
+                         .addRegularColumn("timestamp", LongType.instance)
+                         .addRegularColumn("address", UTF8Type.instance)
+                         .addRegularColumn("score", DoubleType.instance)
+                         .addRegularColumn("comment", UTF8Type.instance)
+                         .addRegularColumn("comment_suffix_split", 
UTF8Type.instance)
+                         .addRegularColumn("/output/full-name/", 
UTF8Type.instance)
+                         .addRegularColumn("/data/output/id", 
UTF8Type.instance)
+                         .addRegularColumn("first_name_prefix", 
UTF8Type.instance);
+
+        Indexes.Builder indexes = Indexes.builder();
+
+        indexes.add(IndexMetadata.fromSchemaMetadata(cfName + "_first_name", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+                    {{
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, "first_name");
+                        put("mode", 
OnDiskIndexBuilder.Mode.CONTAINS.toString());
+                    }}))
+               .add(IndexMetadata.fromSchemaMetadata(cfName + "_last_name", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+                    {{
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, "last_name");
+                        put("mode", 
OnDiskIndexBuilder.Mode.CONTAINS.toString());
+                    }}))
+               .add(IndexMetadata.fromSchemaMetadata(cfName + "_age", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+                    {{
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, "age");
+                    }}))
+               .add(IndexMetadata.fromSchemaMetadata(cfName + "_timestamp", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+                    {{
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, "timestamp");
+                        put("mode", OnDiskIndexBuilder.Mode.SPARSE.toString());
+
+                    }}))
+               .add(IndexMetadata.fromSchemaMetadata(cfName + "_address", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+                    {{
+                        put("analyzer_class", 
"org.apache.cassandra.index.sasi.analyzer.NonTokenizingAnalyzer");
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, "address");
+                        put("mode", OnDiskIndexBuilder.Mode.PREFIX.toString());
+                        put("case_sensitive", "false");
+                    }}))
+               .add(IndexMetadata.fromSchemaMetadata(cfName + "_score", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+                    {{
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, "score");
+                    }}))
+               .add(IndexMetadata.fromSchemaMetadata(cfName + "_comment", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+                    {{
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, "comment");
+                        put("mode", 
OnDiskIndexBuilder.Mode.CONTAINS.toString());
+                        put("analyzed", "true");
+                    }}))
+               .add(IndexMetadata.fromSchemaMetadata(cfName + 
"_comment_suffix_split", IndexMetadata.Kind.CUSTOM, new HashMap<String, 
String>()
+                    {{
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, 
"comment_suffix_split");
+                        put("mode", 
OnDiskIndexBuilder.Mode.CONTAINS.toString());
+                        put("analyzed", "false");
+                    }}))
+               .add(IndexMetadata.fromSchemaMetadata(cfName + 
"_output_full_name", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+                    {{
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, 
"/output/full-name/");
+                        put("analyzed", "true");
+                        put("analyzer_class", 
"org.apache.cassandra.index.sasi.analyzer.NonTokenizingAnalyzer");
+                        put("case_sensitive", "false");
+                    }}))
+               .add(IndexMetadata.fromSchemaMetadata(cfName + 
"_data_output_id", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+                    {{
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, "/data/output/id");
+                        put("mode", 
OnDiskIndexBuilder.Mode.CONTAINS.toString());
+                    }}))
+               .add(IndexMetadata.fromSchemaMetadata(cfName + 
"_first_name_prefix", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+                    {{
+                        put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
+                        put(IndexTarget.TARGET_OPTION_NAME, 
"first_name_prefix");
+                        put("analyzed", "true");
+                        put("tokenization_normalize_lowercase", "true");
+                    }}));
+
+    return builder.indexes(indexes.build());
+}
+
+public static TableMetadata.Builder clusteringSASICFMD(String ksName, String 
cfName)
+{
+    return clusteringSASICFMD(ksName, cfName, "location", "age", "height", 
"score");
+}
+
+    public static TableMetadata.Builder clusteringSASICFMD(String ksName, 
String cfName, String...indexedColumns)
+    {
+        Indexes.Builder indexes = Indexes.builder();
         for (String indexedColumn : indexedColumns)
         {
-            indexes = 
indexes.with(IndexMetadata.fromSchemaMetadata(indexedColumn, 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+            indexes.add(IndexMetadata.fromSchemaMetadata(cfName + "_" + 
indexedColumn, IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
             {{
                 put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
                 put(IndexTarget.TARGET_OPTION_NAME, indexedColumn);
                 put("mode", OnDiskIndexBuilder.Mode.PREFIX.toString());
             }}));
         }
-        cfm.indexes(indexes);
-        return cfm;
+
+        return TableMetadata.builder(ksName, cfName)
+                            .addPartitionKeyColumn("name", UTF8Type.instance)
+                            .addClusteringColumn("location", UTF8Type.instance)
+                            .addClusteringColumn("age", Int32Type.instance)
+                            .addRegularColumn("height", Int32Type.instance)
+                            .addRegularColumn("score", DoubleType.instance)
+                            .addStaticColumn("nickname", UTF8Type.instance)
+                            .indexes(indexes.build());
     }
 
-    public static CFMetaData staticSASICFMD(String ksName, String cfName)
+    public static TableMetadata.Builder staticSASICFMD(String ksName, String 
cfName)
     {
-        CFMetaData cfm = CFMetaData.Builder.create(ksName, cfName)
-                                           .addPartitionKey("sensor_id", 
Int32Type.instance)
-                                           .addStaticColumn("sensor_type", 
UTF8Type.instance)
-                                           .addClusteringColumn("date", 
LongType.instance)
-                                           .addRegularColumn("value", 
DoubleType.instance)
-                                           .addRegularColumn("variance", 
Int32Type.instance)
-                                           .build();
+        TableMetadata.Builder builder =
+            TableMetadata.builder(ksName, cfName)
+                         .addPartitionKeyColumn("sensor_id", 
Int32Type.instance)
+                         .addStaticColumn("sensor_type", UTF8Type.instance)
+                         .addClusteringColumn("date", LongType.instance)
+                         .addRegularColumn("value", DoubleType.instance)
+                         .addRegularColumn("variance", Int32Type.instance);
+
+        Indexes.Builder indexes = Indexes.builder();
 
-        Indexes indexes = cfm.getIndexes();
-        indexes = indexes.with(IndexMetadata.fromSchemaMetadata("sensor_type", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+        indexes.add(IndexMetadata.fromSchemaMetadata(cfName + "_sensor_type", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
         {{
             put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
             put(IndexTarget.TARGET_OPTION_NAME, "sensor_type");
@@ -671,34 +666,34 @@ public class SchemaLoader
             put("case_sensitive", "false");
         }}));
 
-        indexes = indexes.with(IndexMetadata.fromSchemaMetadata("value", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+        indexes.add(IndexMetadata.fromSchemaMetadata(cfName + "_value", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
         {{
             put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
             put(IndexTarget.TARGET_OPTION_NAME, "value");
             put("mode", OnDiskIndexBuilder.Mode.PREFIX.toString());
         }}));
 
-        indexes = indexes.with(IndexMetadata.fromSchemaMetadata("variance", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+        indexes.add(IndexMetadata.fromSchemaMetadata(cfName + "_variance", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
         {{
             put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
             put(IndexTarget.TARGET_OPTION_NAME, "variance");
             put("mode", OnDiskIndexBuilder.Mode.PREFIX.toString());
         }}));
 
-        cfm.indexes(indexes);
-        return cfm;
+        return builder.indexes(indexes.build());
     }
 
-    public static CFMetaData fullTextSearchSASICFMD(String ksName, String 
cfName)
+    public static TableMetadata.Builder fullTextSearchSASICFMD(String ksName, 
String cfName)
     {
-        CFMetaData cfm = CFMetaData.Builder.create(ksName, cfName)
-                                           .addPartitionKey("song_id", 
UUIDType.instance)
-                                           .addRegularColumn("title", 
UTF8Type.instance)
-                                           .addRegularColumn("artist", 
UTF8Type.instance)
-                                           .build();
+        TableMetadata.Builder builder =
+            TableMetadata.builder(ksName, cfName)
+                         .addPartitionKeyColumn("song_id", UUIDType.instance)
+                         .addRegularColumn("title", UTF8Type.instance)
+                         .addRegularColumn("artist", UTF8Type.instance);
+
+        Indexes.Builder indexes = Indexes.builder();
 
-        Indexes indexes = cfm.getIndexes();
-        indexes = indexes.with(IndexMetadata.fromSchemaMetadata("title", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+        indexes.add(IndexMetadata.fromSchemaMetadata(cfName + "_title", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
         {{
             put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
             put(IndexTarget.TARGET_OPTION_NAME, "title");
@@ -710,7 +705,7 @@ public class SchemaLoader
             put("tokenization_normalize_lowercase", "true");
         }}));
 
-        indexes = indexes.with(IndexMetadata.fromSchemaMetadata("artist", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+        indexes.add(IndexMetadata.fromSchemaMetadata(cfName + "_artist", 
IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
         {{
             put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, 
SASIIndex.class.getName());
             put(IndexTarget.TARGET_OPTION_NAME, "artist");
@@ -720,8 +715,7 @@ public class SchemaLoader
 
         }}));
 
-        cfm.indexes(indexes);
-        return cfm;
+        return builder.indexes(indexes.build());
     }
 
     public static CompressionParams getCompressionParameters()
@@ -784,7 +778,7 @@ public class SchemaLoader
 
     public static void insertData(String keyspace, String columnFamily, int 
offset, int numberOfRows)
     {
-        CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
+        TableMetadata cfm = Schema.instance.getTableMetadata(keyspace, 
columnFamily);
 
         for (int i = offset; i < offset + numberOfRows; i++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/UpdateBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/UpdateBuilder.java 
b/test/unit/org/apache/cassandra/UpdateBuilder.java
index 9fcda15..0f3c918 100644
--- a/test/unit/org/apache/cassandra/UpdateBuilder.java
+++ b/test/unit/org/apache/cassandra/UpdateBuilder.java
@@ -17,13 +17,10 @@
  */
 package org.apache.cassandra;
 
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.utils.FBUtilities;
 
 
 /**
@@ -42,7 +39,7 @@ public class UpdateBuilder
         this.updateBuilder = updateBuilder;
     }
 
-    public static UpdateBuilder create(CFMetaData metadata, Object... 
partitionKey)
+    public static UpdateBuilder create(TableMetadata metadata, Object... 
partitionKey)
     {
         return new UpdateBuilder(PartitionUpdate.simpleBuilder(metadata, 
partitionKey));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index d77ca78..a01aab6 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -39,8 +39,9 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 
@@ -110,13 +111,6 @@ public class Util
         return PartitionPosition.ForKey.get(ByteBufferUtil.bytes(key), 
partitioner);
     }
 
-    public static Cell getRegularCell(CFMetaData metadata, Row row, String 
name)
-    {
-        ColumnDefinition column = 
metadata.getColumnDefinition(ByteBufferUtil.bytes(name));
-        assert column != null;
-        return row.getCell(column);
-    }
-
     public static Clustering clustering(ClusteringComparator comparator, 
Object... o)
     {
         return comparator.make(o);
@@ -179,12 +173,12 @@ public class Util
     {
         IMutation first = mutations.get(0);
         String keyspaceName = first.getKeyspaceName();
-        UUID cfid = first.getColumnFamilyIds().iterator().next();
+        TableId tableId = first.getTableIds().iterator().next();
 
         for (Mutation rm : mutations)
             rm.applyUnsafe();
 
-        ColumnFamilyStore store = 
Keyspace.open(keyspaceName).getColumnFamilyStore(cfid);
+        ColumnFamilyStore store = 
Keyspace.open(keyspaceName).getColumnFamilyStore(tableId);
         store.forceBlockingFlush();
         return store;
     }
@@ -270,7 +264,7 @@ public class Util
 
     public static AbstractReadCommandBuilder.SinglePartitionBuilder 
cmd(ColumnFamilyStore cfs, Object... partitionKey)
     {
-        return new AbstractReadCommandBuilder.SinglePartitionBuilder(cfs, 
makeKey(cfs.metadata, partitionKey));
+        return new AbstractReadCommandBuilder.SinglePartitionBuilder(cfs, 
makeKey(cfs.metadata(), partitionKey));
     }
 
     public static AbstractReadCommandBuilder.PartitionRangeBuilder 
cmd(ColumnFamilyStore cfs)
@@ -278,13 +272,13 @@ public class Util
         return new AbstractReadCommandBuilder.PartitionRangeBuilder(cfs);
     }
 
-    static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
+    static DecoratedKey makeKey(TableMetadata metadata, Object... partitionKey)
     {
         if (partitionKey.length == 1 && partitionKey[0] instanceof 
DecoratedKey)
             return (DecoratedKey)partitionKey[0];
 
-        ByteBuffer key = 
CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
-        return metadata.decorateKey(key);
+        ByteBuffer key = 
metadata.partitionKeyAsClusteringComparator().make(partitionKey).serializeAsPartitionKey();
+        return metadata.partitioner.decorateKey(key);
     }
 
     public static void assertEmptyUnfiltered(ReadCommand command)
@@ -296,7 +290,7 @@ public class Util
             {
                 try (UnfilteredRowIterator partition = iterator.next())
                 {
-                    throw new AssertionError("Expected no results for query " 
+ command.toCQLString() + " but got key " + 
command.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
+                    throw new AssertionError("Expected no results for query " 
+ command.toCQLString() + " but got key " + 
command.metadata().partitionKeyType.getString(partition.partitionKey().getKey()));
                 }
             }
         }
@@ -311,7 +305,7 @@ public class Util
             {
                 try (RowIterator partition = iterator.next())
                 {
-                    throw new AssertionError("Expected no results for query " 
+ command.toCQLString() + " but got key " + 
command.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
+                    throw new AssertionError("Expected no results for query " 
+ command.toCQLString() + " but got key " + 
command.metadata().partitionKeyType.getString(partition.partitionKey().getKey()));
                 }
             }
         }
@@ -423,7 +417,7 @@ public class Util
 
     public static Cell cell(ColumnFamilyStore cfs, Row row, String columnName)
     {
-        ColumnDefinition def = 
cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes(columnName));
+        ColumnMetadata def = 
cfs.metadata().getColumn(ByteBufferUtil.bytes(columnName));
         assert def != null;
         return row.getCell(def);
     }
@@ -436,7 +430,7 @@ public class Util
     public static void assertCellValue(Object value, ColumnFamilyStore cfs, 
Row row, String columnName)
     {
         Cell cell = cell(cfs, row, columnName);
-        assert cell != null : "Row " + row.toString(cfs.metadata) + " has no 
cell for " + columnName;
+        assert cell != null : "Row " + row.toString(cfs.metadata()) + " has no 
cell for " + columnName;
         assertEquals(value, cell.column().type.compose(cell.value()));
     }
 
@@ -460,15 +454,6 @@ public class Util
         return size;
     }
 
-    public static CBuilder getCBuilderForCFM(CFMetaData cfm)
-    {
-        List<ColumnDefinition> clusteringColumns = cfm.clusteringColumns();
-        List<AbstractType<?>> types = new 
ArrayList<>(clusteringColumns.size());
-        for (ColumnDefinition def : clusteringColumns)
-            types.add(def.type);
-        return CBuilder.create(new ClusteringComparator(types));
-    }
-
     public static boolean equal(UnfilteredRowIterator a, UnfilteredRowIterator 
b)
     {
         return Objects.equals(a.columns(), b.columns())
@@ -492,12 +477,12 @@ public class Util
 
     public static boolean sameContent(Mutation a, Mutation b)
     {
-        if (!a.key().equals(b.key()) || 
!a.getColumnFamilyIds().equals(b.getColumnFamilyIds()))
+        if (!a.key().equals(b.key()) || 
!a.getTableIds().equals(b.getTableIds()))
             return false;
 
-        for (UUID cfId : a.getColumnFamilyIds())
+        for (PartitionUpdate update : a.getPartitionUpdates())
         {
-            if (!sameContent(a.getPartitionUpdate(cfId).unfilteredIterator(), 
b.getPartitionUpdate(cfId).unfilteredIterator()))
+            if (!sameContent(update.unfilteredIterator(), 
b.getPartitionUpdate(update.metadata()).unfilteredIterator()))
                 return false;
         }
         return true;
@@ -521,9 +506,9 @@ public class Util
                         StringUtils.join(expectedColumnNames, ","));
     }
 
-    public static void assertColumn(CFMetaData cfm, Row row, String name, 
String value, long timestamp)
+    public static void assertColumn(TableMetadata cfm, Row row, String name, 
String value, long timestamp)
     {
-        Cell cell = row.getCell(cfm.getColumnDefinition(new 
ColumnIdentifier(name, true)));
+        Cell cell = row.getCell(cfm.getColumn(new ColumnIdentifier(name, 
true)));
         assertColumn(cell, value, timestamp);
     }
 
@@ -534,7 +519,7 @@ public class Util
         assertEquals(timestamp, cell.timestamp());
     }
 
-    public static void assertClustering(CFMetaData cfm, Row row, Object... 
clusteringValue)
+    public static void assertClustering(TableMetadata cfm, Row row, Object... 
clusteringValue)
     {
         assertEquals(row.clustering().size(), clusteringValue.length);
         assertEquals(0, cfm.comparator.compare(row.clustering(), 
cfm.comparator.make(clusteringValue)));
@@ -647,12 +632,12 @@ public class Util
     {
         Iterator<Unfiltered> content;
 
-        public UnfilteredSource(CFMetaData cfm, DecoratedKey partitionKey, Row 
staticRow, Iterator<Unfiltered> content)
+        public UnfilteredSource(TableMetadata metadata, DecoratedKey 
partitionKey, Row staticRow, Iterator<Unfiltered> content)
         {
-            super(cfm,
+            super(metadata,
                   partitionKey,
                   DeletionTime.LIVE,
-                  cfm.partitionColumns(),
+                  metadata.regularAndStaticColumns(),
                   staticRow != null ? staticRow : Rows.EMPTY_STATIC_ROW,
                   false,
                   EncodingStats.NO_STATS);
@@ -715,20 +700,20 @@ public class Util
 
     public static PagingState makeSomePagingState(ProtocolVersion 
protocolVersion)
     {
-        CFMetaData metadata = CFMetaData.Builder.create("ks", "tbl")
-                                                .addPartitionKey("k", 
AsciiType.instance)
-                                                .addClusteringColumn("c1", 
AsciiType.instance)
-                                                .addClusteringColumn("c1", 
Int32Type.instance)
-                                                .addRegularColumn("myCol", 
AsciiType.instance)
-                                                .build();
+        TableMetadata metadata =
+            TableMetadata.builder("ks", "tbl")
+                         .addPartitionKeyColumn("k", AsciiType.instance)
+                         .addClusteringColumn("c1", AsciiType.instance)
+                         .addClusteringColumn("c2", Int32Type.instance)
+                         .addRegularColumn("myCol", AsciiType.instance)
+                         .build();
 
         ByteBuffer pk = ByteBufferUtil.bytes("someKey");
 
-        ColumnDefinition def = metadata.getColumnDefinition(new 
ColumnIdentifier("myCol", false));
+        ColumnMetadata def = metadata.getColumn(new ColumnIdentifier("myCol", 
false));
         Clustering c = Clustering.make(ByteBufferUtil.bytes("c1"), 
ByteBufferUtil.bytes(42));
         Row row = BTreeRow.singleCellRow(c, BufferCell.live(def, 0, 
ByteBufferUtil.EMPTY_BYTE_BUFFER));
         PagingState.RowMark mark = PagingState.RowMark.create(metadata, row, 
protocolVersion);
         return new PagingState(pk, mark, 10, 0);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/batchlog/BatchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchTest.java 
b/test/unit/org/apache/cassandra/batchlog/BatchTest.java
index d2db9b9..8ed1811 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchTest.java
@@ -28,8 +28,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RowUpdateBuilder;
@@ -45,7 +44,6 @@ import org.apache.cassandra.utils.UUIDGen;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class BatchTest
 {
@@ -64,7 +62,7 @@ public class BatchTest
     @Test
     public void testSerialization() throws IOException
     {
-        CFMetaData cfm = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF_STANDARD).metadata;
+        TableMetadata cfm = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF_STANDARD).metadata();
 
         long now = FBUtilities.timestampMicros();
         int version = MessagingService.current_version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java 
b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
index 038255b..34902fe 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
 
@@ -30,10 +29,10 @@ import org.junit.*;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.Util.PartitionerSwitcher;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -49,7 +48,6 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -106,7 +104,7 @@ public class BatchlogManagerTest
     public void testDelete()
     {
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
-        CFMetaData cfm = cfs.metadata;
+        TableMetadata cfm = cfs.metadata();
         new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), 
ByteBufferUtil.bytes("1234"))
                 .clustering("c")
                 .add("val", "val" + 1234)
@@ -134,7 +132,7 @@ public class BatchlogManagerTest
         long initialAllBatches = BatchlogManager.instance.countAllBatches();
         long initialReplayedBatches = 
BatchlogManager.instance.getTotalBatchesReplayed();
 
-        CFMetaData cfm = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata;
+        TableMetadata cfm = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata();
 
         // Generate 1000 mutations (100 batches of 10 mutations each) and put 
them all into the batchlog.
         // Half batches (50) ready to be replayed, half not.
@@ -205,8 +203,8 @@ public class BatchlogManagerTest
     @Test
     public void testTruncatedReplay() throws InterruptedException, 
ExecutionException
     {
-        CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, 
CF_STANDARD2);
-        CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, 
CF_STANDARD3);
+        TableMetadata cf2 = Schema.instance.getTableMetadata(KEYSPACE1, 
CF_STANDARD2);
+        TableMetadata cf3 = Schema.instance.getTableMetadata(KEYSPACE1, 
CF_STANDARD3);
         // Generate 2000 mutations (1000 batchlog entries) and put them all 
into the batchlog.
         // Each batchlog entry with a mutation for Standard2 and Standard3.
         // In the middle of the process, 'truncate' Standard2.
@@ -277,7 +275,7 @@ public class BatchlogManagerTest
     public void testAddBatch() throws IOException
     {
         long initialAllBatches = BatchlogManager.instance.countAllBatches();
-        CFMetaData cfm = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata;
+        TableMetadata cfm = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata();
 
         long timestamp = (System.currentTimeMillis() - 
DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
         UUID uuid = UUIDGen.getTimeUUID();
@@ -309,7 +307,7 @@ public class BatchlogManagerTest
     public void testRemoveBatch()
     {
         long initialAllBatches = BatchlogManager.instance.countAllBatches();
-        CFMetaData cfm = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata;
+        TableMetadata cfm = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata();
 
         long timestamp = (System.currentTimeMillis() - 
DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
         UUID uuid = UUIDGen.getTimeUUID();
@@ -351,7 +349,7 @@ public class BatchlogManagerTest
         long initialAllBatches = BatchlogManager.instance.countAllBatches();
         long initialReplayedBatches = 
BatchlogManager.instance.getTotalBatchesReplayed();
 
-        CFMetaData cfm = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata;
+        TableMetadata cfm = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata();
 
         long timestamp = (System.currentTimeMillis() - 
DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
         UUID uuid = UUIDGen.getTimeUUID();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java 
b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
index c952470..bb5129a 100644
--- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
+++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.cassandra.cache;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AsciiType;
@@ -45,10 +45,9 @@ public class AutoSavingCacheTest
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
-                                    CFMetaData.Builder.create(KEYSPACE1, 
CF_STANDARD1)
-                                                      .addPartitionKey("pKey", 
AsciiType.instance)
-                                                      
.addRegularColumn("col1", AsciiType.instance)
-                                                      .build());
+                                    TableMetadata.builder(KEYSPACE1, 
CF_STANDARD1)
+                                                 
.addPartitionKeyColumn("pKey", AsciiType.instance)
+                                                 .addRegularColumn("col1", 
AsciiType.instance));
     }
 
     @Test
@@ -71,8 +70,8 @@ public class AutoSavingCacheTest
         cfs.truncateBlocking();
         for (int i = 0; i < 2; i++)
         {
-            ColumnDefinition colDef = 
ColumnDefinition.regularDef(cfs.metadata, ByteBufferUtil.bytes("col1"), 
AsciiType.instance);
-            RowUpdateBuilder rowBuilder = new RowUpdateBuilder(cfs.metadata, 
System.currentTimeMillis(), "key1");
+            ColumnMetadata colDef = 
ColumnMetadata.regularColumn(cfs.metadata(), ByteBufferUtil.bytes("col1"), 
AsciiType.instance);
+            RowUpdateBuilder rowBuilder = new RowUpdateBuilder(cfs.metadata(), 
System.currentTimeMillis(), "key1");
             rowBuilder.add(colDef, "val1");
             rowBuilder.build().apply();
             cfs.forceBlockingFlush();
@@ -95,6 +94,6 @@ public class AutoSavingCacheTest
         // then load saved
         keyCache.loadSavedAsync().get();
         for (SSTableReader sstable : cfs.getLiveSSTables())
-            Assert.assertNotNull(keyCache.get(new 
KeyCacheKey(cfs.metadata.ksAndCFName, sstable.descriptor, 
ByteBufferUtil.bytes("key1"))));
+            Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata(), 
sstable.descriptor, ByteBufferUtil.bytes("key1"))));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java 
b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index f6a8acf..1aa536e 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -18,30 +18,33 @@
  */
 package org.apache.cassandra.cache;
 
-import com.github.benmanes.caffeine.cache.Weigher;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
 
+import com.github.benmanes.caffeine.cache.Weigher;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.utils.Pair;
-
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.CachedBTreePartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Indexes;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.FBUtilities;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -59,20 +62,20 @@ public class CacheProviderTest
     private static final String KEYSPACE1 = "CacheProviderTest1";
     private static final String CF_STANDARD1 = "Standard1";
 
-    private static CFMetaData cfm;
+    private static TableMetadata cfm;
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
 
-        cfm = CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD1)
-                                        .addPartitionKey("pKey", 
AsciiType.instance)
-                                        .addRegularColumn("col1", 
AsciiType.instance)
-                                        .build();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    cfm);
+        cfm =
+            TableMetadata.builder(KEYSPACE1, CF_STANDARD1)
+                         .addPartitionKeyColumn("pKey", AsciiType.instance)
+                         .addRegularColumn("col1", AsciiType.instance)
+                         .build();
+
+        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), cfm);
     }
 
     private CachedBTreePartition createPartition()
@@ -158,16 +161,43 @@ public class CacheProviderTest
     @Test
     public void testKeys()
     {
-        Pair<String, String> ksAndCFName = Pair.create(KEYSPACE1, 
CF_STANDARD1);
+        TableId id1 = TableId.generate();
         byte[] b1 = {1, 2, 3, 4};
-        RowCacheKey key1 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b1));
+        RowCacheKey key1 = new RowCacheKey(id1, null, ByteBuffer.wrap(b1));
+        TableId id2 = TableId.fromString(id1.toString());
         byte[] b2 = {1, 2, 3, 4};
-        RowCacheKey key2 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b2));
+        RowCacheKey key2 = new RowCacheKey(id2, null, ByteBuffer.wrap(b2));
         assertEquals(key1, key2);
         assertEquals(key1.hashCode(), key2.hashCode());
 
+        TableMetadata tm = TableMetadata.builder("ks", "tab", id1)
+                                        .addPartitionKeyColumn("pk", 
UTF8Type.instance)
+                                        .build();
+
+        assertTrue(key1.sameTable(tm));
+
         byte[] b3 = {1, 2, 3, 5};
-        RowCacheKey key3 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b3));
+        RowCacheKey key3 = new RowCacheKey(id1, null, ByteBuffer.wrap(b3));
+        assertNotSame(key1, key3);
+        assertNotSame(key1.hashCode(), key3.hashCode());
+
+        // with index name
+
+        key1 = new RowCacheKey(id1, "indexFoo", ByteBuffer.wrap(b1));
+        assertNotSame(key1, key2);
+        assertNotSame(key1.hashCode(), key2.hashCode());
+
+        key2 = new RowCacheKey(id2, "indexFoo", ByteBuffer.wrap(b2));
+        assertEquals(key1, key2);
+        assertEquals(key1.hashCode(), key2.hashCode());
+
+        tm = TableMetadata.builder("ks", "tab.indexFoo", id1)
+                          .addPartitionKeyColumn("pk", UTF8Type.instance)
+                          
.indexes(Indexes.of(IndexMetadata.fromSchemaMetadata("indexFoo", 
IndexMetadata.Kind.KEYS, Collections.emptyMap())))
+                          .build();
+        assertTrue(key1.sameTable(tm));
+
+        key3 = new RowCacheKey(id1, "indexFoo", ByteBuffer.wrap(b3));
         assertNotSame(key1, key3);
         assertNotSame(key1.hashCode(), key3.hashCode());
     }

Reply via email to