http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 43c22a3..228af33 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -25,10 +25,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.ColumnDefinition.Raw;
-import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.ColumnMetadata.Raw;
+import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
 import org.apache.cassandra.cql3.conditions.ColumnConditions;
@@ -71,24 +72,24 @@ public abstract class ModificationStatement implements 
CQLStatement
     protected final StatementType type;
 
     private final int boundTerms;
-    public final CFMetaData cfm;
+    public final TableMetadata metadata;
     private final Attributes attrs;
 
     private final StatementRestrictions restrictions;
 
     private final Operations operations;
 
-    private final PartitionColumns updatedColumns;
+    private final RegularAndStaticColumns updatedColumns;
 
     private final Conditions conditions;
 
-    private final PartitionColumns conditionColumns;
+    private final RegularAndStaticColumns conditionColumns;
 
-    private final PartitionColumns requiresRead;
+    private final RegularAndStaticColumns requiresRead;
 
     public ModificationStatement(StatementType type,
                                  int boundTerms,
-                                 CFMetaData cfm,
+                                 TableMetadata metadata,
                                  Operations operations,
                                  StatementRestrictions restrictions,
                                  Conditions conditions,
@@ -96,7 +97,7 @@ public abstract class ModificationStatement implements 
CQLStatement
     {
         this.type = type;
         this.boundTerms = boundTerms;
-        this.cfm = cfm;
+        this.metadata = metadata;
         this.restrictions = restrictions;
         this.operations = operations;
         this.conditions = conditions;
@@ -104,17 +105,17 @@ public abstract class ModificationStatement implements 
CQLStatement
 
         if (!conditions.isEmpty())
         {
-            checkFalse(cfm.isCounter(), "Conditional updates are not supported 
on counter tables");
+            checkFalse(metadata.isCounter(), "Conditional updates are not 
supported on counter tables");
             checkFalse(attrs.isTimestampSet(), "Cannot provide custom 
timestamp for conditional updates");
         }
 
-        PartitionColumns.Builder conditionColumnsBuilder = 
PartitionColumns.builder();
-        Iterable<ColumnDefinition> columns = conditions.getColumns();
+        RegularAndStaticColumns.Builder conditionColumnsBuilder = 
RegularAndStaticColumns.builder();
+        Iterable<ColumnMetadata> columns = conditions.getColumns();
         if (columns != null)
             conditionColumnsBuilder.addAll(columns);
 
-        PartitionColumns.Builder updatedColumnsBuilder = 
PartitionColumns.builder();
-        PartitionColumns.Builder requiresReadBuilder = 
PartitionColumns.builder();
+        RegularAndStaticColumns.Builder updatedColumnsBuilder = 
RegularAndStaticColumns.builder();
+        RegularAndStaticColumns.Builder requiresReadBuilder = 
RegularAndStaticColumns.builder();
         for (Operation operation : operations)
         {
             updatedColumnsBuilder.add(operation.column);
@@ -127,13 +128,13 @@ public abstract class ModificationStatement implements 
CQLStatement
             }
         }
 
-        PartitionColumns modifiedColumns = updatedColumnsBuilder.build();
+        RegularAndStaticColumns modifiedColumns = 
updatedColumnsBuilder.build();
         // Compact tables have not row marker. So if we don't actually update 
any particular column,
         // this means that we're only updating the PK, which we allow if only 
those were declared in
         // the definition. In that case however, we do went to write the 
compactValueColumn (since again
         // we can't use a "row marker") so add it automatically.
-        if (cfm.isCompactTable() && modifiedColumns.isEmpty() && 
updatesRegularRows())
-            modifiedColumns = cfm.partitionColumns();
+        if (metadata.isCompactTable() && modifiedColumns.isEmpty() && 
updatesRegularRows())
+            modifiedColumns = metadata.regularAndStaticColumns();
 
         this.updatedColumns = modifiedColumns;
         this.conditionColumns = conditionColumnsBuilder.build();
@@ -155,6 +156,11 @@ public abstract class ModificationStatement implements 
CQLStatement
         conditions.addFunctionsTo(functions);
     }
 
+    public TableMetadata metadata()
+    {
+        return metadata;
+    }
+
     /*
      * May be used by QueryHandler implementations
      */
@@ -174,22 +180,22 @@ public abstract class ModificationStatement implements 
CQLStatement
 
     public String keyspace()
     {
-        return cfm.ksName;
+        return metadata.keyspace;
     }
 
     public String columnFamily()
     {
-        return cfm.cfName;
+        return metadata.name;
     }
 
     public boolean isCounter()
     {
-        return cfm.isCounter();
+        return metadata().isCounter();
     }
 
     public boolean isView()
     {
-        return cfm.isView();
+        return metadata().isView();
     }
 
     public long getTimestamp(long now, QueryOptions options) throws 
InvalidRequestException
@@ -204,23 +210,23 @@ public abstract class ModificationStatement implements 
CQLStatement
 
     public int getTimeToLive(QueryOptions options) throws 
InvalidRequestException
     {
-        return attrs.getTimeToLive(options, cfm.params.defaultTimeToLive);
+        return attrs.getTimeToLive(options, 
metadata().params.defaultTimeToLive);
     }
 
     public void checkAccess(ClientState state) throws InvalidRequestException, 
UnauthorizedException
     {
-        state.hasColumnFamilyAccess(cfm, Permission.MODIFY);
+        state.hasColumnFamilyAccess(metadata, Permission.MODIFY);
 
         // CAS updates can be used to simulate a SELECT query, so should 
require Permission.SELECT as well.
         if (hasConditions())
-            state.hasColumnFamilyAccess(cfm, Permission.SELECT);
+            state.hasColumnFamilyAccess(metadata, Permission.SELECT);
 
         // MV updates need to get the current state from the table, and might 
update the views
         // Require Permission.SELECT on the base table, and Permission.MODIFY 
on the views
-        Iterator<ViewDefinition> views = View.findAll(keyspace(), 
columnFamily()).iterator();
+        Iterator<ViewMetadata> views = View.findAll(keyspace(), 
columnFamily()).iterator();
         if (views.hasNext())
         {
-            state.hasColumnFamilyAccess(cfm, Permission.SELECT);
+            state.hasColumnFamilyAccess(metadata, Permission.SELECT);
             do
             {
                 state.hasColumnFamilyAccess(views.next().metadata, 
Permission.MODIFY);
@@ -239,12 +245,12 @@ public abstract class ModificationStatement implements 
CQLStatement
         checkFalse(isView(), "Cannot directly modify a materialized view");
     }
 
-    public PartitionColumns updatedColumns()
+    public RegularAndStaticColumns updatedColumns()
     {
         return updatedColumns;
     }
 
-    public PartitionColumns conditionColumns()
+    public RegularAndStaticColumns conditionColumns()
     {
         return conditionColumns;
     }
@@ -256,7 +262,7 @@ public abstract class ModificationStatement implements 
CQLStatement
         // columns is if we set some static columns, and in that case no 
clustering
         // columns should be given. So in practice, it's enough to check if we 
have
         // either the table has no clustering or if it has at least one of 
them set.
-        return cfm.clusteringColumns().isEmpty() || 
restrictions.hasClusteringColumnsRestrictions();
+        return metadata().clusteringColumns().isEmpty() || 
restrictions.hasClusteringColumnsRestrictions();
     }
 
     public boolean updatesStaticRow()
@@ -279,7 +285,7 @@ public abstract class ModificationStatement implements 
CQLStatement
         return operations;
     }
 
-    public Iterable<ColumnDefinition> getColumnsWithConditions()
+    public Iterable<ColumnMetadata> getColumnsWithConditions()
     {
          return conditions.getColumns();
     }
@@ -308,7 +314,7 @@ public abstract class ModificationStatement implements 
CQLStatement
     throws InvalidRequestException
     {
         if (appliesOnlyToStaticColumns() && 
!restrictions.hasClusteringColumnsRestrictions())
-            return FBUtilities.singleton(CBuilder.STATIC_BUILDER.build(), 
cfm.comparator);
+            return FBUtilities.singleton(CBuilder.STATIC_BUILDER.build(), 
metadata().comparator);
 
         return restrictions.getClusteringColumns(options);
     }
@@ -365,12 +371,12 @@ public abstract class ModificationStatement implements 
CQLStatement
         List<SinglePartitionReadCommand> commands = new 
ArrayList<>(partitionKeys.size());
         int nowInSec = FBUtilities.nowInSeconds();
         for (ByteBuffer key : partitionKeys)
-            commands.add(SinglePartitionReadCommand.create(cfm,
+            commands.add(SinglePartitionReadCommand.create(metadata(),
                                                            nowInSec,
                                                            
ColumnFilter.selection(this.requiresRead),
                                                            RowFilter.NONE,
                                                            limits,
-                                                           
cfm.decorateKey(key),
+                                                           
metadata().partitioner.decorateKey(key),
                                                            filter));
 
         SinglePartitionReadCommand.Group group = new 
SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
@@ -424,9 +430,9 @@ public abstract class ModificationStatement implements 
CQLStatement
     {
         ConsistencyLevel cl = options.getConsistency();
         if (isCounter())
-            cl.validateCounterForWrite(cfm);
+            cl.validateCounterForWrite(metadata());
         else
-            cl.validateForWrite(cfm.ksName);
+            cl.validateForWrite(metadata.keyspace);
 
         Collection<? extends IMutation> mutations = getMutations(options, 
false, options.getTimestamp(queryState), queryStartNanoTime);
         if (!mutations.isEmpty())
@@ -461,7 +467,7 @@ public abstract class ModificationStatement implements 
CQLStatement
                    "IN on the partition key is not supported with conditional 
%s",
                    type.isUpdate()? "updates" : "deletions");
 
-        DecoratedKey key = cfm.decorateKey(keys.get(0));
+        DecoratedKey key = metadata().partitioner.decorateKey(keys.get(0));
         long now = options.getTimestamp(queryState);
 
         checkFalse(restrictions.clusteringKeyRestrictionsHasIN(),
@@ -469,7 +475,7 @@ public abstract class ModificationStatement implements 
CQLStatement
                     type.isUpdate()? "updates" : "deletions");
 
         Clustering clustering = 
Iterables.getOnlyElement(createClustering(options));
-        CQL3CasRequest request = new CQL3CasRequest(cfm, key, false, 
conditionColumns(), updatesRegularRows(), updatesStaticRow());
+        CQL3CasRequest request = new CQL3CasRequest(metadata(), key, false, 
conditionColumns(), updatesRegularRows(), updatesStaticRow());
 
         addConditions(clustering, request, options);
         request.addRowUpdate(clustering, this, options, now);
@@ -487,7 +493,7 @@ public abstract class ModificationStatement implements 
CQLStatement
         return buildCasResultSet(keyspace(), columnFamily(), partition, 
getColumnsWithConditions(), false, options);
     }
 
-    public static ResultSet buildCasResultSet(String ksName, String tableName, 
RowIterator partition, Iterable<ColumnDefinition> columnsWithConditions, 
boolean isBatch, QueryOptions options)
+    public static ResultSet buildCasResultSet(String ksName, String tableName, 
RowIterator partition, Iterable<ColumnMetadata> columnsWithConditions, boolean 
isBatch, QueryOptions options)
     throws InvalidRequestException
     {
         boolean success = partition == null;
@@ -523,35 +529,31 @@ public abstract class ModificationStatement implements 
CQLStatement
         return new ResultSet(new ResultSet.ResultMetadata(specs), rows);
     }
 
-    private static ResultSet buildCasFailureResultSet(RowIterator partition, 
Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions 
options)
+    private static ResultSet buildCasFailureResultSet(RowIterator partition, 
Iterable<ColumnMetadata> columnsWithConditions, boolean isBatch, QueryOptions 
options)
     throws InvalidRequestException
     {
-        CFMetaData cfm = partition.metadata();
+        TableMetadata metadata = partition.metadata();
         Selection selection;
         if (columnsWithConditions == null)
         {
-            selection = Selection.wildcard(cfm);
+            selection = Selection.wildcard(metadata);
         }
         else
         {
             // We can have multiple conditions on the same columns (for 
collections) so use a set
             // to avoid duplicate, but preserve the order just to it follows 
the order of IF in the query in general
-            Set<ColumnDefinition> defs = new LinkedHashSet<>();
+            Set<ColumnMetadata> defs = new LinkedHashSet<>();
             // Adding the partition key for batches to disambiguate if the 
conditions span multipe rows (we don't add them outside
             // of batches for compatibility sakes).
             if (isBatch)
-            {
-                defs.addAll(cfm.partitionKeyColumns());
-                defs.addAll(cfm.clusteringColumns());
-            }
-            for (ColumnDefinition def : columnsWithConditions)
-                defs.add(def);
-            selection = Selection.forColumns(cfm, new ArrayList<>(defs));
+                Iterables.addAll(defs, metadata.primaryKeyColumns());
+            Iterables.addAll(defs, columnsWithConditions);
+            selection = Selection.forColumns(metadata, new ArrayList<>(defs));
 
         }
 
         Selection.ResultSetBuilder builder = 
selection.resultSetBuilder(options, false);
-        SelectStatement.forSelection(cfm, 
selection).processPartition(partition,
+        SelectStatement.forSelection(metadata, 
selection).processPartition(partition,
                                                                       options,
                                                                       builder,
                                                                       
FBUtilities.nowInSeconds());
@@ -616,7 +618,7 @@ public abstract class ModificationStatement implements 
CQLStatement
      */
     private Collection<? extends IMutation> getMutations(QueryOptions options, 
boolean local, long now, long queryStartNanoTime)
     {
-        UpdatesCollector collector = new 
UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), 1);
+        UpdatesCollector collector = new 
UpdatesCollector(Collections.singletonMap(metadata.id, updatedColumns), 1);
         addUpdates(collector, options, local, now, queryStartNanoTime);
         collector.validateIndexedColumns();
 
@@ -650,10 +652,10 @@ public abstract class ModificationStatement implements 
CQLStatement
                                                            queryStartNanoTime);
             for (ByteBuffer key : keys)
             {
-                Validation.validateKey(cfm, key);
-                DecoratedKey dk = cfm.decorateKey(key);
+                Validation.validateKey(metadata(), key);
+                DecoratedKey dk = metadata().partitioner.decorateKey(key);
 
-                PartitionUpdate upd = collector.getPartitionUpdate(cfm, dk, 
options.getConsistency());
+                PartitionUpdate upd = collector.getPartitionUpdate(metadata(), 
dk, options.getConsistency());
 
                 for (Slice slice : slices)
                     addUpdateForKey(upd, slice, params);
@@ -667,10 +669,10 @@ public abstract class ModificationStatement implements 
CQLStatement
 
             for (ByteBuffer key : keys)
             {
-                Validation.validateKey(cfm, key);
-                DecoratedKey dk = cfm.decorateKey(key);
+                Validation.validateKey(metadata(), key);
+                DecoratedKey dk = metadata().partitioner.decorateKey(key);
 
-                PartitionUpdate upd = collector.getPartitionUpdate(cfm, dk, 
options.getConsistency());
+                PartitionUpdate upd = collector.getPartitionUpdate(metadata, 
dk, options.getConsistency());
 
                 if (!restrictions.hasClusteringColumnsRestrictions())
                 {
@@ -738,14 +740,14 @@ public abstract class ModificationStatement implements 
CQLStatement
     {
         // Some lists operation requires reading
         Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, 
limits, local, options.getConsistency(), queryStartNanoTime);
-        return new UpdateParameters(cfm, updatedColumns(), options, 
getTimestamp(now, options), getTimeToLive(options), lists);
+        return new UpdateParameters(metadata(), updatedColumns(), options, 
getTimestamp(now, options), getTimeToLive(options), lists);
     }
 
     private Slices toSlices(SortedSet<ClusteringBound> startBounds, 
SortedSet<ClusteringBound> endBounds)
     {
         assert startBounds.size() == endBounds.size();
 
-        Slices.Builder builder = new Slices.Builder(cfm.comparator);
+        Slices.Builder builder = new Slices.Builder(metadata().comparator);
 
         Iterator<ClusteringBound> starts = startBounds.iterator();
         Iterator<ClusteringBound> ends = endBounds.iterator();
@@ -753,7 +755,7 @@ public abstract class ModificationStatement implements 
CQLStatement
         while (starts.hasNext())
         {
             Slice slice = Slice.make(starts.next(), ends.next());
-            if (!slice.isEmpty(cfm.comparator))
+            if (!slice.isEmpty(metadata().comparator))
             {
                 builder.add(slice);
             }
@@ -766,21 +768,21 @@ public abstract class ModificationStatement implements 
CQLStatement
     {
         protected final StatementType type;
         private final Attributes.Raw attrs;
-        private final List<Pair<ColumnDefinition.Raw, ColumnCondition.Raw>> 
conditions;
+        private final List<Pair<ColumnMetadata.Raw, ColumnCondition.Raw>> 
conditions;
         private final boolean ifNotExists;
         private final boolean ifExists;
 
         protected Parsed(CFName name,
                          StatementType type,
                          Attributes.Raw attrs,
-                         List<Pair<ColumnDefinition.Raw, ColumnCondition.Raw>> 
conditions,
+                         List<Pair<ColumnMetadata.Raw, ColumnCondition.Raw>> 
conditions,
                          boolean ifNotExists,
                          boolean ifExists)
         {
             super(name);
             this.type = type;
             this.attrs = attrs;
-            this.conditions = conditions == null ? 
Collections.<Pair<ColumnDefinition.Raw, ColumnCondition.Raw>>emptyList() : 
conditions;
+            this.conditions = conditions == null ? 
Collections.<Pair<ColumnMetadata.Raw, ColumnCondition.Raw>>emptyList() : 
conditions;
             this.ifNotExists = ifNotExists;
             this.ifExists = ifExists;
         }
@@ -789,23 +791,20 @@ public abstract class ModificationStatement implements 
CQLStatement
         {
             VariableSpecifications boundNames = getBoundVariables();
             ModificationStatement statement = prepare(boundNames);
-            CFMetaData cfm = Validation.validateColumnFamily(keyspace(), 
columnFamily());
-            return new ParsedStatement.Prepared(statement, boundNames, 
boundNames.getPartitionKeyBindIndexes(cfm));
+            TableMetadata metadata = Schema.instance.validateTable(keyspace(), 
columnFamily());
+            return new ParsedStatement.Prepared(statement, boundNames, 
boundNames.getPartitionKeyBindIndexes(metadata));
         }
 
         public ModificationStatement prepare(VariableSpecifications boundNames)
         {
-            CFMetaData metadata = Validation.validateColumnFamily(keyspace(), 
columnFamily());
+            TableMetadata metadata = Schema.instance.validateTable(keyspace(), 
columnFamily());
 
             Attributes preparedAttributes = attrs.prepare(keyspace(), 
columnFamily());
             preparedAttributes.collectMarkerSpecification(boundNames);
 
             Conditions preparedConditions = prepareConditions(metadata, 
boundNames);
 
-            return prepareInternal(metadata,
-                                   boundNames,
-                                   preparedConditions,
-                                   preparedAttributes);
+            return prepareInternal(metadata, boundNames, preparedConditions, 
preparedAttributes);
         }
 
         /**
@@ -815,7 +814,7 @@ public abstract class ModificationStatement implements 
CQLStatement
          * @param boundNames the bound names
          * @return the column conditions.
          */
-        private Conditions prepareConditions(CFMetaData metadata, 
VariableSpecifications boundNames)
+        private Conditions prepareConditions(TableMetadata metadata, 
VariableSpecifications boundNames)
         {
             // To have both 'IF EXISTS'/'IF NOT EXISTS' and some other 
conditions doesn't make sense.
             // So far this is enforced by the parser, but let's assert it for 
sanity if ever the parse changes.
@@ -846,15 +845,15 @@ public abstract class ModificationStatement implements 
CQLStatement
          * @param boundNames the bound names
          * @return the column conditions.
          */
-        private ColumnConditions prepareColumnConditions(CFMetaData metadata, 
VariableSpecifications boundNames)
+        private ColumnConditions prepareColumnConditions(TableMetadata 
metadata, VariableSpecifications boundNames)
         {
             checkNull(attrs.timestamp, "Cannot provide custom timestamp for 
conditional updates");
 
             ColumnConditions.Builder builder = ColumnConditions.newBuilder();
 
-            for (Pair<ColumnDefinition.Raw, ColumnCondition.Raw> entry : 
conditions)
+            for (Pair<ColumnMetadata.Raw, ColumnCondition.Raw> entry : 
conditions)
             {
-                ColumnDefinition def = entry.left.prepare(metadata);
+                ColumnMetadata def = entry.left.prepare(metadata);
                 ColumnCondition condition = entry.right.prepare(keyspace(), 
def, metadata);
                 condition.collectMarkerSpecification(boundNames);
 
@@ -864,7 +863,7 @@ public abstract class ModificationStatement implements 
CQLStatement
             return builder.build();
         }
 
-        protected abstract ModificationStatement prepareInternal(CFMetaData 
cfm,
+        protected abstract ModificationStatement prepareInternal(TableMetadata 
metadata,
                                                                  
VariableSpecifications boundNames,
                                                                  Conditions 
conditions,
                                                                  Attributes 
attrs);
@@ -872,14 +871,14 @@ public abstract class ModificationStatement implements 
CQLStatement
         /**
          * Creates the restrictions.
          *
-         * @param cfm the column family meta data
+         * @param metadata the column family meta data
          * @param boundNames the bound names
          * @param operations the column operations
          * @param where the where clause
          * @param conditions the conditions
          * @return the restrictions
          */
-        protected StatementRestrictions newRestrictions(CFMetaData cfm,
+        protected StatementRestrictions newRestrictions(TableMetadata metadata,
                                                         VariableSpecifications 
boundNames,
                                                         Operations operations,
                                                         WhereClause where,
@@ -889,19 +888,19 @@ public abstract class ModificationStatement implements 
CQLStatement
                 throw new 
InvalidRequestException(CUSTOM_EXPRESSIONS_NOT_ALLOWED);
 
             boolean applyOnlyToStaticColumns = 
appliesOnlyToStaticColumns(operations, conditions);
-            return new StatementRestrictions(type, cfm, where, boundNames, 
applyOnlyToStaticColumns, false, false);
+            return new StatementRestrictions(type, metadata, where, 
boundNames, applyOnlyToStaticColumns, false, false);
         }
 
         /**
-         * Retrieves the <code>ColumnDefinition</code> corresponding to the 
specified raw <code>ColumnIdentifier</code>.
+         * Retrieves the <code>ColumnMetadata</code> corresponding to the 
specified raw <code>ColumnIdentifier</code>.
          *
-         * @param cfm the column family meta data
+         * @param metadata the column family meta data
          * @param rawId the raw <code>ColumnIdentifier</code>
-         * @return the <code>ColumnDefinition</code> corresponding to the 
specified raw <code>ColumnIdentifier</code>
+         * @return the <code>ColumnMetadata</code> corresponding to the 
specified raw <code>ColumnIdentifier</code>
          */
-        protected static ColumnDefinition getColumnDefinition(CFMetaData cfm, 
Raw rawId)
+        protected static ColumnMetadata getColumnDefinition(TableMetadata 
metadata, Raw rawId)
         {
-            return rawId.prepare(cfm);
+            return rawId.prepare(metadata);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
index 3ae6bd8..8d508fc 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java
@@ -21,7 +21,7 @@ import java.util.Set;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.RoleName;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7e66dc4..652b549 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -26,8 +26,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
@@ -82,7 +84,7 @@ public class SelectStatement implements CQLStatement
     public static final int DEFAULT_PAGE_SIZE = 10000;
 
     private final int boundTerms;
-    public final CFMetaData cfm;
+    public final TableMetadata table;
     public final Parameters parameters;
     private final Selection selection;
     private final Term limit;
@@ -111,7 +113,7 @@ public class SelectStatement implements CQLStatement
                                                                        false,
                                                                        false);
 
-    public SelectStatement(CFMetaData cfm,
+    public SelectStatement(TableMetadata table,
                            int boundTerms,
                            Parameters parameters,
                            Selection selection,
@@ -122,7 +124,7 @@ public class SelectStatement implements CQLStatement
                            Term limit,
                            Term perPartitionLimit)
     {
-        this.cfm = cfm;
+        this.table = table;
         this.boundTerms = boundTerms;
         this.selection = selection;
         this.restrictions = restrictions;
@@ -159,11 +161,11 @@ public class SelectStatement implements CQLStatement
     private ColumnFilter gatherQueriedColumns()
     {
         if (selection.isWildcard())
-            return ColumnFilter.all(cfm);
+            return ColumnFilter.all(table);
 
-        ColumnFilter.Builder builder = 
ColumnFilter.allRegularColumnsBuilder(cfm);
+        ColumnFilter.Builder builder = 
ColumnFilter.allRegularColumnsBuilder(table);
         // Adds all selected columns
-        for (ColumnDefinition def : selection.getColumns())
+        for (ColumnMetadata def : selection.getColumns())
             if (!def.isPrimaryKeyColumn())
                 builder.add(def);
         // as well as any restricted column (so we can actually apply the 
restriction)
@@ -171,8 +173,8 @@ public class SelectStatement implements CQLStatement
 
         // In a number of cases, we want to distinguish between a partition 
truly empty and one with only static content
         // (but no rows). In those cases, we should force querying all static 
columns (to make the distinction).
-        if (cfm.hasStaticColumns() && 
returnStaticContentOnPartitionWithNoRows())
-            builder.addAll(cfm.partitionColumns().statics);
+        if (table.hasStaticColumns() && 
returnStaticContentOnPartitionWithNoRows())
+            builder.addAll(table.staticColumns());
 
         return builder.build();
     }
@@ -189,13 +191,13 @@ public class SelectStatement implements CQLStatement
     // Creates a simple select based on the given selection.
     // Note that the results select statement should not be used for actual 
queries, but only for processing already
     // queried data through processColumnFamily.
-    static SelectStatement forSelection(CFMetaData cfm, Selection selection)
+    static SelectStatement forSelection(TableMetadata table, Selection 
selection)
     {
-        return new SelectStatement(cfm,
+        return new SelectStatement(table,
                                    0,
                                    defaultParameters,
                                    selection,
-                                   
StatementRestrictions.empty(StatementType.SELECT, cfm),
+                                   
StatementRestrictions.empty(StatementType.SELECT, table),
                                    false,
                                    null,
                                    null,
@@ -215,15 +217,15 @@ public class SelectStatement implements CQLStatement
 
     public void checkAccess(ClientState state) throws InvalidRequestException, 
UnauthorizedException
     {
-        if (cfm.isView())
+        if (table.isView())
         {
-            CFMetaData baseTable = View.findBaseTable(keyspace(), 
columnFamily());
+            TableMetadataRef baseTable = View.findBaseTable(keyspace(), 
columnFamily());
             if (baseTable != null)
                 state.hasColumnFamilyAccess(baseTable, Permission.SELECT);
         }
         else
         {
-            state.hasColumnFamilyAccess(cfm, Permission.SELECT);
+            state.hasColumnFamilyAccess(table, Permission.SELECT);
         }
 
         for (Function function : getFunctions())
@@ -453,12 +455,12 @@ public class SelectStatement implements CQLStatement
 
     public String keyspace()
     {
-        return cfm.ksName;
+        return table.keyspace;
     }
 
     public String columnFamily()
     {
-        return cfm.cfName;
+        return table.name;
     }
 
     /**
@@ -495,8 +497,8 @@ public class SelectStatement implements CQLStatement
         for (ByteBuffer key : keys)
         {
             QueryProcessor.validateKey(key);
-            DecoratedKey dk = cfm.decorateKey(ByteBufferUtil.clone(key));
-            commands.add(SinglePartitionReadCommand.create(cfm, nowInSec, 
queriedColumns, rowFilter, limit, dk, filter));
+            DecoratedKey dk = 
table.partitioner.decorateKey(ByteBufferUtil.clone(key));
+            commands.add(SinglePartitionReadCommand.create(table, nowInSec, 
queriedColumns, rowFilter, limit, dk, filter));
         }
 
         return new SinglePartitionReadCommand.Group(commands, limit);
@@ -517,7 +519,7 @@ public class SelectStatement implements CQLStatement
         if (filter instanceof ClusteringIndexSliceFilter)
             return ((ClusteringIndexSliceFilter)filter).requestedSlices();
 
-        Slices.Builder builder = new Slices.Builder(cfm.comparator);
+        Slices.Builder builder = new Slices.Builder(table.comparator);
         for (Clustering clustering: 
((ClusteringIndexNamesFilter)filter).requestedRows())
             builder.add(Slice.make(clustering));
         return builder.build();
@@ -532,7 +534,7 @@ public class SelectStatement implements CQLStatement
         QueryOptions options = 
QueryOptions.forInternalCalls(Collections.emptyList());
         ClusteringIndexFilter filter = makeClusteringIndexFilter(options);
         RowFilter rowFilter = getRowFilter(options);
-        return SinglePartitionReadCommand.create(cfm, nowInSec, 
queriedColumns, rowFilter, DataLimits.NONE, key, filter);
+        return SinglePartitionReadCommand.create(table, nowInSec, 
queriedColumns, rowFilter, DataLimits.NONE, key, filter);
     }
 
     /**
@@ -557,7 +559,7 @@ public class SelectStatement implements CQLStatement
         if (keyBounds == null)
             return ReadQuery.EMPTY;
 
-        PartitionRangeReadCommand command = new PartitionRangeReadCommand(cfm,
+        PartitionRangeReadCommand command = new 
PartitionRangeReadCommand(table,
                                                                           
nowInSec,
                                                                           
queriedColumns,
                                                                           
rowFilter,
@@ -622,12 +624,12 @@ public class SelectStatement implements CQLStatement
         {
             ClusteringBound start = startBounds.first();
             ClusteringBound end = endBounds.first();
-            return cfm.comparator.compare(start, end) > 0
+            return table.comparator.compare(start, end) > 0
                  ? Slices.NONE
-                 : Slices.with(cfm.comparator, Slice.make(start, end));
+                 : Slices.with(table.comparator, Slice.make(start, end));
         }
 
-        Slices.Builder builder = new Slices.Builder(cfm.comparator, 
startBounds.size());
+        Slices.Builder builder = new Slices.Builder(table.comparator, 
startBounds.size());
         Iterator<ClusteringBound> startIter = startBounds.iterator();
         Iterator<ClusteringBound> endIter = endBounds.iterator();
         while (startIter.hasNext() && endIter.hasNext())
@@ -636,7 +638,7 @@ public class SelectStatement implements CQLStatement
             ClusteringBound end = endIter.next();
 
             // Ignore slices that are nonsensical
-            if (cfm.comparator.compare(start, end) > 0)
+            if (table.comparator.compare(start, end) > 0)
                 continue;
 
             builder.add(start, end);
@@ -774,12 +776,12 @@ public class SelectStatement implements CQLStatement
         return cqlRows;
     }
 
-    public static ByteBuffer[] getComponents(CFMetaData cfm, DecoratedKey dk)
+    public static ByteBuffer[] getComponents(TableMetadata metadata, 
DecoratedKey dk)
     {
         ByteBuffer key = dk.getKey();
-        if (cfm.getKeyValidator() instanceof CompositeType)
+        if (metadata.partitionKeyType instanceof CompositeType)
         {
-            return ((CompositeType)cfm.getKeyValidator()).split(key);
+            return ((CompositeType)metadata.partitionKeyType).split(key);
         }
         else
         {
@@ -796,7 +798,7 @@ public class SelectStatement implements CQLStatement
         // query, then we include that content.
         // We make an exception for "static compact" table are from a CQL 
standpoint we always want to show their static
         // content for backward compatiblity.
-        return queriesFullPartitions() || cfm.isStaticCompactTable();
+        return queriesFullPartitions() || table.isStaticCompactTable();
     }
 
     // Used by ModificationStatement for CAS operations
@@ -805,7 +807,7 @@ public class SelectStatement implements CQLStatement
     {
         ProtocolVersion protocolVersion = options.getProtocolVersion();
 
-        ByteBuffer[] keyComponents = getComponents(cfm, 
partition.partitionKey());
+        ByteBuffer[] keyComponents = getComponents(table, 
partition.partitionKey());
 
         Row staticRow = partition.staticRow();
         // If there is no rows, we include the static content if we should and 
we're done.
@@ -814,7 +816,7 @@ public class SelectStatement implements CQLStatement
             if (!staticRow.isEmpty() && 
returnStaticContentOnPartitionWithNoRows())
             {
                 result.newRow(partition.partitionKey(), 
staticRow.clustering());
-                for (ColumnDefinition def : selection.getColumns())
+                for (ColumnMetadata def : selection.getColumns())
                 {
                     switch (def.kind)
                     {
@@ -837,7 +839,7 @@ public class SelectStatement implements CQLStatement
             Row row = partition.next();
             result.newRow( partition.partitionKey(), row.clustering());
             // Respect selection order
-            for (ColumnDefinition def : selection.getColumns())
+            for (ColumnMetadata def : selection.getColumns())
             {
                 switch (def.kind)
                 {
@@ -867,7 +869,7 @@ public class SelectStatement implements CQLStatement
         return !restrictions.hasClusteringColumnsRestrictions() && 
!restrictions.hasRegularColumnsRestrictions();
     }
 
-    private static void addValue(Selection.ResultSetBuilder result, 
ColumnDefinition def, Row row, int nowInSec, ProtocolVersion protocolVersion)
+    private static void addValue(Selection.ResultSetBuilder result, 
ColumnMetadata def, Row row, int nowInSec, ProtocolVersion protocolVersion)
     {
         if (def.isComplex())
         {
@@ -932,22 +934,22 @@ public class SelectStatement implements CQLStatement
 
         public ParsedStatement.Prepared prepare(boolean forView) throws 
InvalidRequestException
         {
-            CFMetaData cfm = Validation.validateColumnFamily(keyspace(), 
columnFamily());
+            TableMetadata table = Schema.instance.validateTable(keyspace(), 
columnFamily());
             VariableSpecifications boundNames = getBoundVariables();
 
             Selection selection = selectClause.isEmpty()
-                                  ? Selection.wildcard(cfm)
-                                  : Selection.fromSelectors(cfm, selectClause, 
boundNames, !parameters.groups.isEmpty());
+                                  ? Selection.wildcard(table)
+                                  : Selection.fromSelectors(table, 
selectClause, boundNames, !parameters.groups.isEmpty());
 
-            StatementRestrictions restrictions = prepareRestrictions(cfm, 
boundNames, selection, forView);
+            StatementRestrictions restrictions = prepareRestrictions(table, 
boundNames, selection, forView);
 
             if (parameters.isDistinct)
             {
                 checkNull(perPartitionLimit, "PER PARTITION LIMIT is not 
allowed with SELECT DISTINCT queries");
-                validateDistinctSelection(cfm, selection, restrictions);
+                validateDistinctSelection(table, selection, restrictions);
             }
 
-            AggregationSpecification aggregationSpec = 
getAggregationSpecification(cfm,
+            AggregationSpecification aggregationSpec = 
getAggregationSpecification(table,
                                                                                
    selection,
                                                                                
    restrictions,
                                                                                
    parameters.isDistinct);
@@ -962,15 +964,15 @@ public class SelectStatement implements CQLStatement
             {
                 assert !forView;
                 verifyOrderingIsAllowed(restrictions);
-                orderingComparator = getOrderingComparator(cfm, selection, 
restrictions);
-                isReversed = isReversed(cfm);
+                orderingComparator = getOrderingComparator(table, selection, 
restrictions);
+                isReversed = isReversed(table);
                 if (isReversed)
                     orderingComparator = 
Collections.reverseOrder(orderingComparator);
             }
 
             checkNeedsFiltering(restrictions);
 
-            SelectStatement stmt = new SelectStatement(cfm,
+            SelectStatement stmt = new SelectStatement(table,
                                                        boundNames.size(),
                                                        parameters,
                                                        selection,
@@ -981,25 +983,25 @@ public class SelectStatement implements CQLStatement
                                                        
prepareLimit(boundNames, limit, keyspace(), limitReceiver()),
                                                        
prepareLimit(boundNames, perPartitionLimit, keyspace(), 
perPartitionLimitReceiver()));
 
-            return new ParsedStatement.Prepared(stmt, boundNames, 
boundNames.getPartitionKeyBindIndexes(cfm));
+            return new ParsedStatement.Prepared(stmt, boundNames, 
boundNames.getPartitionKeyBindIndexes(table));
         }
 
         /**
          * Prepares the restrictions.
          *
-         * @param cfm the column family meta data
+         * @param metadata the column family meta data
          * @param boundNames the variable specifications
          * @param selection the selection
          * @return the restrictions
          * @throws InvalidRequestException if a problem occurs while building 
the restrictions
          */
-        private StatementRestrictions prepareRestrictions(CFMetaData cfm,
+        private StatementRestrictions prepareRestrictions(TableMetadata 
metadata,
                                                           
VariableSpecifications boundNames,
                                                           Selection selection,
                                                           boolean forView) 
throws InvalidRequestException
         {
             return new StatementRestrictions(StatementType.SELECT,
-                                             cfm,
+                                             metadata,
                                              whereClause,
                                              boundNames,
                                              
selection.containsOnlyStaticColumns(),
@@ -1025,17 +1027,17 @@ public class SelectStatement implements CQLStatement
             checkFalse(restrictions.isKeyRange(), "ORDER BY is only supported 
when the partition key is restricted by an EQ or an IN.");
         }
 
-        private static void validateDistinctSelection(CFMetaData cfm,
+        private static void validateDistinctSelection(TableMetadata metadata,
                                                       Selection selection,
                                                       StatementRestrictions 
restrictions)
                                                       throws 
InvalidRequestException
         {
             checkFalse(restrictions.hasClusteringColumnsRestrictions() ||
-                       (restrictions.hasNonPrimaryKeyRestrictions() && 
!restrictions.nonPKRestrictedColumns(true).stream().allMatch(ColumnDefinition::isStatic)),
+                       (restrictions.hasNonPrimaryKeyRestrictions() && 
!restrictions.nonPKRestrictedColumns(true).stream().allMatch(ColumnMetadata::isStatic)),
                        "SELECT DISTINCT with WHERE clause only supports 
restriction by partition key and/or static columns.");
 
-            Collection<ColumnDefinition> requestedColumns = 
selection.getColumns();
-            for (ColumnDefinition def : requestedColumns)
+            Collection<ColumnMetadata> requestedColumns = 
selection.getColumns();
+            for (ColumnMetadata def : requestedColumns)
                 checkFalse(!def.isPartitionKey() && !def.isStatic(),
                            "SELECT DISTINCT queries must only request 
partition key columns and/or static columns (not %s)",
                            def.name);
@@ -1045,7 +1047,7 @@ public class SelectStatement implements CQLStatement
             if (!restrictions.isKeyRange())
                 return;
 
-            for (ColumnDefinition def : cfm.partitionKeyColumns())
+            for (ColumnMetadata def : metadata.partitionKeyColumns())
                 checkTrue(requestedColumns.contains(def),
                           "SELECT DISTINCT queries must request all the 
partition key columns (missing %s)", def.name);
         }
@@ -1053,13 +1055,13 @@ public class SelectStatement implements CQLStatement
         /**
          * Creates the <code>AggregationSpecification</code>s used to make the 
aggregates.
          *
-         * @param cfm the column family metadata
+         * @param metadata the table metadata
          * @param selection the selection
          * @param restrictions the restrictions
          * @param isDistinct <code>true</code> if the query is a DISTINCT one. 
          * @return the <code>AggregationSpecification</code>s used to make the 
aggregates
          */
-        private AggregationSpecification 
getAggregationSpecification(CFMetaData cfm,
+        private AggregationSpecification 
getAggregationSpecification(TableMetadata metadata,
                                                                      Selection 
selection,
                                                                      
StatementRestrictions restrictions,
                                                                      boolean 
isDistinct)
@@ -1070,10 +1072,10 @@ public class SelectStatement implements CQLStatement
 
             int clusteringPrefixSize = 0;
 
-            Iterator<ColumnDefinition> pkColumns = 
cfm.primaryKeyColumns().iterator();
-            for (ColumnDefinition.Raw raw : parameters.groups)
+            Iterator<ColumnMetadata> pkColumns = 
metadata.primaryKeyColumns().iterator();
+            for (ColumnMetadata.Raw raw : parameters.groups)
             {
-                ColumnDefinition def = raw.prepare(cfm);
+                ColumnMetadata def = raw.prepare(metadata);
 
                 checkTrue(def.isPartitionKey() || def.isClusteringColumn(),
                           "Group by is currently only supported on the columns 
of the PRIMARY KEY, got %s", def.name);
@@ -1083,7 +1085,7 @@ public class SelectStatement implements CQLStatement
                     checkTrue(pkColumns.hasNext(),
                               "Group by currently only support groups of 
columns following their declared order in the PRIMARY KEY");
 
-                    ColumnDefinition pkColumn = pkColumns.next();
+                    ColumnMetadata pkColumn = pkColumns.next();
 
                     if (pkColumn.isClusteringColumn())
                         clusteringPrefixSize++;
@@ -1104,10 +1106,10 @@ public class SelectStatement implements CQLStatement
             checkFalse(clusteringPrefixSize > 0 && isDistinct,
                        "Grouping on clustering columns is not allowed for 
SELECT DISTINCT queries");
 
-            return AggregationSpecification.aggregatePkPrefix(cfm.comparator, 
clusteringPrefixSize);
+            return 
AggregationSpecification.aggregatePkPrefix(metadata.comparator, 
clusteringPrefixSize);
         }
 
-        private Comparator<List<ByteBuffer>> getOrderingComparator(CFMetaData 
cfm,
+        private Comparator<List<ByteBuffer>> 
getOrderingComparator(TableMetadata metadata,
                                                                    Selection 
selection,
                                                                    
StatementRestrictions restrictions)
                                                                    throws 
InvalidRequestException
@@ -1115,14 +1117,14 @@ public class SelectStatement implements CQLStatement
             if (!restrictions.keyIsInRelation())
                 return null;
 
-            Map<ColumnIdentifier, Integer> orderingIndexes = 
getOrderingIndex(cfm, selection);
+            Map<ColumnIdentifier, Integer> orderingIndexes = 
getOrderingIndex(metadata, selection);
 
             List<Integer> idToSort = new ArrayList<Integer>();
             List<Comparator<ByteBuffer>> sorters = new 
ArrayList<Comparator<ByteBuffer>>();
 
-            for (ColumnDefinition.Raw raw : parameters.orderings.keySet())
+            for (ColumnMetadata.Raw raw : parameters.orderings.keySet())
             {
-                ColumnDefinition orderingColumn = raw.prepare(cfm);
+                ColumnMetadata orderingColumn = raw.prepare(metadata);
                 idToSort.add(orderingIndexes.get(orderingColumn.name));
                 sorters.add(orderingColumn.type);
             }
@@ -1130,16 +1132,16 @@ public class SelectStatement implements CQLStatement
                     : new CompositeComparator(sorters, idToSort);
         }
 
-        private Map<ColumnIdentifier, Integer> getOrderingIndex(CFMetaData 
cfm, Selection selection)
+        private Map<ColumnIdentifier, Integer> getOrderingIndex(TableMetadata 
table, Selection selection)
                 throws InvalidRequestException
         {
             // If we order post-query (see orderResults), the sorted column 
needs to be in the ResultSet for sorting,
             // even if we don't
             // ultimately ship them to the client (CASSANDRA-4911).
             Map<ColumnIdentifier, Integer> orderingIndexes = new HashMap<>();
-            for (ColumnDefinition.Raw raw : parameters.orderings.keySet())
+            for (ColumnMetadata.Raw raw : parameters.orderings.keySet())
             {
-                final ColumnDefinition def = raw.prepare(cfm);
+                final ColumnMetadata def = raw.prepare(table);
                 int index = selection.getResultSetIndex(def);
                 if (index < 0)
                     index = selection.addColumnForOrdering(def);
@@ -1148,13 +1150,13 @@ public class SelectStatement implements CQLStatement
             return orderingIndexes;
         }
 
-        private boolean isReversed(CFMetaData cfm) throws 
InvalidRequestException
+        private boolean isReversed(TableMetadata table) throws 
InvalidRequestException
         {
-            Boolean[] reversedMap = new 
Boolean[cfm.clusteringColumns().size()];
+            Boolean[] reversedMap = new 
Boolean[table.clusteringColumns().size()];
             int i = 0;
-            for (Map.Entry<ColumnDefinition.Raw, Boolean> entry : 
parameters.orderings.entrySet())
+            for (Map.Entry<ColumnMetadata.Raw, Boolean> entry : 
parameters.orderings.entrySet())
             {
-                ColumnDefinition def = entry.getKey().prepare(cfm);
+                ColumnMetadata def = entry.getKey().prepare(table);
                 boolean reversed = entry.getValue();
 
                 checkTrue(def.isClusteringColumn(),
@@ -1223,14 +1225,14 @@ public class SelectStatement implements CQLStatement
     public static class Parameters
     {
         // Public because CASSANDRA-9858
-        public final Map<ColumnDefinition.Raw, Boolean> orderings;
-        public final List<ColumnDefinition.Raw> groups;
+        public final Map<ColumnMetadata.Raw, Boolean> orderings;
+        public final List<ColumnMetadata.Raw> groups;
         public final boolean isDistinct;
         public final boolean allowFiltering;
         public final boolean isJson;
 
-        public Parameters(Map<ColumnDefinition.Raw, Boolean> orderings,
-                          List<ColumnDefinition.Raw> groups,
+        public Parameters(Map<ColumnMetadata.Raw, Boolean> orderings,
+                          List<ColumnMetadata.Raw> groups,
                           boolean isDistinct,
                           boolean allowFiltering,
                           boolean isJson)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java 
b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
index 63e1eec..779b6c2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.statements;
 
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 import com.google.common.collect.ImmutableSet;
 
@@ -61,15 +60,15 @@ public final class TableAttributes extends 
PropertyDefinitions
     {
         if (getId() != null)
             throw new ConfigurationException("Cannot alter table id.");
-        return build(TableParams.builder(previous));
+        return build(previous.unbuild());
     }
 
-    public UUID getId() throws ConfigurationException
+    public TableId getId() throws ConfigurationException
     {
         String id = getSimple(KW_ID);
         try
         {
-            return id != null ? UUID.fromString(id) : null;
+            return id != null ? TableId.fromString(id) : null;
         }
         catch (IllegalArgumentException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 927cdda..300d8f4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -20,16 +20,16 @@ package org.apache.cassandra.cql3.statements;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class TruncateStatement extends CFStatement implements CQLStatement
 {
@@ -55,14 +55,14 @@ public class TruncateStatement extends CFStatement 
implements CQLStatement
 
     public void validate(ClientState state) throws InvalidRequestException
     {
-        Validation.validateColumnFamily(keyspace(), columnFamily());
+        Schema.instance.validateTable(keyspace(), columnFamily());
     }
 
     public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime) throws InvalidRequestException, TruncateException
     {
         try
         {
-            CFMetaData metaData = Schema.instance.getCFMetaData(keyspace(), 
columnFamily());
+            TableMetadata metaData = 
Schema.instance.getTableMetadata(keyspace(), columnFamily());
             if (metaData.isView())
                 throw new InvalidRequestException("Cannot TRUNCATE 
materialized view directly; must truncate base table instead");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 01d47bd..7a2a1ba 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -21,8 +21,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
 import org.apache.cassandra.cql3.conditions.Conditions;
@@ -31,6 +29,8 @@ import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.CompactTables;
 import org.apache.cassandra.db.Slice;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -48,13 +48,13 @@ public class UpdateStatement extends ModificationStatement
 
     private UpdateStatement(StatementType type,
                             int boundTerms,
-                            CFMetaData cfm,
+                            TableMetadata metadata,
                             Operations operations,
                             StatementRestrictions restrictions,
                             Conditions conditions,
                             Attributes attrs)
     {
-        super(type, boundTerms, cfm, operations, restrictions, conditions, 
attrs);
+        super(type, boundTerms, metadata, operations, restrictions, 
conditions, attrs);
     }
 
     public boolean requireFullClusteringKey()
@@ -72,7 +72,7 @@ public class UpdateStatement extends ModificationStatement
             // We update the row timestamp (ex-row marker) only on INSERT 
(#6782)
             // Further, COMPACT tables semantic differs from "CQL3" ones in 
that a row exists only if it has
             // a non-null column, so we don't want to set the row timestamp 
for them.
-            if (type.isInsert() && cfm.isCQLTable())
+            if (type.isInsert() && metadata().isCQLTable())
                 params.addPrimaryKeyLivenessInfo();
 
             List<Operation> updates = getRegularOperations();
@@ -80,13 +80,13 @@ public class UpdateStatement extends ModificationStatement
             // For compact table, we don't accept an insert/update that only 
sets the PK unless the is no
             // declared non-PK columns (which we recognize because in that case
             // the compact value is of type "EmptyType").
-            if (cfm.isCompactTable() && updates.isEmpty())
+            if (metadata().isCompactTable() && updates.isEmpty())
             {
-                checkTrue(CompactTables.hasEmptyCompactValue(cfm),
+                checkTrue(CompactTables.hasEmptyCompactValue(metadata),
                           "Column %s is mandatory for this COMPACT STORAGE 
table",
-                          cfm.compactValueColumn().name);
+                          metadata().compactValueColumn.name);
 
-                updates = Collections.<Operation>singletonList(new 
Constants.Setter(cfm.compactValueColumn(), EMPTY));
+                updates = Collections.<Operation>singletonList(new 
Constants.Setter(metadata().compactValueColumn, EMPTY));
             }
 
             for (Operation op : updates)
@@ -112,7 +112,7 @@ public class UpdateStatement extends ModificationStatement
 
     public static class ParsedInsert extends ModificationStatement.Parsed
     {
-        private final List<ColumnDefinition.Raw> columnNames;
+        private final List<ColumnMetadata.Raw> columnNames;
         private final List<Term.Raw> columnValues;
 
         /**
@@ -126,7 +126,7 @@ public class UpdateStatement extends ModificationStatement
          */
         public ParsedInsert(CFName name,
                             Attributes.Raw attrs,
-                            List<ColumnDefinition.Raw> columnNames,
+                            List<ColumnMetadata.Raw> columnNames,
                             List<Term.Raw> columnValues,
                             boolean ifNotExists)
         {
@@ -136,14 +136,14 @@ public class UpdateStatement extends ModificationStatement
         }
 
         @Override
-        protected ModificationStatement prepareInternal(CFMetaData cfm,
+        protected ModificationStatement prepareInternal(TableMetadata metadata,
                                                         VariableSpecifications 
boundNames,
                                                         Conditions conditions,
                                                         Attributes attrs)
         {
 
             // Created from an INSERT
-            checkFalse(cfm.isCounter(), "INSERT statements are not allowed on 
counter tables, use UPDATE instead");
+            checkFalse(metadata.isCounter(), "INSERT statements are not 
allowed on counter tables, use UPDATE instead");
 
             checkFalse(columnNames == null, "Column names for INSERT must be 
provided when using VALUES");
             checkFalse(columnNames.isEmpty(), "No columns provided to INSERT");
@@ -156,7 +156,7 @@ public class UpdateStatement extends ModificationStatement
 
             for (int i = 0; i < columnNames.size(); i++)
             {
-                ColumnDefinition def = getColumnDefinition(cfm, 
columnNames.get(i));
+                ColumnMetadata def = getColumnDefinition(metadata, 
columnNames.get(i));
 
                 if (def.isClusteringColumn())
                     hasClusteringColumnsSet = true;
@@ -169,7 +169,7 @@ public class UpdateStatement extends ModificationStatement
                 }
                 else
                 {
-                    Operation operation = new 
Operation.SetValue(value).prepare(cfm, def);
+                    Operation operation = new 
Operation.SetValue(value).prepare(metadata, def);
                     operation.collectMarkerSpecification(boundNames);
                     operations.add(operation);
                 }
@@ -178,7 +178,7 @@ public class UpdateStatement extends ModificationStatement
             boolean applyOnlyToStaticColumns = !hasClusteringColumnsSet && 
appliesOnlyToStaticColumns(operations, conditions);
 
             StatementRestrictions restrictions = new 
StatementRestrictions(type,
-                                                                           cfm,
+                                                                           
metadata,
                                                                            
whereClause.build(),
                                                                            
boundNames,
                                                                            
applyOnlyToStaticColumns,
@@ -187,7 +187,7 @@ public class UpdateStatement extends ModificationStatement
 
             return new UpdateStatement(type,
                                        boundNames.size(),
-                                       cfm,
+                                       metadata,
                                        operations,
                                        restrictions,
                                        conditions,
@@ -211,21 +211,21 @@ public class UpdateStatement extends ModificationStatement
         }
 
         @Override
-        protected ModificationStatement prepareInternal(CFMetaData cfm,
+        protected ModificationStatement prepareInternal(TableMetadata metadata,
                                                         VariableSpecifications 
boundNames,
                                                         Conditions conditions,
                                                         Attributes attrs)
         {
-            checkFalse(cfm.isCounter(), "INSERT statements are not allowed on 
counter tables, use UPDATE instead");
+            checkFalse(metadata.isCounter(), "INSERT statements are not 
allowed on counter tables, use UPDATE instead");
 
-            Collection<ColumnDefinition> defs = cfm.allColumns();
-            Json.Prepared prepared = jsonValue.prepareAndCollectMarkers(cfm, 
defs, boundNames);
+            Collection<ColumnMetadata> defs = metadata.columns();
+            Json.Prepared prepared = 
jsonValue.prepareAndCollectMarkers(metadata, defs, boundNames);
 
             WhereClause.Builder whereClause = new WhereClause.Builder();
             Operations operations = new Operations(type);
             boolean hasClusteringColumnsSet = false;
 
-            for (ColumnDefinition def : defs)
+            for (ColumnMetadata def : defs)
             {
                 if (def.isClusteringColumn())
                     hasClusteringColumnsSet = true;
@@ -233,11 +233,11 @@ public class UpdateStatement extends ModificationStatement
                 Term.Raw raw = prepared.getRawTermForColumn(def, defaultUnset);
                 if (def.isPrimaryKeyColumn())
                 {
-                    whereClause.add(new 
SingleColumnRelation(ColumnDefinition.Raw.forColumn(def), Operator.EQ, raw));
+                    whereClause.add(new 
SingleColumnRelation(ColumnMetadata.Raw.forColumn(def), Operator.EQ, raw));
                 }
                 else
                 {
-                    Operation operation = new 
Operation.SetValue(raw).prepare(cfm, def);
+                    Operation operation = new 
Operation.SetValue(raw).prepare(metadata, def);
                     operation.collectMarkerSpecification(boundNames);
                     operations.add(operation);
                 }
@@ -246,7 +246,7 @@ public class UpdateStatement extends ModificationStatement
             boolean applyOnlyToStaticColumns = !hasClusteringColumnsSet && 
appliesOnlyToStaticColumns(operations, conditions);
 
             StatementRestrictions restrictions = new 
StatementRestrictions(type,
-                                                                           cfm,
+                                                                           
metadata,
                                                                            
whereClause.build(),
                                                                            
boundNames,
                                                                            
applyOnlyToStaticColumns,
@@ -255,7 +255,7 @@ public class UpdateStatement extends ModificationStatement
 
             return new UpdateStatement(type,
                                        boundNames.size(),
-                                       cfm,
+                                       metadata,
                                        operations,
                                        restrictions,
                                        conditions,
@@ -266,7 +266,7 @@ public class UpdateStatement extends ModificationStatement
     public static class ParsedUpdate extends ModificationStatement.Parsed
     {
         // Provided for an UPDATE
-        private final List<Pair<ColumnDefinition.Raw, Operation.RawUpdate>> 
updates;
+        private final List<Pair<ColumnMetadata.Raw, Operation.RawUpdate>> 
updates;
         private final WhereClause whereClause;
 
         /**
@@ -281,9 +281,9 @@ public class UpdateStatement extends ModificationStatement
          * */
         public ParsedUpdate(CFName name,
                             Attributes.Raw attrs,
-                            List<Pair<ColumnDefinition.Raw, 
Operation.RawUpdate>> updates,
+                            List<Pair<ColumnMetadata.Raw, 
Operation.RawUpdate>> updates,
                             WhereClause whereClause,
-                            List<Pair<ColumnDefinition.Raw, 
ColumnCondition.Raw>> conditions,
+                            List<Pair<ColumnMetadata.Raw, 
ColumnCondition.Raw>> conditions,
                             boolean ifExists)
         {
             super(name, StatementType.UPDATE, attrs, conditions, false, 
ifExists);
@@ -292,25 +292,25 @@ public class UpdateStatement extends ModificationStatement
         }
 
         @Override
-        protected ModificationStatement prepareInternal(CFMetaData cfm,
+        protected ModificationStatement prepareInternal(TableMetadata metadata,
                                                         VariableSpecifications 
boundNames,
                                                         Conditions conditions,
                                                         Attributes attrs)
         {
             Operations operations = new Operations(type);
 
-            for (Pair<ColumnDefinition.Raw, Operation.RawUpdate> entry : 
updates)
+            for (Pair<ColumnMetadata.Raw, Operation.RawUpdate> entry : updates)
             {
-                ColumnDefinition def = getColumnDefinition(cfm, entry.left);
+                ColumnMetadata def = getColumnDefinition(metadata, entry.left);
 
                 checkFalse(def.isPrimaryKeyColumn(), "PRIMARY KEY part %s 
found in SET part", def.name);
 
-                Operation operation = entry.right.prepare(cfm, def);
+                Operation operation = entry.right.prepare(metadata, def);
                 operation.collectMarkerSpecification(boundNames);
                 operations.add(operation);
             }
 
-            StatementRestrictions restrictions = newRestrictions(cfm,
+            StatementRestrictions restrictions = newRestrictions(metadata,
                                                                  boundNames,
                                                                  operations,
                                                                  whereClause,
@@ -318,7 +318,7 @@ public class UpdateStatement extends ModificationStatement
 
             return new UpdateStatement(type,
                                        boundNames.size(),
-                                       cfm,
+                                       metadata,
                                        operations,
                                        restrictions,
                                        conditions,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java 
b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
index 1d65a78..f16c619 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
@@ -20,7 +20,8 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 
@@ -36,7 +37,7 @@ final class UpdatesCollector
     /**
      * The columns that will be updated for each table (keyed by the table ID).
      */
-    private final Map<UUID, PartitionColumns> updatedColumns;
+    private final Map<TableId, RegularAndStaticColumns> updatedColumns;
 
     /**
      * The estimated number of updated row.
@@ -48,7 +49,7 @@ final class UpdatesCollector
      */
     private final Map<String, Map<ByteBuffer, IMutation>> mutations = new 
HashMap<>();
 
-    public UpdatesCollector(Map<UUID, PartitionColumns> updatedColumns, int 
updatedRows)
+    public UpdatesCollector(Map<TableId, RegularAndStaticColumns> 
updatedColumns, int updatedRows)
     {
         super();
         this.updatedColumns = updatedColumns;
@@ -59,20 +60,20 @@ final class UpdatesCollector
      * Gets the <code>PartitionUpdate</code> for the specified column family 
and key. If the update does not
      * exist it will be created.
      *
-     * @param cfm the column family meta data
+     * @param metadata the column family meta data
      * @param dk the partition key
      * @param consistency the consistency level
      * @return the <code>PartitionUpdate</code> for the specified column 
family and key
      */
-    public PartitionUpdate getPartitionUpdate(CFMetaData cfm, DecoratedKey dk, 
ConsistencyLevel consistency)
+    public PartitionUpdate getPartitionUpdate(TableMetadata metadata, 
DecoratedKey dk, ConsistencyLevel consistency)
     {
-        Mutation mut = getMutation(cfm, dk, consistency);
-        PartitionUpdate upd = mut.get(cfm);
+        Mutation mut = getMutation(metadata, dk, consistency);
+        PartitionUpdate upd = mut.get(metadata);
         if (upd == null)
         {
-            PartitionColumns columns = updatedColumns.get(cfm.cfId);
+            RegularAndStaticColumns columns = updatedColumns.get(metadata.id);
             assert columns != null;
-            upd = new PartitionUpdate(cfm, dk, columns, updatedRows);
+            upd = new PartitionUpdate(metadata, dk, columns, updatedRows);
             mut.add(upd);
         }
         return upd;
@@ -90,18 +91,18 @@ final class UpdatesCollector
                     
Keyspace.openAndGetStore(update.metadata()).indexManager.validate(update);
     }
 
-    private Mutation getMutation(CFMetaData cfm, DecoratedKey dk, 
ConsistencyLevel consistency)
+    private Mutation getMutation(TableMetadata metadata, DecoratedKey dk, 
ConsistencyLevel consistency)
     {
-        String ksName = cfm.ksName;
+        String ksName = metadata.keyspace;
         IMutation mutation = keyspaceMap(ksName).get(dk.getKey());
         if (mutation == null)
         {
             Mutation mut = new Mutation(ksName, dk);
-            mutation = cfm.isCounter() ? new CounterMutation(mut, consistency) 
: mut;
+            mutation = metadata.isCounter() ? new CounterMutation(mut, 
consistency) : mut;
             keyspaceMap(ksName).put(dk.getKey(), mutation);
             return mut;
         }
-        return cfm.isCounter() ? ((CounterMutation) mutation).getMutation() : 
(Mutation) mutation;
+        return metadata.isCounter() ? ((CounterMutation) 
mutation).getMutation() : (Mutation) mutation;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java 
b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index 849e684..5a049d3 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -21,8 +21,8 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.filter.*;
@@ -64,28 +64,28 @@ public abstract class AbstractReadCommandBuilder
     public AbstractReadCommandBuilder fromIncl(Object... values)
     {
         assert lowerClusteringBound == null && clusterings == null;
-        this.lowerClusteringBound = 
ClusteringBound.create(cfs.metadata.comparator, true, true, values);
+        this.lowerClusteringBound = 
ClusteringBound.create(cfs.metadata().comparator, true, true, values);
         return this;
     }
 
     public AbstractReadCommandBuilder fromExcl(Object... values)
     {
         assert lowerClusteringBound == null && clusterings == null;
-        this.lowerClusteringBound = 
ClusteringBound.create(cfs.metadata.comparator, true, false, values);
+        this.lowerClusteringBound = 
ClusteringBound.create(cfs.metadata().comparator, true, false, values);
         return this;
     }
 
     public AbstractReadCommandBuilder toIncl(Object... values)
     {
         assert upperClusteringBound == null && clusterings == null;
-        this.upperClusteringBound = 
ClusteringBound.create(cfs.metadata.comparator, false, true, values);
+        this.upperClusteringBound = 
ClusteringBound.create(cfs.metadata().comparator, false, true, values);
         return this;
     }
 
     public AbstractReadCommandBuilder toExcl(Object... values)
     {
         assert upperClusteringBound == null && clusterings == null;
-        this.upperClusteringBound = 
ClusteringBound.create(cfs.metadata.comparator, false, false, values);
+        this.upperClusteringBound = 
ClusteringBound.create(cfs.metadata().comparator, false, false, values);
         return this;
     }
 
@@ -94,9 +94,9 @@ public abstract class AbstractReadCommandBuilder
         assert lowerClusteringBound == null && upperClusteringBound == null;
 
         if (this.clusterings == null)
-            this.clusterings = new TreeSet<>(cfs.metadata.comparator);
+            this.clusterings = new TreeSet<>(cfs.metadata().comparator);
 
-        this.clusterings.add(cfs.metadata.comparator.make(values));
+        this.clusterings.add(cfs.metadata().comparator.make(values));
         return this;
     }
 
@@ -163,7 +163,7 @@ public abstract class AbstractReadCommandBuilder
 
     public AbstractReadCommandBuilder filterOn(String column, Operator op, 
Object value)
     {
-        ColumnDefinition def = 
cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned(column, true));
+        ColumnMetadata def = 
cfs.metadata().getColumn(ColumnIdentifier.getInterned(column, true));
         assert def != null;
 
         AbstractType<?> type = def.type;
@@ -179,11 +179,11 @@ public abstract class AbstractReadCommandBuilder
     protected ColumnFilter makeColumnFilter()
     {
         if (columns == null || columns.isEmpty())
-            return ColumnFilter.all(cfs.metadata);
+            return ColumnFilter.all(cfs.metadata());
 
         ColumnFilter.Builder filter = ColumnFilter.selectionBuilder();
         for (ColumnIdentifier column : columns)
-            filter.add(cfs.metadata.getColumnDefinition(column));
+            filter.add(cfs.metadata().getColumn(column));
         return filter.build();
     }
 
@@ -197,7 +197,7 @@ public abstract class AbstractReadCommandBuilder
         {
             Slice slice = Slice.make(lowerClusteringBound == null ? 
ClusteringBound.BOTTOM : lowerClusteringBound,
                                      upperClusteringBound == null ? 
ClusteringBound.TOP : upperClusteringBound);
-            return new 
ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), 
reversed);
+            return new 
ClusteringIndexSliceFilter(Slices.with(cfs.metadata().comparator, slice), 
reversed);
         }
     }
 
@@ -224,7 +224,7 @@ public abstract class AbstractReadCommandBuilder
         @Override
         public ReadCommand build()
         {
-            return SinglePartitionReadCommand.create(cfs.metadata, 
nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, 
makeFilter());
+            return SinglePartitionReadCommand.create(cfs.metadata(), 
nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, 
makeFilter());
         }
     }
 
@@ -255,7 +255,7 @@ public abstract class AbstractReadCommandBuilder
         @Override
         public ReadCommand build()
         {
-            return SinglePartitionReadCommand.create(cfs.metadata, 
nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, 
makeFilter());
+            return SinglePartitionReadCommand.create(cfs.metadata(), 
nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, 
makeFilter());
         }
     }
 
@@ -275,7 +275,7 @@ public abstract class AbstractReadCommandBuilder
         {
             assert startKey == null;
             this.startInclusive = true;
-            this.startKey = makeKey(cfs.metadata, values);
+            this.startKey = makeKey(cfs.metadata(), values);
             return this;
         }
 
@@ -283,7 +283,7 @@ public abstract class AbstractReadCommandBuilder
         {
             assert startKey == null;
             this.startInclusive = false;
-            this.startKey = makeKey(cfs.metadata, values);
+            this.startKey = makeKey(cfs.metadata(), values);
             return this;
         }
 
@@ -291,7 +291,7 @@ public abstract class AbstractReadCommandBuilder
         {
             assert endKey == null;
             this.endInclusive = true;
-            this.endKey = makeKey(cfs.metadata, values);
+            this.endKey = makeKey(cfs.metadata(), values);
             return this;
         }
 
@@ -299,7 +299,7 @@ public abstract class AbstractReadCommandBuilder
         {
             assert endKey == null;
             this.endInclusive = false;
-            this.endKey = makeKey(cfs.metadata, values);
+            this.endKey = makeKey(cfs.metadata(), values);
             return this;
         }
 
@@ -329,16 +329,16 @@ public abstract class AbstractReadCommandBuilder
             else
                 bounds = new ExcludingBounds<>(start, end);
 
-            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, 
makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), 
Optional.empty());
+            return new PartitionRangeReadCommand(cfs.metadata(), nowInSeconds, 
makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), 
Optional.empty());
         }
 
-        static DecoratedKey makeKey(CFMetaData metadata, Object... 
partitionKey)
+        static DecoratedKey makeKey(TableMetadata metadata, Object... 
partitionKey)
         {
             if (partitionKey.length == 1 && partitionKey[0] instanceof 
DecoratedKey)
                 return (DecoratedKey)partitionKey[0];
 
-            ByteBuffer key = 
CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
-            return metadata.decorateKey(key);
+            ByteBuffer key = 
metadata.partitionKeyAsClusteringComparator().make(partitionKey).serializeAsPartitionKey();
+            return metadata.partitioner.decorateKey(key);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java 
b/src/java/org/apache/cassandra/db/Clustering.java
index fa38ce1..772961f 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -54,23 +54,23 @@ public interface Clustering extends ClusteringPrefix
         return new BufferClustering(newValues);
     }
 
-    public default String toString(CFMetaData metadata)
+    public default String toString(TableMetadata metadata)
     {
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < size(); i++)
         {
-            ColumnDefinition c = metadata.clusteringColumns().get(i);
+            ColumnMetadata c = metadata.clusteringColumns().get(i);
             sb.append(i == 0 ? "" : ", 
").append(c.name).append('=').append(get(i) == null ? "null" : 
c.type.getString(get(i)));
         }
         return sb.toString();
     }
 
-    public default String toCQLString(CFMetaData metadata)
+    public default String toCQLString(TableMetadata metadata)
     {
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < size(); i++)
         {
-            ColumnDefinition c = metadata.clusteringColumns().get(i);
+            ColumnMetadata c = metadata.clusteringColumns().get(i);
             sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i)));
         }
         return sb.toString();
@@ -100,7 +100,7 @@ public interface Clustering extends ClusteringPrefix
         }
 
         @Override
-        public String toString(CFMetaData metadata)
+        public String toString(TableMetadata metadata)
         {
             return toString();
         }
@@ -110,7 +110,7 @@ public interface Clustering extends ClusteringPrefix
     public static final Clustering EMPTY = new 
BufferClustering(EMPTY_VALUES_ARRAY)
     {
         @Override
-        public String toString(CFMetaData metadata)
+        public String toString(TableMetadata metadata)
         {
             return "EMPTY";
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java 
b/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java
index 7a2cce1..84a9e30 100644
--- a/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java
+++ b/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -111,7 +111,7 @@ public abstract class ClusteringBoundOrBoundary extends 
AbstractBufferClustering
         return create(kind(), newValues);
     }
 
-    public String toString(CFMetaData metadata)
+    public String toString(TableMetadata metadata)
     {
         return toString(metadata.comparator);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ClusteringComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java 
b/src/java/org/apache/cassandra/db/ClusteringComparator.java
index 4374a46..50cf5bf 100644
--- a/src/java/org/apache/cassandra/db/ClusteringComparator.java
+++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java
@@ -53,7 +53,7 @@ public class ClusteringComparator implements 
Comparator<Clusterable>
         this(ImmutableList.copyOf(clusteringTypes));
     }
 
-    public ClusteringComparator(List<AbstractType<?>> clusteringTypes)
+    public ClusteringComparator(Iterable<AbstractType<?>> clusteringTypes)
     {
         // copy the list to ensure despatch is monomorphic
         this.clusteringTypes = ImmutableList.copyOf(clusteringTypes);
@@ -62,7 +62,7 @@ public class ClusteringComparator implements 
Comparator<Clusterable>
         this.indexReverseComparator = (o1, o2) -> 
ClusteringComparator.this.compare(o1.firstName, o2.firstName);
         this.reverseComparator = (c1, c2) -> 
ClusteringComparator.this.compare(c2, c1);
         for (AbstractType<?> type : clusteringTypes)
-            type.checkComparable(); // this should already be enforced by 
CFMetaData.rebuild, but we check again for other constructors
+            type.checkComparable(); // this should already be enforced by 
TableMetadata.Builder.addColumn, but we check again for other constructors
     }
 
     /**

Reply via email to