http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeyRestrictions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeyRestrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeyRestrictions.java
index 1ff45d0..b1edf94 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeyRestrictions.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeyRestrictions.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.cql3.restrictions;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.statements.Bound;
 
@@ -53,16 +53,16 @@ interface PartitionKeyRestrictions extends Restrictions
     /**
      * checks if specified restrictions require filtering
      *
-     * @param cfm column family metadata
+     * @param table column family metadata
      * @return <code>true</code> if filtering is required, <code>false</code> 
otherwise
      */
-    public boolean needFiltering(CFMetaData cfm);
+    public boolean needFiltering(TableMetadata table);
 
     /**
      * Checks if the partition key has unrestricted components.
      *
-     * @param cfm column family metadata
+     * @param table column family metadata
      * @return <code>true</code> if the partition key has unrestricted 
components, <code>false</code> otherwise.
      */
-    public boolean hasUnrestrictedPartitionKeyComponents(CFMetaData cfm);
+    public boolean hasUnrestrictedPartitionKeyComponents(TableMetadata table);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
 
b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
index b34ff54..5113667 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.cql3.restrictions;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.ClusteringComparator;
@@ -59,7 +59,7 @@ final class PartitionKeySingleRestrictionSet extends 
RestrictionSetWrapper imple
     {
         List<ByteBuffer> l = new ArrayList<>(clusterings.size());
         for (ClusteringPrefix clustering : clusterings)
-            l.add(CFMetaData.serializePartitionKey(clustering));
+            l.add(clustering.serializeAsPartitionKey());
         return l;
     }
 
@@ -131,18 +131,18 @@ final class PartitionKeySingleRestrictionSet extends 
RestrictionSetWrapper imple
     }
 
     @Override
-    public boolean needFiltering(CFMetaData cfm)
+    public boolean needFiltering(TableMetadata table)
     {
         if (isEmpty())
             return false;
         // slice or has unrestricted key component
-        return hasUnrestrictedPartitionKeyComponents(cfm) || hasSlice();
+        return hasUnrestrictedPartitionKeyComponents(table) || hasSlice();
     }
 
     @Override
-    public boolean hasUnrestrictedPartitionKeyComponents(CFMetaData cfm)
+    public boolean hasUnrestrictedPartitionKeyComponents(TableMetadata table)
     {
-        return size() < cfm.partitionKeyColumns().size();
+        return size() < table.partitionKeyColumns().size();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
index fc7f5bc..daace46 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.cql3.restrictions;
 
 import java.util.List;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.filter.RowFilter;
@@ -39,19 +39,19 @@ public interface Restriction
      * Returns the definition of the first column.
      * @return the definition of the first column.
      */
-    public ColumnDefinition getFirstColumn();
+    public ColumnMetadata getFirstColumn();
 
     /**
      * Returns the definition of the last column.
      * @return the definition of the last column.
      */
-    public ColumnDefinition getLastColumn();
+    public ColumnMetadata getLastColumn();
 
     /**
      * Returns the column definitions in position order.
      * @return the column definitions in position order.
      */
-    public List<ColumnDefinition> getColumnDefs();
+    public List<ColumnMetadata> getColumnDefs();
 
     /**
      * Adds all functions (native and user-defined) used by any component of 
the restriction

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java 
b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
index 3a1bcb1..ee3df98 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
@@ -21,7 +21,7 @@ import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
 import 
org.apache.cassandra.cql3.restrictions.SingleColumnRestriction.ContainsRestriction;
@@ -39,10 +39,10 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
     /**
      * The comparator used to sort the <code>Restriction</code>s.
      */
-    private static final Comparator<ColumnDefinition> 
COLUMN_DEFINITION_COMPARATOR = new Comparator<ColumnDefinition>()
+    private static final Comparator<ColumnMetadata> 
COLUMN_DEFINITION_COMPARATOR = new Comparator<ColumnMetadata>()
     {
         @Override
-        public int compare(ColumnDefinition column, ColumnDefinition 
otherColumn)
+        public int compare(ColumnMetadata column, ColumnMetadata otherColumn)
         {
             int value = Integer.compare(column.position(), 
otherColumn.position());
             return value != 0 ? value : 
column.name.bytes.compareTo(otherColumn.name.bytes);
@@ -52,7 +52,7 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
     /**
      * The restrictions per column.
      */
-    protected final TreeMap<ColumnDefinition, SingleRestriction> restrictions;
+    protected final TreeMap<ColumnMetadata, SingleRestriction> restrictions;
 
     /**
      * {@code true} if it contains multi-column restrictions, {@code false} 
otherwise.
@@ -61,10 +61,10 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
 
     public RestrictionSet()
     {
-        this(new TreeMap<ColumnDefinition, 
SingleRestriction>(COLUMN_DEFINITION_COMPARATOR), false);
+        this(new TreeMap<ColumnMetadata, 
SingleRestriction>(COLUMN_DEFINITION_COMPARATOR), false);
     }
 
-    private RestrictionSet(TreeMap<ColumnDefinition, SingleRestriction> 
restrictions,
+    private RestrictionSet(TreeMap<ColumnMetadata, SingleRestriction> 
restrictions,
                            boolean hasMultiColumnRestrictions)
     {
         this.restrictions = restrictions;
@@ -79,7 +79,7 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
     }
 
     @Override
-    public List<ColumnDefinition> getColumnDefs()
+    public List<ColumnMetadata> getColumnDefs()
     {
         return new ArrayList<>(restrictions.keySet());
     }
@@ -108,9 +108,9 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
      * @param kind the column kind
      * @return {@code true} if one of the restrictions applies to a column of 
the specific kind, {@code false} otherwise.
      */
-    public boolean hasRestrictionFor(ColumnDefinition.Kind kind)
+    public boolean hasRestrictionFor(ColumnMetadata.Kind kind)
     {
-        for (ColumnDefinition column : restrictions.keySet())
+        for (ColumnMetadata column : restrictions.keySet())
         {
             if (column.kind == kind)
                 return true;
@@ -127,19 +127,19 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
     public RestrictionSet addRestriction(SingleRestriction restriction)
     {
         // RestrictionSet is immutable so we need to clone the restrictions 
map.
-        TreeMap<ColumnDefinition, SingleRestriction> newRestrictions = new 
TreeMap<>(this.restrictions);
+        TreeMap<ColumnMetadata, SingleRestriction> newRestrictions = new 
TreeMap<>(this.restrictions);
         return new RestrictionSet(mergeRestrictions(newRestrictions, 
restriction), hasMultiColumnRestrictions || restriction.isMultiColumn());
     }
 
-    private TreeMap<ColumnDefinition, SingleRestriction> 
mergeRestrictions(TreeMap<ColumnDefinition, SingleRestriction> restrictions,
-                                                                           
SingleRestriction restriction)
+    private TreeMap<ColumnMetadata, SingleRestriction> 
mergeRestrictions(TreeMap<ColumnMetadata, SingleRestriction> restrictions,
+                                                                         
SingleRestriction restriction)
     {
-        Collection<ColumnDefinition> columnDefs = restriction.getColumnDefs();
+        Collection<ColumnMetadata> columnDefs = restriction.getColumnDefs();
         Set<SingleRestriction> existingRestrictions = 
getRestrictions(columnDefs);
 
         if (existingRestrictions.isEmpty())
         {
-            for (ColumnDefinition columnDef : columnDefs)
+            for (ColumnMetadata columnDef : columnDefs)
                 restrictions.put(columnDef, restriction);
         }
         else
@@ -148,7 +148,7 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
             {
                 SingleRestriction newRestriction = mergeRestrictions(existing, 
restriction);
 
-                for (ColumnDefinition columnDef : columnDefs)
+                for (ColumnMetadata columnDef : columnDefs)
                     restrictions.put(columnDef, newRestriction);
             }
         }
@@ -157,7 +157,7 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
     }
 
     @Override
-    public Set<Restriction> getRestrictions(ColumnDefinition columnDef)
+    public Set<Restriction> getRestrictions(ColumnMetadata columnDef)
     {
         Restriction existing = restrictions.get(columnDef);
         return existing == null ? Collections.emptySet() : 
Collections.singleton(existing);
@@ -169,10 +169,10 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
      * @param columnDefs the column definitions
      * @return all the restrictions applied to the specified columns
      */
-    private Set<SingleRestriction> 
getRestrictions(Collection<ColumnDefinition> columnDefs)
+    private Set<SingleRestriction> getRestrictions(Collection<ColumnMetadata> 
columnDefs)
     {
         Set<SingleRestriction> set = new HashSet<>();
-        for (ColumnDefinition columnDef : columnDefs)
+        for (ColumnMetadata columnDef : columnDefs)
         {
             SingleRestriction existing = restrictions.get(columnDef);
             if (existing != null)
@@ -198,19 +198,19 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
      * @param columnDef the column for which the next one need to be found
      * @return the column after the specified one.
      */
-    ColumnDefinition nextColumn(ColumnDefinition columnDef)
+    ColumnMetadata nextColumn(ColumnMetadata columnDef)
     {
         return restrictions.tailMap(columnDef, false).firstKey();
     }
 
     @Override
-    public ColumnDefinition getFirstColumn()
+    public ColumnMetadata getFirstColumn()
     {
         return isEmpty() ? null : this.restrictions.firstKey();
     }
 
     @Override
-    public ColumnDefinition getLastColumn()
+    public ColumnMetadata getLastColumn()
     {
         return isEmpty() ? null : this.restrictions.lastKey();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java 
b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
index 996a1c4..e9a62de 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.cql3.restrictions;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.filter.RowFilter;
@@ -49,7 +49,7 @@ class RestrictionSetWrapper implements Restrictions
         restrictions.addRowFilterTo(filter, indexManager, options);
     }
 
-    public List<ColumnDefinition> getColumnDefs()
+    public List<ColumnMetadata> getColumnDefs()
     {
         return restrictions.getColumnDefs();
     }
@@ -74,12 +74,12 @@ class RestrictionSetWrapper implements Restrictions
         return restrictions.hasSupportingIndex(indexManager);
     }
 
-    public ColumnDefinition getFirstColumn()
+    public ColumnMetadata getFirstColumn()
     {
         return restrictions.getFirstColumn();
     }
 
-    public ColumnDefinition getLastColumn()
+    public ColumnMetadata getLastColumn()
     {
         return restrictions.getLastColumn();
     }
@@ -99,7 +99,7 @@ class RestrictionSetWrapper implements Restrictions
         return restrictions.hasOnlyEqualityRestrictions();
     }
 
-    public Set<Restriction> getRestrictions(ColumnDefinition columnDef)
+    public Set<Restriction> getRestrictions(ColumnMetadata columnDef)
     {
         return restrictions.getRestrictions(columnDef);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
index 8a5140a..8ccb1b2 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.cql3.restrictions;
 
 import java.util.Set;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 
 /**
  * Sets of restrictions
@@ -32,7 +32,7 @@ public interface Restrictions extends Restriction
      * @param columnDef the column definition
      * @return the restrictions applied to the specified column
      */
-    Set<Restriction> getRestrictions(ColumnDefinition columnDef);
+    Set<Restriction> getRestrictions(ColumnMetadata columnDef);
 
     /**
      * Checks if this <code>Restrictions</code> is empty or not.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index 1d84331..44f95a8 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.Term.Terminal;
 import org.apache.cassandra.cql3.functions.Function;
@@ -45,27 +45,27 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
     /**
      * The definition of the column to which apply the restriction.
      */
-    protected final ColumnDefinition columnDef;
+    protected final ColumnMetadata columnDef;
 
-    public SingleColumnRestriction(ColumnDefinition columnDef)
+    public SingleColumnRestriction(ColumnMetadata columnDef)
     {
         this.columnDef = columnDef;
     }
 
     @Override
-    public List<ColumnDefinition> getColumnDefs()
+    public List<ColumnMetadata> getColumnDefs()
     {
         return Collections.singletonList(columnDef);
     }
 
     @Override
-    public ColumnDefinition getFirstColumn()
+    public ColumnMetadata getFirstColumn()
     {
         return columnDef;
     }
 
     @Override
-    public ColumnDefinition getLastColumn()
+    public ColumnMetadata getLastColumn()
     {
         return columnDef;
     }
@@ -125,7 +125,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
     {
         private final Term value;
 
-        public EQRestriction(ColumnDefinition columnDef, Term value)
+        public EQRestriction(ColumnMetadata columnDef, Term value)
         {
             super(columnDef);
             this.value = value;
@@ -187,7 +187,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
 
     public static abstract class INRestriction extends SingleColumnRestriction
     {
-        public INRestriction(ColumnDefinition columnDef)
+        public INRestriction(ColumnMetadata columnDef)
         {
             super(columnDef);
         }
@@ -234,7 +234,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
     {
         protected final List<Term> values;
 
-        public InRestrictionWithValues(ColumnDefinition columnDef, List<Term> 
values)
+        public InRestrictionWithValues(ColumnMetadata columnDef, List<Term> 
values)
         {
             super(columnDef);
             this.values = values;
@@ -272,7 +272,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
     {
         protected final AbstractMarker marker;
 
-        public InRestrictionWithMarker(ColumnDefinition columnDef, 
AbstractMarker marker)
+        public InRestrictionWithMarker(ColumnMetadata columnDef, 
AbstractMarker marker)
         {
             super(columnDef);
             this.marker = marker;
@@ -310,7 +310,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
     {
         private final TermSlice slice;
 
-        public SliceRestriction(ColumnDefinition columnDef, Bound bound, 
boolean inclusive, Term term)
+        public SliceRestriction(ColumnMetadata columnDef, Bound bound, boolean 
inclusive, Term term)
         {
             super(columnDef);
             slice = TermSlice.newInstance(bound, inclusive, term);
@@ -404,7 +404,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
             return String.format("SLICE%s", slice);
         }
 
-        private SliceRestriction(ColumnDefinition columnDef, TermSlice slice)
+        private SliceRestriction(ColumnMetadata columnDef, TermSlice slice)
         {
             super(columnDef);
             this.slice = slice;
@@ -419,7 +419,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
         private List<Term> entryKeys = new ArrayList<>(); // for map[key] = 
value
         private List<Term> entryValues = new ArrayList<>(); // for map[key] = 
value
 
-        public ContainsRestriction(ColumnDefinition columnDef, Term t, boolean 
isKey)
+        public ContainsRestriction(ColumnMetadata columnDef, Term t, boolean 
isKey)
         {
             super(columnDef);
             if (isKey)
@@ -428,7 +428,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
                 values.add(t);
         }
 
-        public ContainsRestriction(ColumnDefinition columnDef, Term mapKey, 
Term mapValue)
+        public ContainsRestriction(ColumnMetadata columnDef, Term mapKey, Term 
mapValue)
         {
             super(columnDef);
             entryKeys.add(mapKey);
@@ -583,7 +583,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
             to.entryValues.addAll(from.entryValues);
         }
 
-        private ContainsRestriction(ColumnDefinition columnDef)
+        private ContainsRestriction(ColumnMetadata columnDef)
         {
             super(columnDef);
         }
@@ -591,7 +591,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
 
     public static final class IsNotNullRestriction extends 
SingleColumnRestriction
     {
-        public IsNotNullRestriction(ColumnDefinition columnDef)
+        public IsNotNullRestriction(ColumnMetadata columnDef)
         {
             super(columnDef);
         }
@@ -652,7 +652,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
         private final Operator operator;
         private final Term value;
 
-        public LikeRestriction(ColumnDefinition columnDef, Operator operator, 
Term value)
+        public LikeRestriction(ColumnMetadata columnDef, Operator operator, 
Term value)
         {
             super(columnDef);
             this.operator = operator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 38aeb18..8431490 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -22,9 +22,6 @@ import java.util.*;
 
 import com.google.common.base.Joiner;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.ColumnDefinition.Kind;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.statements.Bound;
@@ -36,7 +33,8 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.SecondaryIndexManager;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
@@ -61,7 +59,7 @@ public final class StatementRestrictions
     /**
      * The Column Family meta data
      */
-    public final CFMetaData cfm;
+    public final TableMetadata table;
 
     /**
      * Restrictions on partitioning columns
@@ -78,7 +76,7 @@ public final class StatementRestrictions
      */
     private RestrictionSet nonPrimaryKeyRestrictions;
 
-    private Set<ColumnDefinition> notNullColumns;
+    private Set<ColumnMetadata> notNullColumns;
 
     /**
      * The restrictions used to build the row filter
@@ -105,40 +103,40 @@ public final class StatementRestrictions
      * Creates a new empty <code>StatementRestrictions</code>.
      *
      * @param type the type of statement
-     * @param cfm the column family meta data
+     * @param table the column family meta data
      * @return a new empty <code>StatementRestrictions</code>.
      */
-    public static StatementRestrictions empty(StatementType type, CFMetaData 
cfm)
+    public static StatementRestrictions empty(StatementType type, 
TableMetadata table)
     {
-        return new StatementRestrictions(type, cfm, false);
+        return new StatementRestrictions(type, table, false);
     }
 
-    private StatementRestrictions(StatementType type, CFMetaData cfm, boolean 
allowFiltering)
+    private StatementRestrictions(StatementType type, TableMetadata table, 
boolean allowFiltering)
     {
         this.type = type;
-        this.cfm = cfm;
-        this.partitionKeyRestrictions = new 
PartitionKeySingleRestrictionSet(cfm.getKeyValidatorAsClusteringComparator());
-        this.clusteringColumnsRestrictions = new 
ClusteringColumnRestrictions(cfm, allowFiltering);
+        this.table = table;
+        this.partitionKeyRestrictions = new 
PartitionKeySingleRestrictionSet(table.partitionKeyAsClusteringComparator());
+        this.clusteringColumnsRestrictions = new 
ClusteringColumnRestrictions(table, allowFiltering);
         this.nonPrimaryKeyRestrictions = new RestrictionSet();
         this.notNullColumns = new HashSet<>();
     }
 
     public StatementRestrictions(StatementType type,
-                                 CFMetaData cfm,
+                                 TableMetadata table,
                                  WhereClause whereClause,
                                  VariableSpecifications boundNames,
                                  boolean selectsOnlyStaticColumns,
                                  boolean allowFiltering,
                                  boolean forView)
     {
-        this(type, cfm, allowFiltering);
+        this(type, table, allowFiltering);
 
         ColumnFamilyStore cfs;
         SecondaryIndexManager secondaryIndexManager = null;
 
         if (type.allowUseOfSecondaryIndices())
         {
-            cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
+            cfs = 
Keyspace.open(table.keyspace).getColumnFamilyStore(table.name);
             secondaryIndexManager = cfs.indexManager;
         }
 
@@ -158,12 +156,12 @@ public final class StatementRestrictions
                 if (!forView)
                     throw new InvalidRequestException("Unsupported 
restriction: " + relation);
 
-                for (ColumnDefinition def : relation.toRestriction(cfm, 
boundNames).getColumnDefs())
+                for (ColumnMetadata def : relation.toRestriction(table, 
boundNames).getColumnDefs())
                     this.notNullColumns.add(def);
             }
             else if (relation.isLIKE())
             {
-                Restriction restriction = relation.toRestriction(cfm, 
boundNames);
+                Restriction restriction = relation.toRestriction(table, 
boundNames);
 
                 if (!type.allowUseOfSecondaryIndices() || 
!restriction.hasSupportingIndex(secondaryIndexManager))
                     throw new InvalidRequestException(String.format("LIKE 
restriction is only supported on properly " +
@@ -174,11 +172,11 @@ public final class StatementRestrictions
             }
             else
             {
-                addRestriction(relation.toRestriction(cfm, boundNames));
+                addRestriction(relation.toRestriction(table, boundNames));
             }
         }
 
-        hasRegularColumnsRestrictions = 
nonPrimaryKeyRestrictions.hasRestrictionFor(Kind.REGULAR);
+        hasRegularColumnsRestrictions = 
nonPrimaryKeyRestrictions.hasRestrictionFor(ColumnMetadata.Kind.REGULAR);
 
         boolean hasQueriableClusteringColumnIndex = false;
         boolean hasQueriableIndex = false;
@@ -200,7 +198,7 @@ public final class StatementRestrictions
 
         // Some but not all of the partition key columns have been specified;
         // hence we need turn these restrictions into a row filter.
-        if (usesSecondaryIndexing || 
partitionKeyRestrictions.needFiltering(cfm))
+        if (usesSecondaryIndexing || 
partitionKeyRestrictions.needFiltering(table))
             filterRestrictions.add(partitionKeyRestrictions);
 
         if (selectsOnlyStaticColumns && hasClusteringColumnsRestrictions())
@@ -241,7 +239,7 @@ public final class StatementRestrictions
             if (!type.allowNonPrimaryKeyInWhereClause())
             {
                 Collection<ColumnIdentifier> nonPrimaryKeyColumns =
-                        
ColumnDefinition.toIdentifiers(nonPrimaryKeyRestrictions.getColumnDefs());
+                        
ColumnMetadata.toIdentifiers(nonPrimaryKeyRestrictions.getColumnDefs());
 
                 throw invalidRequest("Non PRIMARY KEY columns found in where 
clause: %s ",
                                      Joiner.on(", 
").join(nonPrimaryKeyColumns));
@@ -260,7 +258,7 @@ public final class StatementRestrictions
 
     private void addRestriction(Restriction restriction)
     {
-        ColumnDefinition def = restriction.getFirstColumn();
+        ColumnMetadata def = restriction.getFirstColumn();
         if (def.isPartitionKey())
             partitionKeyRestrictions = 
partitionKeyRestrictions.mergeWith(restriction);
         else if (def.isClusteringColumn())
@@ -287,19 +285,19 @@ public final class StatementRestrictions
      * by an IS NOT NULL restriction will be included, otherwise they will not 
be included (unless another restriction
      * applies to them).
      */
-    public Set<ColumnDefinition> nonPKRestrictedColumns(boolean 
includeNotNullRestrictions)
+    public Set<ColumnMetadata> nonPKRestrictedColumns(boolean 
includeNotNullRestrictions)
     {
-        Set<ColumnDefinition> columns = new HashSet<>();
+        Set<ColumnMetadata> columns = new HashSet<>();
         for (Restrictions r : filterRestrictions.getRestrictions())
         {
-            for (ColumnDefinition def : r.getColumnDefs())
+            for (ColumnMetadata def : r.getColumnDefs())
                 if (!def.isPrimaryKeyColumn())
                     columns.add(def);
         }
 
         if (includeNotNullRestrictions)
         {
-            for (ColumnDefinition def : notNullColumns)
+            for (ColumnMetadata def : notNullColumns)
             {
                 if (!def.isPrimaryKeyColumn())
                     columns.add(def);
@@ -312,7 +310,7 @@ public final class StatementRestrictions
     /**
      * @return the set of columns that have an IS NOT NULL restriction on them
      */
-    public Set<ColumnDefinition> notNullColumns()
+    public Set<ColumnMetadata> notNullColumns()
     {
         return notNullColumns;
     }
@@ -320,7 +318,7 @@ public final class StatementRestrictions
     /**
      * @return true if column is restricted by some restriction, false 
otherwise
      */
-    public boolean isRestricted(ColumnDefinition column)
+    public boolean isRestricted(ColumnMetadata column)
     {
         if (notNullColumns.contains(column))
             return true;
@@ -356,7 +354,7 @@ public final class StatementRestrictions
      * @return <code>true</code> if the specified column is restricted by an 
EQ restiction, <code>false</code>
      * otherwise.
      */
-    public boolean isColumnRestrictedByEq(ColumnDefinition columnDef)
+    public boolean isColumnRestrictedByEq(ColumnMetadata columnDef)
     {
         Set<Restriction> restrictions = 
getRestrictions(columnDef.kind).getRestrictions(columnDef);
         return restrictions.stream()
@@ -370,7 +368,7 @@ public final class StatementRestrictions
      * @param kind the column type
      * @return the <code>Restrictions</code> for the specified type of columns
      */
-    private Restrictions getRestrictions(ColumnDefinition.Kind kind)
+    private Restrictions getRestrictions(ColumnMetadata.Kind kind)
     {
         switch (kind)
         {
@@ -397,7 +395,7 @@ public final class StatementRestrictions
             checkFalse(partitionKeyRestrictions.isOnToken(),
                        "The token function cannot be used in WHERE clauses for 
%s statements", type);
 
-            if 
(partitionKeyRestrictions.hasUnrestrictedPartitionKeyComponents(cfm))
+            if 
(partitionKeyRestrictions.hasUnrestrictedPartitionKeyComponents(table))
                 throw invalidRequest("Some partition key parts are missing: 
%s",
                                      Joiner.on(", 
").join(getPartitionKeyUnrestrictedComponents()));
 
@@ -412,7 +410,7 @@ public final class StatementRestrictions
             if (partitionKeyRestrictions.isOnToken())
                 isKeyRange = true;
 
-            if (partitionKeyRestrictions.isEmpty() && 
partitionKeyRestrictions.hasUnrestrictedPartitionKeyComponents(cfm))
+            if (partitionKeyRestrictions.isEmpty() && 
partitionKeyRestrictions.hasUnrestrictedPartitionKeyComponents(table))
             {
                 isKeyRange = true;
                 usesSecondaryIndexing = hasQueriableIndex;
@@ -424,10 +422,10 @@ public final class StatementRestrictions
             // - Is it queriable without 2ndary index, which is always more 
efficient
             // If a component of the partition key is restricted by a 
relation, all preceding
             // components must have a EQ. Only the last partition key 
component can be in IN relation.
-            if (partitionKeyRestrictions.needFiltering(cfm))
+            if (partitionKeyRestrictions.needFiltering(table))
             {
                 if (!allowFiltering && !forView && !hasQueriableIndex
-                    && 
(partitionKeyRestrictions.hasUnrestrictedPartitionKeyComponents(cfm) || 
partitionKeyRestrictions.hasSlice()))
+                    && 
(partitionKeyRestrictions.hasUnrestrictedPartitionKeyComponents(table) || 
partitionKeyRestrictions.hasSlice()))
                     throw new 
InvalidRequestException(REQUIRES_ALLOW_FILTERING_MESSAGE);
 
                 if (partitionKeyRestrictions.hasIN())
@@ -459,9 +457,9 @@ public final class StatementRestrictions
      */
     private Collection<ColumnIdentifier> 
getPartitionKeyUnrestrictedComponents()
     {
-        List<ColumnDefinition> list = new 
ArrayList<>(cfm.partitionKeyColumns());
+        List<ColumnMetadata> list = new 
ArrayList<>(table.partitionKeyColumns());
         list.removeAll(partitionKeyRestrictions.getColumnDefs());
-        return ColumnDefinition.toIdentifiers(list);
+        return ColumnMetadata.toIdentifiers(list);
     }
 
     /**
@@ -502,7 +500,7 @@ public final class StatementRestrictions
                    "Slice restrictions are not supported on the clustering 
columns in %s statements", type);
 
         if (!type.allowClusteringColumnSlices()
-               && (!cfm.isCompactTable() || (cfm.isCompactTable() && 
!hasClusteringColumnsRestrictions())))
+               && (!table.isCompactTable() || (table.isCompactTable() && 
!hasClusteringColumnsRestrictions())))
         {
             if (!selectsOnlyStaticColumns && 
hasUnrestrictedClusteringColumns())
                 throw invalidRequest("Some clustering keys are missing: %s",
@@ -522,13 +520,13 @@ public final class StatementRestrictions
                 }
                 else if (!allowFiltering)
                 {
-                    List<ColumnDefinition> clusteringColumns = 
cfm.clusteringColumns();
-                    List<ColumnDefinition> restrictedColumns = new 
LinkedList<>(clusteringColumnsRestrictions.getColumnDefs());
+                    List<ColumnMetadata> clusteringColumns = 
table.clusteringColumns();
+                    List<ColumnMetadata> restrictedColumns = new 
LinkedList<>(clusteringColumnsRestrictions.getColumnDefs());
 
                     for (int i = 0, m = restrictedColumns.size(); i < m; i++)
                     {
-                        ColumnDefinition clusteringColumn = 
clusteringColumns.get(i);
-                        ColumnDefinition restrictedColumn = 
restrictedColumns.get(i);
+                        ColumnMetadata clusteringColumn = 
clusteringColumns.get(i);
+                        ColumnMetadata restrictedColumn = 
restrictedColumns.get(i);
 
                         if (!clusteringColumn.equals(restrictedColumn))
                         {
@@ -550,9 +548,9 @@ public final class StatementRestrictions
      */
     private Collection<ColumnIdentifier> getUnrestrictedClusteringColumns()
     {
-        List<ColumnDefinition> missingClusteringColumns = new 
ArrayList<>(cfm.clusteringColumns());
+        List<ColumnMetadata> missingClusteringColumns = new 
ArrayList<>(table.clusteringColumns());
         missingClusteringColumns.removeAll(new 
LinkedList<>(clusteringColumnsRestrictions.getColumnDefs()));
-        return ColumnDefinition.toIdentifiers(missingClusteringColumns);
+        return ColumnMetadata.toIdentifiers(missingClusteringColumns);
     }
 
     /**
@@ -561,7 +559,7 @@ public final class StatementRestrictions
      */
     private boolean hasUnrestrictedClusteringColumns()
     {
-        return cfm.clusteringColumns().size() != 
clusteringColumnsRestrictions.size();
+        return table.clusteringColumns().size() != 
clusteringColumnsRestrictions.size();
     }
 
     private void processCustomIndexExpressions(List<CustomIndexExpression> 
expressions,
@@ -575,16 +573,16 @@ public final class StatementRestrictions
 
         CFName cfName = expression.targetIndex.getCfName();
         if (cfName.hasKeyspace()
-            && !expression.targetIndex.getKeyspace().equals(cfm.ksName))
-            throw IndexRestrictions.invalidIndex(expression.targetIndex, cfm);
+            && !expression.targetIndex.getKeyspace().equals(table.keyspace))
+            throw IndexRestrictions.invalidIndex(expression.targetIndex, 
table);
 
-        if (cfName.getColumnFamily() != null && 
!cfName.getColumnFamily().equals(cfm.cfName))
-            throw IndexRestrictions.invalidIndex(expression.targetIndex, cfm);
+        if (cfName.getColumnFamily() != null && 
!cfName.getColumnFamily().equals(table.name))
+            throw IndexRestrictions.invalidIndex(expression.targetIndex, 
table);
 
-        if (!cfm.getIndexes().has(expression.targetIndex.getIdx()))
-            throw IndexRestrictions.indexNotFound(expression.targetIndex, cfm);
+        if (!table.indexes.has(expression.targetIndex.getIdx()))
+            throw IndexRestrictions.indexNotFound(expression.targetIndex, 
table);
 
-        Index index = 
indexManager.getIndex(cfm.getIndexes().get(expression.targetIndex.getIdx()).get());
+        Index index = 
indexManager.getIndex(table.indexes.get(expression.targetIndex.getIdx()).get());
 
         if (!index.getIndexMetadata().isCustom())
             throw 
IndexRestrictions.nonCustomIndexInExpression(expression.targetIndex);
@@ -593,7 +591,7 @@ public final class StatementRestrictions
         if (expressionType == null)
             throw 
IndexRestrictions.customExpressionNotSupported(expression.targetIndex);
 
-        expression.prepareValue(cfm, expressionType, boundNames);
+        expression.prepareValue(table, expressionType, boundNames);
 
         filterRestrictions.add(expression);
     }
@@ -608,7 +606,7 @@ public final class StatementRestrictions
             restrictions.addRowFilterTo(filter, indexManager, options);
 
         for (CustomIndexExpression expression : 
filterRestrictions.getCustomIndexExpressions())
-            expression.addToRowFilter(filter, cfm, options);
+            expression.addToRowFilter(filter, table, options);
 
         return filter;
     }
@@ -645,7 +643,7 @@ public final class StatementRestrictions
      */
     public AbstractBounds<PartitionPosition> 
getPartitionKeyBounds(QueryOptions options)
     {
-        IPartitioner p = cfm.partitioner;
+        IPartitioner p = table.partitioner;
 
         if (partitionKeyRestrictions.isOnToken())
         {
@@ -660,7 +658,7 @@ public final class StatementRestrictions
     {
         // Deal with unrestricted partition key components (special-casing is 
required to deal with 2i queries on the
         // first component of a composite partition key) queries that filter 
on the partition key.
-        if (partitionKeyRestrictions.needFiltering(cfm))
+        if (partitionKeyRestrictions.needFiltering(table))
             return new Range<>(p.getMinimumToken().minKeyBound(), 
p.getMinimumToken().maxKeyBound());
 
         ByteBuffer startKeyBytes = getPartitionKeyBound(Bound.START, options);
@@ -746,8 +744,8 @@ public final class StatementRestrictions
         // If this is a names command and the table is a static compact one, 
then as far as CQL is concerned we have
         // only a single row which internally correspond to the static parts. 
In which case we want to return an empty
         // set (since that's what ClusteringIndexNamesFilter expects).
-        if (cfm.isStaticCompactTable())
-            return BTreeSet.empty(cfm.comparator);
+        if (table.isStaticCompactTable())
+            return BTreeSet.empty(table.comparator);
 
         return clusteringColumnsRestrictions.valuesAsClustering(options);
     }
@@ -774,7 +772,7 @@ public final class StatementRestrictions
         // For static compact tables we want to ignore the fake clustering 
column (note that if we weren't special casing,
         // this would mean a 'SELECT *' on a static compact table would query 
whole partitions, even though we'll only return
         // the static part as far as CQL is concerned. This is thus mostly an 
optimization to use the query-by-name path).
-        int numberOfClusteringColumns = cfm.isStaticCompactTable() ? 0 : 
cfm.clusteringColumns().size();
+        int numberOfClusteringColumns = table.isStaticCompactTable() ? 0 : 
table.clusteringColumns().size();
         // it is a range query if it has at least one the column alias for 
which no relation is defined or is not EQ or IN.
         return clusteringColumnsRestrictions.size() < numberOfClusteringColumns
             || !clusteringColumnsRestrictions.hasOnlyEqualityRestrictions();
@@ -816,7 +814,7 @@ public final class StatementRestrictions
     public boolean hasAllPKColumnsRestrictedByEqualities()
     {
         return !isPartitionKeyRestrictionsOnToken()
-                && 
!partitionKeyRestrictions.hasUnrestrictedPartitionKeyComponents(cfm)
+                && 
!partitionKeyRestrictions.hasUnrestrictedPartitionKeyComponents(table)
                 && (partitionKeyRestrictions.hasOnlyEqualityRestrictions())
                 && !hasUnrestrictedClusteringColumns()
                 && 
(clusteringColumnsRestrictions.hasOnlyEqualityRestrictions());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/TermSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TermSlice.java 
b/src/java/org/apache/cassandra/cql3/restrictions/TermSlice.java
index d6543b8..100fcef 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TermSlice.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TermSlice.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.cql3.restrictions;
 
 import java.util.List;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.cql3.functions.Function;
@@ -155,7 +155,7 @@ final class TermSlice
      * @return <code>true</code> this type of <code>TermSlice</code> is 
supported by the specified index,
      * <code>false</code> otherwise.
      */
-    public boolean isSupportedBy(ColumnDefinition column, Index index)
+    public boolean isSupportedBy(ColumnMetadata column, Index index)
     {
         boolean supported = false;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java 
b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
index 400706b..b2d8509 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
@@ -25,8 +25,8 @@ import com.google.common.collect.ImmutableRangeSet;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.statements.Bound;
@@ -73,7 +73,7 @@ final class TokenFilter implements PartitionKeyRestrictions
     }
 
     @Override
-    public Set<Restriction> getRestrictions(ColumnDefinition columnDef)
+    public Set<Restriction> getRestrictions(ColumnMetadata columnDef)
     {
         Set<Restriction> set = new HashSet<>();
         set.addAll(restrictions.getRestrictions(columnDef));
@@ -243,19 +243,19 @@ final class TokenFilter implements 
PartitionKeyRestrictions
     }
 
     @Override
-    public ColumnDefinition getFirstColumn()
+    public ColumnMetadata getFirstColumn()
     {
         return restrictions.getFirstColumn();
     }
 
     @Override
-    public ColumnDefinition getLastColumn()
+    public ColumnMetadata getLastColumn()
     {
         return restrictions.getLastColumn();
     }
 
     @Override
-    public List<ColumnDefinition> getColumnDefs()
+    public List<ColumnMetadata> getColumnDefs()
     {
         return restrictions.getColumnDefs();
     }
@@ -291,15 +291,15 @@ final class TokenFilter implements 
PartitionKeyRestrictions
     }
 
     @Override
-    public boolean needFiltering(CFMetaData cfm)
+    public boolean needFiltering(TableMetadata table)
     {
-        return restrictions.needFiltering(cfm);
+        return restrictions.needFiltering(table);
     }
 
     @Override
-    public boolean hasUnrestrictedPartitionKeyComponents(CFMetaData cfm)
+    public boolean hasUnrestrictedPartitionKeyComponents(TableMetadata table)
     {
-        return restrictions.hasUnrestrictedPartitionKeyComponents(cfm);
+        return restrictions.hasUnrestrictedPartitionKeyComponents(table);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
index e90319d..7f4b9ff 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@ -22,8 +22,8 @@ import java.util.*;
 
 import com.google.common.base.Joiner;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.cql3.functions.Function;
@@ -42,16 +42,16 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
     /**
      * The definition of the columns to which apply the token restriction.
      */
-    protected final List<ColumnDefinition> columnDefs;
+    protected final List<ColumnMetadata> columnDefs;
 
-    protected final CFMetaData metadata;
+    protected final TableMetadata metadata;
 
     /**
      * Creates a new <code>TokenRestriction</code> that apply to the specified 
columns.
      *
      * @param columnDefs the definition of the columns to which apply the 
token restriction
      */
-    public TokenRestriction(CFMetaData metadata, List<ColumnDefinition> 
columnDefs)
+    public TokenRestriction(TableMetadata metadata, List<ColumnMetadata> 
columnDefs)
     {
         this.columnDefs = columnDefs;
         this.metadata = metadata;
@@ -68,7 +68,7 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
     }
 
     @Override
-    public Set<Restriction> getRestrictions(ColumnDefinition columnDef)
+    public Set<Restriction> getRestrictions(ColumnMetadata columnDef)
     {
         return Collections.singleton(this);
     }
@@ -80,7 +80,7 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
     }
 
     @Override
-    public boolean needFiltering(CFMetaData cfm)
+    public boolean needFiltering(TableMetadata table)
     {
         return false;
     }
@@ -92,25 +92,25 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
     }
 
     @Override
-    public boolean hasUnrestrictedPartitionKeyComponents(CFMetaData cfm)
+    public boolean hasUnrestrictedPartitionKeyComponents(TableMetadata table)
     {
         return false;
     }
 
     @Override
-    public List<ColumnDefinition> getColumnDefs()
+    public List<ColumnMetadata> getColumnDefs()
     {
         return columnDefs;
     }
 
     @Override
-    public ColumnDefinition getFirstColumn()
+    public ColumnMetadata getFirstColumn()
     {
         return columnDefs.get(0);
     }
 
     @Override
-    public ColumnDefinition getLastColumn()
+    public ColumnMetadata getLastColumn()
     {
         return columnDefs.get(columnDefs.size() - 1);
     }
@@ -146,7 +146,7 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
      */
     protected final String getColumnNamesAsString()
     {
-        return Joiner.on(", 
").join(ColumnDefinition.toIdentifiers(columnDefs));
+        return Joiner.on(", ").join(ColumnMetadata.toIdentifiers(columnDefs));
     }
 
     @Override
@@ -176,16 +176,16 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
         if (restriction instanceof PartitionKeyRestrictions)
             return (PartitionKeyRestrictions) restriction;
 
-        return new 
PartitionKeySingleRestrictionSet(metadata.getKeyValidatorAsClusteringComparator()).mergeWith(restriction);
+        return new 
PartitionKeySingleRestrictionSet(metadata.partitionKeyAsClusteringComparator()).mergeWith(restriction);
     }
 
     public static final class EQRestriction extends TokenRestriction
     {
         private final Term value;
 
-        public EQRestriction(CFMetaData cfm, List<ColumnDefinition> 
columnDefs, Term value)
+        public EQRestriction(TableMetadata table, List<ColumnMetadata> 
columnDefs, Term value)
         {
-            super(cfm, columnDefs);
+            super(table, columnDefs);
             this.value = value;
         }
 
@@ -199,7 +199,7 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
         protected PartitionKeyRestrictions doMergeWith(TokenRestriction 
otherRestriction) throws InvalidRequestException
         {
             throw invalidRequest("%s cannot be restricted by more than one 
relation if it includes an Equal",
-                                 Joiner.on(", 
").join(ColumnDefinition.toIdentifiers(columnDefs)));
+                                 Joiner.on(", 
").join(ColumnMetadata.toIdentifiers(columnDefs)));
         }
 
         @Override
@@ -231,9 +231,9 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
     {
         private final TermSlice slice;
 
-        public SliceRestriction(CFMetaData cfm, List<ColumnDefinition> 
columnDefs, Bound bound, boolean inclusive, Term term)
+        public SliceRestriction(TableMetadata table, List<ColumnMetadata> 
columnDefs, Bound bound, boolean inclusive, Term term)
         {
-            super(cfm, columnDefs);
+            super(table, columnDefs);
             slice = TermSlice.newInstance(bound, inclusive, term);
         }
 
@@ -291,7 +291,7 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
                 throw invalidRequest("More than one restriction was found for 
the end bound on %s",
                                      getColumnNamesAsString());
 
-            return new SliceRestriction(metadata, columnDefs,  
slice.merge(otherSlice.slice));
+            return new SliceRestriction(metadata, columnDefs, 
slice.merge(otherSlice.slice));
         }
 
         @Override
@@ -299,9 +299,9 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
         {
             return String.format("SLICE%s", slice);
         }
-        private SliceRestriction(CFMetaData cfm, List<ColumnDefinition> 
columnDefs, TermSlice slice)
+        private SliceRestriction(TableMetadata table, List<ColumnMetadata> 
columnDefs, TermSlice slice)
         {
-            super(cfm, columnDefs);
+            super(table, columnDefs);
             this.slice = slice;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
index 498cf0f..115729a 100644
--- a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
@@ -22,7 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
@@ -70,7 +70,7 @@ abstract class AbstractFunctionSelector<T extends Function> 
extends Selector
                 if (tmpMapping.getMappings().get(resultsColumn).isEmpty())
                     // add a null mapping for cases where there are no
                     // further selectors, such as no-arg functions and count
-                    mapping.addMapping(resultsColumn, (ColumnDefinition)null);
+                    mapping.addMapping(resultsColumn, (ColumnMetadata)null);
                 else
                     // collate the mapped columns from the child factories & 
add those
                     mapping.addMapping(resultsColumn, 
tmpMapping.getMappings().values());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/CollectionFactory.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/selection/CollectionFactory.java 
b/src/java/org/apache/cassandra/cql3/selection/CollectionFactory.java
index de5fd93..88885dd 100644
--- a/src/java/org/apache/cassandra/cql3/selection/CollectionFactory.java
+++ b/src/java/org/apache/cassandra/cql3/selection/CollectionFactory.java
@@ -19,11 +19,11 @@ package org.apache.cassandra.cql3.selection;
 
 import java.util.List;
 
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.selection.Selector.Factory;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.ColumnMetadata;
 
 /**
  * A base <code>Selector.Factory</code> for collections or tuples.
@@ -83,7 +83,7 @@ abstract class CollectionFactory extends Factory
 
         if (tmpMapping.getMappings().get(resultsColumn).isEmpty())
             // add a null mapping for cases where the collection is empty
-            mapping.addMapping(resultsColumn, (ColumnDefinition)null);
+            mapping.addMapping(resultsColumn, (ColumnMetadata)null);
         else
             // collate the mapped columns from the child factories & add those
             mapping.addMapping(resultsColumn, 
tmpMapping.getMappings().values());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/ListSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/ListSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/ListSelector.java
index 940bd9c..b540ec9 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ListSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ListSelector.java
@@ -21,8 +21,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.Lists;
 import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/MapSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/MapSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/MapSelector.java
index 8bbae8c..97dfc30 100644
--- a/src/java/org/apache/cassandra/cql3/selection/MapSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/MapSelector.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.Maps;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -33,6 +32,7 @@ import 
org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.Pair;
@@ -78,7 +78,7 @@ final class MapSelector extends Selector
 
                 if (tmpMapping.getMappings().get(resultsColumn).isEmpty())
                     // add a null mapping for cases where the collection is 
empty
-                    mapping.addMapping(resultsColumn, (ColumnDefinition)null);
+                    mapping.addMapping(resultsColumn, (ColumnMetadata)null);
                 else
                     // collate the mapped columns from the child factories & 
add those
                     mapping.addMapping(resultsColumn, 
tmpMapping.getMappings().values());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/RawSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/RawSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/RawSelector.java
index 7d5543f..fbf7c30 100644
--- a/src/java/org/apache/cassandra/cql3/selection/RawSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/RawSelector.java
@@ -20,11 +20,11 @@ package org.apache.cassandra.cql3.selection;
 
 import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
 
 public class RawSelector
 {
@@ -43,15 +43,9 @@ public class RawSelector
      * @param raws the <code>RawSelector</code>s to converts.
      * @return a list of <code>Selectable</code>s
      */
-    public static List<Selectable> toSelectables(List<RawSelector> raws, final 
CFMetaData cfm)
+    public static List<Selectable> toSelectables(List<RawSelector> raws, final 
TableMetadata table)
     {
-        return Lists.transform(raws, new Function<RawSelector, Selectable>()
-        {
-            public Selectable apply(RawSelector raw)
-            {
-                return raw.selectable.prepare(cfm);
-            }
-        });
+        return Lists.transform(raws, raw -> raw.selectable.prepare(table));
     }
 
     public boolean processesSelection()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/Selectable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selectable.java 
b/src/java/org/apache/cassandra/cql3/selection/Selectable.java
index 20719ea..b2526a5 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selectable.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selectable.java
@@ -23,13 +23,13 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.text.StrBuilder;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.cql3.selection.Selector.Factory;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.Pair;
 
 import static 
org.apache.cassandra.cql3.selection.SelectorFactories.createFactoriesAndCollectColumnDefinitions;
@@ -37,7 +37,7 @@ import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidReq
 
 public interface Selectable extends AssignmentTestable
 {
-    public Selector.Factory newSelectorFactory(CFMetaData cfm, AbstractType<?> 
expectedType, List<ColumnDefinition> defs, VariableSpecifications boundNames);
+    public Selector.Factory newSelectorFactory(TableMetadata table, 
AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications 
boundNames);
 
     /**
      * The type of the {@code Selectable} if it can be infered.
@@ -58,7 +58,7 @@ public interface Selectable extends AssignmentTestable
         return type == null ? TestResult.NOT_ASSIGNABLE : 
type.testAssignment(keyspace, receiver);
     }
 
-    default int addAndGetIndex(ColumnDefinition def, List<ColumnDefinition> l)
+    default int addAndGetIndex(ColumnMetadata def, List<ColumnMetadata> l)
     {
         int idx = l.indexOf(def);
         if (idx < 0)
@@ -71,7 +71,7 @@ public interface Selectable extends AssignmentTestable
 
     public static abstract class Raw
     {
-        public abstract Selectable prepare(CFMetaData cfm);
+        public abstract Selectable prepare(TableMetadata table);
 
         /**
          * Returns true if any processing is performed on the selected column.
@@ -111,7 +111,7 @@ public interface Selectable extends AssignmentTestable
             return rawTerm.testAssignment(keyspace, receiver);
         }
 
-        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
AbstractType<?> expectedType, List<ColumnDefinition> defs, 
VariableSpecifications boundNames) throws InvalidRequestException
+        public Selector.Factory newSelectorFactory(TableMetadata table, 
AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications 
boundNames) throws InvalidRequestException
         {
             /*
              * expectedType will be null if we have no constraint on what the 
type should be. For instance, if this term is a bind marker:
@@ -133,7 +133,7 @@ public interface Selectable extends AssignmentTestable
              * Lastly, note that if the term is a terminal literal, we don't 
have to check it's compatibility with 'expectedType' as any incompatibility
              * would have been found at preparation time.
              */
-            AbstractType<?> type = getExactTypeIfKnown(cfm.ksName);
+            AbstractType<?> type = getExactTypeIfKnown(table.keyspace);
             if (type == null)
             {
                 type = expectedType;
@@ -145,7 +145,7 @@ public interface Selectable extends AssignmentTestable
             // selection will have this name. Which isn't terribly helpful, 
but it's unclear how to provide
             // something a lot more helpful and in practice user can bind 
those markers by position or, even better,
             // use bind markers.
-            Term term = rawTerm.prepare(cfm.ksName, new 
ColumnSpecification(cfm.ksName, cfm.cfName, bindMarkerNameInSelection, type));
+            Term term = rawTerm.prepare(table.keyspace, new 
ColumnSpecification(table.keyspace, table.name, bindMarkerNameInSelection, 
type));
             term.collectMarkerSpecification(boundNames);
             return TermSelector.newFactory(rawTerm.getText(), term, type);
         }
@@ -171,7 +171,7 @@ public interface Selectable extends AssignmentTestable
                 this.term = term;
             }
 
-            public Selectable prepare(CFMetaData cfm)
+            public Selectable prepare(TableMetadata table)
             {
                 return new WithTerm(term);
             }
@@ -180,10 +180,10 @@ public interface Selectable extends AssignmentTestable
 
     public static class WritetimeOrTTL implements Selectable
     {
-        public final ColumnDefinition column;
+        public final ColumnMetadata column;
         public final boolean isWritetime;
 
-        public WritetimeOrTTL(ColumnDefinition column, boolean isWritetime)
+        public WritetimeOrTTL(ColumnMetadata column, boolean isWritetime)
         {
             this.column = column;
             this.isWritetime = isWritetime;
@@ -195,9 +195,9 @@ public interface Selectable extends AssignmentTestable
             return (isWritetime ? "writetime" : "ttl") + "(" + column.name + 
")";
         }
 
-        public Selector.Factory newSelectorFactory(CFMetaData cfm,
+        public Selector.Factory newSelectorFactory(TableMetadata table,
                                                    AbstractType<?> 
expectedType,
-                                                   List<ColumnDefinition> defs,
+                                                   List<ColumnMetadata> defs,
                                                    VariableSpecifications 
boundNames)
         {
             if (column.isPrimaryKeyColumn())
@@ -219,18 +219,18 @@ public interface Selectable extends AssignmentTestable
 
         public static class Raw extends Selectable.Raw
         {
-            private final ColumnDefinition.Raw id;
+            private final ColumnMetadata.Raw id;
             private final boolean isWritetime;
 
-            public Raw(ColumnDefinition.Raw id, boolean isWritetime)
+            public Raw(ColumnMetadata.Raw id, boolean isWritetime)
             {
                 this.id = id;
                 this.isWritetime = isWritetime;
             }
 
-            public WritetimeOrTTL prepare(CFMetaData cfm)
+            public WritetimeOrTTL prepare(TableMetadata table)
             {
-                return new WritetimeOrTTL(id.prepare(cfm), isWritetime);
+                return new WritetimeOrTTL(id.prepare(table), isWritetime);
             }
         }
     }
@@ -252,9 +252,9 @@ public interface Selectable extends AssignmentTestable
             return 
function.columnName(args.stream().map(Object::toString).collect(Collectors.toList()));
         }
 
-        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
AbstractType<?> expectedType, List<ColumnDefinition> defs, 
VariableSpecifications boundNames)
+        public Selector.Factory newSelectorFactory(TableMetadata table, 
AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications 
boundNames)
         {
-            SelectorFactories factories = 
SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, 
function.argTypes(), cfm, defs, boundNames);
+            SelectorFactories factories = 
SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, 
function.argTypes(), table, defs, boundNames);
             return AbstractFunctionSelector.newFactory(function, factories);
         }
 
@@ -292,11 +292,11 @@ public interface Selectable extends AssignmentTestable
                                Collections.singletonList(arg));
             }
 
-            public Selectable prepare(CFMetaData cfm)
+            public Selectable prepare(TableMetadata table)
             {
                 List<Selectable> preparedArgs = new ArrayList<>(args.size());
                 for (Selectable.Raw arg : args)
-                    preparedArgs.add(arg.prepare(cfm));
+                    preparedArgs.add(arg.prepare(table));
 
                 FunctionName name = functionName;
                 // We need to circumvent the normal function lookup process 
for toJson() because instances of the function
@@ -319,7 +319,7 @@ public interface Selectable extends AssignmentTestable
                     preparedArgs = Collections.emptyList();
                 }
 
-                Function fun = FunctionResolver.get(cfm.ksName, name, 
preparedArgs, cfm.ksName, cfm.cfName, null);
+                Function fun = FunctionResolver.get(table.keyspace, name, 
preparedArgs, table.keyspace, table.name, null);
 
                 if (fun == null)
                     throw new InvalidRequestException(String.format("Unknown 
function '%s'", functionName));
@@ -351,9 +351,9 @@ public interface Selectable extends AssignmentTestable
                                    .toString();
         }
 
-        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
AbstractType<?> expectedType, List<ColumnDefinition> defs, 
VariableSpecifications boundNames)
+        public Selector.Factory newSelectorFactory(TableMetadata table, 
AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications 
boundNames)
         {
-            SelectorFactories factories = 
SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, null, cfm, 
defs, boundNames);
+            SelectorFactories factories = 
SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, null, table, 
defs, boundNames);
             Function fun = ToJsonFct.getInstance(factories.getReturnTypes());
             return AbstractFunctionSelector.newFactory(fun, factories);
         }
@@ -381,10 +381,10 @@ public interface Selectable extends AssignmentTestable
             return String.format("cast(%s as %s)", arg, 
type.toString().toLowerCase());
         }
 
-        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
AbstractType<?> expectedType, List<ColumnDefinition> defs, 
VariableSpecifications boundNames)
+        public Selector.Factory newSelectorFactory(TableMetadata table, 
AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications 
boundNames)
         {
             List<Selectable> args = Collections.singletonList(arg);
-            SelectorFactories factories = 
SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, null, cfm, 
defs, boundNames);
+            SelectorFactories factories = 
SelectorFactories.createFactoriesAndCollectColumnDefinitions(args, null, table, 
defs, boundNames);
 
             Selector.Factory factory = factories.get(0);
 
@@ -393,7 +393,7 @@ public interface Selectable extends AssignmentTestable
                 return factory;
 
             FunctionName name = 
FunctionName.nativeFunction(CastFcts.getFunctionName(type));
-            Function fun = FunctionResolver.get(cfm.ksName, name, args, 
cfm.ksName, cfm.cfName, null);
+            Function fun = FunctionResolver.get(table.keyspace, name, args, 
table.keyspace, table.name, null);
 
             if (fun == null)
             {
@@ -420,9 +420,9 @@ public interface Selectable extends AssignmentTestable
                 this.type = type;
             }
 
-            public WithCast prepare(CFMetaData cfm)
+            public WithCast prepare(TableMetadata table)
             {
-                return new WithCast(arg.prepare(cfm), type);
+                return new WithCast(arg.prepare(table), type);
             }
         }
     }
@@ -444,7 +444,7 @@ public interface Selectable extends AssignmentTestable
             return String.format("%s.%s", selected, field);
         }
 
-        public Selector.Factory newSelectorFactory(CFMetaData cfm, 
AbstractType<?> expectedType, List<ColumnDefinition> defs, 
VariableSpecifications boundNames)
+        public Selector.Factory newSelectorFactory(TableMetadata table, 
AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications 
boundNames)
         {
             AbstractType<?> expectedUdtType = null;
 
@@ -452,11 +452,11 @@ public interface Selectable extends AssignmentTestable
             if (selected instanceof BetweenParenthesesOrWithTuple)
             {
                 BetweenParenthesesOrWithTuple betweenParentheses = 
(BetweenParenthesesOrWithTuple) selected;
-                expectedUdtType = 
betweenParentheses.selectables.get(0).getExactTypeIfKnown(cfm.ksName);
+                expectedUdtType = 
betweenParentheses.selectables.get(0).getExactTypeIfKnown(table.keyspace);
             }
 
-            Selector.Factory factory = selected.newSelectorFactory(cfm, 
expectedUdtType, defs, boundNames);
-            AbstractType<?> type = factory.getColumnSpecification(cfm).type;
+            Selector.Factory factory = selected.newSelectorFactory(table, 
expectedUdtType, defs, boundNames);
+            AbstractType<?> type = factory.getColumnSpecification(table).type;
             if (!type.isUDT())
             {
                 throw new InvalidRequestException(
@@ -501,9 +501,9 @@ public interface Selectable extends AssignmentTestable
                 this.field = field;
             }
 
-            public WithFieldSelection prepare(CFMetaData cfm)
+            public WithFieldSelection prepare(TableMetadata table)
             {
-                return new WithFieldSelection(selected.prepare(cfm), field);
+                return new WithFieldSelection(selected.prepare(table), field);
             }
         }
     }
@@ -535,12 +535,12 @@ public interface Selectable extends AssignmentTestable
         }
 
         @Override
-        public Factory newSelectorFactory(CFMetaData cfm,
+        public Factory newSelectorFactory(TableMetadata cfm,
                                           AbstractType<?> expectedType,
-                                          List<ColumnDefinition> defs,
+                                          List<ColumnMetadata> defs,
                                           VariableSpecifications boundNames)
         {
-            AbstractType<?> type = getExactTypeIfKnown(cfm.ksName);
+            AbstractType<?> type = getExactTypeIfKnown(cfm.keyspace);
             if (type == null)
             {
                 type = expectedType;
@@ -555,9 +555,9 @@ public interface Selectable extends AssignmentTestable
             return newTupleSelectorFactory(cfm, (TupleType) type, defs, 
boundNames);
         }
 
-        private Factory newBetweenParenthesesSelectorFactory(CFMetaData cfm,
+        private Factory newBetweenParenthesesSelectorFactory(TableMetadata cfm,
                                                              AbstractType<?> 
expectedType,
-                                                             
List<ColumnDefinition> defs,
+                                                             
List<ColumnMetadata> defs,
                                                              
VariableSpecifications boundNames)
         {
             Selectable selectable = selectables.get(0);
@@ -577,9 +577,9 @@ public interface Selectable extends AssignmentTestable
             };
         }
 
-        private Factory newTupleSelectorFactory(CFMetaData cfm,
+        private Factory newTupleSelectorFactory(TableMetadata cfm,
                                                 TupleType tupleType,
-                                                List<ColumnDefinition> defs,
+                                                List<ColumnMetadata> defs,
                                                 VariableSpecifications 
boundNames)
         {
             SelectorFactories factories = 
createFactoriesAndCollectColumnDefinitions(selectables,
@@ -617,7 +617,7 @@ public interface Selectable extends AssignmentTestable
                 this.raws = raws;
             }
 
-            public Selectable prepare(CFMetaData cfm)
+            public Selectable prepare(TableMetadata cfm)
             {
                 return new BetweenParenthesesOrWithTuple(raws.stream().map(p 
-> p.prepare(cfm)).collect(Collectors.toList()));
             }
@@ -646,12 +646,12 @@ public interface Selectable extends AssignmentTestable
         }
 
         @Override
-        public Factory newSelectorFactory(CFMetaData cfm,
+        public Factory newSelectorFactory(TableMetadata cfm,
                                           AbstractType<?> expectedType,
-                                          List<ColumnDefinition> defs,
+                                          List<ColumnMetadata> defs,
                                           VariableSpecifications boundNames)
         {
-            AbstractType<?> type = getExactTypeIfKnown(cfm.ksName);
+            AbstractType<?> type = getExactTypeIfKnown(cfm.keyspace);
             if (type == null)
             {
                 type = expectedType;
@@ -695,7 +695,7 @@ public interface Selectable extends AssignmentTestable
                 this.raws = raws;
             }
 
-            public Selectable prepare(CFMetaData cfm)
+            public Selectable prepare(TableMetadata cfm)
             {
                 return new WithList(raws.stream().map(p -> 
p.prepare(cfm)).collect(Collectors.toList()));
             }
@@ -724,12 +724,12 @@ public interface Selectable extends AssignmentTestable
         }
 
         @Override
-        public Factory newSelectorFactory(CFMetaData cfm,
+        public Factory newSelectorFactory(TableMetadata cfm,
                                           AbstractType<?> expectedType,
-                                          List<ColumnDefinition> defs,
+                                          List<ColumnMetadata> defs,
                                           VariableSpecifications boundNames)
         {
-            AbstractType<?> type = getExactTypeIfKnown(cfm.ksName);
+            AbstractType<?> type = getExactTypeIfKnown(cfm.keyspace);
             if (type == null)
             {
                 type = expectedType;
@@ -781,7 +781,7 @@ public interface Selectable extends AssignmentTestable
                 this.raws = raws;
             }
 
-            public Selectable prepare(CFMetaData cfm)
+            public Selectable prepare(TableMetadata cfm)
             {
                 return new WithSet(raws.stream().map(p -> 
p.prepare(cfm)).collect(Collectors.toList()));
             }
@@ -791,7 +791,7 @@ public interface Selectable extends AssignmentTestable
     /**
      * {@code Selectable} for literal Maps or UDTs.
      * <p>The parser cannot differentiate between a Map or a UDT in the 
selection cause because a
-     * {@code ColumnDefinition} is equivalent to a {@code FieldIdentifier} 
from a syntax point of view.
+     * {@code ColumnMetadata} is equivalent to a {@code FieldIdentifier} from 
a syntax point of view.
      * By consequence, we are forced to wait until the type is known to be 
able to differentiate them.</p>
      */
     public static class WithMapOrUdt implements Selectable
@@ -800,14 +800,14 @@ public interface Selectable extends AssignmentTestable
          * The column family metadata. We need to store them to be able to 
build the proper data once the type has been
          * identified.
          */
-        private final CFMetaData cfm;
+        private final TableMetadata cfm;
 
         /**
          * The Map or UDT raw elements.
          */
         private final List<Pair<Selectable.Raw, Selectable.Raw>> raws;
 
-        public WithMapOrUdt(CFMetaData cfm, List<Pair<Selectable.Raw, 
Selectable.Raw>> raws)
+        public WithMapOrUdt(TableMetadata cfm, List<Pair<Selectable.Raw, 
Selectable.Raw>> raws)
         {
             this.cfm = cfm;
             this.raws = raws;
@@ -821,12 +821,12 @@ public interface Selectable extends AssignmentTestable
         }
 
         @Override
-        public Factory newSelectorFactory(CFMetaData cfm,
+        public Factory newSelectorFactory(TableMetadata cfm,
                                           AbstractType<?> expectedType,
-                                          List<ColumnDefinition> defs,
+                                          List<ColumnMetadata> defs,
                                           VariableSpecifications boundNames)
         {
-            AbstractType<?> type = getExactTypeIfKnown(cfm.ksName);
+            AbstractType<?> type = getExactTypeIfKnown(cfm.keyspace);
             if (type == null)
             {
                 type = expectedType;
@@ -841,8 +841,8 @@ public interface Selectable extends AssignmentTestable
             return newMapSelectorFactory(cfm, defs, boundNames, type);
         }
 
-        private Factory newMapSelectorFactory(CFMetaData cfm,
-                                              List<ColumnDefinition> defs,
+        private Factory newMapSelectorFactory(TableMetadata cfm,
+                                              List<ColumnMetadata> defs,
                                               VariableSpecifications 
boundNames,
                                               AbstractType<?> type)
         {
@@ -857,9 +857,9 @@ public interface Selectable extends AssignmentTestable
                                                                   
.collect(Collectors.toList()));
         }
 
-        private Factory newUdtSelectorFactory(CFMetaData cfm,
+        private Factory newUdtSelectorFactory(TableMetadata cfm,
                                               AbstractType<?> expectedType,
-                                              List<ColumnDefinition> defs,
+                                              List<ColumnMetadata> defs,
                                               VariableSpecifications 
boundNames)
         {
             UserType ut = (UserType) expectedType;
@@ -905,7 +905,7 @@ public interface Selectable extends AssignmentTestable
                        .collect(Collectors.joining(", ", "{", "}"));
         }
 
-        private List<Pair<Selectable, Selectable>> getMapEntries(CFMetaData 
cfm)
+        private List<Pair<Selectable, Selectable>> getMapEntries(TableMetadata 
cfm)
         {
             return raws.stream()
                        .map(p -> Pair.create(p.left.prepare(cfm), 
p.right.prepare(cfm)))
@@ -946,7 +946,7 @@ public interface Selectable extends AssignmentTestable
                 this.raws = raws;
             }
 
-            public Selectable prepare(CFMetaData cfm)
+            public Selectable prepare(TableMetadata cfm)
             {
                 return new WithMapOrUdt(cfm, raws);
             }
@@ -993,14 +993,14 @@ public interface Selectable extends AssignmentTestable
         }
 
         @Override
-        public Factory newSelectorFactory(CFMetaData cfm,
+        public Factory newSelectorFactory(TableMetadata cfm,
                                           AbstractType<?> expectedType,
-                                          List<ColumnDefinition> defs,
+                                          List<ColumnMetadata> defs,
                                           VariableSpecifications boundNames)
         {
-            final ColumnSpecification receiver = new 
ColumnSpecification(cfm.ksName, cfm.cfName, new ColumnIdentifier(toString(), 
true), type);
+            final ColumnSpecification receiver = new 
ColumnSpecification(cfm.keyspace, cfm.name, new ColumnIdentifier(toString(), 
true), type);
 
-            if (!selectable.testAssignment(cfm.ksName, 
receiver).isAssignable())
+            if (!selectable.testAssignment(cfm.keyspace, 
receiver).isAssignable())
                 throw new InvalidRequestException(String.format("Cannot assign 
value %s to %s of type %s", this, receiver.name, receiver.type.asCQL3Type()));
 
             final Factory factory = selectable.newSelectorFactory(cfm, type, 
defs, boundNames);
@@ -1048,10 +1048,10 @@ public interface Selectable extends AssignmentTestable
                 this.raw = raw;
             }
 
-            public Selectable prepare(CFMetaData cfm)
+            public Selectable prepare(TableMetadata cfm)
             {
                 Selectable selectable = raw.prepare(cfm);
-                AbstractType<?> type = 
this.typeRaw.prepare(cfm.ksName).getType();
+                AbstractType<?> type = 
this.typeRaw.prepare(cfm.keyspace).getType();
                 if (type.isFreezable())
                     type = type.freeze();
                 return new WithTypeHint(typeRaw.toString(), type, selectable);
@@ -1062,7 +1062,7 @@ public interface Selectable extends AssignmentTestable
     /**
      * In the selection clause, the parser cannot differentiate between Maps 
and UDTs as a column identifier and field
      * identifier have the same syntax. By consequence, we need to wait until 
the type is known to create the proper
-     * Object: {@code ColumnDefinition} or {@code FieldIdentifier}.
+     * Object: {@code ColumnMetadata} or {@code FieldIdentifier}.
      */
     public static final class RawIdentifier extends Selectable.Raw
     {
@@ -1093,10 +1093,10 @@ public interface Selectable extends AssignmentTestable
         }
 
         @Override
-        public Selectable prepare(CFMetaData cfm)
+        public Selectable prepare(TableMetadata cfm)
         {
-            ColumnDefinition.Raw raw = quoted ? 
ColumnDefinition.Raw.forQuoted(text)
-                                              : 
ColumnDefinition.Raw.forUnquoted(text);
+            ColumnMetadata.Raw raw = quoted ? 
ColumnMetadata.Raw.forQuoted(text)
+                                            : 
ColumnMetadata.Raw.forUnquoted(text);
             return raw.prepare(cfm);
         }
 

Reply via email to