http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 14903ba..bf65e96 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -25,25 +25,22 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Objects;
-import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.*;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -83,21 +80,21 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
         return new CQLFilter(new ArrayList<>(capacity));
     }
 
-    public SimpleExpression add(ColumnDefinition def, Operator op, ByteBuffer 
value)
+    public SimpleExpression add(ColumnMetadata def, Operator op, ByteBuffer 
value)
     {
         SimpleExpression expression = new SimpleExpression(def, op, value);
         add(expression);
         return expression;
     }
 
-    public void addMapEquality(ColumnDefinition def, ByteBuffer key, Operator 
op, ByteBuffer value)
+    public void addMapEquality(ColumnMetadata def, ByteBuffer key, Operator 
op, ByteBuffer value)
     {
         add(new MapEqualityExpression(def, key, op, value));
     }
 
-    public void addCustomIndexExpression(CFMetaData cfm, IndexMetadata 
targetIndex, ByteBuffer value)
+    public void addCustomIndexExpression(TableMetadata metadata, IndexMetadata 
targetIndex, ByteBuffer value)
     {
-        add(new CustomExpression(cfm, targetIndex, value));
+        add(new CustomExpression(metadata, targetIndex, value));
     }
 
     private void add(Expression expression)
@@ -135,7 +132,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
      * @param nowInSec the current time in seconds (to know what is live and 
what isn't).
      * @return {@code true} if {@code row} in partition {@code partitionKey} 
satisfies this row filter.
      */
-    public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row, int nowInSec)
+    public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey 
partitionKey, Row row, int nowInSec)
     {
         // We purge all tombstones as the expressions isSatisfiedBy methods 
expects it
         Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
@@ -249,7 +246,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
             if (expressions.isEmpty())
                 return iter;
 
-            final CFMetaData metadata = iter.metadata();
+            final TableMetadata metadata = iter.metadata();
 
             List<Expression> partitionLevelExpressions = new ArrayList<>();
             List<Expression> rowLevelExpressions = new ArrayList<>();
@@ -323,11 +320,11 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
         protected enum Kind { SIMPLE, MAP_EQUALITY, UNUSED1, CUSTOM, USER }
 
         protected abstract Kind kind();
-        protected final ColumnDefinition column;
+        protected final ColumnMetadata column;
         protected final Operator operator;
         protected final ByteBuffer value;
 
-        protected Expression(ColumnDefinition column, Operator operator, 
ByteBuffer value)
+        protected Expression(ColumnMetadata column, Operator operator, 
ByteBuffer value)
         {
             this.column = column;
             this.operator = operator;
@@ -344,7 +341,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
             return kind() == Kind.USER;
         }
 
-        public ColumnDefinition column()
+        public ColumnMetadata column()
         {
             return column;
         }
@@ -401,19 +398,21 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
         /**
          * Returns whether the provided row satisfied this expression or not.
          *
+         *
+         * @param metadata
          * @param partitionKey the partition key for row to check.
          * @param row the row to check. It should *not* contain deleted cells
          * (i.e. it should come from a RowIterator).
          * @return whether the row is satisfied by this expression.
          */
-        public abstract boolean isSatisfiedBy(CFMetaData metadata, 
DecoratedKey partitionKey, Row row);
+        public abstract boolean isSatisfiedBy(TableMetadata metadata, 
DecoratedKey partitionKey, Row row);
 
-        protected ByteBuffer getValue(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
+        protected ByteBuffer getValue(TableMetadata metadata, DecoratedKey 
partitionKey, Row row)
         {
             switch (column.kind)
             {
                 case PARTITION_KEY:
-                    return metadata.getKeyValidator() instanceof CompositeType
+                    return metadata.partitionKeyType instanceof CompositeType
                          ? 
CompositeType.extractComponent(partitionKey.getKey(), column.position())
                          : partitionKey.getKey();
                 case CLUSTERING:
@@ -484,7 +483,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
                 }
             }
 
-            public Expression deserialize(DataInputPlus in, int version, 
CFMetaData metadata) throws IOException
+            public Expression deserialize(DataInputPlus in, int version, 
TableMetadata metadata) throws IOException
             {
                 Kind kind = Kind.values()[in.readByte()];
 
@@ -501,7 +500,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
 
                 ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
                 Operator operator = Operator.readFrom(in);
-                ColumnDefinition column = metadata.getColumnDefinition(name);
+                ColumnMetadata column = metadata.getColumn(name);
 
                 if (!metadata.isCompactTable() && column == null)
                     throw new RuntimeException("Unknown (or dropped) column " 
+ UTF8Type.instance.getString(name) + " during deserialization");
@@ -556,12 +555,12 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
      */
     public static class SimpleExpression extends Expression
     {
-        SimpleExpression(ColumnDefinition column, Operator operator, 
ByteBuffer value)
+        SimpleExpression(ColumnMetadata column, Operator operator, ByteBuffer 
value)
         {
             super(column, operator, value);
         }
 
-        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
+        public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey 
partitionKey, Row row)
         {
             // We support null conditions for LWT (in ColumnCondition) but not 
for RowFilter.
             // TODO: we should try to merge both code someday.
@@ -707,7 +706,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
     {
         private final ByteBuffer key;
 
-        public MapEqualityExpression(ColumnDefinition column, ByteBuffer key, 
Operator operator, ByteBuffer value)
+        public MapEqualityExpression(ColumnMetadata column, ByteBuffer key, 
Operator operator, ByteBuffer value)
         {
             super(column, operator, value);
             assert column.type instanceof MapType && operator == Operator.EQ;
@@ -729,7 +728,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
             return CompositeType.build(key, value);
         }
 
-        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
+        public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey 
partitionKey, Row row)
         {
             assert key != null;
             // We support null conditions for LWT (in ColumnCondition) but not 
for RowFilter.
@@ -800,21 +799,21 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
     public static final class CustomExpression extends Expression
     {
         private final IndexMetadata targetIndex;
-        private final CFMetaData cfm;
+        private final TableMetadata table;
 
-        public CustomExpression(CFMetaData cfm, IndexMetadata targetIndex, 
ByteBuffer value)
+        public CustomExpression(TableMetadata table, IndexMetadata 
targetIndex, ByteBuffer value)
         {
             // The operator is not relevant, but Expression requires it so for 
now we just hardcode EQ
-            super(makeDefinition(cfm, targetIndex), Operator.EQ, value);
+            super(makeDefinition(table, targetIndex), Operator.EQ, value);
             this.targetIndex = targetIndex;
-            this.cfm = cfm;
+            this.table = table;
         }
 
-        private static ColumnDefinition makeDefinition(CFMetaData cfm, 
IndexMetadata index)
+        private static ColumnMetadata makeDefinition(TableMetadata table, 
IndexMetadata index)
         {
             // Similarly to how we handle non-defined columns in thift, we 
create a fake column definition to
             // represent the target index. This is definitely something that 
can be improved though.
-            return ColumnDefinition.regularDef(cfm, 
ByteBuffer.wrap(index.name.getBytes()), BytesType.instance);
+            return ColumnMetadata.regularColumn(table, 
ByteBuffer.wrap(index.name.getBytes()), BytesType.instance);
         }
 
         public IndexMetadata getTargetIndex()
@@ -831,7 +830,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
         {
             return String.format("expr(%s, %s)",
                                  targetIndex.name,
-                                 Keyspace.openAndGetStore(cfm)
+                                 Keyspace.openAndGetStore(table)
                                          .indexManager
                                          .getIndex(targetIndex)
                                          .customExpressionValueType());
@@ -843,7 +842,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
         }
 
         // Filtering by custom expressions isn't supported yet, so just accept 
any row
-        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row)
+        public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey 
partitionKey, Row row)
         {
             return true;
         }
@@ -900,7 +899,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
         {
             protected abstract UserExpression deserialize(DataInputPlus in,
                                                           int version,
-                                                          CFMetaData metadata) 
throws IOException;
+                                                          TableMetadata 
metadata) throws IOException;
         }
 
         public static void register(Class<? extends UserExpression> 
expressionClass, Deserializer deserializer)
@@ -908,7 +907,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
             deserializers.registerUserExpressionClass(expressionClass, 
deserializer);
         }
 
-        private static UserExpression deserialize(DataInputPlus in, int 
version, CFMetaData metadata) throws IOException
+        private static UserExpression deserialize(DataInputPlus in, int 
version, TableMetadata metadata) throws IOException
         {
             int id = in.readInt();
             Deserializer deserializer = deserializers.getDeserializer(id);
@@ -929,7 +928,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
             return 4 + expression.serializedSize(version);
         }
 
-        protected UserExpression(ColumnDefinition column, Operator operator, 
ByteBuffer value)
+        protected UserExpression(ColumnMetadata column, Operator operator, 
ByteBuffer value)
         {
             super(column, operator, value);
         }
@@ -954,7 +953,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
 
         }
 
-        public RowFilter deserialize(DataInputPlus in, int version, CFMetaData 
metadata) throws IOException
+        public RowFilter deserialize(DataInputPlus in, int version, 
TableMetadata metadata) throws IOException
         {
             in.readBoolean(); // Unused
             int size = (int)in.readUnsignedVInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java 
b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
index 98b539e..982ee1f 100644
--- 
a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
+++ 
b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
@@ -20,19 +20,19 @@ package org.apache.cassandra.db.filter;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 
 public class TombstoneOverwhelmingException extends RuntimeException
 {
-    public TombstoneOverwhelmingException(int numTombstones, String query, 
CFMetaData metadata, DecoratedKey lastPartitionKey, ClusteringPrefix 
lastClustering)
+    public TombstoneOverwhelmingException(int numTombstones, String query, 
TableMetadata metadata, DecoratedKey lastPartitionKey, ClusteringPrefix 
lastClustering)
     {
         super(String.format("Scanned over %d tombstones during query '%s' 
(last scanned row partion key was (%s)); query aborted",
                             numTombstones, query, makePKString(metadata, 
lastPartitionKey.getKey(), lastClustering)));
     }
 
-    private static String makePKString(CFMetaData metadata, ByteBuffer 
partitionKey, ClusteringPrefix clustering)
+    private static String makePKString(TableMetadata metadata, ByteBuffer 
partitionKey, ClusteringPrefix clustering)
     {
         StringBuilder sb = new StringBuilder();
 
@@ -40,7 +40,7 @@ public class TombstoneOverwhelmingException extends 
RuntimeException
             sb.append("(");
 
         // TODO: We should probably make that a lot easier/transparent for 
partition keys
-        AbstractType<?> pkType = metadata.getKeyValidator();
+        AbstractType<?> pkType = metadata.partitionKeyType;
         if (pkType instanceof CompositeType)
         {
             CompositeType ct = (CompositeType)pkType;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index f1c2c44..89b82a1 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -29,7 +29,8 @@ import com.google.common.collect.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.OperationType;
@@ -539,9 +540,9 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
         return 
LogTransaction.removeUnfinishedLeftovers(cfs.getDirectories().getCFDirectories());
     }
 
-    public static boolean removeUnfinishedLeftovers(CFMetaData cfMetaData)
+    public static boolean removeUnfinishedLeftovers(TableMetadata metadata)
     {
-        return LogTransaction.removeUnfinishedLeftovers(cfMetaData);
+        return LogTransaction.removeUnfinishedLeftovers(metadata);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index 75e82f3..face9fa 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SystemKeyspace;
@@ -410,7 +410,7 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
      * @return true if the leftovers of all transaction logs found were 
removed, false otherwise.
      *
      */
-    static boolean removeUnfinishedLeftovers(CFMetaData metadata)
+    static boolean removeUnfinishedLeftovers(TableMetadata metadata)
     {
         return removeUnfinishedLeftovers(new Directories(metadata, 
ColumnFamilyStore.getInitialDirectories()).getCFDirectories());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java 
b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index dfcd772..9eadd85 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -74,6 +75,11 @@ public class CompositeType extends AbstractCompositeType
         return getInstance(parser.getTypeParameters());
     }
 
+    public static CompositeType getInstance(Iterable<AbstractType<?>> types)
+    {
+        return getInstance(Lists.newArrayList(types));
+    }
+
     public static CompositeType getInstance(AbstractType... types)
     {
         return getInstance(Arrays.asList(types));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/marshal/UserType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java 
b/src/java/org/apache/cassandra/db/marshal/UserType.java
index edfb5f3..f139edd 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -425,4 +425,9 @@ public class UserType extends TupleType
             sb.append(")");
         return sb.toString();
     }
+
+    public String toCQLString()
+    {
+        return String.format("%s.%s", ColumnIdentifier.maybeQuote(keyspace), 
ColumnIdentifier.maybeQuote(getNameAsString()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 74ac033..465dec3 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.partitions;
 
 import java.util.Iterator;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
@@ -32,30 +32,28 @@ import static 
org.apache.cassandra.utils.btree.BTree.Dir.desc;
 
 public abstract class AbstractBTreePartition implements Partition, 
Iterable<Row>
 {
-    protected static final Holder EMPTY = new Holder(PartitionColumns.NONE, 
BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, 
EncodingStats.NO_STATS);
+    protected static final Holder EMPTY = new 
Holder(RegularAndStaticColumns.NONE, BTree.empty(), DeletionInfo.LIVE, 
Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
 
-    protected final CFMetaData metadata;
     protected final DecoratedKey partitionKey;
 
     protected abstract Holder holder();
     protected abstract boolean canHaveShadowedData();
 
-    protected AbstractBTreePartition(CFMetaData metadata, DecoratedKey 
partitionKey)
+    protected AbstractBTreePartition(DecoratedKey partitionKey)
     {
-        this.metadata = metadata;
         this.partitionKey = partitionKey;
     }
 
     protected static final class Holder
     {
-        final PartitionColumns columns;
+        final RegularAndStaticColumns columns;
         final DeletionInfo deletionInfo;
         // the btree of rows
         final Object[] tree;
         final Row staticRow;
         final EncodingStats stats;
 
-        Holder(PartitionColumns columns, Object[] tree, DeletionInfo 
deletionInfo, Row staticRow, EncodingStats stats)
+        Holder(RegularAndStaticColumns columns, Object[] tree, DeletionInfo 
deletionInfo, Row staticRow, EncodingStats stats)
         {
             this.columns = columns;
             this.tree = tree;
@@ -87,10 +85,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
         return !BTree.isEmpty(holder.tree);
     }
 
-    public CFMetaData metadata()
-    {
-        return metadata;
-    }
+    public abstract TableMetadata metadata();
 
     public DecoratedKey partitionKey()
     {
@@ -102,7 +97,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
         return holder().deletionInfo.getPartitionDeletion();
     }
 
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
         return holder().columns;
     }
@@ -126,7 +121,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
         if (columns.fetchedColumns().statics.isEmpty() || 
(current.staticRow.isEmpty() && partitionDeletion.isLive()))
             return Rows.EMPTY_STATIC_ROW;
 
-        Row row = current.staticRow.filter(columns, partitionDeletion, 
setActiveDeletionToRow, metadata);
+        Row row = current.staticRow.filter(columns, partitionDeletion, 
setActiveDeletionToRow, metadata());
         return row == null ? Rows.EMPTY_STATIC_ROW : row;
     }
 
@@ -136,7 +131,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
         final Holder current = holder();
         return new SearchIterator<Clustering, Row>()
         {
-            private final SearchIterator<Clustering, Row> rawIter = new 
BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed));
+            private final SearchIterator<Clustering, Row> rawIter = new 
BTreeSearchIterator<>(current.tree, metadata().comparator, desc(reversed));
             private final DeletionTime partitionDeletion = 
current.deletionInfo.getPartitionDeletion();
 
             public boolean hasNext()
@@ -162,7 +157,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
                 if (row == null)
                     return activeDeletion.isLive() ? null : 
BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion));
 
-                return row.filter(columns, activeDeletion, true, metadata);
+                return row.filter(columns, activeDeletion, true, metadata());
             }
         };
     }
@@ -183,7 +178,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
         if (slices.size() == 0)
         {
             DeletionTime partitionDeletion = 
current.deletionInfo.getPartitionDeletion();
-            return UnfilteredRowIterators.noRowsIterator(metadata, 
partitionKey(), staticRow, partitionDeletion, reversed);
+            return UnfilteredRowIterators.noRowsIterator(metadata(), 
partitionKey(), staticRow, partitionDeletion, reversed);
         }
 
         return slices.size() == 1
@@ -195,7 +190,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
     {
         ClusteringBound start = slice.start() == ClusteringBound.BOTTOM ? null 
: slice.start();
         ClusteringBound end = slice.end() == ClusteringBound.TOP ? null : 
slice.end();
-        Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, 
start, true, end, true, desc(reversed));
+        Iterator<Row> rowIter = BTree.slice(current.tree, 
metadata().comparator, start, true, end, true, desc(reversed));
         Iterator<RangeTombstone> deleteIter = 
current.deletionInfo.rangeIterator(slice, reversed);
         return merge(rowIter, deleteIter, selection, reversed, current, 
staticRow);
     }
@@ -203,7 +198,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
     private RowAndDeletionMergeIterator merge(Iterator<Row> rowIter, 
Iterator<RangeTombstone> deleteIter,
                                               ColumnFilter selection, boolean 
reversed, Holder current, Row staticRow)
     {
-        return new RowAndDeletionMergeIterator(metadata, partitionKey(), 
current.deletionInfo.getPartitionDeletion(),
+        return new RowAndDeletionMergeIterator(metadata(), partitionKey(), 
current.deletionInfo.getPartitionDeletion(),
                                                selection, staticRow, reversed, 
current.stats,
                                                rowIter, deleteIter,
                                                canHaveShadowedData());
@@ -216,7 +211,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
 
         private AbstractIterator(Holder current, Row staticRow, ColumnFilter 
selection, boolean isReversed)
         {
-            super(AbstractBTreePartition.this.metadata,
+            super(AbstractBTreePartition.this.metadata(),
                   AbstractBTreePartition.this.partitionKey(),
                   current.deletionInfo.getPartitionDeletion(),
                   selection.fetchedColumns(), // non-selected columns will be 
filtered in subclasses by RowAndDeletionMergeIterator
@@ -276,8 +271,8 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
 
     protected static Holder build(UnfilteredRowIterator iterator, int 
initialRowCapacity, boolean ordered)
     {
-        CFMetaData metadata = iterator.metadata();
-        PartitionColumns columns = iterator.columns();
+        TableMetadata metadata = iterator.metadata();
+        RegularAndStaticColumns columns = iterator.columns();
         boolean reversed = iterator.isReverseOrder();
 
         BTree.Builder<Row> builder = BTree.builder(metadata.comparator, 
initialRowCapacity);
@@ -303,8 +298,8 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
     // passes a MutableDeletionInfo that it mutates later.
     protected static Holder build(RowIterator rows, DeletionInfo deletion, 
boolean buildEncodingStats, int initialRowCapacity)
     {
-        CFMetaData metadata = rows.metadata();
-        PartitionColumns columns = rows.columns();
+        TableMetadata metadata = rows.metadata();
+        RegularAndStaticColumns columns = rows.columns();
         boolean reversed = rows.isReverseOrder();
 
         BTree.Builder<Row> builder = BTree.builder(metadata.comparator, 
initialRowCapacity);
@@ -327,17 +322,16 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
     {
         StringBuilder sb = new StringBuilder();
 
-        sb.append(String.format("[%s.%s] key=%s columns=%s",
-                                metadata.ksName,
-                                metadata.cfName,
-                                
metadata.getKeyValidator().getString(partitionKey().getKey()),
+        sb.append(String.format("[%s] key=%s columns=%s",
+                                metadata().toString(),
+                                
metadata().partitionKeyType.getString(partitionKey().getKey()),
                                 columns()));
 
         if (staticRow() != Rows.EMPTY_STATIC_ROW)
-            sb.append("\n    ").append(staticRow().toString(metadata));
+            sb.append("\n    ").append(staticRow().toString(metadata()));
 
         for (Row row : this)
-            sb.append("\n    ").append(row.toString(metadata));
+            sb.append("\n    ").append(row.toString(metadata()));
 
         return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index c9c6006..ee94963 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -24,7 +24,8 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -50,7 +51,7 @@ import org.apache.cassandra.utils.memory.MemtableAllocator;
  */
 public class AtomicBTreePartition extends AbstractBTreePartition
 {
-    public static final long EMPTY_SIZE = ObjectSizes.measure(new 
AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"),
+    public static final long EMPTY_SIZE = ObjectSizes.measure(new 
AtomicBTreePartition(null,
                                                                                
        DatabaseDescriptor.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
                                                                                
        null));
 
@@ -83,10 +84,13 @@ public class AtomicBTreePartition extends 
AbstractBTreePartition
     private final MemtableAllocator allocator;
     private volatile Holder ref;
 
-    public AtomicBTreePartition(CFMetaData metadata, DecoratedKey 
partitionKey, MemtableAllocator allocator)
+    private final TableMetadataRef metadata;
+
+    public AtomicBTreePartition(TableMetadataRef metadata, DecoratedKey 
partitionKey, MemtableAllocator allocator)
     {
         // involved in potential bug? partition columns may be a subset if we 
alter columns while it's in memtable
-        super(metadata, partitionKey);
+        super(partitionKey);
+        this.metadata = metadata;
         this.allocator = allocator;
         this.ref = EMPTY;
     }
@@ -96,6 +100,11 @@ public class AtomicBTreePartition extends 
AbstractBTreePartition
         return ref;
     }
 
+    public TableMetadata metadata()
+    {
+        return metadata.get();
+    }
+
     protected boolean canHaveShadowedData()
     {
         return true;
@@ -148,7 +157,7 @@ public class AtomicBTreePartition extends 
AbstractBTreePartition
                     deletionInfo = current.deletionInfo;
                 }
 
-                PartitionColumns columns = 
update.columns().mergeTo(current.columns);
+                RegularAndStaticColumns columns = 
update.columns().mergeTo(current.columns);
                 Row newStatic = update.staticRow();
                 Row staticRow = newStatic.isEmpty()
                               ? current.staticRow

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
index 90d64f4..48895dc 100644
--- a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.partitions;
 
 import java.io.IOException;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.rows.*;
@@ -27,6 +26,9 @@ import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.btree.BTree;
 
 public class CachedBTreePartition extends ImmutableBTreePartition implements 
CachedPartition
@@ -36,7 +38,7 @@ public class CachedBTreePartition extends 
ImmutableBTreePartition implements Cac
     private final int cachedLiveRows;
     private final int rowsWithNonExpiringCells;
 
-    private CachedBTreePartition(CFMetaData metadata,
+    private CachedBTreePartition(TableMetadata metadata,
                                  DecoratedKey partitionKey,
                                  Holder holder,
                                  int createdAtInSec,
@@ -150,7 +152,7 @@ public class CachedBTreePartition extends 
ImmutableBTreePartition implements Cac
             out.writeInt(p.createdAtInSec);
             out.writeInt(p.cachedLiveRows);
             out.writeInt(p.rowsWithNonExpiringCells);
-            CFMetaData.serializer.serialize(partition.metadata(), out, 
version);
+            partition.metadata().id.serialize(out);
             try (UnfilteredRowIterator iter = p.unfilteredIterator())
             {
                 UnfilteredRowIteratorSerializer.serializer.serialize(iter, 
null, out, version, p.rowCount());
@@ -173,7 +175,7 @@ public class CachedBTreePartition extends 
ImmutableBTreePartition implements Cac
             int rowsWithNonExpiringCells = in.readInt();
 
 
-            CFMetaData metadata = CFMetaData.serializer.deserialize(in, 
version);
+            TableMetadata metadata = 
Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
             UnfilteredRowIteratorSerializer.Header header = 
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, 
in, version, SerializationHelper.Flag.LOCAL);
             assert !header.isReversed && header.rowEstimate >= 0;
 
@@ -184,11 +186,11 @@ public class CachedBTreePartition extends 
ImmutableBTreePartition implements Cac
             }
 
             return new CachedBTreePartition(metadata,
-                                                  header.key,
-                                                  holder,
-                                                  createdAtInSec,
-                                                  cachedLiveRows,
-                                                  rowsWithNonExpiringCells);
+                                            header.key,
+                                            holder,
+                                            createdAtInSec,
+                                            cachedLiveRows,
+                                            rowsWithNonExpiringCells);
 
         }
 
@@ -204,7 +206,7 @@ public class CachedBTreePartition extends 
ImmutableBTreePartition implements Cac
                 return TypeSizes.sizeof(p.createdAtInSec)
                      + TypeSizes.sizeof(p.cachedLiveRows)
                      + TypeSizes.sizeof(p.rowsWithNonExpiringCells)
-                     + 
CFMetaData.serializer.serializedSize(partition.metadata(), version)
+                     + partition.metadata().id.serializedSize()
                      + 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, 
MessagingService.current_version, p.rowCount());
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java 
b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
index 70a4678..5730076 100644
--- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
@@ -19,10 +19,10 @@ package org.apache.cassandra.db.partitions;
 
 import java.util.Iterator;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.rows.*;
 
 public class FilteredPartition extends ImmutableBTreePartition
@@ -48,9 +48,9 @@ public class FilteredPartition extends ImmutableBTreePartition
         final Iterator<Row> iter = iterator();
         return new RowIterator()
         {
-            public CFMetaData metadata()
+            public TableMetadata metadata()
             {
-                return metadata;
+                return FilteredPartition.this.metadata();
             }
 
             public boolean isReverseOrder()
@@ -58,7 +58,7 @@ public class FilteredPartition extends ImmutableBTreePartition
                 return false;
             }
 
-            public PartitionColumns columns()
+            public RegularAndStaticColumns columns()
             {
                 return FilteredPartition.this.columns();
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
index 8d96f1e..5139d40 100644
--- a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
@@ -18,34 +18,37 @@
 */
 package org.apache.cassandra.db.partitions;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.rows.*;
 
 public class ImmutableBTreePartition extends AbstractBTreePartition
 {
 
     protected final Holder holder;
+    protected final TableMetadata metadata;
 
-    public ImmutableBTreePartition(CFMetaData metadata,
-                                      DecoratedKey partitionKey,
-                                      PartitionColumns columns,
-                                      Row staticRow,
-                                      Object[] tree,
-                                      DeletionInfo deletionInfo,
-                                      EncodingStats stats)
+    public ImmutableBTreePartition(TableMetadata metadata,
+                                   DecoratedKey partitionKey,
+                                   RegularAndStaticColumns columns,
+                                   Row staticRow,
+                                   Object[] tree,
+                                   DeletionInfo deletionInfo,
+                                   EncodingStats stats)
     {
-        super(metadata, partitionKey);
+        super(partitionKey);
+        this.metadata = metadata;
         this.holder = new Holder(columns, tree, deletionInfo, staticRow, 
stats);
     }
 
-    protected ImmutableBTreePartition(CFMetaData metadata,
+    protected ImmutableBTreePartition(TableMetadata metadata,
                                       DecoratedKey partitionKey,
                                       Holder holder)
     {
-        super(metadata, partitionKey);
+        super(partitionKey);
+        this.metadata = metadata;
         this.holder = holder;
     }
 
@@ -111,6 +114,11 @@ public class ImmutableBTreePartition extends 
AbstractBTreePartition
         return new ImmutableBTreePartition(iterator.metadata(), 
iterator.partitionKey(), build(iterator, initialRowCapacity, ordered));
     }
 
+    public TableMetadata metadata()
+    {
+        return metadata;
+    }
+
     protected Holder holder()
     {
         return holder;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/partitions/Partition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java 
b/src/java/org/apache/cassandra/db/partitions/Partition.java
index 04568e9..baeb6d5 100644
--- a/src/java/org/apache/cassandra/db/partitions/Partition.java
+++ b/src/java/org/apache/cassandra/db/partitions/Partition.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.db.partitions;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.Slices;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -34,11 +34,11 @@ import org.apache.cassandra.utils.SearchIterator;
  */
 public interface Partition
 {
-    public CFMetaData metadata();
+    public TableMetadata metadata();
     public DecoratedKey partitionKey();
     public DeletionTime partitionLevelDeletion();
 
-    public PartitionColumns columns();
+    public RegularAndStaticColumns columns();
 
     public EncodingStats stats();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 1ed961f..33c9408 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -28,13 +28,14 @@ import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
@@ -74,48 +75,52 @@ public class PartitionUpdate extends AbstractBTreePartition
 
     private final boolean canHaveShadowedData;
 
-    private PartitionUpdate(CFMetaData metadata,
+    private final TableMetadata metadata;
+
+    private PartitionUpdate(TableMetadata metadata,
                             DecoratedKey key,
-                            PartitionColumns columns,
+                            RegularAndStaticColumns columns,
                             MutableDeletionInfo deletionInfo,
                             int initialRowCapacity,
                             boolean canHaveShadowedData)
     {
-        super(metadata, key);
+        super(key);
+        this.metadata = metadata;
         this.deletionInfo = deletionInfo;
         this.holder = new Holder(columns, BTree.empty(), deletionInfo, 
Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
         this.canHaveShadowedData = canHaveShadowedData;
         rowBuilder = builder(initialRowCapacity);
     }
 
-    private PartitionUpdate(CFMetaData metadata,
+    private PartitionUpdate(TableMetadata metadata,
                             DecoratedKey key,
                             Holder holder,
                             MutableDeletionInfo deletionInfo,
                             boolean canHaveShadowedData)
     {
-        super(metadata, key);
+        super(key);
+        this.metadata = metadata;
         this.holder = holder;
         this.deletionInfo = deletionInfo;
         this.isBuilt = true;
         this.canHaveShadowedData = canHaveShadowedData;
     }
 
-    public PartitionUpdate(CFMetaData metadata,
+    public PartitionUpdate(TableMetadata metadata,
                            DecoratedKey key,
-                           PartitionColumns columns,
+                           RegularAndStaticColumns columns,
                            int initialRowCapacity)
     {
         this(metadata, key, columns, MutableDeletionInfo.live(), 
initialRowCapacity, true);
     }
 
-    public PartitionUpdate(CFMetaData metadata,
+    public PartitionUpdate(TableMetadata metadata,
                            ByteBuffer key,
-                           PartitionColumns columns,
+                           RegularAndStaticColumns columns,
                            int initialRowCapacity)
     {
         this(metadata,
-             metadata.decorateKey(key),
+             metadata.partitioner.decorateKey(key),
              columns,
              initialRowCapacity);
     }
@@ -128,10 +133,10 @@ public class PartitionUpdate extends 
AbstractBTreePartition
      *
      * @return the newly created empty (and immutable) update.
      */
-    public static PartitionUpdate emptyUpdate(CFMetaData metadata, 
DecoratedKey key)
+    public static PartitionUpdate emptyUpdate(TableMetadata metadata, 
DecoratedKey key)
     {
         MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
-        Holder holder = new Holder(PartitionColumns.NONE, BTree.empty(), 
deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+        Holder holder = new Holder(RegularAndStaticColumns.NONE, 
BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
         return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
     }
 
@@ -145,10 +150,10 @@ public class PartitionUpdate extends 
AbstractBTreePartition
      *
      * @return the newly created partition deletion update.
      */
-    public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, 
DecoratedKey key, long timestamp, int nowInSec)
+    public static PartitionUpdate fullPartitionDelete(TableMetadata metadata, 
DecoratedKey key, long timestamp, int nowInSec)
     {
         MutableDeletionInfo deletionInfo = new MutableDeletionInfo(timestamp, 
nowInSec);
-        Holder holder = new Holder(PartitionColumns.NONE, BTree.empty(), 
deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+        Holder holder = new Holder(RegularAndStaticColumns.NONE, 
BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
         return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
     }
 
@@ -161,17 +166,17 @@ public class PartitionUpdate extends 
AbstractBTreePartition
      *
      * @return the newly created partition update containing only {@code row}.
      */
-    public static PartitionUpdate singleRowUpdate(CFMetaData metadata, 
DecoratedKey key, Row row)
+    public static PartitionUpdate singleRowUpdate(TableMetadata metadata, 
DecoratedKey key, Row row)
     {
         MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
         if (row.isStatic())
         {
-            Holder holder = new Holder(new 
PartitionColumns(Columns.from(row.columns()), Columns.NONE), BTree.empty(), 
deletionInfo, row, EncodingStats.NO_STATS);
+            Holder holder = new Holder(new 
RegularAndStaticColumns(Columns.from(row.columns()), Columns.NONE), 
BTree.empty(), deletionInfo, row, EncodingStats.NO_STATS);
             return new PartitionUpdate(metadata, key, holder, deletionInfo, 
false);
         }
         else
         {
-            Holder holder = new Holder(new PartitionColumns(Columns.NONE, 
Columns.from(row.columns())), BTree.singleton(row), deletionInfo, 
Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+            Holder holder = new Holder(new 
RegularAndStaticColumns(Columns.NONE, Columns.from(row.columns())), 
BTree.singleton(row), deletionInfo, Rows.EMPTY_STATIC_ROW, 
EncodingStats.NO_STATS);
             return new PartitionUpdate(metadata, key, holder, deletionInfo, 
false);
         }
     }
@@ -185,9 +190,9 @@ public class PartitionUpdate extends AbstractBTreePartition
      *
      * @return the newly created partition update containing only {@code row}.
      */
-    public static PartitionUpdate singleRowUpdate(CFMetaData metadata, 
ByteBuffer key, Row row)
+    public static PartitionUpdate singleRowUpdate(TableMetadata metadata, 
ByteBuffer key, Row row)
     {
-        return singleRowUpdate(metadata, metadata.decorateKey(key), row);
+        return singleRowUpdate(metadata, 
metadata.partitioner.decorateKey(key), row);
     }
 
     /**
@@ -289,9 +294,9 @@ public class PartitionUpdate extends AbstractBTreePartition
      *
      * @return the newly created partition deletion update.
      */
-    public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, 
ByteBuffer key, long timestamp, int nowInSec)
+    public static PartitionUpdate fullPartitionDelete(TableMetadata metadata, 
ByteBuffer key, long timestamp, int nowInSec)
     {
-        return fullPartitionDelete(metadata, metadata.decorateKey(key), 
timestamp, nowInSec);
+        return fullPartitionDelete(metadata, 
metadata.partitioner.decorateKey(key), timestamp, nowInSec);
     }
 
     /**
@@ -371,8 +376,13 @@ public class PartitionUpdate extends AbstractBTreePartition
         return size;
     }
 
+    public TableMetadata metadata()
+    {
+        return metadata;
+    }
+
     @Override
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
         // The superclass implementation calls holder(), but that triggers a 
build of the PartitionUpdate. But since
         // the columns are passed to the ctor, we know the holder always has 
the proper columns even if it doesn't have
@@ -415,7 +425,7 @@ public class PartitionUpdate extends AbstractBTreePartition
 
     private BTree.Builder<Row> builder(int initialCapacity)
     {
-        return BTree.<Row>builder(metadata.comparator, initialCapacity)
+        return BTree.<Row>builder(metadata().comparator, initialCapacity)
                     .setQuickResolver((a, b) ->
                                       Rows.merge(a, b, createdAtInSec));
     }
@@ -526,7 +536,7 @@ public class PartitionUpdate extends AbstractBTreePartition
     public void add(RangeTombstone range)
     {
         assertNotBuilt();
-        deletionInfo.add(range, metadata.comparator);
+        deletionInfo.add(range, metadata().comparator);
     }
 
     /**
@@ -582,7 +592,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         Holder holder = this.holder;
         Object[] cur = holder.tree;
         Object[] add = rowBuilder.build();
-        Object[] merged = BTree.<Row>merge(cur, add, metadata.comparator,
+        Object[] merged = BTree.<Row>merge(cur, add, metadata().comparator,
                                            UpdateFunction.Simple.of((a, b) -> 
Rows.merge(a, b, createdAtInSec)));
 
         assert deletionInfo == holder.deletionInfo;
@@ -604,10 +614,9 @@ public class PartitionUpdate extends AbstractBTreePartition
         // modified.
 
         StringBuilder sb = new StringBuilder();
-        sb.append(String.format("[%s.%s] key=%s columns=%s",
-                                metadata.ksName,
-                                metadata.cfName,
-                                
metadata.getKeyValidator().getString(partitionKey().getKey()),
+        sb.append(String.format("[%s] key=%s columns=%s",
+                                metadata.toString(),
+                                
metadata.partitionKeyType.getString(partitionKey().getKey()),
                                 columns()));
 
         sb.append("\n    deletionInfo=").append(deletionInfo);
@@ -624,7 +633,7 @@ public class PartitionUpdate extends AbstractBTreePartition
      * Int32Type, string for UTF8Type, ...). It is also allowed to pass a 
single {@code DecoratedKey} value directly.
      * @return a newly created builder.
      */
-    public static SimpleBuilder simpleBuilder(CFMetaData metadata, Object... 
partitionKeyValues)
+    public static SimpleBuilder simpleBuilder(TableMetadata metadata, 
Object... partitionKeyValues)
     {
         return new SimpleBuilders.PartitionUpdateBuilder(metadata, 
partitionKeyValues);
     }
@@ -640,7 +649,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         /**
          * The metadata of the table this is a builder on.
          */
-        public CFMetaData metadata();
+        public TableMetadata metadata();
 
         /**
          * Sets the timestamp to use for the following additions to this 
builder or any derived (row) builder.
@@ -777,14 +786,14 @@ public class PartitionUpdate extends 
AbstractBTreePartition
             {
                 assert !iter.isReverseOrder();
 
-                CFMetaData.serializer.serialize(update.metadata(), out, 
version);
+                update.metadata.id.serialize(out);
                 UnfilteredRowIteratorSerializer.serializer.serialize(iter, 
null, out, version, update.rowCount());
             }
         }
 
         public PartitionUpdate deserialize(DataInputPlus in, int version, 
SerializationHelper.Flag flag) throws IOException
         {
-            CFMetaData metadata = CFMetaData.serializer.deserialize(in, 
version);
+            TableMetadata metadata = 
Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
             UnfilteredRowIteratorSerializer.Header header = 
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, 
in, version, flag);
             if (header.isEmpty)
                 return emptyUpdate(metadata, header.key);
@@ -820,7 +829,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         {
             try (UnfilteredRowIterator iter = update.unfilteredIterator())
             {
-                return CFMetaData.serializer.serializedSize(update.metadata(), 
version)
+                return update.metadata.id.serializedSize()
                      + 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, 
update.rowCount());
             }
         }
@@ -834,10 +843,10 @@ public class PartitionUpdate extends 
AbstractBTreePartition
     public static class CounterMark
     {
         private final Row row;
-        private final ColumnDefinition column;
+        private final ColumnMetadata column;
         private final CellPath path;
 
-        private CounterMark(Row row, ColumnDefinition column, CellPath path)
+        private CounterMark(Row row, ColumnMetadata column, CellPath path)
         {
             this.row = row;
             this.column = column;
@@ -849,7 +858,7 @@ public class PartitionUpdate extends AbstractBTreePartition
             return row.clustering();
         }
 
-        public ColumnDefinition column()
+        public ColumnMetadata column()
         {
             return column;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
 
b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
index 3b968e6..b739e8b 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.db.partitions;
 
 import java.util.NoSuchElementException;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
 public class SingletonUnfilteredPartitionIterator implements 
UnfilteredPartitionIterator
@@ -32,7 +32,7 @@ public class SingletonUnfilteredPartitionIterator implements 
UnfilteredPartition
         this.iter = iter;
     }
 
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
         return iter.metadata();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
index 872225f..cd8e47f 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.db.partitions;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
 /**
@@ -30,5 +30,5 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  */
 public interface UnfilteredPartitionIterator extends 
BasePartitionIterator<UnfilteredRowIterator>
 {
-    public CFMetaData metadata();
+    public TableMetadata metadata();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 76420cf..40eaad1 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.security.MessageDigest;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
@@ -31,7 +30,7 @@ import org.apache.cassandra.db.transform.MorePartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.MergeIterator;
 
 /**
@@ -113,7 +112,7 @@ public abstract class UnfilteredPartitionIterators
         assert listener != null;
         assert !iterators.isEmpty();
 
-        final CFMetaData metadata = iterators.get(0).metadata();
+        final TableMetadata metadata = iterators.get(0).metadata();
 
         final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> 
merged = MergeIterator.get(iterators, partitionComparator, new 
MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
         {
@@ -154,7 +153,7 @@ public abstract class UnfilteredPartitionIterators
 
         return new AbstractUnfilteredPartitionIterator()
         {
-            public CFMetaData metadata()
+            public TableMetadata metadata()
             {
                 return metadata;
             }
@@ -185,7 +184,7 @@ public abstract class UnfilteredPartitionIterators
         if (iterators.size() == 1)
             return iterators.get(0);
 
-        final CFMetaData metadata = iterators.get(0).metadata();
+        final TableMetadata metadata = iterators.get(0).metadata();
 
         final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> 
merged = MergeIterator.get(iterators, partitionComparator, new 
MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
         {
@@ -215,7 +214,7 @@ public abstract class UnfilteredPartitionIterators
 
         return new AbstractUnfilteredPartitionIterator()
         {
-            public CFMetaData metadata()
+            public TableMetadata metadata()
             {
                 return metadata;
             }
@@ -304,7 +303,7 @@ public abstract class UnfilteredPartitionIterators
             out.writeBoolean(false);
         }
 
-        public UnfilteredPartitionIterator deserialize(final DataInputPlus in, 
final int version, final CFMetaData metadata, final ColumnFilter selection, 
final SerializationHelper.Flag flag) throws IOException
+        public UnfilteredPartitionIterator deserialize(final DataInputPlus in, 
final int version, final TableMetadata metadata, final ColumnFilter selection, 
final SerializationHelper.Flag flag) throws IOException
         {
             // Skip now unused isForThrift boolean
             in.readBoolean();
@@ -315,7 +314,7 @@ public abstract class UnfilteredPartitionIterators
                 private boolean hasNext;
                 private boolean nextReturned = true;
 
-                public CFMetaData metadata()
+                public TableMetadata metadata()
                 {
                     return metadata;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java 
b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index ca83783..a9b058e 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Objects;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.context.CounterContext;
@@ -39,7 +39,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
  */
 public abstract class AbstractCell extends Cell
 {
-    protected AbstractCell(ColumnDefinition column)
+    protected AbstractCell(ColumnMetadata column)
     {
         super(column);
     }
@@ -139,7 +139,7 @@ public abstract class AbstractCell extends Cell
             throw new MarshalException("Shoud not have a TTL without an 
associated local deletion time");
 
         // non-frozen UDTs require both the cell path & value to validate,
-        // so that logic is pushed down into ColumnDefinition. Tombstone
+        // so that logic is pushed down into ColumnMetadata. Tombstone
         // validation is done there too as it also involves the cell path
         // for complex columns
         column().validateCell(this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java 
b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
index 153243c..a7c48c1 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.ClusteringBoundOrBoundary;
 
 public abstract class AbstractRangeTombstoneMarker<B extends 
ClusteringBoundOrBoundary> implements RangeTombstoneMarker
@@ -56,7 +56,7 @@ public abstract class AbstractRangeTombstoneMarker<B extends 
ClusteringBoundOrBo
         return bound.isClose(reversed);
     }
 
-    public void validateData(CFMetaData metadata)
+    public void validateData(TableMetadata metadata)
     {
         ClusteringBoundOrBoundary bound = clustering();
         for (int i = 0; i < bound.size(); i++)
@@ -67,11 +67,11 @@ public abstract class AbstractRangeTombstoneMarker<B 
extends ClusteringBoundOrBo
         }
     }
 
-    public String toString(CFMetaData metadata, boolean fullDetails)
+    public String toString(TableMetadata metadata, boolean fullDetails)
     {
         return toString(metadata);
     }
-    public String toString(CFMetaData metadata, boolean includeClusteringKeys, 
boolean fullDetails)
+    public String toString(TableMetadata metadata, boolean 
includeClusteringKeys, boolean fullDetails)
     {
         return toString(metadata);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java 
b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 356722e..c869a1a 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -26,7 +26,7 @@ import java.util.stream.StreamSupport;
 
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.UserType;
@@ -71,7 +71,7 @@ public abstract class AbstractRow extends 
AbstractCollection<ColumnData> impleme
             cd.digest(digest);
     }
 
-    public void validateData(CFMetaData metadata)
+    public void validateData(TableMetadata metadata)
     {
         Clustering clustering = clustering();
         for (int i = 0; i < clustering.size(); i++)
@@ -89,17 +89,17 @@ public abstract class AbstractRow extends 
AbstractCollection<ColumnData> impleme
             cd.validate();
     }
 
-    public String toString(CFMetaData metadata)
+    public String toString(TableMetadata metadata)
     {
         return toString(metadata, false);
     }
 
-    public String toString(CFMetaData metadata, boolean fullDetails)
+    public String toString(TableMetadata metadata, boolean fullDetails)
     {
         return toString(metadata, true, fullDetails);
     }
 
-    public String toString(CFMetaData metadata, boolean includeClusterKeys, 
boolean fullDetails)
+    public String toString(TableMetadata metadata, boolean includeClusterKeys, 
boolean fullDetails)
     {
         StringBuilder sb = new StringBuilder();
         sb.append("Row");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
index f2389a7..2c3f78f 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
@@ -17,25 +17,25 @@
  */
 package org.apache.cassandra.db.rows;
 
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.AbstractIterator;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 
 public abstract class AbstractUnfilteredRowIterator extends 
AbstractIterator<Unfiltered> implements UnfilteredRowIterator
 {
-    protected final CFMetaData metadata;
+    protected final TableMetadata metadata;
     protected final DecoratedKey partitionKey;
     protected final DeletionTime partitionLevelDeletion;
-    protected final PartitionColumns columns;
+    protected final RegularAndStaticColumns columns;
     protected final Row staticRow;
     protected final boolean isReverseOrder;
     protected final EncodingStats stats;
 
-    protected AbstractUnfilteredRowIterator(CFMetaData metadata,
+    protected AbstractUnfilteredRowIterator(TableMetadata metadata,
                                             DecoratedKey partitionKey,
                                             DeletionTime 
partitionLevelDeletion,
-                                            PartitionColumns columns,
+                                            RegularAndStaticColumns columns,
                                             Row staticRow,
                                             boolean isReverseOrder,
                                             EncodingStats stats)
@@ -49,12 +49,12 @@ public abstract class AbstractUnfilteredRowIterator extends 
AbstractIterator<Unf
         this.stats = stats;
     }
 
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
         return metadata;
     }
 
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
         return columns;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java 
b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 54da16b..0fd6337 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -27,12 +27,13 @@ import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.DroppedColumn;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.BTreeSearchIterator;
@@ -191,7 +192,7 @@ public class BTreeRow extends AbstractRow
         return clustering;
     }
 
-    public Collection<ColumnDefinition> columns()
+    public Collection<ColumnMetadata> columns()
     {
         return Collections2.transform(this, ColumnData::column);
     }
@@ -213,13 +214,13 @@ public class BTreeRow extends AbstractRow
         return deletion;
     }
 
-    public Cell getCell(ColumnDefinition c)
+    public Cell getCell(ColumnMetadata c)
     {
         assert !c.isComplex();
-        return (Cell) BTree.<Object>find(btree, 
ColumnDefinition.asymmetricColumnDataComparator, c);
+        return (Cell) BTree.<Object>find(btree, 
ColumnMetadata.asymmetricColumnDataComparator, c);
     }
 
-    public Cell getCell(ColumnDefinition c, CellPath path)
+    public Cell getCell(ColumnMetadata c, CellPath path)
     {
         assert c.isComplex();
         ComplexColumnData cd = getComplexColumnData(c);
@@ -228,10 +229,10 @@ public class BTreeRow extends AbstractRow
         return cd.getCell(path);
     }
 
-    public ComplexColumnData getComplexColumnData(ColumnDefinition c)
+    public ComplexColumnData getComplexColumnData(ColumnMetadata c)
     {
         assert c.isComplex();
-        return (ComplexColumnData) BTree.<Object>find(btree, 
ColumnDefinition.asymmetricColumnDataComparator, c);
+        return (ComplexColumnData) BTree.<Object>find(btree, 
ColumnMetadata.asymmetricColumnDataComparator, c);
     }
 
     public int size()
@@ -249,19 +250,19 @@ public class BTreeRow extends AbstractRow
         return CellIterator::new;
     }
 
-    public BTreeSearchIterator<ColumnDefinition, ColumnData> searchIterator()
+    public BTreeSearchIterator<ColumnMetadata, ColumnData> searchIterator()
     {
-        return BTree.slice(btree, 
ColumnDefinition.asymmetricColumnDataComparator, BTree.Dir.ASC);
+        return BTree.slice(btree, 
ColumnMetadata.asymmetricColumnDataComparator, BTree.Dir.ASC);
     }
 
-    public Row filter(ColumnFilter filter, CFMetaData metadata)
+    public Row filter(ColumnFilter filter, TableMetadata metadata)
     {
         return filter(filter, DeletionTime.LIVE, false, metadata);
     }
 
-    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, 
boolean setActiveDeletionToRow, CFMetaData metadata)
+    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, 
boolean setActiveDeletionToRow, TableMetadata metadata)
     {
-        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = 
metadata.getDroppedColumns();
+        Map<ByteBuffer, DroppedColumn> droppedColumns = 
metadata.droppedColumns;
 
         boolean mayFilterColumns = !filter.fetchesAllColumns(isStatic());
         boolean mayHaveShadowed = activeDeletion.supersedes(deletion.time());
@@ -282,16 +283,16 @@ public class BTreeRow extends AbstractRow
         }
 
         Columns columns = filter.fetchedColumns().columns(isStatic());
-        Predicate<ColumnDefinition> inclusionTester = 
columns.inOrderInclusionTester();
-        Predicate<ColumnDefinition> queriedByUserTester = 
filter.queriedColumns().columns(isStatic()).inOrderInclusionTester();
+        Predicate<ColumnMetadata> inclusionTester = 
columns.inOrderInclusionTester();
+        Predicate<ColumnMetadata> queriedByUserTester = 
filter.queriedColumns().columns(isStatic()).inOrderInclusionTester();
         final LivenessInfo rowLiveness = newInfo;
         return transformAndFilter(newInfo, newDeletion, (cd) -> {
 
-            ColumnDefinition column = cd.column();
+            ColumnMetadata column = cd.column();
             if (!inclusionTester.test(column))
                 return null;
 
-            CFMetaData.DroppedColumn dropped = 
droppedColumns.get(column.name.bytes);
+            DroppedColumn dropped = droppedColumns.get(column.name.bytes);
             if (column.isComplex())
                 return ((ComplexColumnData) cd).filter(filter, mayHaveShadowed 
? activeDeletion : DeletionTime.LIVE, dropped, rowLiveness);
 
@@ -313,7 +314,7 @@ public class BTreeRow extends AbstractRow
 
         return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> {
 
-            ColumnDefinition column = cd.column();
+            ColumnMetadata column = cd.column();
             if (column.isComplex())
                 return ((ComplexColumnData)cd).withOnlyQueriedData(filter);
 
@@ -455,16 +456,16 @@ public class BTreeRow extends AbstractRow
     // assumption that Row objects are immutable. This method should go away 
post-#6506 in particular.
     // This method is in particular not exposed by the Row API on purpose.
     // This method also *assumes* that the cell we're setting already exists.
-    public void setValue(ColumnDefinition column, CellPath path, ByteBuffer 
value)
+    public void setValue(ColumnMetadata column, CellPath path, ByteBuffer 
value)
     {
-        ColumnData current = (ColumnData) BTree.<Object>find(btree, 
ColumnDefinition.asymmetricColumnDataComparator, column);
+        ColumnData current = (ColumnData) BTree.<Object>find(btree, 
ColumnMetadata.asymmetricColumnDataComparator, column);
         if (column.isSimple())
             BTree.replaceInSitu(btree, ColumnData.comparator, current, ((Cell) 
current).withUpdatedValue(value));
         else
             ((ComplexColumnData) current).setValue(path, value);
     }
 
-    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata, boolean 
reversed)
+    public Iterable<Cell> cellsInLegacyOrder(TableMetadata metadata, boolean 
reversed)
     {
         return () -> new CellInLegacyOrderIterator(metadata, reversed);
     }
@@ -508,9 +509,9 @@ public class BTreeRow extends AbstractRow
         private Iterator<Cell> complexCells;
         private final Object[] data;
 
-        private CellInLegacyOrderIterator(CFMetaData metadata, boolean 
reversed)
+        private CellInLegacyOrderIterator(TableMetadata metadata, boolean 
reversed)
         {
-            AbstractType<?> nameComparator = 
metadata.getColumnDefinitionNameComparator(isStatic() ? 
ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR);
+            AbstractType<?> nameComparator = 
metadata.columnDefinitionNameComparator(isStatic() ? ColumnMetadata.Kind.STATIC 
: ColumnMetadata.Kind.REGULAR);
             this.comparator = reversed ? 
Collections.reverseOrder(nameComparator) : nameComparator;
             this.reversed = reversed;
 
@@ -591,7 +592,7 @@ public class BTreeRow extends AbstractRow
         // a simple marker class that will sort to the beginning of a run of 
complex cells to store the deletion time
         private static class ComplexColumnDeletion extends BufferCell
         {
-            public ComplexColumnDeletion(ColumnDefinition column, DeletionTime 
deletionTime)
+            public ComplexColumnDeletion(ColumnMetadata column, DeletionTime 
deletionTime)
             {
                 super(column, deletionTime.markedForDeleteAt(), 0, 
deletionTime.localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, 
CellPath.BOTTOM);
             }
@@ -609,7 +610,7 @@ public class BTreeRow extends AbstractRow
             public ColumnData resolve(Object[] cells, int lb, int ub)
             {
                 Cell cell = (Cell) cells[lb];
-                ColumnDefinition column = cell.column;
+                ColumnMetadata column = cell.column;
                 if (cell.column.isSimple())
                 {
                     assert lb + 1 == ub || nowInSec != Integer.MIN_VALUE;
@@ -738,7 +739,7 @@ public class BTreeRow extends AbstractRow
             hasComplex |= cell.column.isComplex();
         }
 
-        public void addComplexDeletion(ColumnDefinition column, DeletionTime 
complexDeletion)
+        public void addComplexDeletion(ColumnMetadata column, DeletionTime 
complexDeletion)
         {
             getCells().add(new ComplexColumnDeletion(column, complexDeletion));
             hasComplex = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java
index ce37297..4033248 100644
--- a/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java
@@ -18,9 +18,9 @@
 */
 package org.apache.cassandra.db.rows;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.utils.CloseableIterator;
 
 /**
@@ -32,7 +32,7 @@ public interface BaseRowIterator<U extends Unfiltered> 
extends CloseableIterator
     /**
      * The metadata for the table this iterator on.
      */
-    public CFMetaData metadata();
+    public TableMetadata metadata();
 
     /**
      * Whether or not the rows returned by this iterator are in reversed
@@ -44,7 +44,7 @@ public interface BaseRowIterator<U extends Unfiltered> 
extends CloseableIterator
      * A subset of the columns for the (static and regular) rows returned by 
this iterator.
      * Every row returned by this iterator must guarantee that it has only 
those columns.
      */
-    public PartitionColumns columns();
+    public RegularAndStaticColumns columns();
 
     /**
      * The partition key of the partition this in an iterator over.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java 
b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index 9b31c16..76c6d3e 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.marshal.ByteType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -27,7 +27,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 public class BufferCell extends AbstractCell
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
BufferCell(ColumnDefinition.regularDef("", "", "", ByteType.instance), 0L, 0, 
0, ByteBufferUtil.EMPTY_BYTE_BUFFER, null));
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
BufferCell(ColumnMetadata.regularColumn("", "", "", ByteType.instance), 0L, 0, 
0, ByteBufferUtil.EMPTY_BYTE_BUFFER, null));
 
     private final long timestamp;
     private final int ttl;
@@ -36,7 +36,7 @@ public class BufferCell extends AbstractCell
     private final ByteBuffer value;
     private final CellPath path;
 
-    public BufferCell(ColumnDefinition column, long timestamp, int ttl, int 
localDeletionTime, ByteBuffer value, CellPath path)
+    public BufferCell(ColumnMetadata column, long timestamp, int ttl, int 
localDeletionTime, ByteBuffer value, CellPath path)
     {
         super(column);
         assert !column.isPrimaryKeyColumn();
@@ -48,33 +48,33 @@ public class BufferCell extends AbstractCell
         this.path = path;
     }
 
-    public static BufferCell live(ColumnDefinition column, long timestamp, 
ByteBuffer value)
+    public static BufferCell live(ColumnMetadata column, long timestamp, 
ByteBuffer value)
     {
         return live(column, timestamp, value, null);
     }
 
-    public static BufferCell live(ColumnDefinition column, long timestamp, 
ByteBuffer value, CellPath path)
+    public static BufferCell live(ColumnMetadata column, long timestamp, 
ByteBuffer value, CellPath path)
     {
         return new BufferCell(column, timestamp, NO_TTL, NO_DELETION_TIME, 
value, path);
     }
 
-    public static BufferCell expiring(ColumnDefinition column, long timestamp, 
int ttl, int nowInSec, ByteBuffer value)
+    public static BufferCell expiring(ColumnMetadata column, long timestamp, 
int ttl, int nowInSec, ByteBuffer value)
     {
         return expiring(column, timestamp, ttl, nowInSec, value, null);
     }
 
-    public static BufferCell expiring(ColumnDefinition column, long timestamp, 
int ttl, int nowInSec, ByteBuffer value, CellPath path)
+    public static BufferCell expiring(ColumnMetadata column, long timestamp, 
int ttl, int nowInSec, ByteBuffer value, CellPath path)
     {
         assert ttl != NO_TTL;
         return new BufferCell(column, timestamp, ttl, nowInSec + ttl, value, 
path);
     }
 
-    public static BufferCell tombstone(ColumnDefinition column, long 
timestamp, int nowInSec)
+    public static BufferCell tombstone(ColumnMetadata column, long timestamp, 
int nowInSec)
     {
         return tombstone(column, timestamp, nowInSec, null);
     }
 
-    public static BufferCell tombstone(ColumnDefinition column, long 
timestamp, int nowInSec, CellPath path)
+    public static BufferCell tombstone(ColumnMetadata column, long timestamp, 
int nowInSec, CellPath path)
     {
         return new BufferCell(column, timestamp, NO_TTL, nowInSec, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, path);
     }
@@ -104,7 +104,7 @@ public class BufferCell extends AbstractCell
         return path;
     }
 
-    public Cell withUpdatedColumn(ColumnDefinition newColumn)
+    public Cell withUpdatedColumn(ColumnMetadata newColumn)
     {
         return new BufferCell(newColumn, timestamp, ttl, localDeletionTime, 
value, path);
     }

Reply via email to