http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java 
b/src/java/org/apache/cassandra/db/rows/Cell.java
index 19d1f30..4636022 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
@@ -54,7 +55,7 @@ public abstract class Cell extends ColumnData
 
     public static final Serializer serializer = new BufferCell.Serializer();
 
-    protected Cell(ColumnDefinition column)
+    protected Cell(ColumnMetadata column)
     {
         super(column);
     }
@@ -130,7 +131,7 @@ public abstract class Cell extends ColumnData
      */
     public abstract CellPath path();
 
-    public abstract Cell withUpdatedColumn(ColumnDefinition newColumn);
+    public abstract Cell withUpdatedColumn(ColumnMetadata newColumn);
 
     public abstract Cell withUpdatedValue(ByteBuffer newValue);
 
@@ -171,7 +172,7 @@ public abstract class Cell extends ColumnData
         private final static int USE_ROW_TIMESTAMP_MASK      = 0x08; // Wether 
the cell has the same timestamp than the row this is a cell of.
         private final static int USE_ROW_TTL_MASK            = 0x10; // Wether 
the cell has the same ttl than the row this is a cell of.
 
-        public void serialize(Cell cell, ColumnDefinition column, 
DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) 
throws IOException
+        public void serialize(Cell cell, ColumnMetadata column, DataOutputPlus 
out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
         {
             assert cell != null;
             boolean hasValue = cell.value().hasRemaining();
@@ -210,7 +211,7 @@ public abstract class Cell extends ColumnData
                 header.getType(column).writeValue(cell.value(), out);
         }
 
-        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, 
ColumnDefinition column, SerializationHeader header, SerializationHelper 
helper) throws IOException
+        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, 
ColumnMetadata column, SerializationHeader header, SerializationHelper helper) 
throws IOException
         {
             int flags = in.readUnsignedByte();
             boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
@@ -251,7 +252,7 @@ public abstract class Cell extends ColumnData
             return new BufferCell(column, timestamp, ttl, localDeletionTime, 
value, path);
         }
 
-        public long serializedSize(Cell cell, ColumnDefinition column, 
LivenessInfo rowLiveness, SerializationHeader header)
+        public long serializedSize(Cell cell, ColumnMetadata column, 
LivenessInfo rowLiveness, SerializationHeader header)
         {
             long size = 1; // flags
             boolean hasValue = cell.value().hasRemaining();
@@ -278,7 +279,7 @@ public abstract class Cell extends ColumnData
         }
 
         // Returns if the skipped cell was an actual cell (i.e. it had its 
presence flag).
-        public boolean skip(DataInputPlus in, ColumnDefinition column, 
SerializationHeader header) throws IOException
+        public boolean skip(DataInputPlus in, ColumnMetadata column, 
SerializationHeader header) throws IOException
         {
             int flags = in.readUnsignedByte();
             boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/Cells.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java 
b/src/java/org/apache/cassandra/db/rows/Cells.java
index 38bde16..7f2772c 100644
--- a/src/java/org/apache/cassandra/db/rows/Cells.java
+++ b/src/java/org/apache/cassandra/db/rows/Cells.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.Iterator;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.Conflicts;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
@@ -197,7 +197,7 @@ public abstract class Cells
      * of cells from {@code existing} and {@code update} having the same cell 
path is empty, this
      * returns {@code Long.MAX_VALUE}.
      */
-    public static long reconcileComplex(ColumnDefinition column,
+    public static long reconcileComplex(ColumnMetadata column,
                                         Iterator<Cell> existing,
                                         Iterator<Cell> update,
                                         DeletionTime deletion,
@@ -282,7 +282,7 @@ public abstract class Cells
      * because deleted cells always have precedence on timestamp equality and 
deciding if a
      * cell is a live or not depends on the current time due to expiring 
cells).
      */
-    public static void addNonShadowedComplex(ColumnDefinition column,
+    public static void addNonShadowedComplex(ColumnMetadata column,
                                              Iterator<Cell> existing,
                                              Iterator<Cell> update,
                                              DeletionTime deletion,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/ColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index 933da6a..166a805 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.rows;
 import java.security.MessageDigest;
 import java.util.Comparator;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.serializers.MarshalException;
@@ -35,8 +35,8 @@ public abstract class ColumnData
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> 
cd1.column().compareTo(cd2.column());
 
-    protected final ColumnDefinition column;
-    protected ColumnData(ColumnDefinition column)
+    protected final ColumnMetadata column;
+    protected ColumnData(ColumnMetadata column)
     {
         this.column = column;
     }
@@ -46,7 +46,7 @@ public abstract class ColumnData
      *
      * @return the column this is a data for.
      */
-    public final ColumnDefinition column() { return column; }
+    public final ColumnMetadata column() { return column; }
 
     /**
      * The size of the data hold by this {@code ColumnData}.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index b5195c5..8f9b63f 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -24,14 +24,14 @@ import java.util.Objects;
 
 import com.google.common.base.Function;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.LivenessInfo;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.ByteType;
 import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.DroppedColumn;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
 
@@ -43,7 +43,7 @@ public class ComplexColumnData extends ColumnData implements 
Iterable<Cell>
 {
     static final Cell[] NO_CELLS = new Cell[0];
 
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
ComplexColumnData(ColumnDefinition.regularDef("", "", "", 
SetType.getInstance(ByteType.instance, true)), NO_CELLS, new DeletionTime(0, 
0)));
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
ComplexColumnData(ColumnMetadata.regularColumn("", "", "", 
SetType.getInstance(ByteType.instance, true)), NO_CELLS, new DeletionTime(0, 
0)));
 
     // The cells for 'column' sorted by cell path.
     private final Object[] cells;
@@ -51,7 +51,7 @@ public class ComplexColumnData extends ColumnData implements 
Iterable<Cell>
     private final DeletionTime complexDeletion;
 
     // Only ArrayBackedRow should call this.
-    ComplexColumnData(ColumnDefinition column, Object[] cells, DeletionTime 
complexDeletion)
+    ComplexColumnData(ColumnMetadata column, Object[] cells, DeletionTime 
complexDeletion)
     {
         super(column);
         assert column.isComplex();
@@ -143,7 +143,7 @@ public class ComplexColumnData extends ColumnData 
implements Iterable<Cell>
         return transformAndFilter(complexDeletion, 
Cell::markCounterLocalToBeCleared);
     }
 
-    public ComplexColumnData filter(ColumnFilter filter, DeletionTime 
activeDeletion, CFMetaData.DroppedColumn dropped, LivenessInfo rowLiveness)
+    public ComplexColumnData filter(ColumnFilter filter, DeletionTime 
activeDeletion, DroppedColumn dropped, LivenessInfo rowLiveness)
     {
         ColumnFilter.Tester cellTester = filter.newTester(column);
         if (cellTester == null && activeDeletion.isLive() && dropped == null)
@@ -235,10 +235,10 @@ public class ComplexColumnData extends ColumnData 
implements Iterable<Cell>
     public static class Builder
     {
         private DeletionTime complexDeletion;
-        private ColumnDefinition column;
+        private ColumnMetadata column;
         private BTree.Builder<Cell> builder;
 
-        public void newColumn(ColumnDefinition column)
+        public void newColumn(ColumnMetadata column)
         {
             this.column = column;
             this.complexDeletion = DeletionTime.LIVE; // default if 
writeComplexDeletion is not called

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
 
b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
index 504d60e..d8bd36f 100644
--- 
a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
+++ 
b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
@@ -17,9 +17,9 @@
  */
 package org.apache.cassandra.db.rows;
 
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.AbstractIterator;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 
 /**
@@ -53,13 +53,13 @@ public abstract class 
LazilyInitializedUnfilteredRowIterator extends AbstractIte
         return iterator != null;
     }
 
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
         maybeInit();
         return iterator.metadata();
     }
 
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
         maybeInit();
         return iterator.columns();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/NativeCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/NativeCell.java 
b/src/java/org/apache/cassandra/db/rows/NativeCell.java
index 5930332..10ccf88 100644
--- a/src/java/org/apache/cassandra/db/rows/NativeCell.java
+++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.MemoryUtil;
@@ -61,7 +61,7 @@ public class NativeCell extends AbstractCell
 
     public NativeCell(NativeAllocator allocator,
                       OpOrder.Group writeOp,
-                      ColumnDefinition column,
+                      ColumnMetadata column,
                       long timestamp,
                       int ttl,
                       int localDeletionTime,
@@ -143,7 +143,7 @@ public class NativeCell extends AbstractCell
         throw new UnsupportedOperationException();
     }
 
-    public Cell withUpdatedColumn(ColumnDefinition column)
+    public Cell withUpdatedColumn(ColumnMetadata column)
     {
         return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), 
value(), path());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java 
b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index 45e594b..71b2315 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Objects;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
@@ -124,7 +124,7 @@ public class RangeTombstoneBoundMarker extends 
AbstractRangeTombstoneMarker<Clus
         deletion.digest(digest);
     }
 
-    public String toString(CFMetaData metadata)
+    public String toString(TableMetadata metadata)
     {
         return "Marker " + bound.toString(metadata) + '@' + 
deletion.markedForDeleteAt();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java 
b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index fd41bea..b6ecd9d 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Objects;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
@@ -146,7 +146,7 @@ public class RangeTombstoneBoundaryMarker extends 
AbstractRangeTombstoneMarker<C
         startDeletion.digest(digest);
     }
 
-    public String toString(CFMetaData metadata)
+    public String toString(TableMetadata metadata)
     {
         return String.format("Marker %s@%d-%d", bound.toString(metadata), 
endDeletion.markedForDeleteAt(), startDeletion.markedForDeleteAt());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java 
b/src/java/org/apache/cassandra/db/rows/Row.java
index c19d55e..9e7e50d 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -23,10 +23,10 @@ import java.util.function.Consumer;
 
 import com.google.common.base.Predicate;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MergeIterator;
@@ -60,7 +60,7 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
      * An in-natural-order collection of the columns for which data (incl. 
simple tombstones)
      * is present in this row.
      */
-    public Collection<ColumnDefinition> columns();
+    public Collection<ColumnMetadata> columns();
 
     /**
      * The row deletion.
@@ -115,7 +115,7 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
      * @param c the simple column for which to fetch the cell.
      * @return the corresponding cell or {@code null} if the row has no such 
cell.
      */
-    public Cell getCell(ColumnDefinition c);
+    public Cell getCell(ColumnMetadata c);
 
     /**
      * Return a cell for a given complex column and cell path.
@@ -124,7 +124,7 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
      * @param path the cell path for which to fetch the cell.
      * @return the corresponding cell or {@code null} if the row has no such 
cell.
      */
-    public Cell getCell(ColumnDefinition c, CellPath path);
+    public Cell getCell(ColumnMetadata c, CellPath path);
 
     /**
      * The data for a complex column.
@@ -134,7 +134,7 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
      * @param c the complex column for which to return the complex data.
      * @return the data for {@code c} or {@code null} is the row has no data 
for this column.
      */
-    public ComplexColumnData getComplexColumnData(ColumnDefinition c);
+    public ComplexColumnData getComplexColumnData(ColumnMetadata c);
 
     /**
      * An iterable over the cells of this row.
@@ -156,7 +156,7 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
      * @param reversed if cells should returned in reverse order.
      * @return an iterable over the cells of this row in "legacy order".
      */
-    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata, boolean 
reversed);
+    public Iterable<Cell> cellsInLegacyOrder(TableMetadata metadata, boolean 
reversed);
 
     /**
      * Whether the row stores any (non-live) complex deletion for any complex 
column.
@@ -180,14 +180,14 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
      *
      * @return a search iterator for the cells of this row.
      */
-    public SearchIterator<ColumnDefinition, ColumnData> searchIterator();
+    public SearchIterator<ColumnMetadata, ColumnData> searchIterator();
 
     /**
      * Returns a copy of this row that:
      *   1) only includes the data for the column included by {@code filter}.
      *   2) doesn't include any data that belongs to a dropped column 
(recorded in {@code metadata}).
      */
-    public Row filter(ColumnFilter filter, CFMetaData metadata);
+    public Row filter(ColumnFilter filter, TableMetadata metadata);
 
     /**
      * Returns a copy of this row that:
@@ -196,7 +196,7 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
      *   3) doesn't include any data that is shadowed/deleted by {@code 
activeDeletion}.
      *   4) uses {@code activeDeletion} as row deletion iff {@code 
setActiveDeletionToRow} and {@code activeDeletion} supersedes the row deletion.
      */
-    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, 
boolean setActiveDeletionToRow, CFMetaData metadata);
+    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, 
boolean setActiveDeletionToRow, TableMetadata metadata);
 
     /**
      * Returns a copy of this row without any deletion info that should be 
purged according to {@code purger}.
@@ -249,7 +249,7 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
 
     public long unsharedHeapSizeExcludingData();
 
-    public String toString(CFMetaData metadata, boolean fullDetails);
+    public String toString(TableMetadata metadata, boolean fullDetails);
 
     /**
      * Apply a function to every column in a row
@@ -475,7 +475,7 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
          * @param column the column for which to add the {@code 
complexDeletion}.
          * @param complexDeletion the complex deletion time to add.
          */
-        public void addComplexDeletion(ColumnDefinition column, DeletionTime 
complexDeletion);
+        public void addComplexDeletion(ColumnMetadata column, DeletionTime 
complexDeletion);
 
         /**
          * Builds and return built row.
@@ -690,7 +690,7 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
         {
             private final int nowInSec;
 
-            private ColumnDefinition column;
+            private ColumnMetadata column;
             private final List<ColumnData> versions;
 
             private DeletionTime activeDeletion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java 
b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
index 5d7d8fe..e08b370 100644
--- a/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java
@@ -20,7 +20,8 @@ package org.apache.cassandra.db.rows;
 import java.util.Comparator;
 import java.util.Iterator;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 
@@ -51,7 +52,7 @@ public class RowAndDeletionMergeIterator extends 
AbstractUnfilteredRowIterator
     // The currently open tombstone. Note that unless this is null, there is 
no point in checking nextRange.
     private RangeTombstone openRange;
 
-    public RowAndDeletionMergeIterator(CFMetaData metadata,
+    public RowAndDeletionMergeIterator(TableMetadata metadata,
                                        DecoratedKey partitionKey,
                                        DeletionTime partitionLevelDeletion,
                                        ColumnFilter selection,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowDiffListener.java 
b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
index 0c7e32b..88a9bae 100644
--- a/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
+++ b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.db.rows;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.*;
 
 /**
@@ -63,7 +63,7 @@ public interface RowDiffListener
      * @param original the complex deletion of input {@code i} for column 
{@code column}. May be {@code null} if input {@code i}
      * had no complex deletion but the merged row has.
      */
-    public void onComplexDeletion(int i, Clustering clustering, 
ColumnDefinition column, DeletionTime merged, DeletionTime original);
+    public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata 
column, DeletionTime merged, DeletionTime original);
 
     /**
      * Called for any cell that is either in the merged row or in input {@code 
i}.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java 
b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index bce6a7d..b155d68 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -22,9 +22,9 @@ import java.security.MessageDigest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -76,12 +76,12 @@ public abstract class RowIterators
      */
     public static RowIterator loggingIterator(RowIterator iterator, final 
String id)
     {
-        CFMetaData metadata = iterator.metadata();
+        TableMetadata metadata = iterator.metadata();
         logger.info("[{}] Logging iterator on {}.{}, partition key={}, 
reversed={}",
                     id,
-                    metadata.ksName,
-                    metadata.cfName,
-                    
metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
+                    metadata.keyspace,
+                    metadata.name,
+                    
metadata.partitionKeyType.getString(iterator.partitionKey().getKey()),
                     iterator.isReverseOrder());
 
         class Log extends Transformation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java 
b/src/java/org/apache/cassandra/db/rows/Rows.java
index a92bdac..8c7143a 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -22,11 +22,10 @@ import java.util.*;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
-import org.apache.cassandra.db.rows.Row.Deletion;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.WrappedInt;
 
@@ -71,7 +70,7 @@ public abstract class Rows
      * only argument.
      * @return a newly created builder.
      */
-    public static Row.SimpleBuilder simpleBuilder(CFMetaData metadata, 
Object... clusteringValues)
+    public static Row.SimpleBuilder simpleBuilder(TableMetadata metadata, 
Object... clusteringValues)
     {
         return new SimpleBuilders.RowBuilder(metadata, clusteringValues);
     }
@@ -173,7 +172,7 @@ public abstract class Rows
                     ColumnData input = inputDatas[i];
                     if (mergedData != null || input != null)
                     {
-                        ColumnDefinition column = (mergedData != null ? 
mergedData : input).column;
+                        ColumnMetadata column = (mergedData != null ? 
mergedData : input).column;
                         if (column.isSimple())
                         {
                             diffListener.onCell(i, clustering, (Cell) 
mergedData, (Cell) input);
@@ -298,7 +297,7 @@ public abstract class Rows
             int comparison = nexta == null ? 1 : nextb == null ? -1 : 
nexta.column.compareTo(nextb.column);
             ColumnData cura = comparison <= 0 ? nexta : null;
             ColumnData curb = comparison >= 0 ? nextb : null;
-            ColumnDefinition column = (cura != null ? cura : curb).column;
+            ColumnMetadata column = (cura != null ? cura : curb).column;
             if (column.isSimple())
             {
                 timeDelta = Math.min(timeDelta, Cells.reconcile((Cell) cura, 
(Cell) curb, deletion, builder, nowInSec));
@@ -367,7 +366,7 @@ public abstract class Rows
             if (comparison <= 0)
             {
                 ColumnData cura = nexta;
-                ColumnDefinition column = cura.column;
+                ColumnMetadata column = cura.column;
                 ColumnData curb = comparison == 0 ? nextb : null;
                 if (column.isSimple())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java 
b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
index e40a1e1..c7fb8e4 100644
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@ -20,11 +20,12 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.schema.DroppedColumn;
 
 public class SerializationHelper
 {
@@ -50,24 +51,24 @@ public class SerializationHelper
     private final ColumnFilter columnsToFetch;
     private ColumnFilter.Tester tester;
 
-    private final Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns;
-    private CFMetaData.DroppedColumn currentDroppedComplex;
+    private final Map<ByteBuffer, DroppedColumn> droppedColumns;
+    private DroppedColumn currentDroppedComplex;
 
 
-    public SerializationHelper(CFMetaData metadata, int version, Flag flag, 
ColumnFilter columnsToFetch)
+    public SerializationHelper(TableMetadata metadata, int version, Flag flag, 
ColumnFilter columnsToFetch)
     {
         this.flag = flag;
         this.version = version;
         this.columnsToFetch = columnsToFetch;
-        this.droppedColumns = metadata.getDroppedColumns();
+        this.droppedColumns = metadata.droppedColumns;
     }
 
-    public SerializationHelper(CFMetaData metadata, int version, Flag flag)
+    public SerializationHelper(TableMetadata metadata, int version, Flag flag)
     {
         this(metadata, version, flag, null);
     }
 
-    public boolean includes(ColumnDefinition column)
+    public boolean includes(ColumnMetadata column)
     {
         return columnsToFetch == null || columnsToFetch.fetches(column);
     }
@@ -83,7 +84,7 @@ public class SerializationHelper
         // actually requested by the user however (canSkipValue), we can skip 
the full cell if the cell
         // timestamp is lower than the row one, because in that case, the row 
timestamp is enough proof
         // of the liveness of the row. Otherwise, we'll only be able to skip 
the values of those cells.
-        ColumnDefinition column = cell.column();
+        ColumnMetadata column = cell.column();
         if (column.isComplex())
         {
             if (!includes(cell.path()))
@@ -102,7 +103,7 @@ public class SerializationHelper
         return path == null || tester == null || tester.fetches(path);
     }
 
-    public boolean canSkipValue(ColumnDefinition column)
+    public boolean canSkipValue(ColumnMetadata column)
     {
         return columnsToFetch != null && 
!columnsToFetch.fetchedColumnIsQueried(column);
     }
@@ -112,7 +113,7 @@ public class SerializationHelper
         return path != null && tester != null && 
!tester.fetchedCellIsQueried(path);
     }
 
-    public void startOfComplexColumn(ColumnDefinition column)
+    public void startOfComplexColumn(ColumnMetadata column)
     {
         this.tester = columnsToFetch == null ? null : 
columnsToFetch.newTester(column);
         this.currentDroppedComplex = droppedColumns.get(column.name.bytes);
@@ -125,7 +126,7 @@ public class SerializationHelper
 
     public boolean isDropped(Cell cell, boolean isComplex)
     {
-        CFMetaData.DroppedColumn dropped = isComplex ? currentDroppedComplex : 
droppedColumns.get(cell.column().name.bytes);
+        DroppedColumn dropped = isComplex ? currentDroppedComplex : 
droppedColumns.get(cell.column().name.bytes);
         return dropped != null && cell.timestamp() <= dropped.droppedTime;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java 
b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
index 37ad447..39d9e75 100644
--- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.db.rows;
 
 import java.security.MessageDigest;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.Clusterable;
 
 /**
@@ -53,11 +53,11 @@ public interface Unfiltered extends Clusterable
      * invalid (some value is invalid for its column type, or some field
      * is nonsensical).
      */
-    public void validateData(CFMetaData metadata);
+    public void validateData(TableMetadata metadata);
 
-    public String toString(CFMetaData metadata);
-    public String toString(CFMetaData metadata, boolean fullDetails);
-    public String toString(CFMetaData metadata, boolean includeClusterKeys, 
boolean fullDetails);
+    public String toString(TableMetadata metadata);
+    public String toString(TableMetadata metadata, boolean fullDetails);
+    public String toString(TableMetadata metadata, boolean includeClusterKeys, 
boolean fullDetails);
 
     default boolean isRow()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 45c026f..7cac5e6 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -23,11 +23,11 @@ import java.io.IOError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -177,14 +177,14 @@ public class UnfilteredRowIteratorSerializer
         return size;
     }
 
-    public Header deserializeHeader(CFMetaData metadata, ColumnFilter 
selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws 
IOException
+    public Header deserializeHeader(TableMetadata metadata, ColumnFilter 
selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws 
IOException
     {
-        DecoratedKey key = 
metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in));
+        DecoratedKey key = 
metadata.partitioner.decorateKey(ByteBufferUtil.readWithVIntLength(in));
         int flags = in.readUnsignedByte();
         boolean isReversed = (flags & IS_REVERSED) != 0;
         if ((flags & IS_EMPTY) != 0)
         {
-            SerializationHeader sh = new SerializationHeader(false, metadata, 
PartitionColumns.NONE, EncodingStats.NO_STATS);
+            SerializationHeader sh = new SerializationHeader(false, metadata, 
RegularAndStaticColumns.NONE, EncodingStats.NO_STATS);
             return new Header(sh, key, isReversed, true, null, null, 0);
         }
 
@@ -204,7 +204,7 @@ public class UnfilteredRowIteratorSerializer
         return new Header(header, key, isReversed, false, partitionDeletion, 
staticRow, rowEstimate);
     }
 
-    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, 
CFMetaData metadata, SerializationHelper.Flag flag, Header header) throws 
IOException
+    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, 
TableMetadata metadata, SerializationHelper.Flag flag, Header header) throws 
IOException
     {
         if (header.isEmpty)
             return EmptyIterators.unfilteredRow(metadata, header.key, 
header.isReversed);
@@ -230,7 +230,7 @@ public class UnfilteredRowIteratorSerializer
         };
     }
 
-    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, 
CFMetaData metadata, ColumnFilter selection, SerializationHelper.Flag flag) 
throws IOException
+    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, 
TableMetadata metadata, ColumnFilter selection, SerializationHelper.Flag flag) 
throws IOException
     {
         return deserialize(in, version, metadata, flag, 
deserializeHeader(metadata, selection, in, version, flag));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
index 8dbc606..74ade1e 100644
--- 
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
+++ 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
@@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -108,8 +108,8 @@ public class UnfilteredRowIteratorWithLowerBound extends 
LazilyInitializedUnfilt
         if (lowerBound != null && ret != null)
             assert comparator().compare(lowerBound, ret.clustering()) <= 0
                 : String.format("Lower bound [%s ]is bigger than first 
returned value [%s] for sstable %s",
-                                lowerBound.toString(sstable.metadata),
-                                ret.toString(sstable.metadata),
+                                lowerBound.toString(metadata()),
+                                ret.toString(metadata()),
                                 sstable.getFilename());
 
         return ret;
@@ -117,13 +117,13 @@ public class UnfilteredRowIteratorWithLowerBound extends 
LazilyInitializedUnfilt
 
     private Comparator<Clusterable> comparator()
     {
-        return filter.isReversed() ? sstable.metadata.comparator.reversed() : 
sstable.metadata.comparator;
+        return filter.isReversed() ? metadata().comparator.reversed() : 
metadata().comparator;
     }
 
     @Override
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
-        return sstable.metadata;
+        return sstable.metadata();
     }
 
     @Override
@@ -133,7 +133,7 @@ public class UnfilteredRowIteratorWithLowerBound extends 
LazilyInitializedUnfilt
     }
 
     @Override
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
         return selectedColumns.fetchedColumns();
     }
@@ -184,10 +184,10 @@ public class UnfilteredRowIteratorWithLowerBound extends 
LazilyInitializedUnfilt
         {
             IndexInfo column = 
onHeapRetriever.columnsIndex(filter.isReversed() ? 
rowIndexEntry.columnsIndexCount() - 1 : 0);
             ClusteringPrefix lowerBoundPrefix = filter.isReversed() ? 
column.lastName : column.firstName;
-            assert lowerBoundPrefix.getRawValues().length <= 
sstable.metadata.comparator.size() :
+            assert lowerBoundPrefix.getRawValues().length <= 
metadata().comparator.size() :
             String.format("Unexpected number of clustering values %d, expected 
%d or fewer for %s",
                           lowerBoundPrefix.getRawValues().length,
-                          sstable.metadata.comparator.size(),
+                          metadata().comparator.size(),
                           sstable.getFilename());
             return ClusteringBound.inclusiveOpen(filter.isReversed(), 
lowerBoundPrefix.getRawValues());
         }
@@ -217,10 +217,10 @@ public class UnfilteredRowIteratorWithLowerBound extends 
LazilyInitializedUnfilt
 
         final StatsMetadata m = sstable.getSSTableMetadata();
         List<ByteBuffer> vals = filter.isReversed() ? m.maxClusteringValues : 
m.minClusteringValues;
-        assert vals.size() <= sstable.metadata.comparator.size() :
+        assert vals.size() <= metadata().comparator.size() :
         String.format("Unexpected number of clustering values %d, expected %d 
or fewer for %s",
                       vals.size(),
-                      sstable.metadata.comparator.size(),
+                      metadata().comparator.size(),
                       sstable.getFilename());
         return  ClusteringBound.inclusiveOpen(filter.isReversed(), 
vals.toArray(new ByteBuffer[vals.size()]));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 004783e..b03799a 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -23,16 +23,15 @@ import java.security.MessageDigest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.transform.FilteredRows;
 import org.apache.cassandra.db.transform.MoreRows;
 import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.IMergeIterator;
 import org.apache.cassandra.utils.MergeIterator;
@@ -135,9 +134,9 @@ public abstract class UnfilteredRowIterators
     /**
      * Returns an empty unfiltered iterator for a given partition.
      */
-    public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, 
final DecoratedKey partitionKey, final Row staticRow, final DeletionTime 
partitionDeletion, final boolean isReverseOrder)
+    public static UnfilteredRowIterator noRowsIterator(final TableMetadata 
metadata, final DecoratedKey partitionKey, final Row staticRow, final 
DeletionTime partitionDeletion, final boolean isReverseOrder)
     {
-        return EmptyIterators.unfilteredRow(cfm, partitionKey, isReverseOrder, 
staticRow, partitionDeletion);
+        return EmptyIterators.unfilteredRow(metadata, partitionKey, 
isReverseOrder, staticRow, partitionDeletion);
     }
 
     /**
@@ -199,7 +198,7 @@ public abstract class UnfilteredRowIterators
      */
     public static UnfilteredRowIterator concat(final UnfilteredRowIterator 
iter1, final UnfilteredRowIterator iter2)
     {
-        assert iter1.metadata().cfId.equals(iter2.metadata().cfId)
+        assert iter1.metadata().id.equals(iter2.metadata().id)
             && iter1.partitionKey().equals(iter2.partitionKey())
             && 
iter1.partitionLevelDeletion().equals(iter2.partitionLevelDeletion())
             && iter1.isReverseOrder() == iter2.isReverseOrder()
@@ -310,12 +309,12 @@ public abstract class UnfilteredRowIterators
      */
     public static UnfilteredRowIterator loggingIterator(UnfilteredRowIterator 
iterator, final String id, final boolean fullDetails)
     {
-        CFMetaData metadata = iterator.metadata();
+        TableMetadata metadata = iterator.metadata();
         logger.info("[{}] Logging iterator on {}.{}, partition key={}, 
reversed={}, deletion={}",
                     id,
-                    metadata.ksName,
-                    metadata.cfName,
-                    
metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
+                    metadata.keyspace,
+                    metadata.name,
+                    
metadata.partitionKeyType.getString(iterator.partitionKey().getKey()),
                     iterator.isReverseOrder(),
                     iterator.partitionLevelDeletion().markedForDeleteAt());
 
@@ -354,9 +353,9 @@ public abstract class UnfilteredRowIterators
         private final IMergeIterator<Unfiltered, Unfiltered> mergeIterator;
         private final MergeListener listener;
 
-        private UnfilteredRowMergeIterator(CFMetaData metadata,
+        private UnfilteredRowMergeIterator(TableMetadata metadata,
                                            List<UnfilteredRowIterator> 
iterators,
-                                           PartitionColumns columns,
+                                           RegularAndStaticColumns columns,
                                            DeletionTime partitionDeletion,
                                            int nowInSec,
                                            boolean reversed,
@@ -413,7 +412,7 @@ public abstract class UnfilteredRowIterators
             for (int i = 1; i < iterators.size(); i++)
             {
                 UnfilteredRowIterator iter = iterators.get(i);
-                assert first.metadata().cfId.equals(iter.metadata().cfId);
+                assert first.metadata().id.equals(iter.metadata().id);
                 assert first.partitionKey().equals(iter.partitionKey());
                 assert first.isReverseOrder() == iter.isReverseOrder();
             }
@@ -463,20 +462,20 @@ public abstract class UnfilteredRowIterators
             return merged;
         }
 
-        private static PartitionColumns 
collectColumns(List<UnfilteredRowIterator> iterators)
+        private static RegularAndStaticColumns 
collectColumns(List<UnfilteredRowIterator> iterators)
         {
-            PartitionColumns first = iterators.get(0).columns();
+            RegularAndStaticColumns first = iterators.get(0).columns();
             Columns statics = first.statics;
             Columns regulars = first.regulars;
             for (int i = 1; i < iterators.size(); i++)
             {
-                PartitionColumns cols = iterators.get(i).columns();
+                RegularAndStaticColumns cols = iterators.get(i).columns();
                 statics = statics.mergeTo(cols.statics);
                 regulars = regulars.mergeTo(cols.regulars);
             }
             return statics == first.statics && regulars == first.regulars
                  ? first
-                 : new PartitionColumns(statics, regulars);
+                 : new RegularAndStaticColumns(statics, regulars);
         }
 
         private static EncodingStats mergeStats(List<UnfilteredRowIterator> 
iterators)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index c79cc8c..ecfd1c0 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import com.google.common.collect.Collections2;
 
 import net.nicoulaj.compilecommand.annotations.Inline;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.Row.Deletion;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -224,7 +224,7 @@ public class UnfilteredSerializer
         if ((flags & HAS_ALL_COLUMNS) == 0)
             Columns.serializer.serializeSubset(Collections2.transform(row, 
ColumnData::column), headerColumns, out);
 
-        SearchIterator<ColumnDefinition, ColumnDefinition> si = 
headerColumns.iterator();
+        SearchIterator<ColumnMetadata, ColumnMetadata> si = 
headerColumns.iterator();
 
         try
         {
@@ -232,9 +232,9 @@ public class UnfilteredSerializer
                 // We can obtain the column for data directly from 
data.column(). However, if the cell/complex data
                 // originates from a sstable, the column we'll get will have 
the type used when the sstable was serialized,
                 // and if that type have been recently altered, that may not 
be the type we want to serialize the column
-                // with. So we use the ColumnDefinition from the "header" 
which is "current". Also see #11810 for what
+                // with. So we use the ColumnMetadata from the "header" which 
is "current". Also see #11810 for what
                 // happens if we don't do that.
-                ColumnDefinition column = si.next(cd.column());
+                ColumnMetadata column = si.next(cd.column());
                 assert column != null : cd.column.toString();
 
                 try
@@ -259,7 +259,7 @@ public class UnfilteredSerializer
         }
     }
 
-    private void writeComplexColumn(ComplexColumnData data, ColumnDefinition 
column, boolean hasComplexDeletion, LivenessInfo rowLiveness, 
SerializationHeader header, DataOutputPlus out)
+    private void writeComplexColumn(ComplexColumnData data, ColumnMetadata 
column, boolean hasComplexDeletion, LivenessInfo rowLiveness, 
SerializationHeader header, DataOutputPlus out)
     throws IOException
     {
         if (hasComplexDeletion)
@@ -347,10 +347,10 @@ public class UnfilteredSerializer
         if (!hasAllColumns)
             size += 
Columns.serializer.serializedSubsetSize(Collections2.transform(row, 
ColumnData::column), header.columns(isStatic));
 
-        SearchIterator<ColumnDefinition, ColumnDefinition> si = 
headerColumns.iterator();
+        SearchIterator<ColumnMetadata, ColumnMetadata> si = 
headerColumns.iterator();
         for (ColumnData data : row)
         {
-            ColumnDefinition column = si.next(data.column());
+            ColumnMetadata column = si.next(data.column());
             assert column != null;
 
             if (data.column.isSimple())
@@ -362,7 +362,7 @@ public class UnfilteredSerializer
         return size;
     }
 
-    private long sizeOfComplexColumn(ComplexColumnData data, ColumnDefinition 
column, boolean hasComplexDeletion, LivenessInfo rowLiveness, 
SerializationHeader header)
+    private long sizeOfComplexColumn(ComplexColumnData data, ColumnMetadata 
column, boolean hasComplexDeletion, LivenessInfo rowLiveness, 
SerializationHeader header)
     {
         long size = 0;
 
@@ -602,7 +602,7 @@ public class UnfilteredSerializer
         }
     }
 
-    private void readSimpleColumn(ColumnDefinition column, DataInputPlus in, 
SerializationHeader header, SerializationHelper helper, Row.Builder builder, 
LivenessInfo rowLiveness)
+    private void readSimpleColumn(ColumnMetadata column, DataInputPlus in, 
SerializationHeader header, SerializationHelper helper, Row.Builder builder, 
LivenessInfo rowLiveness)
     throws IOException
     {
         if (helper.includes(column))
@@ -617,7 +617,7 @@ public class UnfilteredSerializer
         }
     }
 
-    private void readComplexColumn(ColumnDefinition column, DataInputPlus in, 
SerializationHeader header, SerializationHelper helper, boolean 
hasComplexDeletion, Row.Builder builder, LivenessInfo rowLiveness)
+    private void readComplexColumn(ColumnMetadata column, DataInputPlus in, 
SerializationHeader header, SerializationHelper helper, boolean 
hasComplexDeletion, Row.Builder builder, LivenessInfo rowLiveness)
     throws IOException
     {
         if (helper.includes(column))
@@ -667,7 +667,7 @@ public class UnfilteredSerializer
         in.skipBytesFully(markerSize);
     }
 
-    private void skipComplexColumn(DataInputPlus in, ColumnDefinition column, 
SerializationHeader header, boolean hasComplexDeletion)
+    private void skipComplexColumn(DataInputPlus in, ColumnMetadata column, 
SerializationHeader header, boolean hasComplexDeletion)
     throws IOException
     {
         if (hasComplexDeletion)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java 
b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
index dcf0891..ecae172 100644
--- a/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
+++ b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.db.rows;
 
-import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.transform.Transformation;
 
@@ -36,7 +36,7 @@ public class WithOnlyQueriedData<I extends 
BaseRowIterator<?>> extends Transform
     }
 
     @Override
-    protected PartitionColumns applyToPartitionColumns(PartitionColumns 
columns)
+    protected RegularAndStaticColumns 
applyToPartitionColumns(RegularAndStaticColumns columns)
     {
         return filter.queriedColumns();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
index 411950e..e38be09 100644
--- a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.db.rows;
 
 import com.google.common.collect.UnmodifiableIterator;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 
 /**
@@ -40,12 +40,12 @@ public abstract class WrappingUnfilteredRowIterator extends 
UnmodifiableIterator
         this.wrapped = wrapped;
     }
 
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
         return wrapped.metadata();
     }
 
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
         return wrapped.columns();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/transform/BaseRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/BaseRows.java 
b/src/java/org/apache/cassandra/db/transform/BaseRows.java
index 9bf878f..0f03bbb 100644
--- a/src/java/org/apache/cassandra/db/transform/BaseRows.java
+++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java
@@ -20,9 +20,9 @@
  */
 package org.apache.cassandra.db.transform;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.rows.*;
 
 import static org.apache.cassandra.utils.Throwables.merge;
@@ -50,7 +50,7 @@ implements BaseRowIterator<R>
         partitionKey = copyFrom.partitionKey();
     }
 
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
         return input.metadata();
     }
@@ -60,7 +60,7 @@ implements BaseRowIterator<R>
         return input.isReverseOrder();
     }
 
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
         return input.columns();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/transform/Transformation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Transformation.java 
b/src/java/org/apache/cassandra/db/transform/Transformation.java
index 33c1fe7..811932c 100644
--- a/src/java/org/apache/cassandra/db/transform/Transformation.java
+++ b/src/java/org/apache/cassandra/db/transform/Transformation.java
@@ -22,7 +22,7 @@ package org.apache.cassandra.db.transform;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
@@ -116,7 +116,7 @@ public abstract class Transformation<I extends 
BaseRowIterator<?>>
      * NOTE: same remark than for applyToDeletion: it is only applied to the 
first iterator in a sequence of iterators
      * filled by MoreContents.
      */
-    protected PartitionColumns applyToPartitionColumns(PartitionColumns 
columns)
+    protected RegularAndStaticColumns 
applyToPartitionColumns(RegularAndStaticColumns columns)
     {
         return columns;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java 
b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
index de0a51b..f0f295f 100644
--- a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
@@ -20,7 +20,7 @@
  */
 package org.apache.cassandra.db.transform;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
@@ -32,7 +32,7 @@ final class UnfilteredPartitions extends 
BasePartitions<UnfilteredRowIterator, U
         super(input);
     }
 
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
         return input.metadata();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java 
b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
index ba86066..a3dc96e 100644
--- a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
@@ -21,20 +21,20 @@
 package org.apache.cassandra.db.transform;
 
 import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
 final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> 
implements UnfilteredRowIterator
 {
-    private PartitionColumns partitionColumns;
+    private RegularAndStaticColumns regularAndStaticColumns;
     private DeletionTime partitionLevelDeletion;
 
     public UnfilteredRows(UnfilteredRowIterator input)
     {
         super(input);
-        partitionColumns = input.columns();
+        regularAndStaticColumns = input.columns();
         partitionLevelDeletion = input.partitionLevelDeletion();
     }
 
@@ -42,14 +42,14 @@ final class UnfilteredRows extends BaseRows<Unfiltered, 
UnfilteredRowIterator> i
     void add(Transformation add)
     {
         super.add(add);
-        partitionColumns = add.applyToPartitionColumns(partitionColumns);
+        regularAndStaticColumns = 
add.applyToPartitionColumns(regularAndStaticColumns);
         partitionLevelDeletion = add.applyToDeletion(partitionLevelDeletion);
     }
 
     @Override
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
-        return partitionColumns;
+        return regularAndStaticColumns;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java 
b/src/java/org/apache/cassandra/db/view/TableViews.java
index 7ee5aab..0579429 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -26,13 +26,16 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.btree.BTreeSet;
@@ -43,16 +46,16 @@ import org.apache.cassandra.utils.btree.BTreeSet;
  */
 public class TableViews extends AbstractCollection<View>
 {
-    private final CFMetaData baseTableMetadata;
+    private final TableMetadataRef baseTableMetadata;
 
     // We need this to be thread-safe, but the number of times this is changed 
(when a view is created in the keyspace)
     // is massively exceeded by the number of times it's read (for every 
mutation on the keyspace), so a copy-on-write
     // list is the best option.
     private final List<View> views = new CopyOnWriteArrayList();
 
-    public TableViews(CFMetaData baseTableMetadata)
+    public TableViews(TableId id)
     {
-        this.baseTableMetadata = baseTableMetadata;
+        baseTableMetadata = Schema.instance.getTableMetadataRef(id);
     }
 
     public int size()
@@ -79,8 +82,8 @@ public class TableViews extends AbstractCollection<View>
 
     public Iterable<ColumnFamilyStore> allViewsCfs()
     {
-        Keyspace keyspace = Keyspace.open(baseTableMetadata.ksName);
-        return Iterables.transform(views, view -> 
keyspace.getColumnFamilyStore(view.getDefinition().viewName));
+        Keyspace keyspace = Keyspace.open(baseTableMetadata.keyspace);
+        return Iterables.transform(views, view -> 
keyspace.getColumnFamilyStore(view.getDefinition().name));
     }
 
     public void forceBlockingFlush()
@@ -119,7 +122,7 @@ public class TableViews extends AbstractCollection<View>
      */
     public void pushViewReplicaUpdates(PartitionUpdate update, boolean 
writeCommitLog, AtomicLong baseComplete)
     {
-        assert update.metadata().cfId.equals(baseTableMetadata.cfId);
+        assert update.metadata().id.equals(baseTableMetadata.id);
 
         Collection<View> views = updatedViews(update);
         if (views.isEmpty())
@@ -168,7 +171,7 @@ public class TableViews extends AbstractCollection<View>
                                                               int nowInSec,
                                                               boolean 
separateUpdates)
     {
-        assert updates.metadata().cfId.equals(baseTableMetadata.cfId);
+        assert updates.metadata().id.equals(baseTableMetadata.id);
 
         List<ViewUpdateGenerator> generators = new ArrayList<>(views.size());
         for (View view : views)
@@ -191,7 +194,7 @@ public class TableViews extends AbstractCollection<View>
 
             Row existingRow;
             Row updateRow;
-            int cmp = baseTableMetadata.comparator.compare(update, existing);
+            int cmp = baseTableMetadata.get().comparator.compare(update, 
existing);
             if (cmp < 0)
             {
                 // We have an update where there was nothing before
@@ -261,7 +264,7 @@ public class TableViews extends AbstractCollection<View>
 
         if (separateUpdates)
         {
-            final Collection<Mutation> firstBuild = 
buildMutations(baseTableMetadata, generators);
+            final Collection<Mutation> firstBuild = 
buildMutations(baseTableMetadata.get(), generators);
 
             return new Iterator<Collection<Mutation>>()
             {
@@ -288,7 +291,7 @@ public class TableViews extends AbstractCollection<View>
                         // If the updates have been filtered, then we won't 
have any mutations; we need to make sure that we
                         // only return if the mutations are empty. Otherwise, 
we continue to search for an update which is
                         // not filtered
-                        Collection<Mutation> mutations = 
buildMutations(baseTableMetadata, generators);
+                        Collection<Mutation> mutations = 
buildMutations(baseTableMetadata.get(), generators);
                         if (!mutations.isEmpty())
                             return mutations;
                     }
@@ -325,7 +328,7 @@ public class TableViews extends AbstractCollection<View>
                 addToViewUpdateGenerators(emptyRow(updateRow.clustering(), 
DeletionTime.LIVE), updateRow, generators, nowInSec);
             }
 
-            return 
Iterators.singletonIterator(buildMutations(baseTableMetadata, generators));
+            return 
Iterators.singletonIterator(buildMutations(baseTableMetadata.get(), 
generators));
         }
     }
 
@@ -363,7 +366,7 @@ public class TableViews extends AbstractCollection<View>
     {
         Slices.Builder sliceBuilder = null;
         DeletionInfo deletionInfo = updates.deletionInfo();
-        CFMetaData metadata = updates.metadata();
+        TableMetadata metadata = updates.metadata();
         DecoratedKey key = updates.partitionKey();
         // TODO: This is subtle: we need to gather all the slices that we have 
to fetch between partition del, range tombstones and rows.
         if (!deletionInfo.isLive())
@@ -484,7 +487,7 @@ public class TableViews extends AbstractCollection<View>
      * @param generators the generators from which to extract the view 
mutations from.
      * @return the mutations created by all the generators in {@code 
generators}.
      */
-    private Collection<Mutation> buildMutations(CFMetaData baseTableMetadata, 
List<ViewUpdateGenerator> generators)
+    private Collection<Mutation> buildMutations(TableMetadata 
baseTableMetadata, List<ViewUpdateGenerator> generators)
     {
         // One view is probably common enough and we can optimize a bit easily
         if (generators.size() == 1)
@@ -508,7 +511,7 @@ public class TableViews extends AbstractCollection<View>
                 Mutation mutation = mutations.get(key);
                 if (mutation == null)
                 {
-                    mutation = new Mutation(baseTableMetadata.ksName, key);
+                    mutation = new Mutation(baseTableMetadata.keyspace, key);
                     mutations.put(key, mutation);
                 }
                 mutation.add(update);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java 
b/src/java/org/apache/cassandra/db/view/View.java
index 0b8de9e..1cde464 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -28,10 +28,13 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.FBUtilities;
 import org.slf4j.Logger;
@@ -47,11 +50,11 @@ public class View
     private static final Logger logger = LoggerFactory.getLogger(View.class);
 
     public final String name;
-    private volatile ViewDefinition definition;
+    private volatile ViewMetadata definition;
 
     private final ColumnFamilyStore baseCfs;
 
-    public volatile List<ColumnDefinition> baseNonPKColumnsInViewPK;
+    public volatile List<ColumnMetadata> baseNonPKColumnsInViewPK;
 
     private final boolean includeAllColumns;
     private ViewBuilder builder;
@@ -63,33 +66,33 @@ public class View
     private SelectStatement select;
     private ReadQuery query;
 
-    public View(ViewDefinition definition,
+    public View(ViewMetadata definition,
                 ColumnFamilyStore baseCfs)
     {
         this.baseCfs = baseCfs;
-        this.name = definition.viewName;
+        this.name = definition.name;
         this.includeAllColumns = definition.includeAllColumns;
         this.rawSelect = definition.select;
 
         updateDefinition(definition);
     }
 
-    public ViewDefinition getDefinition()
+    public ViewMetadata getDefinition()
     {
         return definition;
     }
 
     /**
-     * This updates the columns stored which are dependent on the base 
CFMetaData.
+     * This updates the columns stored which are dependent on the base 
TableMetadata.
      */
-    public void updateDefinition(ViewDefinition definition)
+    public void updateDefinition(ViewMetadata definition)
     {
         this.definition = definition;
 
-        List<ColumnDefinition> nonPKDefPartOfViewPK = new ArrayList<>();
-        for (ColumnDefinition baseColumn : baseCfs.metadata.allColumns())
+        List<ColumnMetadata> nonPKDefPartOfViewPK = new ArrayList<>();
+        for (ColumnMetadata baseColumn : baseCfs.metadata().columns())
         {
-            ColumnDefinition viewColumn = getViewColumn(baseColumn);
+            ColumnMetadata viewColumn = getViewColumn(baseColumn);
             if (viewColumn != null && !baseColumn.isPrimaryKeyColumn() && 
viewColumn.isPrimaryKeyColumn())
                 nonPKDefPartOfViewPK.add(baseColumn);
         }
@@ -100,18 +103,18 @@ public class View
      * The view column corresponding to the provided base column. This 
<b>can</b>
      * return {@code null} if the column is denormalized in the view.
      */
-    public ColumnDefinition getViewColumn(ColumnDefinition baseColumn)
+    public ColumnMetadata getViewColumn(ColumnMetadata baseColumn)
     {
-        return definition.metadata.getColumnDefinition(baseColumn.name);
+        return definition.metadata.getColumn(baseColumn.name);
     }
 
     /**
      * The base column corresponding to the provided view column. This should
      * never return {@code null} since a view can't have its "own" columns.
      */
-    public ColumnDefinition getBaseColumn(ColumnDefinition viewColumn)
+    public ColumnMetadata getBaseColumn(ColumnMetadata viewColumn)
     {
-        ColumnDefinition baseColumn = 
baseCfs.metadata.getColumnDefinition(viewColumn.name);
+        ColumnMetadata baseColumn = 
baseCfs.metadata().getColumn(viewColumn.name);
         assert baseColumn != null;
         return baseColumn;
     }
@@ -148,7 +151,7 @@ public class View
 
         for (ColumnData data : update)
         {
-            if (definition.metadata.getColumnDefinition(data.column().name) != 
null)
+            if (definition.metadata.getColumn(data.column().name) != null)
                 return true;
         }
         return false;
@@ -169,7 +172,7 @@ public class View
     public boolean matchesViewFilter(DecoratedKey partitionKey, Row baseRow, 
int nowInSec)
     {
         return getReadQuery().selectsClustering(partitionKey, 
baseRow.clustering())
-            && 
getSelectStatement().rowFilterForInternalCalls().isSatisfiedBy(baseCfs.metadata,
 partitionKey, baseRow, nowInSec);
+            && 
getSelectStatement().rowFilterForInternalCalls().isSatisfiedBy(baseCfs.metadata(),
 partitionKey, baseRow, nowInSec);
     }
 
     /**
@@ -214,23 +217,22 @@ public class View
     }
 
     @Nullable
-    public static CFMetaData findBaseTable(String keyspace, String viewName)
+    public static TableMetadataRef findBaseTable(String keyspace, String 
viewName)
     {
-        ViewDefinition view = Schema.instance.getView(keyspace, viewName);
-        return (view == null) ? null : 
Schema.instance.getCFMetaData(view.baseTableId);
+        ViewMetadata view = Schema.instance.getView(keyspace, viewName);
+        return (view == null) ? null : 
Schema.instance.getTableMetadataRef(view.baseTableId);
     }
 
-    public static Iterable<ViewDefinition> findAll(String keyspace, String 
baseTable)
+    public static Iterable<ViewMetadata> findAll(String keyspace, String 
baseTable)
     {
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
-        final UUID baseId = Schema.instance.getId(keyspace, baseTable);
-        return Iterables.filter(ksm.views, view -> 
view.baseTableId.equals(baseId));
+        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace);
+        return Iterables.filter(ksm.views, view -> 
view.baseTableName.equals(baseTable));
     }
 
     /**
      * Builds the string text for a materialized view's SELECT statement.
      */
-    public static String buildSelectStatement(String cfName, 
Collection<ColumnDefinition> includedColumns, String whereClause)
+    public static String buildSelectStatement(String cfName, 
Collection<ColumnMetadata> includedColumns, String whereClause)
     {
          StringBuilder rawSelect = new StringBuilder("SELECT ");
         if (includedColumns == null || includedColumns.isEmpty())
@@ -251,7 +253,7 @@ public class View
             if (rel.isMultiColumn())
             {
                 sb.append(((MultiColumnRelation) rel).getEntities().stream()
-                        .map(ColumnDefinition.Raw::toString)
+                        .map(ColumnMetadata.Raw::toString)
                         .collect(Collectors.joining(", ", "(", ")")));
             }
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java 
b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 9550e1e..6250ae7 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -80,13 +80,13 @@ public class ViewBuilder extends CompactionInfo.Holder
 
         // We're rebuilding everything from what's on disk, so we read 
everything, consider that as new updates
         // and pretend that there is nothing pre-existing.
-        UnfilteredRowIterator empty = 
UnfilteredRowIterators.noRowsIterator(baseCfs.metadata, key, 
Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
+        UnfilteredRowIterator empty = 
UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, 
Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
 
         try (ReadExecutionController orderGroup = 
command.executionController();
              UnfilteredRowIterator data = 
UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), 
command))
         {
             Iterator<Collection<Mutation>> mutations = 
baseCfs.keyspace.viewManager
-                                                      
.forTable(baseCfs.metadata)
+                                                      
.forTable(baseCfs.metadata.id)
                                                       
.generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true);
 
             AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
@@ -96,9 +96,9 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     public void run()
     {
-        logger.trace("Running view builder for {}.{}", 
baseCfs.metadata.ksName, view.name);
+        logger.trace("Running view builder for {}.{}", 
baseCfs.metadata.keyspace, view.name);
         UUID localHostId = SystemKeyspace.getLocalHostId();
-        String ksname = baseCfs.metadata.ksName, viewName = view.name;
+        String ksname = baseCfs.metadata.keyspace, viewName = view.name;
 
         if (SystemKeyspace.isViewBuilt(ksname, viewName))
         {
@@ -107,7 +107,7 @@ public class ViewBuilder extends CompactionInfo.Holder
             return;
         }
 
-        Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+        Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(baseCfs.metadata.keyspace);
         final Pair<Integer, Token> buildStatus = 
SystemKeyspace.getViewBuildStatus(ksname, viewName);
         Token lastToken;
         Function<org.apache.cassandra.db.lifecycle.View, 
Iterable<SSTableReader>> function;
@@ -222,7 +222,7 @@ public class ViewBuilder extends CompactionInfo.Holder
             if (lastToken == null || range.contains(lastToken))
                 rangesLeft = 0;
         }
-        return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, 
rangesLeft, rangesTotal, "ranges", compactionId);
+        return new CompactionInfo(baseCfs.metadata(), 
OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
     }
 
     public void stop()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java 
b/src/java/org/apache/cassandra/db/view/ViewManager.java
index b30ad2a..84738b1 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -26,9 +26,9 @@ import com.google.common.util.concurrent.Striped;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.repair.SystemDistributedKeyspace;
@@ -58,7 +58,7 @@ public class ViewManager
     private static final boolean enableCoordinatorBatchlog = 
Boolean.getBoolean("cassandra.mv_enable_coordinator_batchlog");
 
     private final ConcurrentMap<String, View> viewsByName = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<UUID, TableViews> viewsByBaseTable = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<TableId, TableViews> viewsByBaseTable = new 
ConcurrentHashMap<>();
     private final Keyspace keyspace;
 
     public ViewManager(Keyspace keyspace)
@@ -75,12 +75,12 @@ public class ViewManager
         {
             for (PartitionUpdate update : mutation.getPartitionUpdates())
             {
-                assert keyspace.getName().equals(update.metadata().ksName);
+                assert keyspace.getName().equals(update.metadata().keyspace);
 
                 if (coordinatorBatchlog && 
keyspace.getReplicationStrategy().getReplicationFactor() == 1)
                     continue;
 
-                if 
(!forTable(update.metadata()).updatedViews(update).isEmpty())
+                if 
(!forTable(update.metadata().id).updatedViews(update).isEmpty())
                     return true;
             }
         }
@@ -93,24 +93,12 @@ public class ViewManager
         return viewsByName.values();
     }
 
-    public void update(String viewName)
-    {
-        View view = viewsByName.get(viewName);
-        assert view != null : "When updating a view, it should already be in 
the ViewManager";
-        view.build();
-
-        // We provide the new definition from the base metadata
-        Optional<ViewDefinition> viewDefinition = 
keyspace.getMetadata().views.get(viewName);
-        assert viewDefinition.isPresent() : "When updating a view, it should 
still be in the Keyspaces views";
-        view.updateDefinition(viewDefinition.get());
-    }
-
     public void reload()
     {
-        Map<String, ViewDefinition> newViewsByName = new HashMap<>();
-        for (ViewDefinition definition : keyspace.getMetadata().views)
+        Map<String, ViewMetadata> newViewsByName = new HashMap<>();
+        for (ViewMetadata definition : keyspace.getMetadata().views)
         {
-            newViewsByName.put(definition.viewName, definition);
+            newViewsByName.put(definition.name, definition);
         }
 
         for (String viewName : viewsByName.keySet())
@@ -119,7 +107,7 @@ public class ViewManager
                 removeView(viewName);
         }
 
-        for (Map.Entry<String, ViewDefinition> entry : 
newViewsByName.entrySet())
+        for (Map.Entry<String, ViewMetadata> entry : newViewsByName.entrySet())
         {
             if (!viewsByName.containsKey(entry.getKey()))
                 addView(entry.getValue());
@@ -147,11 +135,11 @@ public class ViewManager
         }
     }
 
-    public void addView(ViewDefinition definition)
+    public void addView(ViewMetadata definition)
     {
         View view = new View(definition, 
keyspace.getColumnFamilyStore(definition.baseTableId));
-        forTable(view.getDefinition().baseTableMetadata()).add(view);
-        viewsByName.put(definition.viewName, view);
+        forTable(view.getDefinition().baseTableId).add(view);
+        viewsByName.put(definition.name, view);
     }
 
     public void removeView(String name)
@@ -161,7 +149,7 @@ public class ViewManager
         if (view == null)
             return;
 
-        forTable(view.getDefinition().baseTableMetadata()).removeByName(name);
+        forTable(view.getDefinition().baseTableId).removeByName(name);
         SystemKeyspace.setViewRemoved(keyspace.getName(), view.name);
         SystemDistributedKeyspace.setViewRemoved(keyspace.getName(), 
view.name);
     }
@@ -172,14 +160,13 @@ public class ViewManager
             view.build();
     }
 
-    public TableViews forTable(CFMetaData metadata)
+    public TableViews forTable(TableId id)
     {
-        UUID baseId = metadata.cfId;
-        TableViews views = viewsByBaseTable.get(baseId);
+        TableViews views = viewsByBaseTable.get(id);
         if (views == null)
         {
-            views = new TableViews(metadata);
-            TableViews previous = viewsByBaseTable.putIfAbsent(baseId, views);
+            views = new TableViews(id);
+            TableViews previous = viewsByBaseTable.putIfAbsent(id, views);
             if (previous != null)
                 views = previous;
         }

Reply via email to