http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java 
b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
deleted file mode 100644
index 8b3e121..0000000
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.Collection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-/**
- * Called when node receives updated schema state from the schema migration 
coordinator node.
- * Such happens when user makes local schema migration on one of the nodes in 
the ring
- * (which is going to act as coordinator) and that node sends (pushes) it's 
updated schema state
- * (in form of mutations) to all the alive nodes in the cluster.
- */
-public class DefinitionsUpdateVerbHandler implements 
IVerbHandler<Collection<Mutation>>
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
-
-    public void doVerb(final MessageIn<Collection<Mutation>> message, int id)
-    {
-        logger.trace("Received schema mutation push from {}", message.from);
-
-        StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
-        {
-            public void runMayThrow() throws ConfigurationException
-            {
-                SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload);
-            }
-        });
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index 2bb4784..a3e80e5 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiFunction;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
@@ -43,8 +42,8 @@ import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.DirectorySizeCalculator;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -60,13 +59,13 @@ import org.apache.cassandra.utils.Pair;
  * } </pre>
  *
  * Until v2.0, {@code <cf dir>} is just column family name.
- * Since v2.1, {@code <cf dir>} has column family ID(cfId) added to its end.
+ * Since v2.1, {@code <cf dir>} has column family ID(tableId) added to its end.
  *
  * SSTables from secondary indexes were put in the same directory as their 
parent.
  * Since v2.2, they have their own directory under the parent directory whose 
name is index name.
  * Upon startup, those secondary index files are moved to new directory when 
upgrading.
  *
- * For backward compatibility, Directories can use directory without cfId if 
exists.
+ * For backward compatibility, Directories can use directory without tableId 
if exists.
  *
  * In addition, more that one 'root' data directory can be specified so that
  * {@code <path_to_data_dir>} potentially represents multiple locations.
@@ -174,16 +173,16 @@ public class Directories
         }
     }
 
-    private final CFMetaData metadata;
+    private final TableMetadata metadata;
     private final DataDirectory[] paths;
     private final File[] dataPaths;
 
-    public Directories(final CFMetaData metadata)
+    public Directories(final TableMetadata metadata)
     {
         this(metadata, dataDirectories);
     }
 
-    public Directories(final CFMetaData metadata, Collection<DataDirectory> 
paths)
+    public Directories(final TableMetadata metadata, Collection<DataDirectory> 
paths)
     {
         this(metadata, paths.toArray(new DataDirectory[paths.size()]));
     }
@@ -194,35 +193,29 @@ public class Directories
      *
      * @param metadata metadata of ColumnFamily
      */
-    public Directories(final CFMetaData metadata, DataDirectory[] paths)
+    public Directories(final TableMetadata metadata, DataDirectory[] paths)
     {
         this.metadata = metadata;
         this.paths = paths;
 
-        String cfId = 
ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
-        int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
-        String cfName = idx >= 0 ? metadata.cfName.substring(0, idx) : 
metadata.cfName;
-        String indexNameWithDot = idx >= 0 ? metadata.cfName.substring(idx) : 
null;
+        String tableId = metadata.id.toHexString();
+        int idx = metadata.name.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
+        String cfName = idx >= 0 ? metadata.name.substring(0, idx) : 
metadata.name;
+        String indexNameWithDot = idx >= 0 ? metadata.name.substring(idx) : 
null;
 
         this.dataPaths = new File[paths.length];
         // If upgraded from version less than 2.1, use existing directories
-        String oldSSTableRelativePath = join(metadata.ksName, cfName);
+        String oldSSTableRelativePath = join(metadata.keyspace, cfName);
         for (int i = 0; i < paths.length; ++i)
         {
             // check if old SSTable directory exists
             dataPaths[i] = new File(paths[i].location, oldSSTableRelativePath);
         }
-        boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), 
new Predicate<File>()
-        {
-            public boolean apply(File file)
-            {
-                return file.exists();
-            }
-        });
+        boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), 
File::exists);
         if (!olderDirectoryExists)
         {
             // use 2.1+ style
-            String newSSTableRelativePath = join(metadata.ksName, cfName + '-' 
+ cfId);
+            String newSSTableRelativePath = join(metadata.keyspace, cfName + 
'-' + tableId);
             for (int i = 0; i < paths.length; ++i)
                 dataPaths[i] = new File(paths[i].location, 
newSSTableRelativePath);
         }
@@ -261,7 +254,7 @@ public class Directories
                             return false;
 
                         Descriptor desc = 
SSTable.tryDescriptorFromFilename(file);
-                        return desc != null && 
desc.ksname.equals(metadata.ksName) && desc.cfname.equals(metadata.cfName);
+                        return desc != null && 
desc.ksname.equals(metadata.keyspace) && desc.cfname.equals(metadata.name);
 
                     }
                 });
@@ -760,7 +753,7 @@ public class Directories
                             return false;
 
                         // we are only interested in the SSTable files that 
belong to the specific ColumnFamily
-                        if (!pair.left.ksname.equals(metadata.ksName) || 
!pair.left.cfname.equals(metadata.cfName))
+                        if (!pair.left.ksname.equals(metadata.keyspace) || 
!pair.left.cfname.equals(metadata.name))
                             return false;
 
                         Set<Component> previous = components.get(pair.left);
@@ -1027,8 +1020,8 @@ public class Directories
             File file = path.toFile();
             Descriptor desc = SSTable.tryDescriptorFromFilename(file);
             return desc != null
-                && desc.ksname.equals(metadata.ksName)
-                && desc.cfname.equals(metadata.cfName)
+                && desc.ksname.equals(metadata.keyspace)
+                && desc.cfname.equals(metadata.name)
                 && !toSkip.contains(file);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/EmptyIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EmptyIterators.java 
b/src/java/org/apache/cassandra/db/EmptyIterators.java
index 24c923f..04ff31b 100644
--- a/src/java/org/apache/cassandra/db/EmptyIterators.java
+++ b/src/java/org/apache/cassandra/db/EmptyIterators.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db;
 
 import java.util.NoSuchElementException;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.partitions.BasePartitionIterator;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -52,14 +52,14 @@ public class EmptyIterators
 
     private static class EmptyUnfilteredPartitionIterator extends 
EmptyBasePartitionIterator<UnfilteredRowIterator> implements 
UnfilteredPartitionIterator
     {
-        final CFMetaData metadata;
+        final TableMetadata metadata;
 
-        public EmptyUnfilteredPartitionIterator(CFMetaData metadata)
+        public EmptyUnfilteredPartitionIterator(TableMetadata metadata)
         {
             this.metadata = metadata;
         }
 
-        public CFMetaData metadata()
+        public TableMetadata metadata()
         {
             return metadata;
         }
@@ -76,13 +76,13 @@ public class EmptyIterators
 
     private static class EmptyBaseRowIterator<U extends Unfiltered> implements 
BaseRowIterator<U>
     {
-        final PartitionColumns columns;
-        final CFMetaData metadata;
+        final RegularAndStaticColumns columns;
+        final TableMetadata metadata;
         final DecoratedKey partitionKey;
         final boolean isReverseOrder;
         final Row staticRow;
 
-        EmptyBaseRowIterator(PartitionColumns columns, CFMetaData metadata, 
DecoratedKey partitionKey, boolean isReverseOrder, Row staticRow)
+        EmptyBaseRowIterator(RegularAndStaticColumns columns, TableMetadata 
metadata, DecoratedKey partitionKey, boolean isReverseOrder, Row staticRow)
         {
             this.columns = columns;
             this.metadata = metadata;
@@ -91,7 +91,7 @@ public class EmptyIterators
             this.staticRow = staticRow;
         }
 
-        public CFMetaData metadata()
+        public TableMetadata metadata()
         {
             return metadata;
         }
@@ -101,7 +101,7 @@ public class EmptyIterators
             return isReverseOrder;
         }
 
-        public PartitionColumns columns()
+        public RegularAndStaticColumns columns()
         {
             return columns;
         }
@@ -139,7 +139,7 @@ public class EmptyIterators
     private static class EmptyUnfilteredRowIterator extends 
EmptyBaseRowIterator<Unfiltered> implements UnfilteredRowIterator
     {
         final DeletionTime partitionLevelDeletion;
-        public EmptyUnfilteredRowIterator(PartitionColumns columns, CFMetaData 
metadata, DecoratedKey partitionKey,
+        public EmptyUnfilteredRowIterator(RegularAndStaticColumns columns, 
TableMetadata metadata, DecoratedKey partitionKey,
                                           boolean isReverseOrder, Row 
staticRow, DeletionTime partitionLevelDeletion)
         {
             super(columns, metadata, partitionKey, isReverseOrder, staticRow);
@@ -164,13 +164,13 @@ public class EmptyIterators
 
     private static class EmptyRowIterator extends EmptyBaseRowIterator<Row> 
implements RowIterator
     {
-        public EmptyRowIterator(CFMetaData metadata, DecoratedKey 
partitionKey, boolean isReverseOrder, Row staticRow)
+        public EmptyRowIterator(TableMetadata metadata, DecoratedKey 
partitionKey, boolean isReverseOrder, Row staticRow)
         {
-            super(PartitionColumns.NONE, metadata, partitionKey, 
isReverseOrder, staticRow);
+            super(RegularAndStaticColumns.NONE, metadata, partitionKey, 
isReverseOrder, staticRow);
         }
     }
 
-    public static UnfilteredPartitionIterator unfilteredPartition(CFMetaData 
metadata)
+    public static UnfilteredPartitionIterator 
unfilteredPartition(TableMetadata metadata)
     {
         return new EmptyUnfilteredPartitionIterator(metadata);
     }
@@ -181,11 +181,11 @@ public class EmptyIterators
     }
 
     // this method is the only one that can return a non-empty iterator, but 
it still has no rows, so it seems cleanest to keep it here
-    public static UnfilteredRowIterator unfilteredRow(CFMetaData metadata, 
DecoratedKey partitionKey, boolean isReverseOrder, Row staticRow, DeletionTime 
partitionDeletion)
+    public static UnfilteredRowIterator unfilteredRow(TableMetadata metadata, 
DecoratedKey partitionKey, boolean isReverseOrder, Row staticRow, DeletionTime 
partitionDeletion)
     {
-        PartitionColumns columns = PartitionColumns.NONE;
+        RegularAndStaticColumns columns = RegularAndStaticColumns.NONE;
         if (!staticRow.isEmpty())
-            columns = new PartitionColumns(Columns.from(staticRow.columns()), 
Columns.NONE);
+            columns = new 
RegularAndStaticColumns(Columns.from(staticRow.columns()), Columns.NONE);
         else
             staticRow = Rows.EMPTY_STATIC_ROW;
 
@@ -195,12 +195,12 @@ public class EmptyIterators
         return new EmptyUnfilteredRowIterator(columns, metadata, partitionKey, 
isReverseOrder, staticRow, partitionDeletion);
     }
 
-    public static UnfilteredRowIterator unfilteredRow(CFMetaData metadata, 
DecoratedKey partitionKey, boolean isReverseOrder)
+    public static UnfilteredRowIterator unfilteredRow(TableMetadata metadata, 
DecoratedKey partitionKey, boolean isReverseOrder)
     {
-        return new EmptyUnfilteredRowIterator(PartitionColumns.NONE, metadata, 
partitionKey, isReverseOrder, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE);
+        return new EmptyUnfilteredRowIterator(RegularAndStaticColumns.NONE, 
metadata, partitionKey, isReverseOrder, Rows.EMPTY_STATIC_ROW, 
DeletionTime.LIVE);
     }
 
-    public static RowIterator row(CFMetaData metadata, DecoratedKey 
partitionKey, boolean isReverseOrder)
+    public static RowIterator row(TableMetadata metadata, DecoratedKey 
partitionKey, boolean isReverseOrder)
     {
         return new EmptyRowIterator(metadata, partitionKey, isReverseOrder, 
Rows.EMPTY_STATIC_ROW);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java 
b/src/java/org/apache/cassandra/db/IMutation.java
index 0ac89f7..3d4b1b2 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -18,15 +18,15 @@
 package org.apache.cassandra.db;
 
 import java.util.Collection;
-import java.util.UUID;
 
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.TableId;
 
 public interface IMutation
 {
     public void apply();
     public String getKeyspaceName();
-    public Collection<UUID> getColumnFamilyIds();
+    public Collection<TableId> getTableIds();
     public DecoratedKey key();
     public long getTimeout();
     public String toString(boolean shallow);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index b477fa3..f0fd5aa 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -46,6 +46,11 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.metrics.KeyspaceMetrics;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
@@ -79,7 +84,8 @@ public class Keyspace
     public static final OpOrder writeOrder = new OpOrder();
 
     /* ColumnFamilyStore per column family */
-    private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = 
new ConcurrentHashMap<>();
+    private final ConcurrentMap<TableId, ColumnFamilyStore> columnFamilyStores 
= new ConcurrentHashMap<>();
+
     private volatile AbstractReplicationStrategy replicationStrategy;
     public final ViewManager viewManager;
 
@@ -152,9 +158,14 @@ public class Keyspace
         }
     }
 
-    public static ColumnFamilyStore openAndGetStore(CFMetaData cfm)
+    public static ColumnFamilyStore openAndGetStore(TableMetadataRef tableRef)
+    {
+        return open(tableRef.keyspace).getColumnFamilyStore(tableRef.id);
+    }
+
+    public static ColumnFamilyStore openAndGetStore(TableMetadata table)
     {
-        return open(cfm.ksName).getColumnFamilyStore(cfm.cfId);
+        return open(table.keyspace).getColumnFamilyStore(table.id);
     }
 
     /**
@@ -191,13 +202,13 @@ public class Keyspace
 
     public ColumnFamilyStore getColumnFamilyStore(String cfName)
     {
-        UUID id = Schema.instance.getId(getName(), cfName);
-        if (id == null)
+        TableMetadata table = Schema.instance.getTableMetadata(getName(), 
cfName);
+        if (table == null)
             throw new IllegalArgumentException(String.format("Unknown 
keyspace/cf pair (%s.%s)", getName(), cfName));
-        return getColumnFamilyStore(id);
+        return getColumnFamilyStore(table.id);
     }
 
-    public ColumnFamilyStore getColumnFamilyStore(UUID id)
+    public ColumnFamilyStore getColumnFamilyStore(TableId id)
     {
         ColumnFamilyStore cfs = columnFamilyStores.get(id);
         if (cfs == null)
@@ -205,7 +216,7 @@ public class Keyspace
         return cfs;
     }
 
-    public boolean hasColumnFamilyStore(UUID id)
+    public boolean hasColumnFamilyStore(TableId id)
     {
         return columnFamilyStores.containsKey(id);
     }
@@ -310,16 +321,16 @@ public class Keyspace
 
     private Keyspace(String keyspaceName, boolean loadSSTables)
     {
-        metadata = Schema.instance.getKSMetaData(keyspaceName);
+        metadata = Schema.instance.getKeyspaceMetadata(keyspaceName);
         assert metadata != null : "Unknown keyspace " + keyspaceName;
         createReplicationStrategy(metadata);
 
         this.metric = new KeyspaceMetrics(this);
         this.viewManager = new ViewManager(this);
-        for (CFMetaData cfm : metadata.tablesAndViews())
+        for (TableMetadata cfm : metadata.tablesAndViews())
         {
-            logger.trace("Initializing {}.{}", getName(), cfm.cfName);
-            initCf(cfm, loadSSTables);
+            logger.trace("Initializing {}.{}", getName(), cfm.name);
+            initCf(Schema.instance.getTableMetadataRef(cfm.id), loadSSTables);
         }
         this.viewManager.reload();
     }
@@ -347,10 +358,10 @@ public class Keyspace
     }
 
     // best invoked on the compaction mananger.
-    public void dropCf(UUID cfId)
+    public void dropCf(TableId tableId)
     {
-        assert columnFamilyStores.containsKey(cfId);
-        ColumnFamilyStore cfs = columnFamilyStores.remove(cfId);
+        assert columnFamilyStores.containsKey(tableId);
+        ColumnFamilyStore cfs = columnFamilyStores.remove(tableId);
         if (cfs == null)
             return;
 
@@ -376,17 +387,17 @@ public class Keyspace
      */
     public void initCfCustom(ColumnFamilyStore newCfs)
     {
-        ColumnFamilyStore cfs = columnFamilyStores.get(newCfs.metadata.cfId);
+        ColumnFamilyStore cfs = columnFamilyStores.get(newCfs.metadata.id);
 
         if (cfs == null)
         {
             // CFS being created for the first time, either on server startup 
or new CF being added.
             // We don't worry about races here; startup is safe, and adding 
multiple idential CFs
             // simultaneously is a "don't do that" scenario.
-            ColumnFamilyStore oldCfs = 
columnFamilyStores.putIfAbsent(newCfs.metadata.cfId, newCfs);
+            ColumnFamilyStore oldCfs = 
columnFamilyStores.putIfAbsent(newCfs.metadata.id, newCfs);
             // CFS mbean instantiation will error out before we hit this, but 
in case that changes...
             if (oldCfs != null)
-                throw new IllegalStateException("added multiple mappings for 
cf id " + newCfs.metadata.cfId);
+                throw new IllegalStateException("added multiple mappings for 
cf id " + newCfs.metadata.id);
         }
         else
         {
@@ -397,25 +408,25 @@ public class Keyspace
     /**
      * adds a cf to internal structures, ends up creating disk files).
      */
-    public void initCf(CFMetaData metadata, boolean loadSSTables)
+    public void initCf(TableMetadataRef metadata, boolean loadSSTables)
     {
-        ColumnFamilyStore cfs = columnFamilyStores.get(metadata.cfId);
+        ColumnFamilyStore cfs = columnFamilyStores.get(metadata.id);
 
         if (cfs == null)
         {
             // CFS being created for the first time, either on server startup 
or new CF being added.
             // We don't worry about races here; startup is safe, and adding 
multiple idential CFs
             // simultaneously is a "don't do that" scenario.
-            ColumnFamilyStore oldCfs = 
columnFamilyStores.putIfAbsent(metadata.cfId, 
ColumnFamilyStore.createColumnFamilyStore(this, metadata, loadSSTables));
+            ColumnFamilyStore oldCfs = 
columnFamilyStores.putIfAbsent(metadata.id, 
ColumnFamilyStore.createColumnFamilyStore(this, metadata, loadSSTables));
             // CFS mbean instantiation will error out before we hit this, but 
in case that changes...
             if (oldCfs != null)
-                throw new IllegalStateException("added multiple mappings for 
cf id " + metadata.cfId);
+                throw new IllegalStateException("added multiple mappings for 
cf id " + metadata.id);
         }
         else
         {
             // re-initializing an existing CF.  This will happen if you 
cleared the schema
             // on this node and it's getting repopulated from the rest of the 
cluster.
-            assert cfs.name.equals(metadata.cfName);
+            assert cfs.name.equals(metadata.name);
             cfs.reload();
         }
     }
@@ -490,14 +501,14 @@ public class Keyspace
             mutation.viewLockAcquireStart.compareAndSet(0L, 
System.currentTimeMillis());
 
             // the order of lock acquisition doesn't matter (from a deadlock 
perspective) because we only use tryLock()
-            Collection<UUID> columnFamilyIds = mutation.getColumnFamilyIds();
-            Iterator<UUID> idIterator = columnFamilyIds.iterator();
+            Collection<TableId> tableIds = mutation.getTableIds();
+            Iterator<TableId> idIterator = tableIds.iterator();
 
-            locks = new Lock[columnFamilyIds.size()];
-            for (int i = 0; i < columnFamilyIds.size(); i++)
+            locks = new Lock[tableIds.size()];
+            for (int i = 0; i < tableIds.size(); i++)
             {
-                UUID cfid = idIterator.next();
-                int lockKey = Objects.hash(mutation.key().getKey(), cfid);
+                TableId tableId = idIterator.next();
+                int lockKey = Objects.hash(mutation.key().getKey(), tableId);
                 while (true)
                 {
                     Lock lock = null;
@@ -515,7 +526,7 @@ public class Keyspace
                             for (int j = 0; j < i; j++)
                                 locks[j].unlock();
 
-                            logger.trace("Could not acquire lock for {} and 
table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), 
columnFamilyStores.get(cfid).name);
+                            logger.trace("Could not acquire lock for {} and 
table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), 
columnFamilyStores.get(tableId).name);
                             Tracing.trace("Could not acquire MV lock");
                             if (future != null)
                             {
@@ -569,8 +580,8 @@ public class Keyspace
             // Bulk non-droppable operations (e.g. commitlog replay, hint 
delivery) are not measured
             if (isDroppable)
             {
-                for(UUID cfid : columnFamilyIds)
-                    
columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, 
TimeUnit.MILLISECONDS);
+                for(TableId tableId : tableIds)
+                    
columnFamilyStores.get(tableId).metric.viewLockAcquireTime.update(acquireTime, 
TimeUnit.MILLISECONDS);
             }
         }
         int nowInSec = FBUtilities.nowInSeconds();
@@ -586,10 +597,10 @@ public class Keyspace
 
             for (PartitionUpdate upd : mutation.getPartitionUpdates())
             {
-                ColumnFamilyStore cfs = 
columnFamilyStores.get(upd.metadata().cfId);
+                ColumnFamilyStore cfs = 
columnFamilyStores.get(upd.metadata().id);
                 if (cfs == null)
                 {
-                    logger.error("Attempting to mutate non-existant table {} 
({}.{})", upd.metadata().cfId, upd.metadata().ksName, upd.metadata().cfName);
+                    logger.error("Attempting to mutate non-existant table {} 
({}.{})", upd.metadata().id, upd.metadata().keyspace, upd.metadata().name);
                     continue;
                 }
                 AtomicLong baseComplete = new AtomicLong(Long.MAX_VALUE);
@@ -599,18 +610,18 @@ public class Keyspace
                     try
                     {
                         Tracing.trace("Creating materialized view mutations 
from base table replica");
-                        
viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, 
writeCommitLog, baseComplete);
+                        
viewManager.forTable(upd.metadata().id).pushViewReplicaUpdates(upd, 
writeCommitLog, baseComplete);
                     }
                     catch (Throwable t)
                     {
                         JVMStabilityInspector.inspectThrowable(t);
-                        logger.error(String.format("Unknown exception caught 
while attempting to update MaterializedView! %s.%s",
-                                     upd.metadata().ksName, 
upd.metadata().cfName), t);
+                        logger.error(String.format("Unknown exception caught 
while attempting to update MaterializedView! %s",
+                                                   upd.metadata().toString()), 
t);
                         throw t;
                     }
                 }
 
-                Tracing.trace("Adding to {} memtable", upd.metadata().cfName);
+                Tracing.trace("Adding to {} memtable", upd.metadata().name);
                 UpdateTransaction indexTransaction = updateIndexes
                                                      ? 
cfs.indexManager.newUpdateTransaction(upd, opGroup, nowInSec)
                                                      : UpdateTransaction.NO_OP;
@@ -683,7 +694,7 @@ public class Keyspace
                 Index index = baseCfs.indexManager.getIndexByName(indexName);
                 if (index == null)
                     throw new IllegalArgumentException(String.format("Invalid 
index specified: %s/%s.",
-                                                                     
baseCfs.metadata.cfName,
+                                                                     
baseCfs.metadata.name,
                                                                      
indexName));
 
                 if (index.getBackingTable().isPresent())
@@ -706,7 +717,7 @@ public class Keyspace
         Set<ColumnFamilyStore> stores = new HashSet<>();
         for (ColumnFamilyStore indexCfs : 
baseCfs.indexManager.getAllIndexColumnFamilyStores())
         {
-            logger.info("adding secondary index table {} to operation", 
indexCfs.metadata.cfName);
+            logger.info("adding secondary index table {} to operation", 
indexCfs.metadata.name);
             stores.add(indexCfs);
         }
         return stores;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index e2b0fc6..01e741d 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -29,14 +29,13 @@ import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -145,19 +144,19 @@ public class Memtable implements Comparable<Memtable>
         this.cfs = cfs;
         this.commitLogLowerBound = commitLogLowerBound;
         this.allocator = MEMORY_POOL.newAllocator();
-        this.initialComparator = cfs.metadata.comparator;
+        this.initialComparator = cfs.metadata().comparator;
         this.cfs.scheduleFlush();
-        this.columnsCollector = new 
ColumnsCollector(cfs.metadata.partitionColumns());
+        this.columnsCollector = new 
ColumnsCollector(cfs.metadata().regularAndStaticColumns());
     }
 
     // ONLY to be used for testing, to create a mock Memtable
     @VisibleForTesting
-    public Memtable(CFMetaData metadata)
+    public Memtable(TableMetadata metadata)
     {
         this.initialComparator = metadata.comparator;
         this.cfs = null;
         this.allocator = null;
-        this.columnsCollector = new 
ColumnsCollector(metadata.partitionColumns());
+        this.columnsCollector = new 
ColumnsCollector(metadata.regularAndStaticColumns());
     }
 
     public MemtableAllocator getAllocator()
@@ -249,7 +248,7 @@ public class Memtable implements Comparable<Memtable>
      */
     public boolean isExpired()
     {
-        int period = cfs.metadata.params.memtableFlushPeriodInMs;
+        int period = cfs.metadata().params.memtableFlushPeriodInMs;
         return period > 0 && (System.nanoTime() - creationNano >= 
TimeUnit.MILLISECONDS.toNanos(period));
     }
 
@@ -500,17 +499,17 @@ public class Memtable implements Comparable<Memtable>
 
         public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn,
                                                     Descriptor descriptor,
-                                                    PartitionColumns columns,
+                                                    RegularAndStaticColumns 
columns,
                                                     EncodingStats stats)
         {
-            MetadataCollector sstableMetadataCollector = new 
MetadataCollector(cfs.metadata.comparator)
+            MetadataCollector sstableMetadataCollector = new 
MetadataCollector(cfs.metadata().comparator)
                     .commitLogIntervals(new 
IntervalSet<>(commitLogLowerBound.get(), commitLogUpperBound.get()));
 
             return cfs.createSSTableMultiWriter(descriptor,
                                                 toFlush.size(),
                                                 
ActiveRepairService.UNREPAIRED_SSTABLE,
                                                 sstableMetadataCollector,
-                                                new SerializationHeader(true, 
cfs.metadata, columns, stats), txn);
+                                                new SerializationHeader(true, 
cfs.metadata(), columns, stats), txn);
         }
 
         @Override
@@ -564,9 +563,9 @@ public class Memtable implements Comparable<Memtable>
             return minLocalDeletionTime;
         }
 
-        public CFMetaData metadata()
+        public TableMetadata metadata()
         {
-            return cfs.metadata;
+            return cfs.metadata();
         }
 
         public boolean hasNext()
@@ -588,25 +587,25 @@ public class Memtable implements Comparable<Memtable>
 
     private static class ColumnsCollector
     {
-        private final HashMap<ColumnDefinition, AtomicBoolean> predefined = 
new HashMap<>();
-        private final ConcurrentSkipListSet<ColumnDefinition> extra = new 
ConcurrentSkipListSet<>();
-        ColumnsCollector(PartitionColumns columns)
+        private final HashMap<ColumnMetadata, AtomicBoolean> predefined = new 
HashMap<>();
+        private final ConcurrentSkipListSet<ColumnMetadata> extra = new 
ConcurrentSkipListSet<>();
+        ColumnsCollector(RegularAndStaticColumns columns)
         {
-            for (ColumnDefinition def : columns.statics)
+            for (ColumnMetadata def : columns.statics)
                 predefined.put(def, new AtomicBoolean());
-            for (ColumnDefinition def : columns.regulars)
+            for (ColumnMetadata def : columns.regulars)
                 predefined.put(def, new AtomicBoolean());
         }
 
-        public void update(PartitionColumns columns)
+        public void update(RegularAndStaticColumns columns)
         {
-            for (ColumnDefinition s : columns.statics)
+            for (ColumnMetadata s : columns.statics)
                 update(s);
-            for (ColumnDefinition r : columns.regulars)
+            for (ColumnMetadata r : columns.regulars)
                 update(r);
         }
 
-        private void update(ColumnDefinition definition)
+        private void update(ColumnMetadata definition)
         {
             AtomicBoolean present = predefined.get(definition);
             if (present != null)
@@ -620,10 +619,10 @@ public class Memtable implements Comparable<Memtable>
             }
         }
 
-        public PartitionColumns get()
+        public RegularAndStaticColumns get()
         {
-            PartitionColumns.Builder builder = PartitionColumns.builder();
-            for (Map.Entry<ColumnDefinition, AtomicBoolean> e : 
predefined.entrySet())
+            RegularAndStaticColumns.Builder builder = 
RegularAndStaticColumns.builder();
+            for (Map.Entry<ColumnMetadata, AtomicBoolean> e : 
predefined.entrySet())
                 if (e.getValue().get())
                     builder.add(e.getKey());
             return builder.addAll(extra).build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java 
b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
deleted file mode 100644
index 3666b27..0000000
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.Collection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.service.MigrationManager;
-
-/**
- * Sends it's current schema state in form of mutations in reply to the remote 
node's request.
- * Such a request is made when one of the nodes, by means of Gossip, detects 
schema disagreement in the ring.
- */
-public class MigrationRequestVerbHandler implements IVerbHandler
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(MigrationRequestVerbHandler.class);
-
-    public void doVerb(MessageIn message, int id)
-    {
-        logger.trace("Received migration request from {}.", message.from);
-        MessageOut<Collection<Mutation>> response = new 
MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
-                                                                     
SchemaKeyspace.convertSchemaToMutations(),
-                                                                     
MigrationManager.MigrationsSerializer.instance);
-        MessagingService.instance().sendReply(response, id, message.from);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/MultiCBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MultiCBuilder.java 
b/src/java/org/apache/cassandra/db/MultiCBuilder.java
index ae8c26c..c4cff02 100644
--- a/src/java/org/apache/cassandra/db/MultiCBuilder.java
+++ b/src/java/org/apache/cassandra/db/MultiCBuilder.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.NavigableSet;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
@@ -177,7 +177,7 @@ public abstract class MultiCBuilder
     public abstract NavigableSet<ClusteringBound> buildBoundForSlice(boolean 
isStart,
                                                                  boolean 
isInclusive,
                                                                  boolean 
isOtherBoundInclusive,
-                                                                 
List<ColumnDefinition> columnDefs);
+                                                                 
List<ColumnMetadata> columnDefs);
 
     /**
      * Builds the <code>ClusteringBound</code>s
@@ -266,7 +266,7 @@ public abstract class MultiCBuilder
         public NavigableSet<ClusteringBound> buildBoundForSlice(boolean 
isStart,
                                                                 boolean 
isInclusive,
                                                                 boolean 
isOtherBoundInclusive,
-                                                                
List<ColumnDefinition> columnDefs)
+                                                                
List<ColumnMetadata> columnDefs)
         {
             return buildBound(isStart, columnDefs.get(0).isReversedType() ? 
isOtherBoundInclusive : isInclusive);
         }
@@ -421,7 +421,7 @@ public abstract class MultiCBuilder
         public NavigableSet<ClusteringBound> buildBoundForSlice(boolean 
isStart,
                                                             boolean 
isInclusive,
                                                             boolean 
isOtherBoundInclusive,
-                                                            
List<ColumnDefinition> columnDefs)
+                                                            
List<ColumnMetadata> columnDefs)
         {
             built = true;
 
@@ -454,7 +454,7 @@ public abstract class MultiCBuilder
                 // For example: if we have clustering_0 DESC and clustering_1 
ASC a slice like (clustering_0, clustering_1) > (1, 2)
                 // will produce 2 slices: [BOTTOM, 1) and (1.2, 1]
                 // So, the END bound will return 2 bounds with the same values 
1
-                ColumnDefinition lastColumn = columnDefs.get(columnDefs.size() 
- 1);
+                ColumnMetadata lastColumn = columnDefs.get(columnDefs.size() - 
1);
                 if (elements.size() <= lastColumn.position() && i < m - 1 && 
elements.equals(elementsList.get(i + 1)))
                 {
                     set.add(builder.buildBoundWith(elements, isStart, false));
@@ -463,7 +463,7 @@ public abstract class MultiCBuilder
                 }
 
                 // Handle the normal bounds
-                ColumnDefinition column = columnDefs.get(elements.size() - 1 - 
offset);
+                ColumnMetadata column = columnDefs.get(elements.size() - 1 - 
offset);
                 set.add(builder.buildBoundWith(elements, isStart, 
column.isReversedType() ? isOtherBoundInclusive : isInclusive));
             }
             return set.build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index f7ba43b..062e1fe 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -18,18 +18,13 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.cassandra.config.CFMetaData;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -37,6 +32,9 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 // TODO convert this to a Builder pattern instead of encouraging M.add 
directly,
@@ -54,7 +52,7 @@ public class Mutation implements IMutation
 
     private final DecoratedKey key;
     // map of column family id to mutations for that column family.
-    private final Map<UUID, PartitionUpdate> modifications;
+    private final Map<TableId, PartitionUpdate> modifications;
 
     // Time at which this mutation was instantiated
     public final long createdAt = System.currentTimeMillis();
@@ -70,10 +68,10 @@ public class Mutation implements IMutation
 
     public Mutation(PartitionUpdate update)
     {
-        this(update.metadata().ksName, update.partitionKey(), 
Collections.singletonMap(update.metadata().cfId, update));
+        this(update.metadata().keyspace, update.partitionKey(), 
Collections.singletonMap(update.metadata().id, update));
     }
 
-    protected Mutation(String keyspaceName, DecoratedKey key, Map<UUID, 
PartitionUpdate> modifications)
+    protected Mutation(String keyspaceName, DecoratedKey key, Map<TableId, 
PartitionUpdate> modifications)
     {
         this.keyspaceName = keyspaceName;
         this.key = key;
@@ -87,13 +85,14 @@ public class Mutation implements IMutation
         return new Mutation(keyspaceName, key, new HashMap<>(modifications));
     }
 
-    public Mutation without(Set<UUID> cfIds)
+    public Mutation without(Set<TableId> tableIds)
     {
-        if (cfIds.isEmpty())
+        if (tableIds.isEmpty())
             return this;
 
         Mutation copy = copy();
-        copy.modifications.keySet().removeAll(cfIds);
+
+        copy.modifications.keySet().removeAll(tableIds);
 
         copy.cdcEnabled = false;
         for (PartitionUpdate pu : modifications.values())
@@ -102,9 +101,9 @@ public class Mutation implements IMutation
         return copy;
     }
 
-    public Mutation without(UUID cfId)
+    public Mutation without(TableId tableId)
     {
-        return without(Collections.singleton(cfId));
+        return without(Collections.singleton(tableId));
     }
 
     public String getKeyspaceName()
@@ -112,7 +111,7 @@ public class Mutation implements IMutation
         return keyspaceName;
     }
 
-    public Collection<UUID> getColumnFamilyIds()
+    public Collection<TableId> getTableIds()
     {
         return modifications.keySet();
     }
@@ -127,9 +126,9 @@ public class Mutation implements IMutation
         return modifications.values();
     }
 
-    public PartitionUpdate getPartitionUpdate(UUID cfId)
+    public PartitionUpdate getPartitionUpdate(TableMetadata table)
     {
-        return modifications.get(cfId);
+        return table == null ? null : modifications.get(table.id);
     }
 
     /**
@@ -147,16 +146,16 @@ public class Mutation implements IMutation
 
         cdcEnabled |= update.metadata().params.cdc;
 
-        PartitionUpdate prev = modifications.put(update.metadata().cfId, 
update);
+        PartitionUpdate prev = modifications.put(update.metadata().id, update);
         if (prev != null)
             // developer error
-            throw new IllegalArgumentException("Table " + 
update.metadata().cfName + " already has modifications in this mutation: " + 
prev);
+            throw new IllegalArgumentException("Table " + 
update.metadata().name + " already has modifications in this mutation: " + 
prev);
         return this;
     }
 
-    public PartitionUpdate get(CFMetaData cfm)
+    public PartitionUpdate get(TableMetadata metadata)
     {
-        return modifications.get(cfm.cfId);
+        return modifications.get(metadata.id);
     }
 
     public boolean isEmpty()
@@ -182,7 +181,7 @@ public class Mutation implements IMutation
         if (mutations.size() == 1)
             return mutations.get(0);
 
-        Set<UUID> updatedTables = new HashSet<>();
+        Set<TableId> updatedTables = new HashSet<>();
         String ks = null;
         DecoratedKey key = null;
         for (Mutation mutation : mutations)
@@ -197,8 +196,8 @@ public class Mutation implements IMutation
         }
 
         List<PartitionUpdate> updates = new ArrayList<>(mutations.size());
-        Map<UUID, PartitionUpdate> modifications = new 
HashMap<>(updatedTables.size());
-        for (UUID table : updatedTables)
+        Map<TableId, PartitionUpdate> modifications = new 
HashMap<>(updatedTables.size());
+        for (TableId table : updatedTables)
         {
             for (Mutation mutation : mutations)
             {
@@ -288,10 +287,10 @@ public class Mutation implements IMutation
         if (shallow)
         {
             List<String> cfnames = new ArrayList<>(modifications.size());
-            for (UUID cfid : modifications.keySet())
+            for (TableId tableId : modifications.keySet())
             {
-                CFMetaData cfm = Schema.instance.getCFMetaData(cfid);
-                cfnames.add(cfm == null ? "-dropped-" : cfm.cfName);
+                TableMetadata cfm = Schema.instance.getTableMetadata(tableId);
+                cfnames.add(cfm == null ? "-dropped-" : cfm.name);
             }
             buff.append(StringUtils.join(cfnames, ", "));
         }
@@ -349,7 +348,7 @@ public class Mutation implements IMutation
          * @return a builder for the partition identified by {@code metadata} 
(and the partition key for which this is a
          * mutation of).
          */
-        public PartitionUpdate.SimpleBuilder update(CFMetaData metadata);
+        public PartitionUpdate.SimpleBuilder update(TableMetadata metadata);
 
         /**
          * Adds an update for table identified by the provided name and return 
a builder for that partition.
@@ -377,7 +376,7 @@ public class Mutation implements IMutation
             out.writeUnsignedVInt(size);
 
             assert size > 0;
-            for (Map.Entry<UUID, PartitionUpdate> entry : 
mutation.modifications.entrySet())
+            for (Map.Entry<TableId, PartitionUpdate> entry : 
mutation.modifications.entrySet())
                 PartitionUpdate.serializer.serialize(entry.getValue(), out, 
version);
         }
 
@@ -390,17 +389,17 @@ public class Mutation implements IMutation
             if (size == 1)
                 return new Mutation(update);
 
-            Map<UUID, PartitionUpdate> modifications = new HashMap<>(size);
+            Map<TableId, PartitionUpdate> modifications = new HashMap<>(size);
             DecoratedKey dk = update.partitionKey();
 
-            modifications.put(update.metadata().cfId, update);
+            modifications.put(update.metadata().id, update);
             for (int i = 1; i < size; ++i)
             {
                 update = PartitionUpdate.serializer.deserialize(in, version, 
flag);
-                modifications.put(update.metadata().cfId, update);
+                modifications.put(update.metadata().id, update);
             }
 
-            return new Mutation(update.metadata().ksName, dk, modifications);
+            return new Mutation(update.metadata().keyspace, dk, modifications);
         }
 
         public Mutation deserialize(DataInputPlus in, int version) throws 
IOException
@@ -411,7 +410,7 @@ public class Mutation implements IMutation
         public long serializedSize(Mutation mutation, int version)
         {
             int size = 
TypeSizes.sizeofUnsignedVInt(mutation.modifications.size());
-            for (Map.Entry<UUID, PartitionUpdate> entry : 
mutation.modifications.entrySet())
+            for (Map.Entry<TableId, PartitionUpdate> entry : 
mutation.modifications.entrySet())
                 size += 
PartitionUpdate.serializer.serializedSize(entry.getValue(), version);
 
             return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java 
b/src/java/org/apache/cassandra/db/PartitionColumns.java
deleted file mode 100644
index bf4ac43..0000000
--- a/src/java/org/apache/cassandra/db/PartitionColumns.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.*;
-
-import com.google.common.collect.Iterators;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.utils.btree.BTreeSet;
-
-import static java.util.Comparator.naturalOrder;
-
-/**
- * Columns (or a subset of the columns) that a partition contains.
- * This mainly groups both static and regular columns for convenience.
- */
-public class PartitionColumns implements Iterable<ColumnDefinition>
-{
-    public static PartitionColumns NONE = new PartitionColumns(Columns.NONE, 
Columns.NONE);
-
-    public final Columns statics;
-    public final Columns regulars;
-
-    public PartitionColumns(Columns statics, Columns regulars)
-    {
-        assert statics != null && regulars != null;
-        this.statics = statics;
-        this.regulars = regulars;
-    }
-
-    public static PartitionColumns of(ColumnDefinition column)
-    {
-        return new PartitionColumns(column.isStatic() ? Columns.of(column) : 
Columns.NONE,
-                                    column.isStatic() ? Columns.NONE : 
Columns.of(column));
-    }
-
-    public PartitionColumns without(ColumnDefinition column)
-    {
-        return new PartitionColumns(column.isStatic() ? 
statics.without(column) : statics,
-                                    column.isStatic() ? regulars : 
regulars.without(column));
-    }
-
-    public PartitionColumns withoutStatics()
-    {
-        return statics.isEmpty() ? this : new PartitionColumns(Columns.NONE, 
regulars);
-    }
-
-    public PartitionColumns mergeTo(PartitionColumns that)
-    {
-        if (this == that)
-            return this;
-        Columns statics = this.statics.mergeTo(that.statics);
-        Columns regulars = this.regulars.mergeTo(that.regulars);
-        if (statics == this.statics && regulars == this.regulars)
-            return this;
-        if (statics == that.statics && regulars == that.regulars)
-            return that;
-        return new PartitionColumns(statics, regulars);
-    }
-
-    public boolean isEmpty()
-    {
-        return statics.isEmpty() && regulars.isEmpty();
-    }
-
-    public Columns columns(boolean isStatic)
-    {
-        return isStatic ? statics : regulars;
-    }
-
-    public boolean contains(ColumnDefinition column)
-    {
-        return column.isStatic() ? statics.contains(column) : 
regulars.contains(column);
-    }
-
-    public boolean includes(PartitionColumns columns)
-    {
-        return statics.containsAll(columns.statics) && 
regulars.containsAll(columns.regulars);
-    }
-
-    public Iterator<ColumnDefinition> iterator()
-    {
-        return Iterators.concat(statics.iterator(), regulars.iterator());
-    }
-
-    public Iterator<ColumnDefinition> selectOrderIterator()
-    {
-        return Iterators.concat(statics.selectOrderIterator(), 
regulars.selectOrderIterator());
-    }
-
-    /** * Returns the total number of static and regular columns. */
-    public int size()
-    {
-        return regulars.size() + statics.size();
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append("[").append(statics).append(" | 
").append(regulars).append("]");
-        return sb.toString();
-    }
-
-    @Override
-    public boolean equals(Object other)
-    {
-        if (!(other instanceof PartitionColumns))
-            return false;
-
-        PartitionColumns that = (PartitionColumns)other;
-        return this.statics.equals(that.statics)
-            && this.regulars.equals(that.regulars);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(statics, regulars);
-    }
-
-    public static Builder builder()
-    {
-        return new Builder();
-    }
-
-    public static class Builder
-    {
-        // Note that we do want to use sorted sets because we want the column 
definitions to be compared
-        // through compareTo, not equals. The former basically check it's the 
same column name, while the latter
-        // check it's the same object, including the same type.
-        private BTreeSet.Builder<ColumnDefinition> regularColumns;
-        private BTreeSet.Builder<ColumnDefinition> staticColumns;
-
-        public Builder add(ColumnDefinition c)
-        {
-            if (c.isStatic())
-            {
-                if (staticColumns == null)
-                    staticColumns = BTreeSet.builder(naturalOrder());
-                staticColumns.add(c);
-            }
-            else
-            {
-                assert c.isRegular();
-                if (regularColumns == null)
-                    regularColumns = BTreeSet.builder(naturalOrder());
-                regularColumns.add(c);
-            }
-            return this;
-        }
-
-        public Builder addAll(Iterable<ColumnDefinition> columns)
-        {
-            for (ColumnDefinition c : columns)
-                add(c);
-            return this;
-        }
-
-        public Builder addAll(PartitionColumns columns)
-        {
-            if (regularColumns == null && !columns.regulars.isEmpty())
-                regularColumns = BTreeSet.builder(naturalOrder());
-
-            for (ColumnDefinition c : columns.regulars)
-                regularColumns.add(c);
-
-            if (staticColumns == null && !columns.statics.isEmpty())
-                staticColumns = BTreeSet.builder(naturalOrder());
-
-            for (ColumnDefinition c : columns.statics)
-                staticColumns.add(c);
-
-            return this;
-        }
-
-        public PartitionColumns build()
-        {
-            return new PartitionColumns(staticColumns == null ? Columns.NONE : 
Columns.from(staticColumns.build()),
-                                        regularColumns == null ? Columns.NONE 
: Columns.from(regularColumns.build()));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index df4d63c..a47302b 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -24,7 +24,7 @@ import java.util.Optional;
 
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.lifecycle.View;
@@ -60,7 +60,7 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     public PartitionRangeReadCommand(boolean isDigest,
                                      int digestVersion,
-                                     CFMetaData metadata,
+                                     TableMetadata metadata,
                                      int nowInSec,
                                      ColumnFilter columnFilter,
                                      RowFilter rowFilter,
@@ -73,7 +73,7 @@ public class PartitionRangeReadCommand extends ReadCommand
         this.index = index;
     }
 
-    public PartitionRangeReadCommand(CFMetaData metadata,
+    public PartitionRangeReadCommand(TableMetadata metadata,
                                      int nowInSec,
                                      ColumnFilter columnFilter,
                                      RowFilter rowFilter,
@@ -92,7 +92,7 @@ public class PartitionRangeReadCommand extends ReadCommand
      *
      * @return a newly created read command that queries everything in the 
table.
      */
-    public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, 
int nowInSec)
+    public static PartitionRangeReadCommand allDataRead(TableMetadata 
metadata, int nowInSec)
     {
         return new PartitionRangeReadCommand(metadata,
                                              nowInSec,
@@ -165,7 +165,7 @@ public class PartitionRangeReadCommand extends ReadCommand
         if (!dataRange().contains(key))
             return false;
 
-        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, 
metadata().getKeyValidator());
+        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, 
metadata().partitionKeyType);
     }
 
     public boolean selectsClustering(DecoratedKey key, Clustering clustering)
@@ -196,7 +196,7 @@ public class PartitionRangeReadCommand extends ReadCommand
     protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore 
cfs, ReadExecutionController executionController)
     {
         ColumnFamilyStore.ViewFragment view = 
cfs.select(View.selectLive(dataRange().keyRange()));
-        Tracing.trace("Executing seq scan across {} sstables for {}", 
view.sstables.size(), 
dataRange().keyRange().getString(metadata().getKeyValidator()));
+        Tracing.trace("Executing seq scan across {} sstables for {}", 
view.sstables.size(), 
dataRange().keyRange().getString(metadata().partitionKeyType));
 
         // fetch data from current memtable, historical memtables, and 
SSTables in the correct order.
         final List<UnfilteredPartitionIterator> iterators = new 
ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
@@ -303,7 +303,7 @@ public class PartitionRangeReadCommand extends ReadCommand
      */
     public PartitionIterator postReconciliationProcessing(PartitionIterator 
result)
     {
-        ColumnFamilyStore cfs = 
Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
+        ColumnFamilyStore cfs = 
Keyspace.open(metadata().keyspace).getColumnFamilyStore(metadata().name);
         Index index = getIndex(cfs);
         return index == null ? result : 
index.postProcessorFor(this).apply(result, this);
     }
@@ -311,9 +311,8 @@ public class PartitionRangeReadCommand extends ReadCommand
     @Override
     public String toString()
     {
-        return String.format("Read(%s.%s columns=%s rowfilter=%s limits=%s 
%s)",
-                             metadata().ksName,
-                             metadata().cfName,
+        return String.format("Read(%s columns=%s rowfilter=%s limits=%s %s)",
+                             metadata().toString(),
                              columnFilter(),
                              rowFilter(),
                              limits(),
@@ -332,7 +331,7 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, int digestVersion, CFMetaData metadata, int nowInSec, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> 
index)
+        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, int digestVersion, TableMetadata metadata, int nowInSec, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> 
index)
         throws IOException
         {
             DataRange range = DataRange.serializer.deserialize(in, version, 
metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 51f8188..b8856a3 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -18,16 +18,13 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.function.Predicate;
 
-import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.db.monitoring.MonitorableImpl;
@@ -35,7 +32,6 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.StoppingTransformation;
 import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexNotAvailableException;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -43,14 +39,15 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.schema.UnknownIndexException;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.exceptions.UnknownIndexException;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * General interface for storage-engine read commands (common to both range and
@@ -65,7 +62,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
     public static final IVersionedSerializer<ReadCommand> serializer = new 
Serializer();
 
     private final Kind kind;
-    private final CFMetaData metadata;
+    private final TableMetadata metadata;
     private final int nowInSec;
 
     private final ColumnFilter columnFilter;
@@ -88,7 +85,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
 
     protected static abstract class SelectionDeserializer
     {
-        public abstract ReadCommand deserialize(DataInputPlus in, int version, 
boolean isDigest, int digestVersion, CFMetaData metadata, int nowInSec, 
ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, 
Optional<IndexMetadata> index) throws IOException;
+        public abstract ReadCommand deserialize(DataInputPlus in, int version, 
boolean isDigest, int digestVersion, TableMetadata metadata, int nowInSec, 
ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, 
Optional<IndexMetadata> index) throws IOException;
     }
 
     protected enum Kind
@@ -107,7 +104,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
     protected ReadCommand(Kind kind,
                           boolean isDigestQuery,
                           int digestVersion,
-                          CFMetaData metadata,
+                          TableMetadata metadata,
                           int nowInSec,
                           ColumnFilter columnFilter,
                           RowFilter rowFilter,
@@ -139,7 +136,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
      *
      * @return the metadata for the table queried.
      */
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
         return metadata;
     }
@@ -349,7 +346,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
                 throw new IndexNotAvailableException(index);
 
             searcher = index.searcherFor(this);
-            Tracing.trace("Executing read on {}.{} using index {}", 
cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
+            Tracing.trace("Executing read on {}.{} using index {}", 
cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
         }
 
         UnfilteredPartitionIterator resultIterator = searcher == null
@@ -403,7 +400,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
             private final int failureThreshold = 
DatabaseDescriptor.getTombstoneFailureThreshold();
             private final int warningThreshold = 
DatabaseDescriptor.getTombstoneWarnThreshold();
 
-            private final boolean respectTombstoneThresholds = 
!SchemaConstants.isSystemKeyspace(ReadCommand.this.metadata().ksName);
+            private final boolean respectTombstoneThresholds = 
!SchemaConstants.isSystemKeyspace(ReadCommand.this.metadata().keyspace);
 
             private int liveRows = 0;
             private int tombstones = 0;
@@ -524,7 +521,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
 
         private void maybeDelayForTesting()
         {
-            if (!metadata.ksName.startsWith("system"))
+            if (!metadata.keyspace.startsWith("system"))
                 FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
         }
     }
@@ -574,7 +571,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
     {
         StringBuilder sb = new StringBuilder();
         sb.append("SELECT ").append(columnFilter());
-        sb.append(" FROM 
").append(metadata().ksName).append('.').append(metadata.cfName);
+        sb.append(" FROM 
").append(metadata().keyspace).append('.').append(metadata.name);
         appendCQLWhereClause(sb);
 
         if (limits() != DataLimits.NONE)
@@ -626,7 +623,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
             out.writeByte(digestFlag(command.isDigestQuery()) | 
indexFlag(command.index.isPresent()));
             if (command.isDigestQuery())
                 out.writeUnsignedVInt(command.digestVersion());
-            CFMetaData.serializer.serialize(command.metadata(), out, version);
+            command.metadata.id.serialize(out);
             out.writeInt(command.nowInSec());
             ColumnFilter.serializer.serialize(command.columnFilter(), out, 
version);
             RowFilter.serializer.serialize(command.rowFilter(), out, version);
@@ -652,11 +649,11 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
 
             boolean hasIndex = hasIndex(flags);
             int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
-            CFMetaData metadata = CFMetaData.serializer.deserialize(in, 
version);
+            TableMetadata metadata = 
Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
             int nowInSec = in.readInt();
             ColumnFilter columnFilter = 
ColumnFilter.serializer.deserialize(in, version, metadata);
             RowFilter rowFilter = RowFilter.serializer.deserialize(in, 
version, metadata);
-            DataLimits limits = DataLimits.serializer.deserialize(in, version, 
 metadata.comparator);
+            DataLimits limits = DataLimits.serializer.deserialize(in, version, 
metadata.comparator);
             Optional<IndexMetadata> index = hasIndex
                                           ? deserializeIndexMetadata(in, 
version, metadata)
                                           : Optional.empty();
@@ -664,11 +661,11 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
             return kind.selectionDeserializer.deserialize(in, version, 
isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, 
index);
         }
 
-        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus 
in, int version, CFMetaData cfm) throws IOException
+        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus 
in, int version, TableMetadata metadata) throws IOException
         {
             try
             {
-                return Optional.of(IndexMetadata.serializer.deserialize(in, 
version, cfm));
+                return Optional.of(IndexMetadata.serializer.deserialize(in, 
version, metadata));
             }
             catch (UnknownIndexException e)
             {
@@ -676,7 +673,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
                             "If an index was just created, this is likely due 
to the schema not " +
                             "being fully propagated. Local read will proceed 
without using the " +
                             "index. Please wait for schema agreement after 
index creation.",
-                            cfm.ksName, cfm.cfName, e.indexId);
+                            metadata.keyspace, metadata.name, e.indexId);
                 return Optional.empty();
             }
         }
@@ -684,14 +681,14 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
         public long serializedSize(ReadCommand command, int version)
         {
             return 2 // kind + flags
-                 + (command.isDigestQuery() ? 
TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
-                 + CFMetaData.serializer.serializedSize(command.metadata(), 
version)
-                 + TypeSizes.sizeof(command.nowInSec())
-                 + 
ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
-                 + RowFilter.serializer.serializedSize(command.rowFilter(), 
version)
-                 + DataLimits.serializer.serializedSize(command.limits(), 
version, command.metadata.comparator)
-                 + command.selectionSerializedSize(version)
-                 + command.indexSerializedSize(version);
+                   + (command.isDigestQuery() ? 
TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
+                   + command.metadata.id.serializedSize()
+                   + TypeSizes.sizeof(command.nowInSec())
+                   + 
ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
+                   + RowFilter.serializer.serializedSize(command.rowFilter(), 
version)
+                   + DataLimits.serializer.serializedSize(command.limits(), 
version, command.metadata.comparator)
+                   + command.selectionSerializedSize(version)
+                   + command.indexSerializedSize(version);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ReadExecutionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java 
b/src/java/org/apache/cassandra/db/ReadExecutionController.java
index 56bb0d3..9114212 100644
--- a/src/java/org/apache/cassandra/db/ReadExecutionController.java
+++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.db;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -25,13 +25,13 @@ public class ReadExecutionController implements 
AutoCloseable
 {
     // For every reads
     private final OpOrder.Group baseOp;
-    private final CFMetaData baseMetadata; // kept to sanity check that we 
have take the op order on the right table
+    private final TableMetadata baseMetadata; // kept to sanity check that we 
have take the op order on the right table
 
     // For index reads
     private final ReadExecutionController indexController;
     private final OpOrder.Group writeOp;
 
-    private ReadExecutionController(OpOrder.Group baseOp, CFMetaData 
baseMetadata, ReadExecutionController indexController, OpOrder.Group writeOp)
+    private ReadExecutionController(OpOrder.Group baseOp, TableMetadata 
baseMetadata, ReadExecutionController indexController, OpOrder.Group writeOp)
     {
         // We can have baseOp == null, but only when empty() is called, in 
which case the controller will never really be used
         // (which validForReadOn should ensure). But if it's not null, we 
should have the proper metadata too.
@@ -54,7 +54,7 @@ public class ReadExecutionController implements AutoCloseable
 
     public boolean validForReadOn(ColumnFamilyStore cfs)
     {
-        return baseOp != null && cfs.metadata.cfId.equals(baseMetadata.cfId);
+        return baseOp != null && cfs.metadata.id.equals(baseMetadata.id);
     }
 
     public static ReadExecutionController empty()
@@ -79,7 +79,7 @@ public class ReadExecutionController implements AutoCloseable
 
         if (indexCfs == null)
         {
-            return new ReadExecutionController(baseCfs.readOrdering.start(), 
baseCfs.metadata, null, null);
+            return new ReadExecutionController(baseCfs.readOrdering.start(), 
baseCfs.metadata(), null, null);
         }
         else
         {
@@ -89,11 +89,11 @@ public class ReadExecutionController implements 
AutoCloseable
             try
             {
                 baseOp = baseCfs.readOrdering.start();
-                indexController = new 
ReadExecutionController(indexCfs.readOrdering.start(), indexCfs.metadata, null, 
null);
+                indexController = new 
ReadExecutionController(indexCfs.readOrdering.start(), indexCfs.metadata(), 
null, null);
                 // TODO: this should perhaps not open and maintain a writeOp 
for the full duration, but instead only *try* to delete stale entries, without 
blocking if there's no room
                 // as it stands, we open a writeOp and keep it open for the 
duration to ensure that should this CF get flushed to make room we don't block 
the reclamation of any room being made
                 writeOp = Keyspace.writeOrder.start();
-                return new ReadExecutionController(baseOp, baseCfs.metadata, 
indexController, writeOp);
+                return new ReadExecutionController(baseOp, baseCfs.metadata(), 
indexController, writeOp);
             }
             catch (RuntimeException e)
             {
@@ -120,7 +120,7 @@ public class ReadExecutionController implements 
AutoCloseable
         return index == null ? null : index.getBackingTable().orElse(null);
     }
 
-    public CFMetaData metaData()
+    public TableMetadata metadata()
     {
         return baseMetadata;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java 
b/src/java/org/apache/cassandra/db/ReadQuery.java
index 806cfac..338ab1e 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -53,7 +53,7 @@ public interface ReadQuery
 
         public UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController)
         {
-            return 
EmptyIterators.unfilteredPartition(executionController.metaData());
+            return 
EmptyIterators.unfilteredPartition(executionController.metadata());
         }
 
         public DataLimits limits()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java 
b/src/java/org/apache/cassandra/db/ReadResponse.java
index 7cf04a4..52d30c2 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -24,13 +24,13 @@ import java.security.MessageDigest;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java 
b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
new file mode 100644
index 0000000..fab7730
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+import static java.util.Comparator.naturalOrder;
+
+/**
+ * Columns (or a subset of the columns) that a partition contains.
+ * This mainly groups both static and regular columns for convenience.
+ */
+public class RegularAndStaticColumns implements Iterable<ColumnMetadata>
+{
+    public static RegularAndStaticColumns NONE = new 
RegularAndStaticColumns(Columns.NONE, Columns.NONE);
+
+    public final Columns statics;
+    public final Columns regulars;
+
+    public RegularAndStaticColumns(Columns statics, Columns regulars)
+    {
+        assert statics != null && regulars != null;
+        this.statics = statics;
+        this.regulars = regulars;
+    }
+
+    public static RegularAndStaticColumns of(ColumnMetadata column)
+    {
+        return new RegularAndStaticColumns(column.isStatic() ? 
Columns.of(column) : Columns.NONE,
+                                           column.isStatic() ? Columns.NONE : 
Columns.of(column));
+    }
+
+    public RegularAndStaticColumns without(ColumnMetadata column)
+    {
+        return new RegularAndStaticColumns(column.isStatic() ? 
statics.without(column) : statics,
+                                           column.isStatic() ? regulars : 
regulars.without(column));
+    }
+
+    public RegularAndStaticColumns mergeTo(RegularAndStaticColumns that)
+    {
+        if (this == that)
+            return this;
+        Columns statics = this.statics.mergeTo(that.statics);
+        Columns regulars = this.regulars.mergeTo(that.regulars);
+        if (statics == this.statics && regulars == this.regulars)
+            return this;
+        if (statics == that.statics && regulars == that.regulars)
+            return that;
+        return new RegularAndStaticColumns(statics, regulars);
+    }
+
+    public boolean isEmpty()
+    {
+        return statics.isEmpty() && regulars.isEmpty();
+    }
+
+    public Columns columns(boolean isStatic)
+    {
+        return isStatic ? statics : regulars;
+    }
+
+    public boolean contains(ColumnMetadata column)
+    {
+        return column.isStatic() ? statics.contains(column) : 
regulars.contains(column);
+    }
+
+    public boolean includes(RegularAndStaticColumns columns)
+    {
+        return statics.containsAll(columns.statics) && 
regulars.containsAll(columns.regulars);
+    }
+
+    public Iterator<ColumnMetadata> iterator()
+    {
+        return Iterators.concat(statics.iterator(), regulars.iterator());
+    }
+
+    public Iterator<ColumnMetadata> selectOrderIterator()
+    {
+        return Iterators.concat(statics.selectOrderIterator(), 
regulars.selectOrderIterator());
+    }
+
+    /** * Returns the total number of static and regular columns. */
+    public int size()
+    {
+        return regulars.size() + statics.size();
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[").append(statics).append(" | 
").append(regulars).append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if (!(other instanceof RegularAndStaticColumns))
+            return false;
+
+        RegularAndStaticColumns that = (RegularAndStaticColumns)other;
+        return this.statics.equals(that.statics)
+            && this.regulars.equals(that.regulars);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(statics, regulars);
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static class Builder
+    {
+        // Note that we do want to use sorted sets because we want the column 
definitions to be compared
+        // through compareTo, not equals. The former basically check it's the 
same column name, while the latter
+        // check it's the same object, including the same type.
+        private BTreeSet.Builder<ColumnMetadata> regularColumns;
+        private BTreeSet.Builder<ColumnMetadata> staticColumns;
+
+        public Builder add(ColumnMetadata c)
+        {
+            if (c.isStatic())
+            {
+                if (staticColumns == null)
+                    staticColumns = BTreeSet.builder(naturalOrder());
+                staticColumns.add(c);
+            }
+            else
+            {
+                assert c.isRegular();
+                if (regularColumns == null)
+                    regularColumns = BTreeSet.builder(naturalOrder());
+                regularColumns.add(c);
+            }
+            return this;
+        }
+
+        public Builder addAll(Iterable<ColumnMetadata> columns)
+        {
+            for (ColumnMetadata c : columns)
+                add(c);
+            return this;
+        }
+
+        public Builder addAll(RegularAndStaticColumns columns)
+        {
+            if (regularColumns == null && !columns.regulars.isEmpty())
+                regularColumns = BTreeSet.builder(naturalOrder());
+
+            for (ColumnMetadata c : columns.regulars)
+                regularColumns.add(c);
+
+            if (staticColumns == null && !columns.statics.isEmpty())
+                staticColumns = BTreeSet.builder(naturalOrder());
+
+            for (ColumnMetadata c : columns.statics)
+                staticColumns.add(c);
+
+            return this;
+        }
+
+        public RegularAndStaticColumns build()
+        {
+            return new RegularAndStaticColumns(staticColumns  == null ? 
Columns.NONE : Columns.from(staticColumns.build()),
+                                               regularColumns == null ? 
Columns.NONE : Columns.from(regularColumns.build()));
+        }
+    }
+}

Reply via email to