http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java 
b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 2fd7b06..d4c4bb4 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -20,10 +20,8 @@ package org.apache.cassandra.schema;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableMap;
@@ -32,21 +30,15 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
@@ -73,12 +65,10 @@ public class SchemaKeyspaceTest
         {
             for (ColumnFamilyStore cfs : 
Keyspace.open(keyspaceName).getColumnFamilyStores())
             {
-                CFMetaData cfm = cfs.metadata;
-                checkInverses(cfm);
+                checkInverses(cfs.metadata());
 
                 // Testing with compression to catch #3558
-                CFMetaData withCompression = cfm.copy();
-                withCompression.compression(CompressionParams.snappy(32768));
+                TableMetadata withCompression = 
cfs.metadata().unbuild().compression(CompressionParams.snappy(32768)).build();
                 checkInverses(withCompression);
             }
         }
@@ -91,44 +81,44 @@ public class SchemaKeyspaceTest
 
         createTable(keyspace, "CREATE TABLE test (a text primary key, b int, c 
int)");
 
-        CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, "test");
+        TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, 
"test");
         assertTrue("extensions should be empty", 
metadata.params.extensions.isEmpty());
 
         ImmutableMap<String, ByteBuffer> extensions = ImmutableMap.of("From 
... with Love",
                                                                       
ByteBuffer.wrap(new byte[]{0, 0, 7}));
 
-        CFMetaData copy = metadata.copy().extensions(extensions);
+        TableMetadata copy = metadata.unbuild().extensions(extensions).build();
 
         updateTable(keyspace, metadata, copy);
 
-        metadata = Schema.instance.getCFMetaData(keyspace, "test");
+        metadata = Schema.instance.getTableMetadata(keyspace, "test");
         assertEquals(extensions, metadata.params.extensions);
     }
 
-    private static void updateTable(String keyspace, CFMetaData oldTable, 
CFMetaData newTable)
+    private static void updateTable(String keyspace, TableMetadata oldTable, 
TableMetadata newTable)
     {
         KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceInstance(keyspace).getMetadata();
         Mutation mutation = SchemaKeyspace.makeUpdateTableMutation(ksm, 
oldTable, newTable, FBUtilities.timestampMicros()).build();
-        SchemaKeyspace.mergeSchema(Collections.singleton(mutation));
+        Schema.instance.merge(Collections.singleton(mutation));
     }
 
     private static void createTable(String keyspace, String cql)
     {
-        CFMetaData table = CFMetaData.compile(cql, keyspace);
+        TableMetadata table = CreateTableStatement.parse(cql, 
keyspace).build();
 
         KeyspaceMetadata ksm = KeyspaceMetadata.create(keyspace, 
KeyspaceParams.simple(1), Tables.of(table));
         Mutation mutation = SchemaKeyspace.makeCreateTableMutation(ksm, table, 
FBUtilities.timestampMicros()).build();
-        SchemaKeyspace.mergeSchema(Collections.singleton(mutation));
+        Schema.instance.merge(Collections.singleton(mutation));
     }
 
-    private static void checkInverses(CFMetaData cfm) throws Exception
+    private static void checkInverses(TableMetadata metadata) throws Exception
     {
-        KeyspaceMetadata keyspace = Schema.instance.getKSMetaData(cfm.ksName);
+        KeyspaceMetadata keyspace = 
Schema.instance.getKeyspaceMetadata(metadata.keyspace);
 
         // Test schema conversion
-        Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, 
FBUtilities.timestampMicros()).build();
-        PartitionUpdate serializedCf = 
rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME,
 SchemaKeyspace.TABLES));
-        PartitionUpdate serializedCD = 
rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME,
 SchemaKeyspace.COLUMNS));
+        Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, 
metadata, FBUtilities.timestampMicros()).build();
+        PartitionUpdate serializedCf = 
rm.getPartitionUpdate(Schema.instance.getTableMetadata(SchemaConstants.SCHEMA_KEYSPACE_NAME,
 SchemaKeyspace.TABLES));
+        PartitionUpdate serializedCD = 
rm.getPartitionUpdate(Schema.instance.getTableMetadata(SchemaConstants.SCHEMA_KEYSPACE_NAME,
 SchemaKeyspace.COLUMNS));
 
         UntypedResultSet.Row tableRow = 
QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", 
SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES),
                                                                  
UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), 
FBUtilities.nowInSeconds()))
@@ -137,11 +127,11 @@ public class SchemaKeyspaceTest
 
         UntypedResultSet columnsRows = 
QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", 
SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS),
                                                                 
UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), 
FBUtilities.nowInSeconds()));
-        Set<ColumnDefinition> columns = new HashSet<>();
+        Set<ColumnMetadata> columns = new HashSet<>();
         for (UntypedResultSet.Row row : columnsRows)
             columns.add(SchemaKeyspace.createColumnFromRow(row, Types.none()));
 
-        assertEquals(cfm.params, params);
-        assertEquals(new HashSet<>(cfm.allColumns()), columns);
+        assertEquals(metadata.params, params);
+        assertEquals(new HashSet<>(metadata.columns()), columns);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/schema/SchemaTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/SchemaTest.java 
b/test/unit/org/apache/cassandra/schema/SchemaTest.java
new file mode 100644
index 0000000..32a5620
--- /dev/null
+++ b/test/unit/org/apache/cassandra/schema/SchemaTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.io.IOException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.gms.Gossiper;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class SchemaTest
+{
+    @BeforeClass
+    public static void setupDatabaseDescriptor()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    @Test
+    public void testTransKsMigration() throws IOException
+    {
+        SchemaLoader.cleanupAndLeaveDirs();
+        Schema.instance.loadFromDisk();
+        assertEquals(0, Schema.instance.getNonSystemKeyspaces().size());
+
+        Gossiper.instance.start((int)(System.currentTimeMillis() / 1000));
+        Keyspace.setInitialized();
+
+        try
+        {
+            // add a few.
+            
MigrationManager.announceNewKeyspace(KeyspaceMetadata.create("ks0", 
KeyspaceParams.simple(3)));
+            
MigrationManager.announceNewKeyspace(KeyspaceMetadata.create("ks1", 
KeyspaceParams.simple(3)));
+
+            assertNotNull(Schema.instance.getKeyspaceMetadata("ks0"));
+            assertNotNull(Schema.instance.getKeyspaceMetadata("ks1"));
+
+            Schema.instance.unload(Schema.instance.getKeyspaceMetadata("ks0"));
+            Schema.instance.unload(Schema.instance.getKeyspaceMetadata("ks1"));
+
+            assertNull(Schema.instance.getKeyspaceMetadata("ks0"));
+            assertNull(Schema.instance.getKeyspaceMetadata("ks1"));
+
+            Schema.instance.loadFromDisk();
+
+            assertNotNull(Schema.instance.getKeyspaceMetadata("ks0"));
+            assertNotNull(Schema.instance.getKeyspaceMetadata("ks1"));
+        }
+        finally
+        {
+            Gossiper.instance.stop();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/schema/ValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/ValidationTest.java 
b/test/unit/org/apache/cassandra/schema/ValidationTest.java
new file mode 100644
index 0000000..8eb1247
--- /dev/null
+++ b/test/unit/org/apache/cassandra/schema/ValidationTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.*;
+
+import org.apache.cassandra.db.marshal.*;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ValidationTest
+{
+    @Test
+    public void testIsNameValidPositive()
+    {
+         assertTrue(SchemaConstants.isValidName("abcdefghijklmnopqrstuvwxyz"));
+         assertTrue(SchemaConstants.isValidName("ABCDEFGHIJKLMNOPQRSTUVWXYZ"));
+         assertTrue(SchemaConstants.isValidName("_01234567890"));
+    }
+    
+    @Test
+    public void testIsNameValidNegative()
+    {
+        assertFalse(SchemaConstants.isValidName(null));
+        assertFalse(SchemaConstants.isValidName(""));
+        assertFalse(SchemaConstants.isValidName(" "));
+        assertFalse(SchemaConstants.isValidName("@"));
+        assertFalse(SchemaConstants.isValidName("!"));
+    }
+
+    private static Set<String> primitiveTypes =
+        new HashSet<>(Arrays.asList(new String[] { "ascii", "bigint", "blob", 
"boolean", "date",
+                                                   "duration", "decimal", 
"double", "float",
+                                                   "inet", "int", "smallint", 
"text", "time",
+                                                   "timestamp", "timeuuid", 
"tinyint", "uuid",
+                                                   "varchar", "varint" }));
+
+    @Test
+    public void typeCompatibilityTest()
+    {
+        Map<String, Set<String>> compatibilityMap = new HashMap<>();
+        compatibilityMap.put("bigint", new HashSet<>(Arrays.asList(new 
String[] {"timestamp"})));
+        compatibilityMap.put("blob", new HashSet<>(Arrays.asList(new String[] 
{"ascii", "bigint", "boolean", "date", "decimal", "double", "duration",
+                                                                               
"float", "inet", "int", "smallint", "text", "time", "timestamp",
+                                                                               
"timeuuid", "tinyint", "uuid", "varchar", "varint"})));
+        compatibilityMap.put("date", new HashSet<>(Arrays.asList(new String[] 
{"int"})));
+        compatibilityMap.put("time", new HashSet<>(Arrays.asList(new String[] 
{"bigint"})));
+        compatibilityMap.put("text", new HashSet<>(Arrays.asList(new String[] 
{"ascii", "varchar"})));
+        compatibilityMap.put("timestamp", new HashSet<>(Arrays.asList(new 
String[] {"bigint"})));
+        compatibilityMap.put("varchar", new HashSet<>(Arrays.asList(new 
String[] {"ascii", "text"})));
+        compatibilityMap.put("varint", new HashSet<>(Arrays.asList(new 
String[] {"bigint", "int", "timestamp"})));
+        compatibilityMap.put("uuid", new HashSet<>(Arrays.asList(new String[] 
{"timeuuid"})));
+
+        for (String sourceTypeString: primitiveTypes)
+        {
+            AbstractType sourceType = CQLTypeParser.parse("KEYSPACE", 
sourceTypeString, Types.none());
+            for (String destinationTypeString: primitiveTypes)
+            {
+                AbstractType destinationType = CQLTypeParser.parse("KEYSPACE", 
destinationTypeString, Types.none());
+
+                if (compatibilityMap.get(destinationTypeString) != null &&
+                    
compatibilityMap.get(destinationTypeString).contains(sourceTypeString) ||
+                    sourceTypeString.equals(destinationTypeString))
+                {
+                    assertTrue(sourceTypeString + " should be compatible with 
" + destinationTypeString,
+                               
destinationType.isValueCompatibleWith(sourceType));
+                }
+                else
+                {
+                    assertFalse(sourceTypeString + " should not be compatible 
with " + destinationTypeString,
+                                
destinationType.isValueCompatibleWith(sourceType));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void clusteringColumnTypeCompatibilityTest() throws Throwable
+    {
+        Map<String, Set<String>> compatibilityMap = new HashMap<>();
+        compatibilityMap.put("blob", new HashSet<>(Arrays.asList(new String[] 
{"ascii", "text", "varchar"})));
+        compatibilityMap.put("text", new HashSet<>(Arrays.asList(new String[] 
{"ascii", "varchar"})));
+        compatibilityMap.put("varchar", new HashSet<>(Arrays.asList(new 
String[] {"ascii", "text" })));
+
+        for (String sourceTypeString: primitiveTypes)
+        {
+            AbstractType sourceType = CQLTypeParser.parse("KEYSPACE", 
sourceTypeString, Types.none());
+            for (String destinationTypeString: primitiveTypes)
+            {
+                AbstractType destinationType = CQLTypeParser.parse("KEYSPACE", 
destinationTypeString, Types.none());
+
+                if (compatibilityMap.get(destinationTypeString) != null &&
+                    
compatibilityMap.get(destinationTypeString).contains(sourceTypeString) ||
+                    sourceTypeString.equals(destinationTypeString))
+                {
+                    assertTrue(sourceTypeString + " should be compatible with 
" + destinationTypeString,
+                               destinationType.isCompatibleWith(sourceType));
+                }
+                else
+                {
+                    assertFalse(sourceTypeString + " should not be compatible 
with " + destinationTypeString,
+                                destinationType.isCompatibleWith(sourceType));
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java 
b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 2c1a8d2..44bd58c 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -234,10 +234,10 @@ public class ActiveRepairServiceTest
         UUID prsId = UUID.randomUUID();
         ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, 0, false);
         ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(prsId);
-        prs.markSSTablesRepairing(store.metadata.cfId, prsId);
+        prs.markSSTablesRepairing(store.metadata.id, prsId);
 
         //retrieve all sstable references from parent repair sessions
-        Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId);
+        Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId);
         Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
         assertEquals(original, retrieved);
         refs.release();
@@ -256,7 +256,7 @@ public class ActiveRepairServiceTest
         }, OperationType.COMPACTION, null);
 
         //retrieve sstable references from parent repair session again - 
removed sstable must not be present
-        refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId);
+        refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId);
         retrieved = Sets.newHashSet(refs.iterator());
         assertEquals(newLiveSet, retrieved);
         assertFalse(retrieved.contains(removed));
@@ -272,8 +272,8 @@ public class ActiveRepairServiceTest
         ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, System.currentTimeMillis(), true);
 
         ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(prsId);
-        prs.markSSTablesRepairing(store.metadata.cfId, prsId);
-        try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+        prs.markSSTablesRepairing(store.metadata.id, prsId);
+        try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId))
         {
             Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
             assertEquals(original, retrieved);
@@ -284,7 +284,7 @@ public class ActiveRepairServiceTest
         {
             UUID newPrsId = UUID.randomUUID();
             ActiveRepairService.instance.registerParentRepairSession(newPrsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, System.currentTimeMillis(), true);
-            
ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId,
 newPrsId);
+            
ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.id,
 newPrsId);
         }
         catch (Throwable t)
         {
@@ -292,7 +292,7 @@ public class ActiveRepairServiceTest
         }
         assertTrue(exception);
 
-        try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+        try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId))
         {
             Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
             assertEquals(original, retrieved);
@@ -306,19 +306,19 @@ public class ActiveRepairServiceTest
         UUID prsId = UUID.randomUUID();
         Set<SSTableReader> original = 
Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> 
!s.isRepaired())).sstables);
         ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
-        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId,
 prsId);
+        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id,
 prsId);
 
         UUID prsId2 = UUID.randomUUID();
         ActiveRepairService.instance.registerParentRepairSession(prsId2, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
         createSSTables(store, 2);
-        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId,
 prsId);
-        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId,
 prsId))
+        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id,
 prsId);
+        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id,
 prsId))
         {
             assertEquals(original, Sets.newHashSet(refs.iterator()));
         }
         store.forceMajorCompaction();
         // after a major compaction the original sstables will be gone and we 
will have no sstables to anticompact:
-        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId,
 prsId))
+        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id,
 prsId))
         {
             assertEquals(0, refs.size());
         }
@@ -331,21 +331,21 @@ public class ActiveRepairServiceTest
         Set<SSTableReader> original = 
Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> 
!s.isRepaired())).sstables);
         UUID prsId = UUID.randomUUID();
         ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
-        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId,
 prsId);
+        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id,
 prsId);
 
         UUID prsId2 = UUID.randomUUID();
         ActiveRepairService.instance.registerParentRepairSession(prsId2, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
         boolean exception = false;
         try
         {
-            
ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.cfId,
 prsId2);
+            
ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.id,
 prsId2);
         }
         catch (Throwable t)
         {
             exception = true;
         }
         assertTrue(exception);
-        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId,
 prsId))
+        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id,
 prsId))
         {
             assertEquals(original, Sets.newHashSet(refs.iterator()));
         }
@@ -368,7 +368,7 @@ public class ActiveRepairServiceTest
         {
             for (int j = 0; j < 10; j++)
             {
-                new RowUpdateBuilder(cfs.metadata, timestamp, 
Integer.toString(j))
+                new RowUpdateBuilder(cfs.metadata(), timestamp, 
Integer.toString(j))
                 .clustering("c")
                 .add("val", "val")
                 .build()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java 
b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index dba3e95..413f032 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -29,8 +29,8 @@ import org.junit.*;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
@@ -69,9 +69,9 @@ public class DataResolverTest
     private Keyspace ks;
     private ColumnFamilyStore cfs;
     private ColumnFamilyStore cfs2;
-    private CFMetaData cfm;
-    private CFMetaData cfm2;
-    private ColumnDefinition m;
+    private TableMetadata cfm;
+    private TableMetadata cfm2;
+    private ColumnMetadata m;
     private int nowInSec;
     private ReadCommand command;
     private MessageRecorder messageRecorder;
@@ -81,23 +81,23 @@ public class DataResolverTest
     public static void defineSchema() throws ConfigurationException
     {
         DatabaseDescriptor.daemonInitialization();
-        CFMetaData cfMetadata = CFMetaData.Builder.create(KEYSPACE1, 
CF_STANDARD)
-                                                  .addPartitionKey("key", 
BytesType.instance)
-                                                  .addClusteringColumn("col1", 
AsciiType.instance)
-                                                  .addRegularColumn("c1", 
AsciiType.instance)
-                                                  .addRegularColumn("c2", 
AsciiType.instance)
-                                                  .addRegularColumn("one", 
AsciiType.instance)
-                                                  .addRegularColumn("two", 
AsciiType.instance)
-                                                  .build();
-
-        CFMetaData cfMetaData2 = CFMetaData.Builder.create(KEYSPACE1, 
CF_COLLECTION)
-                                                   .addPartitionKey("k", 
ByteType.instance)
-                                                   .addRegularColumn("m", 
MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
-                                                   .build();
+
+        TableMetadata.Builder builder1 =
+            TableMetadata.builder(KEYSPACE1, CF_STANDARD)
+                         .addPartitionKeyColumn("key", BytesType.instance)
+                         .addClusteringColumn("col1", AsciiType.instance)
+                         .addRegularColumn("c1", AsciiType.instance)
+                         .addRegularColumn("c2", AsciiType.instance)
+                         .addRegularColumn("one", AsciiType.instance)
+                         .addRegularColumn("two", AsciiType.instance);
+
+        TableMetadata.Builder builder2 =
+            TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
+                         .addPartitionKeyColumn("k", ByteType.instance)
+                         .addRegularColumn("m", 
MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
+
         SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    cfMetadata, cfMetaData2);
+        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), 
builder1, builder2);
     }
 
     @Before
@@ -106,10 +106,10 @@ public class DataResolverTest
         dk = Util.dk("key1");
         ks = Keyspace.open(KEYSPACE1);
         cfs = ks.getColumnFamilyStore(CF_STANDARD);
-        cfm = cfs.metadata;
+        cfm = cfs.metadata();
         cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
-        cfm2 = cfs2.metadata;
-        m = cfm2.getColumnDefinition(new ColumnIdentifier("m", false));
+        cfm2 = cfs2.metadata();
+        m = cfm2.getColumn(new ColumnIdentifier("m", false));
 
         nowInSec = FBUtilities.nowInSeconds();
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
@@ -631,7 +631,7 @@ public class DataResolverTest
 
         MessageOut<Mutation> msg;
         msg = getSentMessage(peer1);
-        Iterator<Row> rowIter = 
msg.payload.getPartitionUpdate(cfm2.cfId).iterator();
+        Iterator<Row> rowIter = 
msg.payload.getPartitionUpdate(cfm2).iterator();
         assertTrue(rowIter.hasNext());
         Row row = rowIter.next();
         assertFalse(rowIter.hasNext());
@@ -676,7 +676,7 @@ public class DataResolverTest
 
         MessageOut<Mutation> msg;
         msg = getSentMessage(peer1);
-        Iterator<Row> rowIter = 
msg.payload.getPartitionUpdate(cfm2.cfId).iterator();
+        Iterator<Row> rowIter = 
msg.payload.getPartitionUpdate(cfm2).iterator();
         assertTrue(rowIter.hasNext());
         Row row = rowIter.next();
         assertFalse(rowIter.hasNext());
@@ -728,7 +728,7 @@ public class DataResolverTest
 
         MessageOut<Mutation> msg;
         msg = getSentMessage(peer2);
-        Iterator<Row> rowIter = 
msg.payload.getPartitionUpdate(cfm2.cfId).iterator();
+        Iterator<Row> rowIter = 
msg.payload.getPartitionUpdate(cfm2).iterator();
         assertTrue(rowIter.hasNext());
         Row row = rowIter.next();
         assertFalse(rowIter.hasNext());
@@ -779,7 +779,7 @@ public class DataResolverTest
 
         MessageOut<Mutation> msg;
         msg = getSentMessage(peer1);
-        Row row = 
Iterators.getOnlyElement(msg.payload.getPartitionUpdate(cfm2.cfId).iterator());
+        Row row = 
Iterators.getOnlyElement(msg.payload.getPartitionUpdate(cfm2).iterator());
 
         ComplexColumnData cd = row.getComplexColumnData(m);
 
@@ -857,8 +857,8 @@ public class DataResolverTest
     {
         assertEquals(MessagingService.Verb.READ_REPAIR, message.verb);
         PartitionUpdate update = 
((Mutation)message.payload).getPartitionUpdates().iterator().next();
-        assertEquals(update.metadata().ksName, cfm.ksName);
-        assertEquals(update.metadata().cfName, cfm.cfName);
+        assertEquals(update.metadata().keyspace, cfm.keyspace);
+        assertEquals(update.metadata().name, cfm.name);
     }
 
 
@@ -891,7 +891,7 @@ public class DataResolverTest
         return new RangeTombstone(Slice.make(startBound, endBound), new 
DeletionTime(markedForDeleteAt, localDeletionTime));
     }
 
-    private UnfilteredPartitionIterator fullPartitionDelete(CFMetaData cfm, 
DecoratedKey dk, long timestamp, int nowInSec)
+    private UnfilteredPartitionIterator fullPartitionDelete(TableMetadata cfm, 
DecoratedKey dk, long timestamp, int nowInSec)
     {
         return new 
SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(cfm, 
dk, timestamp, nowInSec).unfilteredIterator());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java 
b/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java
index 866910e..c2aeb56 100644
--- a/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java
+++ b/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java
@@ -48,7 +48,7 @@ public class JoinTokenRingTest
         ss.joinRing();
 
         SecondaryIndexManager indexManager = 
ColumnFamilyStore.getIfExists("JoinTokenRingTestKeyspace7", 
"Indexed1").indexManager;
-        StubIndex stub = (StubIndex) 
indexManager.getIndexByName("value_index");
+        StubIndex stub = (StubIndex) 
indexManager.getIndexByName("Indexed1_value_index");
         Assert.assertTrue(stub.preJoinInvocation);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java 
b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index 754def9..aaa2594 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.Util.PartitionerSwitcher;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
@@ -715,7 +715,7 @@ public class LeaveAndBootstrapTest
 
     private AbstractReplicationStrategy getStrategy(String keyspaceName, 
TokenMetadata tmd)
     {
-        KeyspaceMetadata ksmd = Schema.instance.getKSMetaData(keyspaceName);
+        KeyspaceMetadata ksmd = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
         return AbstractReplicationStrategy.createReplicationStrategy(
                 keyspaceName,
                 ksmd.params.replication.klass,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java 
b/test/unit/org/apache/cassandra/service/MoveTest.java
index 90d546c..df4e294 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -30,12 +30,13 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
@@ -141,14 +142,17 @@ public class MoveTest
         });
 
         final TokenMetadata tmd = StorageService.instance.getTokenMetadata();
-                tmd.clearUnsafe();
-                tmd.updateHostId(UUID.randomUUID(), 
InetAddress.getByName("127.0.0.1"));
-                tmd.updateHostId(UUID.randomUUID(), 
InetAddress.getByName("127.0.0.2"));
+
+        tmd.clearUnsafe();
+        tmd.updateHostId(UUID.randomUUID(), 
InetAddress.getByName("127.0.0.1"));
+        tmd.updateHostId(UUID.randomUUID(), 
InetAddress.getByName("127.0.0.2"));
 
         KeyspaceMetadata keyspace =  KeyspaceMetadata.create(keyspaceName,
                                                              
KeyspaceParams.nts(configOptions(replicas)),
-                                                             
Tables.of(CFMetaData.Builder.create(keyspaceName, "CF1")
-                                                                               
          .addPartitionKey("key", BytesType.instance).build()));
+                                                             
Tables.of(TableMetadata.builder(keyspaceName, "CF1")
+                                                                               
     .addPartitionKeyColumn("key", BytesType.instance)
+                                                                               
     .build()));
+
         MigrationManager.announceNewKeyspace(keyspace);
     }
 
@@ -1007,7 +1011,7 @@ public class MoveTest
 
     private AbstractReplicationStrategy getStrategy(String keyspaceName, 
TokenMetadata tmd)
     {
-        KeyspaceMetadata ksmd = Schema.instance.getKSMetaData(keyspaceName);
+        KeyspaceMetadata ksmd = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
         return AbstractReplicationStrategy.createReplicationStrategy(
                 keyspaceName,
                 ksmd.params.replication.klass,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/service/PaxosStateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/PaxosStateTest.java 
b/test/unit/org/apache/cassandra/service/PaxosStateTest.java
index 8054c61..6c12001 100644
--- a/test/unit/org/apache/cassandra/service/PaxosStateTest.java
+++ b/test/unit/org/apache/cassandra/service/PaxosStateTest.java
@@ -59,7 +59,7 @@ public class PaxosStateTest
         ColumnFamilyStore cfs = 
Keyspace.open("PaxosStateTestKeyspace1").getColumnFamilyStore("Standard1");
         String key = "key" + System.nanoTime();
         ByteBuffer value = ByteBufferUtil.bytes(0);
-        RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, 
FBUtilities.timestampMicros(), key);
+        RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 
FBUtilities.timestampMicros(), key);
         builder.clustering("a").add("val", value);
         PartitionUpdate update = 
Iterables.getOnlyElement(builder.build().getPartitionUpdates());
 
@@ -78,7 +78,7 @@ public class PaxosStateTest
         assertNoDataPresent(cfs, Util.dk(key));
 
         // Now try again with a ballot created after the truncation
-        long timestamp = SystemKeyspace.getTruncatedAt(update.metadata().cfId) 
+ 1;
+        long timestamp = SystemKeyspace.getTruncatedAt(update.metadata().id) + 
1;
         Commit afterTruncate = newProposal(timestamp, update);
         PaxosState.commit(afterTruncate);
         assertDataPresent(cfs, Util.dk(key), "val", value);
@@ -93,7 +93,7 @@ public class PaxosStateTest
     {
         Row row = Util.getOnlyRowUnfiltered(Util.cmd(cfs, key).build());
         assertEquals(0, ByteBufferUtil.compareUnsigned(value,
-                
row.getCell(cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes(name))).value()));
+                
row.getCell(cfs.metadata().getColumn(ByteBufferUtil.bytes(name))).value()));
     }
 
     private void assertNoDataPresent(ColumnFamilyStore cfs, DecoratedKey key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/service/QueryPagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/QueryPagerTest.java 
b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
index 2104d14..d0b7704 100644
--- a/test/unit/org/apache/cassandra/service/QueryPagerTest.java
+++ b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
@@ -27,7 +27,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.cassandra.*;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
@@ -59,16 +60,18 @@ public class QueryPagerTest
     public static void defineSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
+
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD));
+
         SchemaLoader.createKeyspace(KEYSPACE_CQL,
                                     KeyspaceParams.simple(1),
-                                    CFMetaData.compile("CREATE TABLE " + 
CF_CQL + " ("
-                                            + "k text,"
-                                            + "c text,"
-                                            + "v text,"
-                                            + "PRIMARY KEY (k, c))", 
KEYSPACE_CQL));
+                                    CreateTableStatement.parse("CREATE TABLE " 
+ CF_CQL + " ("
+                                                               + "k text,"
+                                                               + "c text,"
+                                                               + "v text,"
+                                                               + "PRIMARY KEY 
(k, c))", KEYSPACE_CQL));
         addData();
     }
 
@@ -101,7 +104,7 @@ public class QueryPagerTest
         {
             for (int j = 0; j < nbCols; j++)
             {
-                RowUpdateBuilder builder = new 
RowUpdateBuilder(cfs().metadata, FBUtilities.timestampMicros(), "k" + i);
+                RowUpdateBuilder builder = new 
RowUpdateBuilder(cfs().metadata(), FBUtilities.timestampMicros(), "k" + i);
                 builder.clustering("c" + j).add("val", 
"").build().applyUnsafe();
             }
         }
@@ -156,12 +159,12 @@ public class QueryPagerTest
     private static SinglePartitionReadCommand sliceQuery(String key, String 
start, String end, boolean reversed, int count)
     {
         ClusteringComparator cmp = cfs().getComparator();
-        CFMetaData metadata = cfs().metadata;
+        TableMetadata metadata = cfs().metadata();
 
         Slice slice = Slice.make(cmp.make(start), cmp.make(end));
         ClusteringIndexSliceFilter filter = new 
ClusteringIndexSliceFilter(Slices.with(cmp, slice), reversed);
 
-        return SinglePartitionReadCommand.create(cfs().metadata, nowInSec, 
ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, Util.dk(key), 
filter);
+        return SinglePartitionReadCommand.create(metadata, nowInSec, 
ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, Util.dk(key), 
filter);
     }
 
     private static ReadCommand rangeNamesQuery(String keyStart, String keyEnd, 
int count, String... names)
@@ -427,7 +430,7 @@ public class QueryPagerTest
         for (int i = 0; i < 5; i++)
             executeInternal(String.format("INSERT INTO %s.%s (k, c, v) VALUES 
('k%d', 'c%d', null)", keyspace, table, 0, i));
 
-        ReadCommand command = SinglePartitionReadCommand.create(cfs.metadata, 
nowInSec, Util.dk("k0"), Slice.ALL);
+        ReadCommand command = 
SinglePartitionReadCommand.create(cfs.metadata(), nowInSec, Util.dk("k0"), 
Slice.ALL);
 
         QueryPager pager = command.getPager(null, ProtocolVersion.CURRENT);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/service/StartupChecksTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StartupChecksTest.java 
b/test/unit/org/apache/cassandra/service/StartupChecksTest.java
index 224f3d9..e2c3e6b 100644
--- a/test/unit/org/apache/cassandra/service/StartupChecksTest.java
+++ b/test/unit/org/apache/cassandra/service/StartupChecksTest.java
@@ -27,7 +27,7 @@ import org.junit.*;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.io.util.FileUtils;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java 
b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 297d19d..3ef7bbb 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -35,9 +35,9 @@ import org.junit.runner.RunWith;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.WindowsFailedSnapshotTracker;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -206,7 +206,7 @@ public class StorageServiceServerTest
 
         Keyspace.clear("Keyspace1");
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.create(false, configOptions));
-        Schema.instance.setKeyspaceMetadata(meta);
+        Schema.instance.load(meta);
 
         Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
                                                                                
                             InetAddress.getByName("127.0.0.1"));
@@ -249,7 +249,7 @@ public class StorageServiceServerTest
 
         Keyspace.clear("Keyspace1");
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.create(false, configOptions));
-        Schema.instance.setKeyspaceMetadata(meta);
+        Schema.instance.load(meta);
 
         Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.1"));
         assert primaryRanges.size() == 1;
@@ -286,7 +286,7 @@ public class StorageServiceServerTest
 
         Keyspace.clear("Keyspace1");
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.create(false, configOptions));
-        Schema.instance.setKeyspaceMetadata(meta);
+        Schema.instance.load(meta);
 
         // endpoints in DC1 should not have primary range
         Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.1"));
@@ -325,7 +325,7 @@ public class StorageServiceServerTest
 
         Keyspace.clear("Keyspace1");
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.create(false, configOptions));
-        Schema.instance.setKeyspaceMetadata(meta);
+        Schema.instance.load(meta);
 
         // endpoints in DC1 should not have primary range
         Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, 
InetAddress.getByName("127.0.0.1"));
@@ -377,7 +377,7 @@ public class StorageServiceServerTest
 
         Keyspace.clear("Keyspace1");
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.create(false, configOptions));
-        Schema.instance.setKeyspaceMetadata(meta);
+        Schema.instance.load(meta);
 
         // endpoints in DC1 should not have primary range
         Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.1"));
@@ -444,7 +444,7 @@ public class StorageServiceServerTest
 
         Keyspace.clear("Keyspace1");
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.create(false, configOptions));
-        Schema.instance.setKeyspaceMetadata(meta);
+        Schema.instance.load(meta);
 
         // endpoints in DC1 should have primary ranges which also cover DC2
         Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, 
InetAddress.getByName("127.0.0.1"));
@@ -503,7 +503,7 @@ public class StorageServiceServerTest
 
         Keyspace.clear("Keyspace1");
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.simpleTransient(2));
-        Schema.instance.setKeyspaceMetadata(meta);
+        Schema.instance.load(meta);
 
         Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.1"));
         assert primaryRanges.size() == 1;
@@ -534,7 +534,7 @@ public class StorageServiceServerTest
 
         Keyspace.clear("Keyspace1");
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.simpleTransient(2));
-        Schema.instance.setKeyspaceMetadata(meta);
+        Schema.instance.load(meta);
 
         Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, 
InetAddress.getByName("127.0.0.1"));
         assert primaryRanges.size() == 1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java 
b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index 80ef29d..ae18be9 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 
 import org.junit.Test;
 
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class SessionInfoTest
@@ -35,17 +36,17 @@ public class SessionInfoTest
     @Test
     public void testTotals()
     {
-        UUID cfId = UUID.randomUUID();
+        TableId tableId = TableId.generate();
         InetAddress local = FBUtilities.getLocalAddress();
 
         Collection<StreamSummary> summaries = new ArrayList<>();
         for (int i = 0; i < 10; i++)
         {
-            StreamSummary summary = new StreamSummary(cfId, i, (i + 1) * 10);
+            StreamSummary summary = new StreamSummary(tableId, i, (i + 1) * 
10);
             summaries.add(summary);
         }
 
-        StreamSummary sending = new StreamSummary(cfId, 10, 100);
+        StreamSummary sending = new StreamSummary(tableId, 10, 100);
         SessionInfo info = new SessionInfo(local, 0, local, summaries, 
Collections.singleton(sending), StreamSession.State.PREPARING);
 
         assert info.getTotalFilesToReceive() == 45;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 04be91a..edd9d9c 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -41,9 +41,11 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Ref;
+import org.hsqldb.Table;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -84,7 +86,7 @@ public class StreamTransferTaskTest
         }
 
         // create streaming task that streams those two sstables
-        StreamTransferTask task = new StreamTransferTask(session, 
cfs.metadata.cfId);
+        StreamTransferTask task = new StreamTransferTask(session, 
cfs.metadata.id);
         for (SSTableReader sstable : cfs.getLiveSSTables())
         {
             List<Range<Token>> ranges = new ArrayList<>();
@@ -133,7 +135,7 @@ public class StreamTransferTaskTest
         }
 
         // create streaming task that streams those two sstables
-        StreamTransferTask task = new StreamTransferTask(session, 
cfs.metadata.cfId);
+        StreamTransferTask task = new StreamTransferTask(session, 
cfs.metadata.id);
         List<Ref<SSTableReader>> refs = new 
ArrayList<>(cfs.getLiveSSTables().size());
         for (SSTableReader sstable : cfs.getLiveSSTables())
         {
@@ -146,7 +148,7 @@ public class StreamTransferTaskTest
         assertEquals(2, task.getTotalNumberOfFiles());
 
         //add task to stream session, so it is aborted when stream session 
fails
-        session.transfers.put(UUID.randomUUID(), task);
+        session.transfers.put(TableId.generate(), task);
 
         //make a copy of outgoing file messages, since task is cleared when 
it's aborted
         Collection<OutgoingFileMessage> files = new 
LinkedList<>(task.files.values());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 8f3061a..9219e18 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -35,7 +35,7 @@ import junit.framework.Assert;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -54,6 +54,9 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Refs;
 
+import static org.apache.cassandra.SchemaLoader.compositeIndexCFMD;
+import static org.apache.cassandra.SchemaLoader.createKeyspace;
+import static org.apache.cassandra.SchemaLoader.standardCFMD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -83,25 +86,26 @@ public class StreamingTransferTest
     {
         SchemaLoader.prepareServer();
         StorageService.instance.initServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD),
-                                    CFMetaData.Builder.create(KEYSPACE1, 
CF_COUNTER, false, true, true)
-                                                      .addPartitionKey("key", 
BytesType.instance)
-                                                      .build(),
-                                    CFMetaData.Builder.create(KEYSPACE1, 
CF_STANDARDINT)
-                                                      .addPartitionKey("key", 
AsciiType.instance)
-                                                      
.addClusteringColumn("cols", Int32Type.instance)
-                                                      .addRegularColumn("val", 
BytesType.instance)
-                                                      .build(),
-                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, 
CF_INDEX, true));
-        SchemaLoader.createKeyspace(KEYSPACE2,
-                                    KeyspaceParams.simple(1));
-        SchemaLoader.createKeyspace(KEYSPACE_CACHEKEY,
-                                    KeyspaceParams.simple(1),
-                                    
SchemaLoader.standardCFMD(KEYSPACE_CACHEKEY, CF_STANDARD),
-                                    
SchemaLoader.standardCFMD(KEYSPACE_CACHEKEY, CF_STANDARD2),
-                                    
SchemaLoader.standardCFMD(KEYSPACE_CACHEKEY, CF_STANDARD3));
+
+        createKeyspace(KEYSPACE1,
+                       KeyspaceParams.simple(1),
+                       standardCFMD(KEYSPACE1, CF_STANDARD),
+                       TableMetadata.builder(KEYSPACE1, CF_COUNTER)
+                                    .isCounter(true)
+                                    .addPartitionKeyColumn("key", 
BytesType.instance),
+                       TableMetadata.builder(KEYSPACE1, CF_STANDARDINT)
+                                    .addPartitionKeyColumn("key", 
AsciiType.instance)
+                                    .addClusteringColumn("cols", 
Int32Type.instance)
+                                    .addRegularColumn("val", 
BytesType.instance),
+                       compositeIndexCFMD(KEYSPACE1, CF_INDEX, true));
+
+        createKeyspace(KEYSPACE2, KeyspaceParams.simple(1));
+
+        createKeyspace(KEYSPACE_CACHEKEY,
+                       KeyspaceParams.simple(1),
+                       standardCFMD(KEYSPACE_CACHEKEY, CF_STANDARD),
+                       standardCFMD(KEYSPACE_CACHEKEY, CF_STANDARD2),
+                       standardCFMD(KEYSPACE_CACHEKEY, CF_STANDARD3));
     }
 
     /**
@@ -311,7 +315,7 @@ public class StreamingTransferTest
             {
                 long val = key.hashCode();
 
-                RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, 
timestamp, key);
+                RowUpdateBuilder builder = new 
RowUpdateBuilder(cfs.metadata(), timestamp, key);
                 builder.clustering(col).add("birthdate", 
ByteBufferUtil.bytes(val));
                 builder.build().applyUnsafe();
             }
@@ -324,7 +328,7 @@ public class StreamingTransferTest
 
             // test we can search:
             UntypedResultSet result = 
QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE 
birthdate = %d",
-                    cfs.metadata.ksName, cfs.metadata.cfName, val));
+                                                                               
    cfs.metadata.keyspace, cfs.metadata.name, val));
             assertEquals(1, result.size());
 
             assert 
result.iterator().next().getBytes("key").equals(ByteBufferUtil.bytes(key));
@@ -346,7 +350,7 @@ public class StreamingTransferTest
         String key = "key1";
 
 
-        RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, 
FBUtilities.timestampMicros(), key);
+        RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata(), 
FBUtilities.timestampMicros(), key);
 
         // add columns of size slightly less than column_index_size to force 
insert column index
         updates.clustering(1)
@@ -354,7 +358,7 @@ public class StreamingTransferTest
                 .build()
                 .apply();
 
-        updates = new RowUpdateBuilder(cfs.metadata, 
FBUtilities.timestampMicros(), key);
+        updates = new RowUpdateBuilder(cfs.metadata(), 
FBUtilities.timestampMicros(), key);
         updates.clustering(6)
                 .add("val", ByteBuffer.wrap(new 
byte[DatabaseDescriptor.getColumnIndexSize()]))
                 .build()
@@ -367,7 +371,7 @@ public class StreamingTransferTest
         //        .apply();
 
 
-        updates = new RowUpdateBuilder(cfs.metadata, 
FBUtilities.timestampMicros() + 1, key);
+        updates = new RowUpdateBuilder(cfs.metadata(), 
FBUtilities.timestampMicros() + 1, key);
         updates.addRangeTombstone(5, 7)
                 .build()
                 .apply();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/tools/ToolsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/ToolsTester.java 
b/test/unit/org/apache/cassandra/tools/ToolsTester.java
index ead4e31..8dd7314 100644
--- a/test/unit/org/apache/cassandra/tools/ToolsTester.java
+++ b/test/unit/org/apache/cassandra/tools/ToolsTester.java
@@ -114,12 +114,12 @@ public abstract class ToolsTester
 
     public void assertSchemaNotLoaded()
     {
-        assertClassNotLoaded("org.apache.cassandra.config.Schema");
+        assertClassNotLoaded("org.apache.cassandra.schema.Schema");
     }
 
     public void assertSchemaLoaded()
     {
-        assertClassLoaded("org.apache.cassandra.config.Schema");
+        assertClassLoaded("org.apache.cassandra.schema.Schema");
     }
 
     public void assertKeyspaceNotLoaded()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java 
b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
index 90b4cdf..a796daf 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@ -23,7 +23,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.TriggerMetadata;
+import org.apache.cassandra.schema.Triggers;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -51,7 +52,7 @@ public class TriggerExecutorTest
     @Test
     public void sameKeySameCfColumnFamilies() throws ConfigurationException, 
InvalidRequestException
     {
-        CFMetaData metadata = makeCfMetaData("ks1", "cf1", 
TriggerMetadata.create("test", SameKeySameCfTrigger.class.getName()));
+        TableMetadata metadata = makeTableMetadata("ks1", "cf1", 
TriggerMetadata.create("test", SameKeySameCfTrigger.class.getName()));
         PartitionUpdate mutated = 
TriggerExecutor.instance.execute(makeCf(metadata, "k1", "v1", null));
 
         try (RowIterator rowIterator = 
UnfilteredRowIterators.filter(mutated.unfilteredIterator(),
@@ -67,21 +68,21 @@ public class TriggerExecutorTest
     @Test(expected = InvalidRequestException.class)
     public void sameKeyDifferentCfColumnFamilies() throws 
ConfigurationException, InvalidRequestException
     {
-        CFMetaData metadata = makeCfMetaData("ks1", "cf1", 
TriggerMetadata.create("test", SameKeyDifferentCfTrigger.class.getName()));
+        TableMetadata metadata = makeTableMetadata("ks1", "cf1", 
TriggerMetadata.create("test", SameKeyDifferentCfTrigger.class.getName()));
         TriggerExecutor.instance.execute(makeCf(metadata, "k1", "v1", null));
     }
 
     @Test(expected = InvalidRequestException.class)
     public void differentKeyColumnFamilies() throws ConfigurationException, 
InvalidRequestException
     {
-        CFMetaData metadata = makeCfMetaData("ks1", "cf1", 
TriggerMetadata.create("test", DifferentKeyTrigger.class.getName()));
+        TableMetadata metadata = makeTableMetadata("ks1", "cf1", 
TriggerMetadata.create("test", DifferentKeyTrigger.class.getName()));
         TriggerExecutor.instance.execute(makeCf(metadata, "k1", "v1", null));
     }
 
     @Test
     public void noTriggerMutations() throws ConfigurationException, 
InvalidRequestException
     {
-        CFMetaData metadata = makeCfMetaData("ks1", "cf1", 
TriggerMetadata.create("test", NoOpTrigger.class.getName()));
+        TableMetadata metadata = makeTableMetadata("ks1", "cf1", 
TriggerMetadata.create("test", NoOpTrigger.class.getName()));
         Mutation rm = new Mutation(makeCf(metadata, "k1", "v1", null));
         
assertNull(TriggerExecutor.instance.execute(Collections.singletonList(rm)));
     }
@@ -89,7 +90,7 @@ public class TriggerExecutorTest
     @Test
     public void sameKeySameCfRowMutations() throws ConfigurationException, 
InvalidRequestException
     {
-        CFMetaData metadata = makeCfMetaData("ks1", "cf1", 
TriggerMetadata.create("test", SameKeySameCfTrigger.class.getName()));
+        TableMetadata metadata = makeTableMetadata("ks1", "cf1", 
TriggerMetadata.create("test", SameKeySameCfTrigger.class.getName()));
         PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
         PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
         Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
@@ -102,20 +103,20 @@ public class TriggerExecutorTest
         List<PartitionUpdate> mutatedCFs = new 
ArrayList<>(tmutations.get(0).getPartitionUpdates());
         assertEquals(1, mutatedCFs.size());
         Row row = mutatedCFs.get(0).iterator().next();
-        assertEquals(bytes("k1v1"), 
row.getCell(metadata.getColumnDefinition(bytes("c1"))).value());
-        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumnDefinition(bytes("c2"))).value());
+        assertEquals(bytes("k1v1"), 
row.getCell(metadata.getColumn(bytes("c1"))).value());
+        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumn(bytes("c2"))).value());
 
         mutatedCFs = new ArrayList<>(tmutations.get(1).getPartitionUpdates());
         assertEquals(1, mutatedCFs.size());
         row = mutatedCFs.get(0).iterator().next();
-        assertEquals(bytes("k2v1"), 
row.getCell(metadata.getColumnDefinition(bytes("c1"))).value());
-        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumnDefinition(bytes("c2"))).value());
+        assertEquals(bytes("k2v1"), 
row.getCell(metadata.getColumn(bytes("c1"))).value());
+        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumn(bytes("c2"))).value());
     }
 
     @Test
     public void sameKeySameCfPartialRowMutations() throws 
ConfigurationException, InvalidRequestException
     {
-        CFMetaData metadata = makeCfMetaData("ks1", "cf1", 
TriggerMetadata.create("test", SameKeySameCfPartialTrigger.class.getName()));
+        TableMetadata metadata = makeTableMetadata("ks1", "cf1", 
TriggerMetadata.create("test", SameKeySameCfPartialTrigger.class.getName()));
         PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
         PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
         Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
@@ -128,20 +129,20 @@ public class TriggerExecutorTest
         List<PartitionUpdate> mutatedCFs = new 
ArrayList<>(tmutations.get(0).getPartitionUpdates());
         assertEquals(1, mutatedCFs.size());
         Row row = mutatedCFs.get(0).iterator().next();
-        assertEquals(bytes("k1v1"), 
row.getCell(metadata.getColumnDefinition(bytes("c1"))).value());
-        assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2"))));
+        assertEquals(bytes("k1v1"), 
row.getCell(metadata.getColumn(bytes("c1"))).value());
+        assertNull(row.getCell(metadata.getColumn(bytes("c2"))));
 
         mutatedCFs = new ArrayList<>(tmutations.get(1).getPartitionUpdates());
         assertEquals(1, mutatedCFs.size());
         row = mutatedCFs.get(0).iterator().next();
-        assertEquals(bytes("k2v1"), 
row.getCell(metadata.getColumnDefinition(bytes("c1"))).value());
-        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumnDefinition(bytes("c2"))).value());
+        assertEquals(bytes("k2v1"), 
row.getCell(metadata.getColumn(bytes("c1"))).value());
+        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumn(bytes("c2"))).value());
     }
 
     @Test
     public void sameKeyDifferentCfRowMutations() throws 
ConfigurationException, InvalidRequestException
     {
-        CFMetaData metadata = makeCfMetaData("ks1", "cf1", 
TriggerMetadata.create("test", SameKeyDifferentCfTrigger.class.getName()));
+        TableMetadata metadata = makeTableMetadata("ks1", "cf1", 
TriggerMetadata.create("test", SameKeyDifferentCfTrigger.class.getName()));
         PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
         PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
         Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
@@ -155,17 +156,17 @@ public class TriggerExecutorTest
         assertEquals(2, mutatedCFs.size());
         for (PartitionUpdate update : mutatedCFs)
         {
-            if (update.metadata().cfName.equals("cf1"))
+            if (update.metadata().name.equals("cf1"))
             {
                 Row row = update.iterator().next();
-                assertEquals(bytes("k1v1"), 
row.getCell(metadata.getColumnDefinition(bytes("c1"))).value());
-                
assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2"))));
+                assertEquals(bytes("k1v1"), 
row.getCell(metadata.getColumn(bytes("c1"))).value());
+                assertNull(row.getCell(metadata.getColumn(bytes("c2"))));
             }
             else
             {
                 Row row = update.iterator().next();
-                
assertNull(row.getCell(metadata.getColumnDefinition(bytes("c1"))));
-                assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumnDefinition(bytes("c2"))).value());
+                assertNull(row.getCell(metadata.getColumn(bytes("c1"))));
+                assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumn(bytes("c2"))).value());
             }
         }
 
@@ -174,17 +175,17 @@ public class TriggerExecutorTest
 
         for (PartitionUpdate update : mutatedCFs)
         {
-            if (update.metadata().cfName.equals("cf1"))
+            if (update.metadata().name.equals("cf1"))
             {
                 Row row = update.iterator().next();
-                assertEquals(bytes("k2v1"), 
row.getCell(metadata.getColumnDefinition(bytes("c1"))).value());
-                
assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2"))));
+                assertEquals(bytes("k2v1"), 
row.getCell(metadata.getColumn(bytes("c1"))).value());
+                assertNull(row.getCell(metadata.getColumn(bytes("c2"))));
             }
             else
             {
                 Row row = update.iterator().next();
-                
assertNull(row.getCell(metadata.getColumnDefinition(bytes("c1"))));
-                assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumnDefinition(bytes("c2"))).value());
+                assertNull(row.getCell(metadata.getColumn(bytes("c1"))));
+                assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumn(bytes("c2"))).value());
             }
         }
     }
@@ -192,7 +193,7 @@ public class TriggerExecutorTest
     @Test
     public void sameKeyDifferentKsRowMutations() throws 
ConfigurationException, InvalidRequestException
     {
-        CFMetaData metadata = makeCfMetaData("ks1", "cf1", 
TriggerMetadata.create("test", SameKeyDifferentKsTrigger.class.getName()));
+        TableMetadata metadata = makeTableMetadata("ks1", "cf1", 
TriggerMetadata.create("test", SameKeyDifferentKsTrigger.class.getName()));
         PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
         PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
         Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
@@ -205,33 +206,33 @@ public class TriggerExecutorTest
         List<PartitionUpdate> mutatedCFs = new 
ArrayList<>(tmutations.get(0).getPartitionUpdates());
         assertEquals(1, mutatedCFs.size());
         Row row = mutatedCFs.get(0).iterator().next();
-        assertEquals(bytes("k1v1"), 
row.getCell(metadata.getColumnDefinition(bytes("c1"))).value());
-        assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2"))));
+        assertEquals(bytes("k1v1"), 
row.getCell(metadata.getColumn(bytes("c1"))).value());
+        assertNull(row.getCell(metadata.getColumn(bytes("c2"))));
 
         mutatedCFs = new ArrayList<>(tmutations.get(1).getPartitionUpdates());
         assertEquals(1, mutatedCFs.size());
         row = mutatedCFs.get(0).iterator().next();
-        assertEquals(bytes("k2v1"), 
row.getCell(metadata.getColumnDefinition(bytes("c1"))).value());
-        assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2"))));
+        assertEquals(bytes("k2v1"), 
row.getCell(metadata.getColumn(bytes("c1"))).value());
+        assertNull(row.getCell(metadata.getColumn(bytes("c2"))));
 
         mutatedCFs = new ArrayList<>(tmutations.get(2).getPartitionUpdates());
         assertEquals(1, mutatedCFs.size());
         row = mutatedCFs.get(0).iterator().next();
-        assertNull(row.getCell(metadata.getColumnDefinition(bytes("c1"))));
-        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumnDefinition(bytes("c2"))).value());
+        assertNull(row.getCell(metadata.getColumn(bytes("c1"))));
+        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumn(bytes("c2"))).value());
 
         mutatedCFs = new ArrayList<>(tmutations.get(3).getPartitionUpdates());
         assertEquals(1, mutatedCFs.size());
         row = mutatedCFs.get(0).iterator().next();
-        assertNull(row.getCell(metadata.getColumnDefinition(bytes("c1"))));
-        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumnDefinition(bytes("c2"))).value());
+        assertNull(row.getCell(metadata.getColumn(bytes("c1"))));
+        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumn(bytes("c2"))).value());
     }
 
     @Test
     public void differentKeyRowMutations() throws ConfigurationException, 
InvalidRequestException
     {
 
-        CFMetaData metadata = makeCfMetaData("ks1", "cf1", 
TriggerMetadata.create("test", DifferentKeyTrigger.class.getName()));
+        TableMetadata metadata = makeTableMetadata("ks1", "cf1", 
TriggerMetadata.create("test", DifferentKeyTrigger.class.getName()));
         PartitionUpdate cf1 = makeCf(metadata, "k1", "v1", null);
         Mutation rm = new Mutation("ks1", cf1.partitionKey()).add(cf1);
 
@@ -245,46 +246,39 @@ public class TriggerExecutorTest
         List<PartitionUpdate> mutatedCFs = new 
ArrayList<>(tmutations.get(0).getPartitionUpdates());
         assertEquals(1, mutatedCFs.size());
         Row row = mutatedCFs.get(0).iterator().next();
-        assertEquals(bytes("v1"), 
row.getCell(metadata.getColumnDefinition(bytes("c1"))).value());
-        assertNull(row.getCell(metadata.getColumnDefinition(bytes("c2"))));
+        assertEquals(bytes("v1"), 
row.getCell(metadata.getColumn(bytes("c1"))).value());
+        assertNull(row.getCell(metadata.getColumn(bytes("c2"))));
 
         mutatedCFs = new ArrayList<>(tmutations.get(1).getPartitionUpdates());
         assertEquals(1, mutatedCFs.size());
         row = mutatedCFs.get(0).iterator().next();
-        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumnDefinition(bytes("c2"))).value());
-        assertNull(row.getCell(metadata.getColumnDefinition(bytes("c1"))));
+        assertEquals(bytes("trigger"), 
row.getCell(metadata.getColumn(bytes("c2"))).value());
+        assertNull(row.getCell(metadata.getColumn(bytes("c1"))));
     }
 
-    private static CFMetaData makeCfMetaData(String ks, String cf, 
TriggerMetadata trigger)
+    private static TableMetadata makeTableMetadata(String ks, String cf, 
TriggerMetadata trigger)
     {
-        CFMetaData metadata = CFMetaData.Builder.create(ks, cf)
-                .addPartitionKey("pkey", UTF8Type.instance)
-                .addRegularColumn("c1", UTF8Type.instance)
-                .addRegularColumn("c2", UTF8Type.instance)
-                .build();
+        TableMetadata.Builder builder =
+            TableMetadata.builder(ks, cf)
+                         .addPartitionKeyColumn("pkey", UTF8Type.instance)
+                         .addRegularColumn("c1", UTF8Type.instance)
+                         .addRegularColumn("c2", UTF8Type.instance);
 
-        try
-        {
-            if (trigger != null)
-                metadata.triggers(metadata.getTriggers().with(trigger));
-        }
-        catch (InvalidRequestException e)
-        {
-            throw new AssertionError(e);
-        }
+        if (trigger != null)
+            builder.triggers(Triggers.of(trigger));
 
-        return metadata;
+        return builder.build();
     }
 
-    private static PartitionUpdate makeCf(CFMetaData metadata, String key, 
String columnValue1, String columnValue2)
+    private static PartitionUpdate makeCf(TableMetadata metadata, String key, 
String columnValue1, String columnValue2)
     {
         Row.Builder builder = 
BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
         builder.newRow(Clustering.EMPTY);
         long ts = FBUtilities.timestampMicros();
         if (columnValue1 != null)
-            
builder.addCell(BufferCell.live(metadata.getColumnDefinition(bytes("c1")), ts, 
bytes(columnValue1)));
+            builder.addCell(BufferCell.live(metadata.getColumn(bytes("c1")), 
ts, bytes(columnValue1)));
         if (columnValue2 != null)
-            
builder.addCell(BufferCell.live(metadata.getColumnDefinition(bytes("c2")), ts, 
bytes(columnValue2)));
+            builder.addCell(BufferCell.live(metadata.getColumn(bytes("c2")), 
ts, bytes(columnValue2)));
 
         return PartitionUpdate.singleRowUpdate(metadata, Util.dk(key), 
builder.build());
     }
@@ -324,7 +318,7 @@ public class TriggerExecutorTest
     {
         public Collection<Mutation> augment(Partition partition)
         {
-            RowUpdateBuilder builder = new 
RowUpdateBuilder(makeCfMetaData(partition.metadata().ksName, "otherCf", null), 
FBUtilities.timestampMicros(), partition.partitionKey().getKey());
+            RowUpdateBuilder builder = new 
RowUpdateBuilder(makeTableMetadata(partition.metadata().keyspace, "otherCf", 
null), FBUtilities.timestampMicros(), partition.partitionKey().getKey());
             builder.add("c2", bytes("trigger"));
             return Collections.singletonList(builder.build());
         }
@@ -334,7 +328,7 @@ public class TriggerExecutorTest
     {
         public Collection<Mutation> augment(Partition partition)
         {
-            RowUpdateBuilder builder = new 
RowUpdateBuilder(makeCfMetaData("otherKs", "otherCf", null), 
FBUtilities.timestampMicros(), partition.partitionKey().getKey());
+            RowUpdateBuilder builder = new 
RowUpdateBuilder(makeTableMetadata("otherKs", "otherCf", null), 
FBUtilities.timestampMicros(), partition.partitionKey().getKey());
             builder.add("c2", bytes("trigger"));
             return Collections.singletonList(builder.build());
         }
@@ -344,7 +338,7 @@ public class TriggerExecutorTest
     {
         public Collection<Mutation> augment(Partition partition)
         {
-            RowUpdateBuilder builder = new 
RowUpdateBuilder(makeCfMetaData("otherKs", "otherCf", null), 
FBUtilities.timestampMicros(), "otherKey");
+            RowUpdateBuilder builder = new 
RowUpdateBuilder(makeTableMetadata("otherKs", "otherCf", null), 
FBUtilities.timestampMicros(), "otherKey");
             builder.add("c2", bytes("trigger"));
             return Collections.singletonList(builder.build());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/triggers/TriggersSchemaTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggersSchemaTest.java 
b/test/unit/org/apache/cassandra/triggers/TriggersSchemaTest.java
index b6549bb..88f74a2 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggersSchemaTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersSchemaTest.java
@@ -21,14 +21,16 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.schema.TriggerMetadata;
-import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.schema.Triggers;
+import org.apache.cassandra.schema.MigrationManager;
 
 import static org.junit.Assert.*;
 
@@ -49,15 +51,18 @@ public class TriggersSchemaTest
     public void newKsContainsCfWithTrigger() throws Exception
     {
         TriggerMetadata td = TriggerMetadata.create(triggerName, triggerClass);
-        CFMetaData cfm1 = CFMetaData.compile(String.format("CREATE TABLE %s (k 
int PRIMARY KEY, v int)", cfName), ksName);
-        cfm1.triggers(cfm1.getTriggers().with(td));
-        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, 
KeyspaceParams.simple(1), Tables.of(cfm1));
+        TableMetadata tm =
+            CreateTableStatement.parse(String.format("CREATE TABLE %s (k int 
PRIMARY KEY, v int)", cfName), ksName)
+                                .triggers(Triggers.of(td))
+                                .build();
+
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, 
KeyspaceParams.simple(1), Tables.of(tm));
         MigrationManager.announceNewKeyspace(ksm);
 
-        CFMetaData cfm2 = Schema.instance.getCFMetaData(ksName, cfName);
-        assertFalse(cfm2.getTriggers().isEmpty());
-        assertEquals(1, cfm2.getTriggers().size());
-        assertEquals(td, cfm2.getTriggers().get(triggerName).get());
+        TableMetadata tm2 = Schema.instance.getTableMetadata(ksName, cfName);
+        assertFalse(tm2.triggers.isEmpty());
+        assertEquals(1, tm2.triggers.size());
+        assertEquals(td, tm2.triggers.get(triggerName).get());
     }
 
     @Test
@@ -66,50 +71,62 @@ public class TriggersSchemaTest
         KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, 
KeyspaceParams.simple(1));
         MigrationManager.announceNewKeyspace(ksm);
 
-        CFMetaData cfm1 = CFMetaData.compile(String.format("CREATE TABLE %s (k 
int PRIMARY KEY, v int)", cfName), ksName);
-        TriggerMetadata td = TriggerMetadata.create(triggerName, triggerClass);
-        cfm1.triggers(cfm1.getTriggers().with(td));
+        TableMetadata metadata =
+            CreateTableStatement.parse(String.format("CREATE TABLE %s (k int 
PRIMARY KEY, v int)", cfName), ksName)
+                                
.triggers(Triggers.of(TriggerMetadata.create(triggerName, triggerClass)))
+                                .build();
 
-        MigrationManager.announceNewColumnFamily(cfm1);
+        MigrationManager.announceNewTable(metadata);
 
-        CFMetaData cfm2 = Schema.instance.getCFMetaData(ksName, cfName);
-        assertFalse(cfm2.getTriggers().isEmpty());
-        assertEquals(1, cfm2.getTriggers().size());
-        assertEquals(td, cfm2.getTriggers().get(triggerName).get());
+        metadata = Schema.instance.getTableMetadata(ksName, cfName);
+        assertFalse(metadata.triggers.isEmpty());
+        assertEquals(1, metadata.triggers.size());
+        assertEquals(TriggerMetadata.create(triggerName, triggerClass), 
metadata.triggers.get(triggerName).get());
     }
 
     @Test
     public void addTriggerToCf() throws Exception
     {
-        CFMetaData cfm1 = CFMetaData.compile(String.format("CREATE TABLE %s (k 
int PRIMARY KEY, v int)", cfName), ksName);
-        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, 
KeyspaceParams.simple(1), Tables.of(cfm1));
+        TableMetadata tm1 =
+            CreateTableStatement.parse(String.format("CREATE TABLE %s (k int 
PRIMARY KEY, v int)", cfName), ksName)
+                                .build();
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, 
KeyspaceParams.simple(1), Tables.of(tm1));
         MigrationManager.announceNewKeyspace(ksm);
 
-        CFMetaData cfm2 = Schema.instance.getCFMetaData(ksName, cfName).copy();
         TriggerMetadata td = TriggerMetadata.create(triggerName, triggerClass);
-        cfm2.triggers(cfm2.getTriggers().with(td));
-        MigrationManager.announceColumnFamilyUpdate(cfm2);
-
-        CFMetaData cfm3 = Schema.instance.getCFMetaData(ksName, cfName);
-        assertFalse(cfm3.getTriggers().isEmpty());
-        assertEquals(1, cfm3.getTriggers().size());
-        assertEquals(td, cfm3.getTriggers().get(triggerName).get());
+        TableMetadata tm2 =
+            Schema.instance
+                  .getTableMetadata(ksName, cfName)
+                  .unbuild()
+                  .triggers(Triggers.of(td))
+                  .build();
+        MigrationManager.announceTableUpdate(tm2);
+
+        TableMetadata tm3 = Schema.instance.getTableMetadata(ksName, cfName);
+        assertFalse(tm3.triggers.isEmpty());
+        assertEquals(1, tm3.triggers.size());
+        assertEquals(td, tm3.triggers.get(triggerName).get());
     }
 
     @Test
     public void removeTriggerFromCf() throws Exception
     {
         TriggerMetadata td = TriggerMetadata.create(triggerName, triggerClass);
-        CFMetaData cfm1 = CFMetaData.compile(String.format("CREATE TABLE %s (k 
int PRIMARY KEY, v int)", cfName), ksName);
-        cfm1.triggers(cfm1.getTriggers().with(td));
-        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, 
KeyspaceParams.simple(1), Tables.of(cfm1));
+        TableMetadata tm =
+            CreateTableStatement.parse(String.format("CREATE TABLE %s (k int 
PRIMARY KEY, v int)", cfName), ksName)
+                                .triggers(Triggers.of(td))
+                                .build();
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, 
KeyspaceParams.simple(1), Tables.of(tm));
         MigrationManager.announceNewKeyspace(ksm);
 
-        CFMetaData cfm2 = Schema.instance.getCFMetaData(ksName, cfName).copy();
-        cfm2.triggers(cfm2.getTriggers().without(triggerName));
-        MigrationManager.announceColumnFamilyUpdate(cfm2);
+        TableMetadata tm1 = Schema.instance.getTableMetadata(ksName, cfName);
+        TableMetadata tm2 =
+            tm1.unbuild()
+               .triggers(tm1.triggers.without(triggerName))
+               .build();
+        MigrationManager.announceTableUpdate(tm2);
 
-        CFMetaData cfm3 = Schema.instance.getCFMetaData(ksName, cfName).copy();
-        assertTrue(cfm3.getTriggers().isEmpty());
+        TableMetadata tm3 = Schema.instance.getTableMetadata(ksName, cfName);
+        assertTrue(tm3.triggers.isEmpty());
     }
 }

Reply via email to