http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java 
b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
index 0958113..269fc95 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Memtable;
@@ -57,7 +57,7 @@ public class ColumnIndex
 
     private final AbstractType<?> keyValidator;
 
-    private final ColumnDefinition column;
+    private final ColumnMetadata column;
     private final Optional<IndexMetadata> config;
 
     private final AtomicReference<IndexMemtable> memtable;
@@ -70,7 +70,7 @@ public class ColumnIndex
 
     private final boolean isTokenized;
 
-    public ColumnIndex(AbstractType<?> keyValidator, ColumnDefinition column, 
IndexMetadata metadata)
+    public ColumnIndex(AbstractType<?> keyValidator, ColumnMetadata column, 
IndexMetadata metadata)
     {
         this.keyValidator = keyValidator;
         this.column = column;
@@ -147,7 +147,7 @@ public class ColumnIndex
         tracker.update(oldSSTables, newSSTables);
     }
 
-    public ColumnDefinition getDefinition()
+    public ColumnMetadata getDefinition()
     {
         return column;
     }
@@ -229,7 +229,7 @@ public class ColumnIndex
 
     }
 
-    public static ByteBuffer getValueOf(ColumnDefinition column, Row row, int 
nowInSecs)
+    public static ByteBuffer getValueOf(ColumnMetadata column, Row row, int 
nowInSecs)
     {
         if (row == null)
             return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/sasi/conf/IndexMode.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/IndexMode.java 
b/src/java/org/apache/cassandra/index/sasi/conf/IndexMode.java
index c66dd02..d319636 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/IndexMode.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/IndexMode.java
@@ -22,7 +22,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
 import org.apache.cassandra.index.sasi.analyzer.NoOpAnalyzer;
 import org.apache.cassandra.index.sasi.analyzer.NonTokenizingAnalyzer;
@@ -110,12 +110,12 @@ public class IndexMode
         }
     }
 
-    public static IndexMode getMode(ColumnDefinition column, 
Optional<IndexMetadata> config) throws ConfigurationException
+    public static IndexMode getMode(ColumnMetadata column, 
Optional<IndexMetadata> config) throws ConfigurationException
     {
         return getMode(column, config.isPresent() ? config.get().options : 
null);
     }
 
-    public static IndexMode getMode(ColumnDefinition column, Map<String, 
String> indexOptions) throws ConfigurationException
+    public static IndexMode getMode(ColumnMetadata column, Map<String, String> 
indexOptions) throws ConfigurationException
     {
         if (indexOptions == null || indexOptions.isEmpty())
             return IndexMode.NOT_INDEXED;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
index 9fa4e87..20f6292 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
@@ -27,7 +27,7 @@ import java.util.concurrent.*;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.rows.Row;
@@ -79,10 +79,10 @@ public class PerSSTableIndexWriter implements 
SSTableFlushObserver
     private final OperationType source;
 
     private final AbstractType<?> keyValidator;
-    private final Map<ColumnDefinition, ColumnIndex> supportedIndexes;
+    private final Map<ColumnMetadata, ColumnIndex> supportedIndexes;
 
     @VisibleForTesting
-    protected final Map<ColumnDefinition, Index> indexes;
+    protected final Map<ColumnMetadata, Index> indexes;
 
     private DecoratedKey currentKey;
     private long currentKeyPosition;
@@ -91,7 +91,7 @@ public class PerSSTableIndexWriter implements 
SSTableFlushObserver
     public PerSSTableIndexWriter(AbstractType<?> keyValidator,
                                  Descriptor descriptor,
                                  OperationType source,
-                                 Map<ColumnDefinition, ColumnIndex> 
supportedIndexes)
+                                 Map<ColumnMetadata, ColumnIndex> 
supportedIndexes)
     {
         this.keyValidator = keyValidator;
         this.descriptor = descriptor;
@@ -155,7 +155,7 @@ public class PerSSTableIndexWriter implements 
SSTableFlushObserver
         }
     }
 
-    public Index getIndex(ColumnDefinition columnDef)
+    public Index getIndex(ColumnMetadata columnDef)
     {
         return indexes.get(columnDef);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java 
b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
index ca60ac5..21cb7ce 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
@@ -22,8 +22,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentSkipListSet;
 
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
 import org.apache.cassandra.index.sasi.disk.Token;
@@ -106,9 +106,9 @@ public class TrieMemIndex extends MemIndex
     {
         public static final SizeEstimatingNodeFactory NODE_FACTORY = new 
SizeEstimatingNodeFactory();
 
-        protected final ColumnDefinition definition;
+        protected final ColumnMetadata definition;
 
-        public ConcurrentTrie(ColumnDefinition column)
+        public ConcurrentTrie(ColumnMetadata column)
         {
             definition = column;
         }
@@ -162,7 +162,7 @@ public class TrieMemIndex extends MemIndex
     {
         private final ConcurrentRadixTree<ConcurrentSkipListSet<DecoratedKey>> 
trie;
 
-        private ConcurrentPrefixTrie(ColumnDefinition column)
+        private ConcurrentPrefixTrie(ColumnMetadata column)
         {
             super(column);
             trie = new ConcurrentRadixTree<>(NODE_FACTORY);
@@ -200,7 +200,7 @@ public class TrieMemIndex extends MemIndex
     {
         private final 
ConcurrentSuffixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
 
-        private ConcurrentSuffixTrie(ColumnDefinition column)
+        private ConcurrentSuffixTrie(ColumnMetadata column)
         {
             super(column);
             trie = new ConcurrentSuffixTree<>(NODE_FACTORY);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/sasi/plan/Expression.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/Expression.java 
b/src/java/org/apache/cassandra/index/sasi/plan/Expression.java
index 93f1938..4f9d6b0 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/Expression.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/Expression.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
@@ -115,7 +115,7 @@ public class Expression
     @VisibleForTesting
     public Expression(String name, AbstractType<?> validator)
     {
-        this(null, new ColumnIndex(UTF8Type.instance, 
ColumnDefinition.regularDef("sasi", "internal", name, validator), null));
+        this(null, new ColumnIndex(UTF8Type.instance, 
ColumnMetadata.regularColumn("sasi", "internal", name, validator), null));
     }
 
     public Expression setLower(Bound newLower)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/sasi/plan/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/Operation.java 
b/src/java/org/apache/cassandra/index/sasi/plan/Operation.java
index 7c744e1..148bc1f 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/Operation.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/Operation.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.ColumnDefinition.Kind;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.ColumnMetadata.Kind;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.rows.Row;
@@ -37,6 +37,7 @@ import 
org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
+
 import org.apache.cassandra.utils.FBUtilities;
 
 @SuppressWarnings("resource")
@@ -65,14 +66,14 @@ public class Operation extends RangeIterator<Long, Token>
     private final QueryController controller;
 
     protected final OperationType op;
-    protected final ListMultimap<ColumnDefinition, Expression> expressions;
+    protected final ListMultimap<ColumnMetadata, Expression> expressions;
     protected final RangeIterator<Long, Token> range;
 
     protected Operation left, right;
 
     private Operation(OperationType operation,
                       QueryController controller,
-                      ListMultimap<ColumnDefinition, Expression> expressions,
+                      ListMultimap<ColumnMetadata, Expression> expressions,
                       RangeIterator<Long, Token> range,
                       Operation left, Operation right)
     {
@@ -207,7 +208,7 @@ public class Operation extends RangeIterator<Long, Token>
         boolean result = false;
         int idx = 0;
 
-        for (ColumnDefinition column : expressions.keySet())
+        for (ColumnMetadata column : expressions.keySet())
         {
             if (column.kind == Kind.PARTITION_KEY)
                 continue;
@@ -262,11 +263,11 @@ public class Operation extends RangeIterator<Long, Token>
     }
 
     @VisibleForTesting
-    protected static ListMultimap<ColumnDefinition, Expression> 
analyzeGroup(QueryController controller,
-                                                                             
OperationType op,
-                                                                             
List<RowFilter.Expression> expressions)
+    protected static ListMultimap<ColumnMetadata, Expression> 
analyzeGroup(QueryController controller,
+                                                                           
OperationType op,
+                                                                           
List<RowFilter.Expression> expressions)
     {
-        ListMultimap<ColumnDefinition, Expression> analyzed = 
ArrayListMultimap.create();
+        ListMultimap<ColumnMetadata, Expression> analyzed = 
ArrayListMultimap.create();
 
         // sort all of the expressions in the operation by name and priority 
of the logical operator
         // this gives us an efficient way to handle inequality and combining 
into ranges without extra processing
@@ -429,7 +430,7 @@ public class Operation extends RangeIterator<Long, Token>
         {
             if (!expressions.isEmpty())
             {
-                ListMultimap<ColumnDefinition, Expression> analyzedExpressions 
= analyzeGroup(controller, op, expressions);
+                ListMultimap<ColumnMetadata, Expression> analyzedExpressions = 
analyzeGroup(controller, op, expressions);
                 RangeIterator.Builder<Long, Token> range = 
controller.getIndexes(op, analyzedExpressions.values());
 
                 Operation rightOp = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java 
b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
index 336a740..d2f990e 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -22,10 +22,15 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.sasi.SASIIndex;
@@ -39,9 +44,9 @@ import 
org.apache.cassandra.index.sasi.plan.Operation.OperationType;
 import org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.Pair;
 
 public class QueryController
@@ -63,7 +68,7 @@ public class QueryController
         this.executionStart = System.nanoTime();
     }
 
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
         return command.metadata();
     }
@@ -80,7 +85,7 @@ public class QueryController
 
     public AbstractType<?> getKeyValidator()
     {
-        return cfs.metadata.getKeyValidator();
+        return cfs.metadata().partitionKeyType;
     }
 
     public ColumnIndex getIndex(RowFilter.Expression expression)
@@ -96,7 +101,7 @@ public class QueryController
             throw new NullPointerException();
         try
         {
-            SinglePartitionReadCommand partition = 
SinglePartitionReadCommand.create(cfs.metadata,
+            SinglePartitionReadCommand partition = 
SinglePartitionReadCommand.create(cfs.metadata(),
                                                                                
      command.nowInSec(),
                                                                                
      command.columnFilter(),
                                                                                
      command.rowFilter().withoutExpressions(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java 
b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
index 8a25f79..5e78684 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.index.sasi.plan;
 
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
@@ -28,6 +27,7 @@ import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Operation.OperationType;
 import org.apache.cassandra.exceptions.RequestTimeoutException;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.AbstractIterator;
 
 public class QueryPlan
@@ -153,7 +153,7 @@ public class QueryPlan
             }
         }
 
-        public CFMetaData metadata()
+        public TableMetadata metadata()
         {
             return controller.metadata();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 7efca63..b1e15ed 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -27,13 +27,12 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * Base class for the sstable writers used by CQLSSTableWriter.
@@ -41,13 +40,13 @@ import org.apache.cassandra.utils.Pair;
 abstract class AbstractSSTableSimpleWriter implements Closeable
 {
     protected final File directory;
-    protected final CFMetaData metadata;
-    protected final PartitionColumns columns;
+    protected final TableMetadataRef metadata;
+    protected final RegularAndStaticColumns columns;
     protected SSTableFormat.Type formatType = SSTableFormat.Type.current();
     protected static AtomicInteger generation = new AtomicInteger(0);
     protected boolean makeRangeAware = false;
 
-    protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, 
PartitionColumns columns)
+    protected AbstractSSTableSimpleWriter(File directory, TableMetadataRef 
metadata, RegularAndStaticColumns columns)
     {
         this.metadata = metadata;
         this.directory = directory;
@@ -67,13 +66,13 @@ abstract class AbstractSSTableSimpleWriter implements 
Closeable
 
     protected SSTableTxnWriter createWriter()
     {
-        SerializationHeader header = new SerializationHeader(true, metadata, 
columns, EncodingStats.NO_STATS);
+        SerializationHeader header = new SerializationHeader(true, 
metadata.get(), columns, EncodingStats.NO_STATS);
 
         if (makeRangeAware)
             return SSTableTxnWriter.createRangeAware(metadata, 0,  
ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header);
 
         return SSTableTxnWriter.create(metadata,
-                                       createDescriptor(directory, 
metadata.ksName, metadata.cfName, formatType),
+                                       createDescriptor(directory, 
metadata.keyspace, metadata.name, formatType),
                                        0,
                                        ActiveRepairService.UNREPAIRED_SSTABLE,
                                        0,
@@ -117,7 +116,7 @@ abstract class AbstractSSTableSimpleWriter implements 
Closeable
 
     PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
     {
-        return getUpdateFor(metadata.decorateKey(key));
+        return getUpdateFor(metadata.get().partitioner.decorateKey(key));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index a195235..d026a03 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -31,9 +31,9 @@ import java.util.stream.Collectors;
 
 import com.datastax.driver.core.ProtocolVersion;
 import com.datastax.driver.core.TypeCodec;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -55,6 +55,7 @@ import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -248,7 +249,7 @@ public class CQLSSTableWriter implements Closeable
         long now = System.currentTimeMillis() * 1000;
         // Note that we asks indexes to not validate values (the last 'false' 
arg below) because that triggers a 'Keyspace.open'
         // and that forces a lot of initialization that we don't want.
-        UpdateParameters params = new UpdateParameters(insert.cfm,
+        UpdateParameters params = new UpdateParameters(insert.metadata,
                                                        insert.updatedColumns(),
                                                        options,
                                                        
insert.getTimestamp(now, options),
@@ -309,7 +310,7 @@ public class CQLSSTableWriter implements Closeable
      */
     public com.datastax.driver.core.UserType getUDType(String dataType)
     {
-        KeyspaceMetadata ksm = 
Schema.instance.getKSMetaData(insert.keyspace());
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(insert.keyspace());
         UserType userType = 
ksm.types.getNullable(ByteBufferUtil.bytes(dataType));
         return (com.datastax.driver.core.UserType) 
UDHelper.driverType(userType);
     }
@@ -509,16 +510,16 @@ public class CQLSSTableWriter implements Closeable
             {
                 String keyspace = schemaStatement.keyspace();
 
-                if (Schema.instance.getKSMetaData(keyspace) == null)
+                if (Schema.instance.getKeyspaceMetadata(keyspace) == null)
                     Schema.instance.load(KeyspaceMetadata.create(keyspace, 
KeyspaceParams.simple(1)));
 
                 createTypes(keyspace);
-                CFMetaData cfMetaData = createTable(keyspace);
+                TableMetadataRef tableMetadata = 
TableMetadataRef.forOfflineTools(createTable(keyspace));
                 Pair<UpdateStatement, List<ColumnSpecification>> 
preparedInsert = prepareInsert();
 
                 AbstractSSTableSimpleWriter writer = sorted
-                                                     ? new 
SSTableSimpleWriter(directory, cfMetaData, preparedInsert.left.updatedColumns())
-                                                     : new 
SSTableSimpleUnsortedWriter(directory, cfMetaData, 
preparedInsert.left.updatedColumns(), bufferSizeInMB);
+                                                     ? new 
SSTableSimpleWriter(directory, tableMetadata, 
preparedInsert.left.updatedColumns())
+                                                     : new 
SSTableSimpleUnsortedWriter(directory, tableMetadata, 
preparedInsert.left.updatedColumns(), bufferSizeInMB);
 
                 if (formatType != null)
                     writer.setSSTableFormatType(formatType);
@@ -529,39 +530,38 @@ public class CQLSSTableWriter implements Closeable
 
         private void createTypes(String keyspace)
         {
-            KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+            KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(keyspace);
             Types.RawBuilder builder = Types.rawBuilder(keyspace);
             for (CreateTypeStatement st : typeStatements)
                 st.addToRawBuilder(builder);
 
             ksm = ksm.withSwapped(builder.build());
-            Schema.instance.setKeyspaceMetadata(ksm);
+            Schema.instance.load(ksm);
         }
         /**
          * Creates the table according to schema statement
          *
          * @param keyspace name of the keyspace where table should be created
          */
-        private CFMetaData createTable(String keyspace)
+        private TableMetadata createTable(String keyspace)
         {
-            KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+            KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(keyspace);
 
-            CFMetaData cfMetaData = 
ksm.tables.getNullable(schemaStatement.columnFamily());
-            if (cfMetaData == null)
-            {
-                CreateTableStatement statement = (CreateTableStatement) 
schemaStatement.prepare(ksm.types).statement;
-                statement.validate(ClientState.forInternalCalls());
+            TableMetadata tableMetadata = 
ksm.tables.getNullable(schemaStatement.columnFamily());
 
-                cfMetaData = statement.getCFMetaData();
+            if (tableMetadata != null)
+                return tableMetadata;
 
-                Schema.instance.load(cfMetaData);
-                
Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData)));
-            }
+            CreateTableStatement statement = (CreateTableStatement) 
schemaStatement.prepare(ksm.types).statement;
+            statement.validate(ClientState.forInternalCalls());
 
+            TableMetadata.Builder builder = statement.builder();
             if (partitioner != null)
-                return cfMetaData.copy(partitioner);
-            else
-                return cfMetaData;
+                builder.partitioner(partitioner);
+            TableMetadata metadata = builder.build();
+
+            Schema.instance.load(ksm.withSwapped(ksm.tables.with(metadata)));
+            return metadata;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 95ade16..b8d236a 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -195,10 +196,10 @@ public class IndexSummaryManager implements 
IndexSummaryManagerMBean
      * compacting.
      */
     @SuppressWarnings("resource")
-    private Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> 
getCompactingAndNonCompactingSSTables()
+    private Pair<List<SSTableReader>, Map<TableId, LifecycleTransaction>> 
getCompactingAndNonCompactingSSTables()
     {
         List<SSTableReader> allCompacting = new ArrayList<>();
-        Map<UUID, LifecycleTransaction> allNonCompacting = new HashMap<>();
+        Map<TableId, LifecycleTransaction> allNonCompacting = new HashMap<>();
         for (Keyspace ks : Keyspace.all())
         {
             for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores())
@@ -213,7 +214,7 @@ public class IndexSummaryManager implements 
IndexSummaryManagerMBean
                 }
                 while (null == (txn = 
cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN)));
 
-                allNonCompacting.put(cfStore.metadata.cfId, txn);
+                allNonCompacting.put(cfStore.metadata.id, txn);
                 allCompacting.addAll(Sets.difference(allSSTables, 
nonCompacting));
             }
         }
@@ -222,7 +223,7 @@ public class IndexSummaryManager implements 
IndexSummaryManagerMBean
 
     public void redistributeSummaries() throws IOException
     {
-        Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> 
compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
+        Pair<List<SSTableReader>, Map<TableId, LifecycleTransaction>> 
compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
         try
         {
             redistributeSummaries(new 
IndexSummaryRedistribution(compactingAndNonCompacting.left,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index fc326dc..47c3fca 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.UUID;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +39,7 @@ import 
org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -57,12 +57,12 @@ public class IndexSummaryRedistribution extends 
CompactionInfo.Holder
     static final double DOWNSAMPLE_THESHOLD = 0.75;
 
     private final List<SSTableReader> compacting;
-    private final Map<UUID, LifecycleTransaction> transactions;
+    private final Map<TableId, LifecycleTransaction> transactions;
     private final long memoryPoolBytes;
     private final UUID compactionId;
     private volatile long remainingSpace;
 
-    public IndexSummaryRedistribution(List<SSTableReader> compacting, 
Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes)
+    public IndexSummaryRedistribution(List<SSTableReader> compacting, 
Map<TableId, LifecycleTransaction> transactions, long memoryPoolBytes)
     {
         this.compacting = compacting;
         this.transactions = transactions;
@@ -127,7 +127,7 @@ public class IndexSummaryRedistribution extends 
CompactionInfo.Holder
     }
 
     private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> 
sstables,
-                                                     Map<UUID, 
LifecycleTransaction> transactions,
+                                                     Map<TableId, 
LifecycleTransaction> transactions,
                                                      double totalReadsPerSec, 
long memoryPoolCapacity) throws IOException
     {
         List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 
4);
@@ -144,8 +144,8 @@ public class IndexSummaryRedistribution extends 
CompactionInfo.Holder
             if (isStopRequested())
                 throw new CompactionInterruptedException(getCompactionInfo());
 
-            int minIndexInterval = sstable.metadata.params.minIndexInterval;
-            int maxIndexInterval = sstable.metadata.params.maxIndexInterval;
+            int minIndexInterval = sstable.metadata().params.minIndexInterval;
+            int maxIndexInterval = sstable.metadata().params.maxIndexInterval;
 
             double readsPerSec = sstable.getReadMeter() == null ? 0.0 : 
sstable.getReadMeter().fifteenMinuteRate();
             long idealSpace = Math.round(remainingSpace * (readsPerSec / 
totalReadsPerSec));
@@ -217,7 +217,7 @@ public class IndexSummaryRedistribution extends 
CompactionInfo.Holder
                 logger.trace("SSTable {} is within thresholds of ideal 
sampling", sstable);
                 remainingSpace -= sstable.getIndexSummaryOffHeapSize();
                 newSSTables.add(sstable);
-                transactions.get(sstable.metadata.cfId).cancel(sstable);
+                transactions.get(sstable.metadata().id).cancel(sstable);
             }
             totalReadsPerSec -= readsPerSec;
         }
@@ -228,7 +228,7 @@ public class IndexSummaryRedistribution extends 
CompactionInfo.Holder
             toDownsample = result.right;
             newSSTables.addAll(result.left);
             for (SSTableReader sstable : result.left)
-                transactions.get(sstable.metadata.cfId).cancel(sstable);
+                transactions.get(sstable.metadata().id).cancel(sstable);
         }
 
         // downsample first, then upsample
@@ -244,10 +244,10 @@ public class IndexSummaryRedistribution extends 
CompactionInfo.Holder
             logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} 
of the original number of entries",
                          sstable, sstable.getIndexSummarySamplingLevel(), 
Downsampling.BASE_SAMPLING_LEVEL,
                          entry.newSamplingLevel, 
Downsampling.BASE_SAMPLING_LEVEL);
-            ColumnFamilyStore cfs = 
Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
+            ColumnFamilyStore cfs = 
Keyspace.open(sstable.metadata().keyspace).getColumnFamilyStore(sstable.metadata().id);
             SSTableReader replacement = 
sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
             newSSTables.add(replacement);
-            transactions.get(sstable.metadata.cfId).update(replacement, true);
+            transactions.get(sstable.metadata().id).update(replacement, true);
         }
 
         return newSSTables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java 
b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index d51e97b..091e969 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -20,14 +20,13 @@ package org.apache.cassandra.io.sstable;
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.cassandra.utils.AbstractIterator;
-
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CloseableIterator;
 
@@ -86,7 +85,7 @@ public class KeyIterator extends 
AbstractIterator<DecoratedKey> implements Close
 
     private long keyPosition;
 
-    public KeyIterator(Descriptor desc, CFMetaData metadata)
+    public KeyIterator(Descriptor desc, TableMetadata metadata)
     {
         this.desc = desc;
         in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java 
b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
index 6f395f8..e64d95d 100644
--- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
@@ -39,7 +39,7 @@ public class ReducingKeyIterator implements 
CloseableIterator<DecoratedKey>
     {
         iters = new ArrayList<>(sstables.size());
         for (SSTableReader sstable : sstables)
-            iters.add(new KeyIterator(sstable.descriptor, sstable.metadata));
+            iters.add(new KeyIterator(sstable.descriptor, sstable.metadata()));
     }
 
     private void maybeInit()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java 
b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 8556cfa..57aeff6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -30,7 +30,6 @@ import com.google.common.io.Files;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
@@ -39,9 +38,11 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.DiskOptimizationStrategy;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.memory.HeapAllocator;
 
 /**
  * This class is built on top of the SequenceFile. It stores
@@ -63,21 +64,20 @@ public abstract class SSTable
 
     public final Descriptor descriptor;
     protected final Set<Component> components;
-    public final CFMetaData metadata;
     public final boolean compression;
 
     public DecoratedKey first;
     public DecoratedKey last;
 
     protected final DiskOptimizationStrategy optimizationStrategy;
+    protected final TableMetadataRef metadata;
 
-    protected SSTable(Descriptor descriptor, Set<Component> components, 
CFMetaData metadata, DiskOptimizationStrategy optimizationStrategy)
+    protected SSTable(Descriptor descriptor, Set<Component> components, 
TableMetadataRef metadata, DiskOptimizationStrategy optimizationStrategy)
     {
         // In almost all cases, metadata shouldn't be null, but allowing null 
allows to create a mostly functional SSTable without
         // full schema definition. SSTableLoader use that ability
         assert descriptor != null;
         assert components != null;
-        assert metadata != null;
 
         this.descriptor = descriptor;
         Set<Component> dataComponents = new HashSet<>(components);
@@ -118,9 +118,14 @@ public abstract class SSTable
         return true;
     }
 
+    public TableMetadata metadata()
+    {
+        return metadata.get();
+    }
+
     public IPartitioner getPartitioner()
     {
-        return metadata.partitioner;
+        return metadata().partitioner;
     }
 
     public DecoratedKey decorateKey(ByteBuffer key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 2a79f88..3ade9ff 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.*;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -54,8 +54,8 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
         try
         {
             DeletionTime partitionLevelDeletion = 
DeletionTime.serializer.deserialize(file);
-            SerializationHelper helper = new 
SerializationHelper(sstable.metadata, 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL);
-            SSTableSimpleIterator iterator = 
SSTableSimpleIterator.create(sstable.metadata, file, sstable.header, helper, 
partitionLevelDeletion);
+            SerializationHelper helper = new 
SerializationHelper(sstable.metadata(), 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL);
+            SSTableSimpleIterator iterator = 
SSTableSimpleIterator.create(sstable.metadata(), file, sstable.header, helper, 
partitionLevelDeletion);
             return new SSTableIdentityIterator(sstable, key, 
partitionLevelDeletion, file.getPath(), iterator);
         }
         catch (IOException e)
@@ -72,10 +72,10 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
             dfile.seek(indexEntry.position);
             ByteBufferUtil.skipShortLength(dfile); // Skip partition key
             DeletionTime partitionLevelDeletion = 
DeletionTime.serializer.deserialize(dfile);
-            SerializationHelper helper = new 
SerializationHelper(sstable.metadata, 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL);
+            SerializationHelper helper = new 
SerializationHelper(sstable.metadata(), 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL);
             SSTableSimpleIterator iterator = tombstoneOnly
-                    ? 
SSTableSimpleIterator.createTombstoneOnly(sstable.metadata, dfile, 
sstable.header, helper, partitionLevelDeletion)
-                    : SSTableSimpleIterator.create(sstable.metadata, dfile, 
sstable.header, helper, partitionLevelDeletion);
+                    ? 
SSTableSimpleIterator.createTombstoneOnly(sstable.metadata(), dfile, 
sstable.header, helper, partitionLevelDeletion)
+                    : SSTableSimpleIterator.create(sstable.metadata(), dfile, 
sstable.header, helper, partitionLevelDeletion);
             return new SSTableIdentityIterator(sstable, key, 
partitionLevelDeletion, dfile.getPath(), iterator);
         }
         catch (IOException e)
@@ -85,14 +85,14 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
         }
     }
 
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
-        return sstable.metadata;
+        return iterator.metadata;
     }
 
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
-        return metadata().partitionColumns();
+        return metadata().regularAndStaticColumns();
     }
 
     public boolean isReverseOrder()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index e00de4a..47b37ef 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -25,7 +25,7 @@ import java.util.*;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Range;
@@ -96,7 +96,7 @@ public class SSTableLoader implements StreamEventHandler
                                               return false;
                                           }
 
-                                          CFMetaData metadata = 
client.getTableMetadata(desc.cfname);
+                                          TableMetadataRef metadata = 
client.getTableMetadata(desc.cfname);
                                           if (metadata == null)
                                           {
                                               
outputHandler.output(String.format("Skipping file %s: table %s.%s doesn't 
exist", name, keyspace, desc.cfname));
@@ -272,9 +272,9 @@ public class SSTableLoader implements StreamEventHandler
          * Validate that {@code keyspace} is an existing keyspace and {@code
          * cfName} one of its existing column family.
          */
-        public abstract CFMetaData getTableMetadata(String tableName);
+        public abstract TableMetadataRef getTableMetadata(String tableName);
 
-        public void setTableMetadata(CFMetaData cfm)
+        public void setTableMetadata(TableMetadataRef cfm)
         {
             throw new RuntimeException();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
index b92bc78..1be79ab 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
@@ -19,10 +19,10 @@
 package org.apache.cassandra.io.sstable;
 
 import java.util.Collection;
-import java.util.UUID;
 
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
@@ -44,7 +44,7 @@ public interface SSTableMultiWriter extends Transactional
 
     String getFilename();
     long getFilePointer();
-    UUID getCfId();
+    TableId getTableId();
 
     static void abortOrDie(SSTableMultiWriter writer)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index ad0f3c9..c3c7472 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -21,17 +21,12 @@ import java.io.IOException;
 import java.io.IOError;
 import java.util.Iterator;
 
-import org.apache.cassandra.io.util.RewindableDataInput;
-import org.apache.cassandra.utils.AbstractIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataPosition;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.AbstractIterator;
 
 /**
  * Utility class to handle deserializing atom from sstables.
@@ -41,23 +36,23 @@ import org.apache.cassandra.net.MessagingService;
  */
 public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered> implements Iterator<Unfiltered>
 {
-    protected final CFMetaData metadata;
+    final TableMetadata metadata;
     protected final DataInputPlus in;
     protected final SerializationHelper helper;
 
-    private SSTableSimpleIterator(CFMetaData metadata, DataInputPlus in, 
SerializationHelper helper)
+    private SSTableSimpleIterator(TableMetadata metadata, DataInputPlus in, 
SerializationHelper helper)
     {
         this.metadata = metadata;
         this.in = in;
         this.helper = helper;
     }
 
-    public static SSTableSimpleIterator create(CFMetaData metadata, 
DataInputPlus in, SerializationHeader header, SerializationHelper helper, 
DeletionTime partitionDeletion)
+    public static SSTableSimpleIterator create(TableMetadata metadata, 
DataInputPlus in, SerializationHeader header, SerializationHelper helper, 
DeletionTime partitionDeletion)
     {
         return new CurrentFormatIterator(metadata, in, header, helper);
     }
 
-    public static SSTableSimpleIterator createTombstoneOnly(CFMetaData 
metadata, DataInputPlus in, SerializationHeader header, SerializationHelper 
helper, DeletionTime partitionDeletion)
+    public static SSTableSimpleIterator createTombstoneOnly(TableMetadata 
metadata, DataInputPlus in, SerializationHeader header, SerializationHelper 
helper, DeletionTime partitionDeletion)
     {
         return new CurrentFormatTombstoneIterator(metadata, in, header, 
helper);
     }
@@ -70,7 +65,7 @@ public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered>
 
         private final Row.Builder builder;
 
-        private CurrentFormatIterator(CFMetaData metadata, DataInputPlus in, 
SerializationHeader header, SerializationHelper helper)
+        private CurrentFormatIterator(TableMetadata metadata, DataInputPlus 
in, SerializationHeader header, SerializationHelper helper)
         {
             super(metadata, in, helper);
             this.header = header;
@@ -100,7 +95,7 @@ public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered>
     {
         private final SerializationHeader header;
 
-        private CurrentFormatTombstoneIterator(CFMetaData metadata, 
DataInputPlus in, SerializationHeader header, SerializationHelper helper)
+        private CurrentFormatTombstoneIterator(TableMetadata metadata, 
DataInputPlus in, SerializationHeader header, SerializationHelper helper)
         {
             super(metadata, in, helper);
             this.header = header;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 23e18b5..afb4461 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -28,12 +28,12 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Throwables;
 
 import io.netty.util.concurrent.FastThreadLocalThread;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -60,11 +60,11 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
     private final BlockingQueue<Buffer> writeQueue = new 
SynchronousQueue<Buffer>();
     private final DiskWriter diskWriter = new DiskWriter();
 
-    SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, 
PartitionColumns columns, long bufferSizeInMB)
+    SSTableSimpleUnsortedWriter(File directory, TableMetadataRef metadata, 
RegularAndStaticColumns columns, long bufferSizeInMB)
     {
         super(directory, metadata, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
-        this.header = new SerializationHeader(true, metadata, columns, 
EncodingStats.NO_STATS);
+        this.header = new SerializationHeader(true, metadata.get(), columns, 
EncodingStats.NO_STATS);
         diskWriter.start();
     }
 
@@ -110,7 +110,7 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
 
     private PartitionUpdate createPartitionUpdate(DecoratedKey key)
     {
-        return new PartitionUpdate(metadata, key, columns, 4)
+        return new PartitionUpdate(metadata.get(), key, columns, 4)
         {
             @Override
             public void add(Row row)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 7fbd79d..a663051 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -22,9 +22,9 @@ import java.io.IOException;
 
 import com.google.common.base.Throwables;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.TableMetadataRef;
 
 /**
  * A SSTable writer that assumes rows are in (partitioner) sorted order.
@@ -43,7 +43,7 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 
     private SSTableTxnWriter writer;
 
-    protected SSTableSimpleWriter(File directory, CFMetaData metadata, 
PartitionColumns columns)
+    protected SSTableSimpleWriter(File directory, TableMetadataRef metadata, 
RegularAndStaticColumns columns)
     {
         super(directory, metadata, columns);
     }
@@ -67,7 +67,7 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
             if (update != null)
                 writePartition(update);
             currentKey = key;
-            update = new PartitionUpdate(metadata, currentKey, columns, 4);
+            update = new PartitionUpdate(metadata.get(), currentKey, columns, 
4);
         }
 
         assert update != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 323b1bd..fcf7cd6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.io.sstable;
 import java.io.IOException;
 import java.util.Collection;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SerializationHeader;
@@ -33,6 +32,8 @@ import 
org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
 /**
@@ -107,7 +108,7 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
 
 
     @SuppressWarnings("resource") // log and writer closed during doPostCleanup
-    public static SSTableTxnWriter createRangeAware(CFMetaData cfm,
+    public static SSTableTxnWriter createRangeAware(TableMetadataRef metadata,
                                                     long keyCount,
                                                     long repairedAt,
                                                     SSTableFormat.Type type,
@@ -115,7 +116,7 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
                                                     SerializationHeader header)
     {
 
-        ColumnFamilyStore cfs = 
Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
+        ColumnFamilyStore cfs = 
Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name);
         LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
         SSTableMultiWriter writer;
         try
@@ -133,7 +134,7 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
     }
 
     @SuppressWarnings("resource") // log and writer closed during doPostCleanup
-    public static SSTableTxnWriter create(CFMetaData cfm,
+    public static SSTableTxnWriter create(TableMetadataRef metadata,
                                           Descriptor descriptor,
                                           long keyCount,
                                           long repairedAt,
@@ -143,8 +144,8 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
     {
         // if the column family store does not exist, we create a new default 
SSTableMultiWriter to use:
         LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
-        MetadataCollector collector = new 
MetadataCollector(cfm.comparator).sstableLevel(sstableLevel);
-        SSTableMultiWriter writer = 
SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, 
collector, header, indexes, txn);
+        MetadataCollector collector = new 
MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
+        SSTableMultiWriter writer = 
SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, metadata, 
collector, header, indexes, txn);
         return new SSTableTxnWriter(txn, writer);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 2217ae2..2d7d967 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -15,14 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.io.sstable;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.UUID;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -31,6 +28,8 @@ import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
 
 public class SimpleSSTableMultiWriter implements SSTableMultiWriter
 {
@@ -80,9 +79,9 @@ public class SimpleSSTableMultiWriter implements 
SSTableMultiWriter
         return writer.getFilePointer();
     }
 
-    public UUID getCfId()
+    public TableId getTableId()
     {
-        return writer.metadata.cfId;
+        return writer.metadata().id;
     }
 
     public Throwable commit(Throwable accumulate)
@@ -110,13 +109,13 @@ public class SimpleSSTableMultiWriter implements 
SSTableMultiWriter
     public static SSTableMultiWriter create(Descriptor descriptor,
                                             long keyCount,
                                             long repairedAt,
-                                            CFMetaData cfm,
+                                            TableMetadataRef metadata,
                                             MetadataCollector 
metadataCollector,
                                             SerializationHeader header,
                                             Collection<Index> indexes,
                                             LifecycleTransaction txn)
     {
-        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, 
repairedAt, cfm, metadataCollector, header, indexes, txn);
+        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, 
repairedAt, metadata, metadataCollector, header, indexes, txn);
         return new SimpleSSTableMultiWriter(writer, txn);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 89c064b..766a930 100644
--- 
a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ 
b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.UUID;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -32,6 +31,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -159,9 +159,9 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
     }
 
     @Override
-    public UUID getCfId()
+    public TableId getTableId()
     {
-        return currentWriter.getCfId();
+        return currentWriter.getTableId();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
index 29e29ef..38a7f57 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -18,7 +18,8 @@
 package org.apache.cassandra.io.sstable.format;
 
 import com.google.common.base.CharMatcher;
-import org.apache.cassandra.config.CFMetaData;
+
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
@@ -37,7 +38,7 @@ public interface SSTableFormat
     SSTableWriter.Factory getWriterFactory();
     SSTableReader.Factory getReaderFactory();
 
-    RowIndexEntry.IndexSerializer<?> getIndexSerializer(CFMetaData cfm, 
Version version, SerializationHeader header);
+    RowIndexEntry.IndexSerializer<?> getIndexSerializer(TableMetadata 
metadata, Version version, SerializationHeader header);
 
     public static enum Type
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index d446475..87e12eb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import com.google.common.primitives.Longs;
@@ -44,11 +43,8 @@ import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.EncodingStats;
@@ -56,7 +52,6 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.*;
@@ -65,7 +60,11 @@ import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.schema.CachingParams;
-import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.*;
@@ -233,8 +232,8 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
     protected final AtomicLong keyCacheHit = new AtomicLong(0);
     protected final AtomicLong keyCacheRequest = new AtomicLong(0);
 
-    private final InstanceTidier tidy = new InstanceTidier(descriptor, 
metadata);
-    private final Ref<SSTableReader> selfRef = new Ref<>(this, tidy);
+    private final InstanceTidier tidy;
+    private final Ref<SSTableReader> selfRef;
 
     private RestorableMeter readMeter;
 
@@ -355,32 +354,28 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
 
     public static SSTableReader open(Descriptor descriptor) throws IOException
     {
-        CFMetaData metadata;
+        TableMetadataRef metadata;
         if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
         {
             int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
-            String parentName = descriptor.cfname.substring(0, i);
             String indexName = descriptor.cfname.substring(i + 1);
-            CFMetaData parent = 
Schema.instance.getCFMetaData(descriptor.ksname, parentName);
-            IndexMetadata def = parent.getIndexes()
-                                      .get(indexName)
-                                      .orElseThrow(() -> new AssertionError(
-                                                                           
"Could not find index metadata for index cf " + i));
-            metadata = CassandraIndex.indexCfsMetadata(parent, def);
+            metadata = 
Schema.instance.getIndexTableMetadataRef(descriptor.ksname, indexName);
+            if (metadata == null)
+                throw new AssertionError("Could not find index metadata for 
index cf " + i);
         }
         else
         {
-            metadata = Schema.instance.getCFMetaData(descriptor.ksname, 
descriptor.cfname);
+            metadata = Schema.instance.getTableMetadataRef(descriptor.ksname, 
descriptor.cfname);
         }
         return open(descriptor, metadata);
     }
 
-    public static SSTableReader open(Descriptor desc, CFMetaData metadata) 
throws IOException
+    public static SSTableReader open(Descriptor desc, TableMetadataRef 
metadata) throws IOException
     {
         return open(desc, componentsFor(desc), metadata);
     }
 
-    public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, CFMetaData metadata) throws IOException
+    public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, TableMetadataRef metadata) throws IOException
     {
         return open(descriptor, components, metadata, true, true);
     }
@@ -392,7 +387,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
     }
 
     // use only for offline or "Standalone" operations
-    public static SSTableReader openNoValidation(Descriptor descriptor, 
CFMetaData metadata) throws IOException
+    public static SSTableReader openNoValidation(Descriptor descriptor, 
TableMetadataRef metadata) throws IOException
     {
         return open(descriptor, componentsFor(descriptor), metadata, false, 
false); // do not track hotness
     }
@@ -406,7 +401,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
      * @return opened SSTableReader
      * @throws IOException
      */
-    public static SSTableReader openForBatch(Descriptor descriptor, 
Set<Component> components, CFMetaData metadata) throws IOException
+    public static SSTableReader openForBatch(Descriptor descriptor, 
Set<Component> components, TableMetadataRef metadata) throws IOException
     {
         // Minimum components without which we can't do anything
         assert components.contains(Component.DATA) : "Data component is 
missing for sstable " + descriptor;
@@ -422,7 +417,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         // Check if sstable is created using same partitioner.
         // Partitioner can be null, which indicates older version of sstable 
or no stats available.
         // In that case, we skip the check.
-        String partitionerName = 
metadata.partitioner.getClass().getCanonicalName();
+        String partitionerName = 
metadata.get().partitioner.getClass().getCanonicalName();
         if (validationMetadata != null && 
!partitionerName.equals(validationMetadata.partitioner))
         {
             logger.error("Cannot open {}; partitioner {} does not match system 
partitioner {}.  Note that the default partitioner starting with Cassandra 1.2 
is Murmur3Partitioner, so you will need to edit that to match your old 
partitioner if upgrading.",
@@ -438,7 +433,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                                              System.currentTimeMillis(),
                                              statsMetadata,
                                              OpenReason.NORMAL,
-                                             header == null? null : 
header.toHeader(metadata));
+                                             header.toHeader(metadata.get()));
 
         try(FileHandle.Builder ibuilder = new 
FileHandle.Builder(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))
                                                      
.mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
@@ -461,10 +456,10 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
     }
 
     public static SSTableReader open(Descriptor descriptor,
-                                      Set<Component> components,
-                                      CFMetaData metadata,
-                                      boolean validate,
-                                      boolean trackHotness) throws IOException
+                                     Set<Component> components,
+                                     TableMetadataRef metadata,
+                                     boolean validate,
+                                     boolean trackHotness) throws IOException
     {
         // Minimum components without which we can't do anything
         assert components.contains(Component.DATA) : "Data component is 
missing for sstable " + descriptor;
@@ -483,7 +478,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         // Check if sstable is created using same partitioner.
         // Partitioner can be null, which indicates older version of sstable 
or no stats available.
         // In that case, we skip the check.
-        String partitionerName = 
metadata.partitioner.getClass().getCanonicalName();
+        String partitionerName = 
metadata.get().partitioner.getClass().getCanonicalName();
         if (validationMetadata != null && 
!partitionerName.equals(validationMetadata.partitioner))
         {
             logger.error("Cannot open {}; partitioner {} does not match system 
partitioner {}.  Note that the default partitioner starting with Cassandra 1.2 
is Murmur3Partitioner, so you will need to edit that to match your old 
partitioner if upgrading.",
@@ -499,7 +494,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                                              System.currentTimeMillis(),
                                              statsMetadata,
                                              OpenReason.NORMAL,
-                                             header == null ? null : 
header.toHeader(metadata));
+                                             header.toHeader(metadata.get()));
 
         try
         {
@@ -533,7 +528,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
     }
 
     public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, 
Set<Component>>> entries,
-                                                    final CFMetaData metadata)
+                                                    final TableMetadataRef 
metadata)
     {
         final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 
@@ -591,7 +586,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
      */
     public static SSTableReader internalOpen(Descriptor desc,
                                       Set<Component> components,
-                                      CFMetaData metadata,
+                                      TableMetadataRef metadata,
                                       FileHandle ifile,
                                       FileHandle dfile,
                                       IndexSummary isummary,
@@ -616,12 +611,12 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
 
 
     private static SSTableReader internalOpen(final Descriptor descriptor,
-                                            Set<Component> components,
-                                            CFMetaData metadata,
-                                            Long maxDataAge,
-                                            StatsMetadata sstableMetadata,
-                                            OpenReason openReason,
-                                            SerializationHeader header)
+                                              Set<Component> components,
+                                              TableMetadataRef metadata,
+                                              Long maxDataAge,
+                                              StatsMetadata sstableMetadata,
+                                              OpenReason openReason,
+                                              SerializationHeader header)
     {
         Factory readerFactory = descriptor.getFormat().getReaderFactory();
 
@@ -630,7 +625,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
 
     protected SSTableReader(final Descriptor desc,
                             Set<Component> components,
-                            CFMetaData metadata,
+                            TableMetadataRef metadata,
                             long maxDataAge,
                             StatsMetadata sstableMetadata,
                             OpenReason openReason,
@@ -641,7 +636,9 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         this.header = header;
         this.maxDataAge = maxDataAge;
         this.openReason = openReason;
-        this.rowIndexEntrySerializer = 
descriptor.version.getSSTableFormat().getIndexSerializer(metadata, 
desc.version, header);
+        this.rowIndexEntrySerializer = 
descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), 
desc.version, header);
+        tidy = new InstanceTidier(descriptor, metadata.id);
+        selfRef = new Ref<>(this, tidy);
     }
 
     public static long getTotalBytes(Iterable<SSTableReader> sstables)
@@ -682,7 +679,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         // e.g. by BulkLoader, which does not initialize the cache.  As a 
kludge, we set up the cache
         // here when we know we're being wired into the rest of the server 
infrastructure.
         keyCache = CacheService.instance.keyCache;
-        final ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+        final ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(metadata().id);
         if (cfs != null)
             setCrcCheckChance(cfs.getCrcCheckChance());
     }
@@ -694,7 +691,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
 
     private void load(ValidationMetadata validation) throws IOException
     {
-        if (metadata.params.bloomFilterFpChance == 1.0)
+        if (metadata().params.bloomFilterFpChance == 1.0)
         {
             // bf is disabled.
             load(false, true);
@@ -711,7 +708,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
             // bf is enabled, but filter component is missing.
             load(true, true);
         }
-        else if (validation.bloomFilterFPChance != 
metadata.params.bloomFilterFpChance)
+        else if (validation.bloomFilterFPChance != 
metadata().params.bloomFilterFpChance)
         {
             // bf fp chance in sstable metadata and it has changed since 
compaction.
             load(true, true);
@@ -819,9 +816,9 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                     : estimateRowsFromIndex(primaryIndex); // statistics is 
supposed to be optional
 
             if (recreateBloomFilter)
-                bf = FilterFactory.getFilter(estimatedKeys, 
metadata.params.bloomFilterFpChance, true);
+                bf = FilterFactory.getFilter(estimatedKeys, 
metadata().params.bloomFilterFpChance, true);
 
-            try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : 
new IndexSummaryBuilder(estimatedKeys, metadata.params.minIndexInterval, 
samplingLevel))
+            try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : 
new IndexSummaryBuilder(estimatedKeys, metadata().params.minIndexInterval, 
samplingLevel))
             {
                 long indexPosition;
 
@@ -871,6 +868,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         DataInputStream iStream = null;
         try
         {
+            TableMetadata metadata = metadata();
             iStream = new DataInputStream(new FileInputStream(summariesFile));
             indexSummary = IndexSummary.serializer.deserialize(
                     iStream, getPartitioner(),
@@ -1100,8 +1098,8 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         {
             assert openReason != OpenReason.EARLY;
 
-            int minIndexInterval = metadata.params.minIndexInterval;
-            int maxIndexInterval = metadata.params.maxIndexInterval;
+            int minIndexInterval = metadata().params.minIndexInterval;
+            int maxIndexInterval = metadata().params.maxIndexInterval;
             double effectiveInterval = 
indexSummary.getEffectiveIndexInterval();
 
             IndexSummary newSummary;
@@ -1146,7 +1144,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         try
         {
             long indexSize = primaryIndex.length();
-            try (IndexSummaryBuilder summaryBuilder = new 
IndexSummaryBuilder(estimatedKeys(), metadata.params.minIndexInterval, 
newSamplingLevel))
+            try (IndexSummaryBuilder summaryBuilder = new 
IndexSummaryBuilder(estimatedKeys(), metadata().params.minIndexInterval, 
newSamplingLevel))
             {
                 long indexPosition;
                 while ((indexPosition = primaryIndex.getFilePointer()) != 
indexSize)
@@ -1467,28 +1465,24 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
 
     public KeyCacheKey getCacheKey(DecoratedKey key)
     {
-        return new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey());
+        return new KeyCacheKey(metadata(), descriptor, key.getKey());
     }
 
     public void cacheKey(DecoratedKey key, RowIndexEntry info)
     {
-        CachingParams caching = metadata.params.caching;
+        CachingParams caching = metadata().params.caching;
 
         if (!caching.cacheKeys() || keyCache == null || keyCache.getCapacity() 
== 0)
             return;
 
-        KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, 
descriptor, key.getKey());
+        KeyCacheKey cacheKey = new KeyCacheKey(metadata(), descriptor, 
key.getKey());
         logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
         keyCache.put(cacheKey, info);
     }
 
     public RowIndexEntry getCachedPosition(DecoratedKey key, boolean 
updateStats)
     {
-        if (keyCacheEnabled())
-        {
-            return getCachedPosition(getCacheKey(key), updateStats);
-        }
-        return null;
+        return getCachedPosition(new KeyCacheKey(metadata(), descriptor, 
key.getKey()), updateStats);
     }
 
     protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean 
updateStats)
@@ -1516,7 +1510,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
 
     private boolean keyCacheEnabled()
     {
-        return keyCache != null && keyCache.getCapacity() > 0 && 
metadata.params.caching.cacheKeys();
+        return keyCache != null && keyCache.getCapacity() > 0 && 
metadata().params.caching.cacheKeys();
     }
 
     /**
@@ -2003,7 +1997,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
 
     private int compare(List<ByteBuffer> values1, List<ByteBuffer> values2)
     {
-        ClusteringComparator comparator = metadata.comparator;
+        ClusteringComparator comparator = metadata().comparator;
         for (int i = 0; i < Math.min(values1.size(), values2.size()); i++)
         {
             int cmp = comparator.subtype(i).compare(values1.get(i), 
values2.get(i));
@@ -2070,7 +2064,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
     private static final class InstanceTidier implements Tidy
     {
         private final Descriptor descriptor;
-        private final CFMetaData metadata;
+        private final TableId tableId;
         private IFilter bf;
         private IndexSummary summary;
 
@@ -2100,10 +2094,10 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                 global.ensureReadMeter();
         }
 
-        InstanceTidier(Descriptor descriptor, CFMetaData metadata)
+        InstanceTidier(Descriptor descriptor, TableId tableId)
         {
             this.descriptor = descriptor;
-            this.metadata = metadata;
+            this.tableId = tableId;
         }
 
         public void tidy()
@@ -2115,7 +2109,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
             if (!setup)
                 return;
 
-            final ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+            final ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(tableId);
             final OpOrder.Barrier barrier;
             if (cfs != null)
             {
@@ -2285,7 +2279,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
     {
         public abstract SSTableReader open(final Descriptor descriptor,
                                            Set<Component> components,
-                                           CFMetaData metadata,
+                                           TableMetadataRef metadata,
                                            Long maxDataAge,
                                            StatsMetadata sstableMetadata,
                                            OpenReason openReason,

Reply via email to