http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java 
b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 340e237..0c67b82 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -24,10 +24,12 @@ import java.util.*;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -235,8 +237,24 @@ public interface ClusteringPrefix extends 
IMeasurableMemory, Clusterable
      * @param metadata the metadata for the table the clustering prefix is of.
      * @return a human-readable string representation fo this prefix.
      */
-    public String toString(CFMetaData metadata);
+    public String toString(TableMetadata metadata);
 
+    /*
+     * TODO: we should stop using Clustering for partition keys. Maybe we can 
add
+     * a few methods to DecoratedKey so we don't have to (note that while 
using a Clustering
+     * allows to use buildBound(), it's actually used for partition keys only 
when every restriction
+     * is an equal, so we could easily create a specific method for keys for 
that.
+     */
+    default ByteBuffer serializeAsPartitionKey()
+    {
+        if (size() == 1)
+            return get(0);
+
+        ByteBuffer[] values = new ByteBuffer[size()];
+        for (int i = 0; i < size(); i++)
+            values[i] = get(i);
+        return CompositeType.build(values);
+    }
     /**
      * The values of this prefix as an array.
      * <p>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f712935..4aa2d3e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -21,6 +21,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.management.ManagementFactory;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.*;
@@ -65,7 +67,6 @@ import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.*;
-import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.TableMetrics;
@@ -142,6 +143,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                                                                                
           "internal");
 
     private static final ExecutorService [] perDiskflushExecutors = new 
ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length];
+
     static
     {
         for (int i = 0; i < 
DatabaseDescriptor.getAllDataFileLocations().length; i++)
@@ -208,7 +210,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public final Keyspace keyspace;
     public final String name;
-    public final CFMetaData metadata;
+    public final TableMetadataRef metadata;
     private final String mbeanName;
     @Deprecated
     private final String oldMBeanName;
@@ -261,15 +263,15 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         // only update these runtime-modifiable settings if they have not been 
modified.
         if (!minCompactionThreshold.isModified())
             for (ColumnFamilyStore cfs : concatWithIndexes())
-                cfs.minCompactionThreshold = new 
DefaultValue(metadata.params.compaction.minCompactionThreshold());
+                cfs.minCompactionThreshold = new 
DefaultValue(metadata().params.compaction.minCompactionThreshold());
         if (!maxCompactionThreshold.isModified())
             for (ColumnFamilyStore cfs : concatWithIndexes())
-                cfs.maxCompactionThreshold = new 
DefaultValue(metadata.params.compaction.maxCompactionThreshold());
+                cfs.maxCompactionThreshold = new 
DefaultValue(metadata().params.compaction.maxCompactionThreshold());
         if (!crcCheckChance.isModified())
             for (ColumnFamilyStore cfs : concatWithIndexes())
-                cfs.crcCheckChance = new 
DefaultValue(metadata.params.crcCheckChance);
+                cfs.crcCheckChance = new 
DefaultValue(metadata().params.crcCheckChance);
 
-        compactionStrategyManager.maybeReload(metadata);
+        compactionStrategyManager.maybeReload(metadata());
         directories = compactionStrategyManager.getDirectories();
 
         scheduleFlush();
@@ -278,13 +280,13 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
         // If the CF comparator has changed, we need to change the memtable,
         // because the old one still aliases the previous comparator.
-        if (data.getView().getCurrentMemtable().initialComparator != 
metadata.comparator)
+        if (data.getView().getCurrentMemtable().initialComparator != 
metadata().comparator)
             switchMemtable();
     }
 
     void scheduleFlush()
     {
-        int period = metadata.params.memtableFlushPeriodInMs;
+        int period = metadata().params.memtableFlushPeriodInMs;
         if (period > 0)
         {
             logger.trace("scheduling flush in {} ms", period);
@@ -329,9 +331,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         };
     }
 
-    public void setCompactionParametersJson(String options)
+    public Map<String, String> getCompactionParameters()
     {
-        setCompactionParameters(FBUtilities.fromJsonMap(options));
+        return compactionStrategyManager.getCompactionParams().asMap();
     }
 
     public String getCompactionParametersJson()
@@ -355,22 +357,28 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
-    public Map<String, String> getCompactionParameters()
+    public void setCompactionParametersJson(String options)
     {
-        return compactionStrategyManager.getCompactionParams().asMap();
+        setCompactionParameters(FBUtilities.fromJsonMap(options));
     }
 
     public Map<String,String> getCompressionParameters()
     {
-        return metadata.params.compression.asMap();
+        return metadata().params.compression.asMap();
+    }
+
+    public String getCompressionParametersJson()
+    {
+        return FBUtilities.json(getCompressionParameters());
     }
 
     public void setCompressionParameters(Map<String,String> opts)
     {
         try
         {
-            metadata.compression(CompressionParams.fromMap(opts));
-            metadata.params.compression.validate();
+            CompressionParams params = CompressionParams.fromMap(opts);
+            params.validate();
+            throw new UnsupportedOperationException(); // TODO FIXME 
CASSANDRA-12949
         }
         catch (ConfigurationException e)
         {
@@ -378,15 +386,20 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
+    public void setCompressionParametersJson(String options)
+    {
+        setCompressionParameters(FBUtilities.fromJsonMap(options));
+    }
+
     @VisibleForTesting
     public ColumnFamilyStore(Keyspace keyspace,
-                              String columnFamilyName,
-                              int generation,
-                              CFMetaData metadata,
-                              Directories directories,
-                              boolean loadSSTables,
-                              boolean registerBookeeping,
-                              boolean offline)
+                             String columnFamilyName,
+                             int generation,
+                             TableMetadataRef metadata,
+                             Directories directories,
+                             boolean loadSSTables,
+                             boolean registerBookeeping,
+                             boolean offline)
     {
         assert directories != null;
         assert metadata != null : "null metadata for " + keyspace + ":" + 
columnFamilyName;
@@ -394,11 +407,11 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         this.keyspace = keyspace;
         this.metadata = metadata;
         name = columnFamilyName;
-        minCompactionThreshold = new 
DefaultValue<>(metadata.params.compaction.minCompactionThreshold());
-        maxCompactionThreshold = new 
DefaultValue<>(metadata.params.compaction.maxCompactionThreshold());
-        crcCheckChance = new DefaultValue<>(metadata.params.crcCheckChance);
+        minCompactionThreshold = new 
DefaultValue<>(metadata.get().params.compaction.minCompactionThreshold());
+        maxCompactionThreshold = new 
DefaultValue<>(metadata.get().params.compaction.maxCompactionThreshold());
+        crcCheckChance = new 
DefaultValue<>(metadata.get().params.crcCheckChance);
         indexManager = new SecondaryIndexManager(this);
-        viewManager = keyspace.viewManager.forTable(metadata);
+        viewManager = keyspace.viewManager.forTable(metadata.id);
         metric = new TableMetrics(this);
         fileIndexGenerator.set(generation);
         sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
@@ -426,7 +439,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         if (offline)
             this.directories = directories;
         else
-            this.directories = new Directories(metadata, 
Directories.dataDirectories);
+            this.directories = new Directories(metadata.get(), 
Directories.dataDirectories);
 
 
         // compaction strategy should be created after the CFS has been 
prepared
@@ -442,7 +455,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
 
         // create the private ColumnFamilyStores for the secondary column 
indexes
-        for (IndexMetadata info : metadata.getIndexes())
+        for (IndexMetadata info : metadata.get().indexes)
             indexManager.addIndex(info);
 
         if (registerBookeeping)
@@ -467,12 +480,12 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             {
                 throw new RuntimeException(e);
             }
-            logger.trace("retryPolicy for {} is {}", name, 
this.metadata.params.speculativeRetry);
+            logger.trace("retryPolicy for {} is {}", name, 
this.metadata.get().params.speculativeRetry);
             latencyCalculator = 
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
             {
                 public void run()
                 {
-                    SpeculativeRetryParam retryPolicy = 
ColumnFamilyStore.this.metadata.params.speculativeRetry;
+                    SpeculativeRetryParam retryPolicy = 
ColumnFamilyStore.this.metadata.get().params.speculativeRetry;
                     switch (retryPolicy.kind())
                     {
                         case PERCENTILE:
@@ -497,6 +510,11 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
+    public TableMetadata metadata()
+    {
+        return metadata.get();
+    }
+
     public Directories getDirectories()
     {
         return directories;
@@ -504,7 +522,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, 
LifecycleTransaction txn)
     {
-        MetadataCollector collector = new 
MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
+        MetadataCollector collector = new 
MetadataCollector(metadata().comparator).sstableLevel(sstableLevel);
         return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
collector, header, txn);
     }
 
@@ -545,7 +563,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
         latencyCalculator.cancel(false);
         compactionStrategyManager.shutdown();
-        SystemKeyspace.removeTruncationRecord(metadata.cfId);
+        SystemKeyspace.removeTruncationRecord(metadata.id);
 
         data.dropSSTables();
         LifecycleTransaction.waitForDeletions();
@@ -578,24 +596,24 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     }
 
 
-    public static ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, 
CFMetaData metadata, boolean loadSSTables)
+    public static ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, 
TableMetadataRef metadata, boolean loadSSTables)
     {
-        return createColumnFamilyStore(keyspace, metadata.cfName, metadata, 
loadSSTables);
+        return createColumnFamilyStore(keyspace, metadata.name, metadata, 
loadSSTables);
     }
 
     public static synchronized ColumnFamilyStore 
createColumnFamilyStore(Keyspace keyspace,
                                                                          
String columnFamily,
-                                                                         
CFMetaData metadata,
+                                                                         
TableMetadataRef metadata,
                                                                          
boolean loadSSTables)
     {
-        Directories directories = new Directories(metadata, 
initialDirectories);
+        Directories directories = new Directories(metadata.get(), 
initialDirectories);
         return createColumnFamilyStore(keyspace, columnFamily, metadata, 
directories, loadSSTables, true, false);
     }
 
     /** This is only directly used by offline tools */
     public static synchronized ColumnFamilyStore 
createColumnFamilyStore(Keyspace keyspace,
                                                                          
String columnFamily,
-                                                                         
CFMetaData metadata,
+                                                                         
TableMetadataRef metadata,
                                                                          
Directories directories,
                                                                          
boolean loadSSTables,
                                                                          
boolean registerBookkeeping,
@@ -622,7 +640,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * Removes unnecessary files from the cf directory at startup: these 
include temp files, orphans, zero-length files
      * and compacted sstables. Files that cannot be recognized will be ignored.
      */
-    public static void  scrubDataDirectories(CFMetaData metadata) throws 
StartupException
+    public static void  scrubDataDirectories(TableMetadata metadata) throws 
StartupException
     {
         Directories directories = new Directories(metadata, 
initialDirectories);
         Set<File> cleanedDirectories = new HashSet<>();
@@ -632,15 +650,15 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
         directories.removeTemporaryDirectories();
 
-        logger.trace("Removing temporary or obsoleted files from unfinished 
operations for table {}", metadata.cfName);
+        logger.trace("Removing temporary or obsoleted files from unfinished 
operations for table {}", metadata.name);
         if (!LifecycleTransaction.removeUnfinishedLeftovers(metadata))
             throw new StartupException(StartupException.ERR_WRONG_DISK_STATE,
-                                       String.format("Cannot remove temporary 
or obsoleted files for %s.%s due to a problem with transaction " +
+                                       String.format("Cannot remove temporary 
or obsoleted files for %s due to a problem with transaction " +
                                                      "log files. Please check 
records with problems in the log messages above and fix them. " +
                                                      "Refer to the 3.0 
upgrading instructions in NEWS.txt " +
-                                                     "for a description of 
transaction log files.", metadata.ksName, metadata.cfName));
+                                                     "for a description of 
transaction log files.", metadata.toString()));
 
-        logger.trace("Further extra check for orphan sstable files for {}", 
metadata.cfName);
+        logger.trace("Further extra check for orphan sstable files for {}", 
metadata.name);
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : 
directories.sstableLister(Directories.OnTxnErr.IGNORE).list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
@@ -670,7 +688,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
 
         // cleanup incomplete saved caches
-        Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + "-" + 
metadata.cfName + "-(Key|Row)Cache.*\\.tmp$");
+        Pattern tmpCacheFilePattern = Pattern.compile(metadata.keyspace + "-" 
+ metadata.name + "-(Key|Row)Cache.*\\.tmp$");
         File dir = new File(DatabaseDescriptor.getSavedCachesLocation());
 
         if (dir.exists())
@@ -683,10 +701,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
 
         // also clean out any index leftovers.
-        for (IndexMetadata index : metadata.getIndexes())
+        for (IndexMetadata index : metadata.indexes)
             if (!index.isCustom())
             {
-                CFMetaData indexMetadata = 
CassandraIndex.indexCfsMetadata(metadata, index);
+                TableMetadata indexMetadata = 
CassandraIndex.indexCfsMetadata(metadata, index);
                 scrubDataDirectories(indexMetadata);
             }
     }
@@ -790,7 +808,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public void rebuildSecondaryIndex(String idxName)
     {
-        rebuildSecondaryIndex(keyspace.getName(), metadata.cfName, idxName);
+        rebuildSecondaryIndex(keyspace.getName(), metadata.name, idxName);
     }
 
     public static void rebuildSecondaryIndex(String ksName, String cfName, 
String... idxNames)
@@ -807,6 +825,20 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
+    public AbstractCompactionStrategy 
createCompactionStrategyInstance(CompactionParams compactionParams)
+    {
+        try
+        {
+            Constructor<? extends AbstractCompactionStrategy> constructor =
+                
compactionParams.klass().getConstructor(ColumnFamilyStore.class, Map.class);
+            return constructor.newInstance(this, compactionParams.options());
+        }
+        catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Deprecated
     public String getColumnFamilyName()
     {
@@ -994,7 +1026,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             {
                 Memtable memtable = memtables.get(0);
                 commitLogUpperBound = memtable.getCommitLogUpperBound();
-                CommitLog.instance.discardCompletedSegments(metadata.cfId, 
memtable.getCommitLogLowerBound(), commitLogUpperBound);
+                CommitLog.instance.discardCompletedSegments(metadata.id, 
memtable.getCommitLogLowerBound(), commitLogUpperBound);
             }
 
             metric.pendingFlushes.dec();
@@ -1314,7 +1346,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             DecoratedKey key = update.partitionKey();
             invalidateCachedPartition(key);
             metric.samplers.get(Sampler.WRITES).addSample(key.getKey(), 
key.hashCode(), 1);
-            StorageHook.instance.reportWrite(metadata.cfId, update);
+            StorageHook.instance.reportWrite(metadata.id, update);
             metric.writeLatency.addNano(System.nanoTime() - start);
             // CASSANDRA-11117 - certain resolution paths on memtable put can 
result in very
             // large time deltas, either through a variety of sentinel 
timestamps (used for empty values, ensuring
@@ -1607,7 +1639,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         // what we're caching. Wen doing that, we should be careful about 
expiring cells: we should count
         // something expired that wasn't when the partition was cached, or we 
could decide that the whole
         // partition is cached when it's not. This is why we use 
CachedPartition#cachedLiveRows.
-        if (cached.cachedLiveRows() < 
metadata.params.caching.rowsPerPartitionToCache())
+        if (cached.cachedLiveRows() < 
metadata().params.caching.rowsPerPartitionToCache())
             return true;
 
         // If the whole partition isn't cached, then we must guarantee that 
the filter cannot select data that
@@ -1619,7 +1651,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public int gcBefore(int nowInSec)
     {
-        return nowInSec - metadata.params.gcGraceSeconds;
+        return nowInSec - metadata().params.gcGraceSeconds;
     }
 
     @SuppressWarnings("resource")
@@ -1664,7 +1696,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public List<String> getSSTablesForKey(String key, boolean hexFormat)
     {
-        ByteBuffer keyBuffer = hexFormat ? ByteBufferUtil.hexToBytes(key) : 
metadata.getKeyValidator().fromString(key);
+        ByteBuffer keyBuffer = hexFormat ? ByteBufferUtil.hexToBytes(key) : 
metadata().partitionKeyType.fromString(key);
         DecoratedKey dk = decorateKey(keyBuffer);
         try (OpOrder.Group op = readOrdering.start())
         {
@@ -1699,7 +1731,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                     ByteBufferUtil.bytesToHex(key), // raw
                     counter.getCount(),  // count
                     counter.getError(),  // error
-                    metadata.getKeyValidator().getString(key) })); // string
+                    metadata().partitionKeyType.getString(key) })); // string
         }
         return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new 
Object[]{
                 samplerResults.cardinality, result});
@@ -1724,18 +1756,18 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         {
             RowCacheKey key = keyIter.next();
             DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key));
-            if (key.ksAndCFName.equals(metadata.ksAndCFName) && 
!Range.isInRanges(dk.getToken(), ranges))
+            if (key.sameTable(metadata()) && !Range.isInRanges(dk.getToken(), 
ranges))
                 invalidateCachedPartition(dk);
         }
 
-        if (metadata.isCounter())
+        if (metadata().isCounter())
         {
             for (Iterator<CounterCacheKey> keyIter = 
CacheService.instance.counterCache.keyIterator();
                  keyIter.hasNext(); )
             {
                 CounterCacheKey key = keyIter.next();
                 DecoratedKey dk = decorateKey(key.partitionKey());
-                if (key.ksAndCFName.equals(metadata.ksAndCFName) && 
!Range.isInRanges(dk.getToken(), ranges))
+                if (key.sameTable(metadata()) && 
!Range.isInRanges(dk.getToken(), ranges))
                     CacheService.instance.counterCache.remove(key);
             }
         }
@@ -1743,7 +1775,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public ClusteringComparator getComparator()
     {
-        return metadata.comparator;
+        return metadata().comparator;
     }
 
     public void snapshotWithoutFlush(String snapshotName)
@@ -1774,7 +1806,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 }
 
                 writeSnapshotManifest(filesJSONArr, snapshotName);
-                if 
(!SchemaConstants.SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName) && 
!SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName))
+                if 
(!SchemaConstants.SYSTEM_KEYSPACE_NAMES.contains(metadata.keyspace) && 
!SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(metadata.keyspace))
                     writeSnapshotSchema(snapshotName);
             }
         }
@@ -1816,7 +1848,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
             try (PrintStream out = new PrintStream(schemaFile))
             {
-                for (String s: 
ColumnFamilyStoreCQLHelper.dumpReCreateStatements(metadata))
+                for (String s: 
ColumnFamilyStoreCQLHelper.dumpReCreateStatements(metadata()))
                     out.println(s);
             }
         }
@@ -1974,16 +2006,16 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     {
         if (!isRowCacheEnabled())
             return null;
-        IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new 
RowCacheKey(metadata.ksAndCFName, key));
+        IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new 
RowCacheKey(metadata(), key));
         return cached == null || cached instanceof RowCacheSentinel ? null : 
(CachedPartition)cached;
     }
 
     private void invalidateCaches()
     {
-        CacheService.instance.invalidateKeyCacheForCf(metadata.ksAndCFName);
-        CacheService.instance.invalidateRowCacheForCf(metadata.ksAndCFName);
-        if (metadata.isCounter())
-            
CacheService.instance.invalidateCounterCacheForCf(metadata.ksAndCFName);
+        CacheService.instance.invalidateKeyCacheForCf(metadata());
+        CacheService.instance.invalidateRowCacheForCf(metadata());
+        if (metadata().isCounter())
+            CacheService.instance.invalidateCounterCacheForCf(metadata());
     }
 
     public int invalidateRowCache(Collection<Bounds<Token>> boundsToInvalidate)
@@ -1994,7 +2026,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         {
             RowCacheKey key = keyIter.next();
             DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key));
-            if (key.ksAndCFName.equals(metadata.ksAndCFName) && 
Bounds.isInBounds(dk.getToken(), boundsToInvalidate))
+            if (key.sameTable(metadata()) && Bounds.isInBounds(dk.getToken(), 
boundsToInvalidate))
             {
                 invalidateCachedPartition(dk);
                 invalidatedKeys++;
@@ -2011,7 +2043,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         {
             CounterCacheKey key = keyIter.next();
             DecoratedKey dk = decorateKey(key.partitionKey());
-            if (key.ksAndCFName.equals(metadata.ksAndCFName) && 
Bounds.isInBounds(dk.getToken(), boundsToInvalidate))
+            if (key.sameTable(metadata()) && Bounds.isInBounds(dk.getToken(), 
boundsToInvalidate))
             {
                 CacheService.instance.counterCache.remove(key);
                 invalidatedKeys++;
@@ -2025,7 +2057,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      */
     public boolean containsCachedParition(DecoratedKey key)
     {
-        return CacheService.instance.rowCache.getCapacity() != 0 && 
CacheService.instance.rowCache.containsKey(new 
RowCacheKey(metadata.ksAndCFName, key));
+        return CacheService.instance.rowCache.getCapacity() != 0 && 
CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata(), key));
     }
 
     public void invalidateCachedPartition(RowCacheKey key)
@@ -2038,21 +2070,21 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         if (!isRowCacheEnabled())
             return;
 
-        invalidateCachedPartition(new RowCacheKey(metadata.ksAndCFName, key));
+        invalidateCachedPartition(new RowCacheKey(metadata(), key));
     }
 
-    public ClockAndCount getCachedCounter(ByteBuffer partitionKey, Clustering 
clustering, ColumnDefinition column, CellPath path)
+    public ClockAndCount getCachedCounter(ByteBuffer partitionKey, Clustering 
clustering, ColumnMetadata column, CellPath path)
     {
         if (CacheService.instance.counterCache.getCapacity() == 0L) // counter 
cache disabled.
             return null;
-        return 
CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.ksAndCFName,
 partitionKey, clustering, column, path));
+        return 
CacheService.instance.counterCache.get(CounterCacheKey.create(metadata(), 
partitionKey, clustering, column, path));
     }
 
-    public void putCachedCounter(ByteBuffer partitionKey, Clustering 
clustering, ColumnDefinition column, CellPath path, ClockAndCount clockAndCount)
+    public void putCachedCounter(ByteBuffer partitionKey, Clustering 
clustering, ColumnMetadata column, CellPath path, ClockAndCount clockAndCount)
     {
         if (CacheService.instance.counterCache.getCapacity() == 0L) // counter 
cache disabled.
             return;
-        
CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName,
 partitionKey, clustering, column, path), clockAndCount);
+        
CacheService.instance.counterCache.put(CounterCacheKey.create(metadata(), 
partitionKey, clustering, column, path), clockAndCount);
     }
 
     public void forceMajorCompaction() throws InterruptedException, 
ExecutionException
@@ -2219,7 +2251,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         // and so we only run one major compaction at a time
         synchronized (this)
         {
-            logger.trace("Cancelling in-progress compactions for {}", 
metadata.cfName);
+            logger.trace("Cancelling in-progress compactions for {}", 
metadata.name);
 
             Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
                                                                ? 
Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs())
@@ -2238,7 +2270,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 {
                     if (!cfs.getTracker().getCompacting().isEmpty())
                     {
-                        logger.warn("Unable to cancel in-progress compactions 
for {}.  Perhaps there is an unusually large row in progress somewhere, or the 
system is simply overloaded.", metadata.cfName);
+                        logger.warn("Unable to cancel in-progress compactions 
for {}.  Perhaps there is an unusually large row in progress somewhere, or the 
system is simply overloaded.", metadata.name);
                         return null;
                     }
                 }
@@ -2439,18 +2471,18 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public IPartitioner getPartitioner()
     {
-        return metadata.partitioner;
+        return metadata().partitioner;
     }
 
     public DecoratedKey decorateKey(ByteBuffer key)
     {
-        return metadata.decorateKey(key);
+        return getPartitioner().decorateKey(key);
     }
 
     /** true if this CFS contains secondary index data */
     public boolean isIndex()
     {
-        return metadata.isIndex();
+        return metadata().isIndex();
     }
 
     public Iterable<ColumnFamilyStore> concatWithIndexes()
@@ -2521,19 +2553,19 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     public boolean isRowCacheEnabled()
     {
 
-        boolean retval = metadata.params.caching.cacheRows() && 
CacheService.instance.rowCache.getCapacity() > 0;
+        boolean retval = metadata().params.caching.cacheRows() && 
CacheService.instance.rowCache.getCapacity() > 0;
         assert(!retval || !isIndex());
         return retval;
     }
 
     public boolean isCounterCacheEnabled()
     {
-        return metadata.isCounter() && 
CacheService.instance.counterCache.getCapacity() > 0;
+        return metadata().isCounter() && 
CacheService.instance.counterCache.getCapacity() > 0;
     }
 
     public boolean isKeyCacheEnabled()
     {
-        return metadata.params.caching.cacheKeys() && 
CacheService.instance.keyCache.getCapacity() > 0;
+        return metadata().params.caching.cacheKeys() && 
CacheService.instance.keyCache.getCapacity() > 0;
     }
 
     /**
@@ -2568,7 +2600,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
         for (SSTableReader sstable : getSSTables(SSTableSet.LIVE))
         {
-            allDroppable += sstable.getDroppableTombstonesBefore(localTime - 
sstable.metadata.params.gcGraceSeconds);
+            allDroppable += sstable.getDroppableTombstonesBefore(localTime - 
metadata().params.gcGraceSeconds);
             allColumns += sstable.getEstimatedColumnCount().mean() * 
sstable.getEstimatedColumnCount().count();
         }
         return allColumns > 0 ? allDroppable / allColumns : 0;
@@ -2586,20 +2618,22 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     }
 
     /**
-     * Returns a ColumnFamilyStore by cfId if it exists, null otherwise
+     * Returns a ColumnFamilyStore by id if it exists, null otherwise
      * Differently from others, this method does not throw exception if the 
table does not exist.
      */
-    public static ColumnFamilyStore getIfExists(UUID cfId)
+    public static ColumnFamilyStore getIfExists(TableId id)
     {
-        Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        TableMetadata metadata = Schema.instance.getTableMetadata(id);
+        if (metadata == null)
             return null;
 
-        Keyspace keyspace = Keyspace.open(kscf.left);
+        Keyspace keyspace = Keyspace.open(metadata.keyspace);
         if (keyspace == null)
             return null;
 
-        return keyspace.getColumnFamilyStore(cfId);
+        return keyspace.hasColumnFamilyStore(id)
+             ? keyspace.getColumnFamilyStore(id)
+             : null;
     }
 
     /**
@@ -2615,10 +2649,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         if (keyspace == null)
             return null;
 
-        UUID id = Schema.instance.getId(ksName, cfName);
-        if (id == null)
+        TableMetadata table = Schema.instance.getTableMetadata(ksName, cfName);
+        if (table == null)
             return null;
 
-        return keyspace.getColumnFamilyStore(id);
+        return keyspace.getColumnFamilyStore(table.id);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ColumnFamilyStoreCQLHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreCQLHelper.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStoreCQLHelper.java
index 54c8117..d79fa0f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreCQLHelper.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreCQLHelper.java
@@ -26,25 +26,23 @@ import java.util.function.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.utils.*;
 
 /**
- * Helper methods to represent CFMetadata and related objects in CQL format
+ * Helper methods to represent TableMetadata and related objects in CQL format
  */
 public class ColumnFamilyStoreCQLHelper
 {
-    public static List<String> dumpReCreateStatements(CFMetaData metadata)
+    public static List<String> dumpReCreateStatements(TableMetadata metadata)
     {
         List<String> l = new ArrayList<>();
         // Types come first, as table can't be created without them
         l.addAll(ColumnFamilyStoreCQLHelper.getUserTypesAsCQL(metadata));
         // Record re-create schema statements
-        l.add(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(metadata, true));
+        l.add(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(metadata, 
true));
         // Dropped columns (and re-additions)
         l.addAll(ColumnFamilyStoreCQLHelper.getDroppedColumnsAsCQL(metadata));
         // Indexes applied as last, since otherwise they may interfere with 
column drops / re-additions
@@ -52,35 +50,35 @@ public class ColumnFamilyStoreCQLHelper
         return l;
     }
 
-    private static List<ColumnDefinition> getClusteringColumns(CFMetaData 
metadata)
+    private static List<ColumnMetadata> getClusteringColumns(TableMetadata 
metadata)
     {
-        List<ColumnDefinition> cds = new 
ArrayList<>(metadata.clusteringColumns().size());
+        List<ColumnMetadata> cds = new 
ArrayList<>(metadata.clusteringColumns().size());
 
         if (!metadata.isStaticCompactTable())
-            for (ColumnDefinition cd : metadata.clusteringColumns())
+            for (ColumnMetadata cd : metadata.clusteringColumns())
                 cds.add(cd);
 
         return cds;
     }
 
-    private static List<ColumnDefinition> getPartitionColumns(CFMetaData 
metadata)
+    private static List<ColumnMetadata> getPartitionColumns(TableMetadata 
metadata)
     {
-        List<ColumnDefinition> cds = new 
ArrayList<>(metadata.partitionColumns().size());
+        List<ColumnMetadata> cds = new 
ArrayList<>(metadata.regularAndStaticColumns().size());
 
-        for (ColumnDefinition cd : metadata.partitionColumns().statics)
+        for (ColumnMetadata cd : metadata.staticColumns())
             cds.add(cd);
 
         if (metadata.isDense())
         {
             // remove an empty type
-            for (ColumnDefinition cd : 
metadata.partitionColumns().withoutStatics())
+            for (ColumnMetadata cd : metadata.regularColumns())
                 if (!cd.type.equals(EmptyType.instance))
                     cds.add(cd);
         }
         // "regular" columns are not exposed for static compact tables
         else if (!metadata.isStaticCompactTable())
         {
-            for (ColumnDefinition cd : 
metadata.partitionColumns().withoutStatics())
+            for (ColumnMetadata cd : metadata.regularColumns())
                 cds.add(cd);
         }
 
@@ -91,28 +89,27 @@ public class ColumnFamilyStoreCQLHelper
      * Build a CQL String representation of Column Family Metadata
      */
     @VisibleForTesting
-    public static String getCFMetadataAsCQL(CFMetaData metadata, boolean 
includeDroppedColumns)
+    public static String getTableMetadataAsCQL(TableMetadata metadata, boolean 
includeDroppedColumns)
     {
         StringBuilder sb = new StringBuilder();
         if (!isCqlCompatible(metadata))
         {
-            sb.append(String.format("/*\nWarning: Table %s.%s omitted because 
it has constructs not compatible with CQL (was created via legacy API).\n",
-                                    metadata.ksName,
-                                    metadata.cfName));
+            sb.append(String.format("/*\nWarning: Table %s omitted because it 
has constructs not compatible with CQL (was created via legacy API).\n",
+                                    metadata.toString()));
             sb.append("\nApproximate structure, for reference:");
             sb.append("\n(this should not be used to reproduce this 
schema)\n\n");
         }
 
         sb.append("CREATE TABLE IF NOT EXISTS ");
-        
sb.append(quoteIdentifier(metadata.ksName)).append('.').append(quoteIdentifier(metadata.cfName)).append("
 (");
+        sb.append(metadata.toString()).append(" (");
 
-        List<ColumnDefinition> partitionKeyColumns = 
metadata.partitionKeyColumns();
-        List<ColumnDefinition> clusteringColumns = 
getClusteringColumns(metadata);
-        List<ColumnDefinition> partitionColumns = 
getPartitionColumns(metadata);
+        List<ColumnMetadata> partitionKeyColumns = 
metadata.partitionKeyColumns();
+        List<ColumnMetadata> clusteringColumns = 
getClusteringColumns(metadata);
+        List<ColumnMetadata> partitionColumns = getPartitionColumns(metadata);
 
         Consumer<StringBuilder> cdCommaAppender = commaAppender("\n\t");
         sb.append("\n\t");
-        for (ColumnDefinition cfd: partitionKeyColumns)
+        for (ColumnMetadata cfd: partitionKeyColumns)
         {
             cdCommaAppender.accept(sb);
             sb.append(toCQL(cfd));
@@ -120,13 +117,13 @@ public class ColumnFamilyStoreCQLHelper
                 sb.append(" PRIMARY KEY");
         }
 
-        for (ColumnDefinition cfd: clusteringColumns)
+        for (ColumnMetadata cfd: clusteringColumns)
         {
             cdCommaAppender.accept(sb);
             sb.append(toCQL(cfd));
         }
 
-        for (ColumnDefinition cfd: partitionColumns)
+        for (ColumnMetadata cfd: partitionColumns)
         {
             cdCommaAppender.accept(sb);
             sb.append(toCQL(cfd, metadata.isStaticCompactTable()));
@@ -134,16 +131,16 @@ public class ColumnFamilyStoreCQLHelper
 
         if (includeDroppedColumns)
         {
-            for (Map.Entry<ByteBuffer, CFMetaData.DroppedColumn> entry: 
metadata.getDroppedColumns().entrySet())
+            for (Map.Entry<ByteBuffer, DroppedColumn> entry: 
metadata.droppedColumns.entrySet())
             {
-                if (metadata.getColumnDefinition(entry.getKey()) != null)
+                if (metadata.getColumn(entry.getKey()) != null)
                     continue;
 
-                CFMetaData.DroppedColumn droppedColumn = entry.getValue();
+                DroppedColumn droppedColumn = entry.getValue();
                 cdCommaAppender.accept(sb);
-                sb.append(quoteIdentifier(droppedColumn.name));
+                sb.append(droppedColumn.column.name.toCQLString());
                 sb.append(' ');
-                sb.append(droppedColumn.type.asCQL3Type().toString());
+                sb.append(droppedColumn.column.type.asCQL3Type().toString());
             }
         }
 
@@ -154,27 +151,27 @@ public class ColumnFamilyStoreCQLHelper
             {
                 sb.append("(");
                 Consumer<StringBuilder> pkCommaAppender = commaAppender(" ");
-                for (ColumnDefinition cfd : partitionKeyColumns)
+                for (ColumnMetadata cfd : partitionKeyColumns)
                 {
                     pkCommaAppender.accept(sb);
-                    sb.append(quoteIdentifier(cfd.name.toString()));
+                    sb.append(cfd.name.toCQLString());
                 }
                 sb.append(")");
             }
             else
             {
-                
sb.append(quoteIdentifier(partitionKeyColumns.get(0).name.toString()));
+                sb.append(partitionKeyColumns.get(0).name.toCQLString());
             }
 
-            for (ColumnDefinition cfd : metadata.clusteringColumns())
-                sb.append(", ").append(quoteIdentifier(cfd.name.toString()));
+            for (ColumnMetadata cfd : metadata.clusteringColumns())
+                sb.append(", ").append(cfd.name.toCQLString());
 
             sb.append(')');
         }
         sb.append(")\n\t");
         sb.append("WITH ");
 
-        sb.append("ID = ").append(metadata.cfId).append("\n\tAND ");
+        sb.append("ID = ").append(metadata.id).append("\n\tAND ");
 
         if (metadata.isCompactTable())
             sb.append("COMPACT STORAGE\n\tAND ");
@@ -184,10 +181,10 @@ public class ColumnFamilyStoreCQLHelper
             sb.append("CLUSTERING ORDER BY (");
 
             Consumer<StringBuilder> cOrderCommaAppender = commaAppender(" ");
-            for (ColumnDefinition cd : clusteringColumns)
+            for (ColumnMetadata cd : clusteringColumns)
             {
                 cOrderCommaAppender.accept(sb);
-                sb.append(quoteIdentifier(cd.name.toString())).append(' 
').append(cd.clusteringOrder().toString());
+                sb.append(cd.name.toCQLString()).append(' 
').append(cd.clusteringOrder().toString());
             }
             sb.append(")\n\tAND ");
         }
@@ -209,11 +206,11 @@ public class ColumnFamilyStoreCQLHelper
      * to the outermost.
      */
     @VisibleForTesting
-    public static List<String> getUserTypesAsCQL(CFMetaData metadata)
+    public static List<String> getUserTypesAsCQL(TableMetadata metadata)
     {
         List<AbstractType> types = new ArrayList<>();
         Set<AbstractType> typeSet = new HashSet<>();
-        for (ColumnDefinition cd: 
Iterables.concat(metadata.partitionKeyColumns(), metadata.clusteringColumns(), 
metadata.partitionColumns()))
+        for (ColumnMetadata cd: 
Iterables.concat(metadata.partitionKeyColumns(), metadata.clusteringColumns(), 
metadata.regularAndStaticColumns()))
         {
             AbstractType type = cd.type;
             if (type.isUDT())
@@ -232,16 +229,16 @@ public class ColumnFamilyStoreCQLHelper
      * If the column was dropped once, but is now re-created `ADD` will be 
appended accordingly.
      */
     @VisibleForTesting
-    public static List<String> getDroppedColumnsAsCQL(CFMetaData metadata)
+    public static List<String> getDroppedColumnsAsCQL(TableMetadata metadata)
     {
         List<String> droppedColumns = new ArrayList<>();
 
-        for (Map.Entry<ByteBuffer, CFMetaData.DroppedColumn> entry: 
metadata.getDroppedColumns().entrySet())
+        for (Map.Entry<ByteBuffer, DroppedColumn> entry: 
metadata.droppedColumns.entrySet())
         {
-            CFMetaData.DroppedColumn column = entry.getValue();
-            droppedColumns.add(toCQLDrop(metadata.ksName, metadata.cfName, 
column));
-            if (metadata.getColumnDefinition(entry.getKey()) != null)
-                droppedColumns.add(toCQLAdd(metadata.ksName, metadata.cfName, 
metadata.getColumnDefinition(entry.getKey())));
+            DroppedColumn column = entry.getValue();
+            droppedColumns.add(toCQLDrop(metadata, column));
+            if (metadata.getColumn(entry.getKey()) != null)
+                droppedColumns.add(toCQLAdd(metadata, 
metadata.getColumn(entry.getKey())));
         }
 
         return droppedColumns;
@@ -251,15 +248,15 @@ public class ColumnFamilyStoreCQLHelper
      * Build a CQL String representation of Indexes on columns in the given 
Column Family
      */
     @VisibleForTesting
-    public static List<String> getIndexesAsCQL(CFMetaData metadata)
+    public static List<String> getIndexesAsCQL(TableMetadata metadata)
     {
         List<String> indexes = new ArrayList<>();
-        for (IndexMetadata indexMetadata: metadata.getIndexes())
-            indexes.add(toCQL(metadata.ksName, metadata.cfName, 
indexMetadata));
+        for (IndexMetadata indexMetadata: metadata.indexes)
+            indexes.add(toCQL(metadata, indexMetadata));
         return indexes;
     }
 
-    private static String toCQL(String keyspace, String cf, IndexMetadata 
indexMetadata)
+    private static String toCQL(TableMetadata baseTable, IndexMetadata 
indexMetadata)
     {
         if (indexMetadata.isCustom())
         {
@@ -269,29 +266,25 @@ public class ColumnFamilyStoreCQLHelper
                     options.put(k, v);
             });
 
-            return String.format("CREATE CUSTOM INDEX %s ON %s.%s (%s) USING 
'%s'%s;",
-                                 quoteIdentifier(indexMetadata.name),
-                                 quoteIdentifier(keyspace),
-                                 quoteIdentifier(cf),
+            return String.format("CREATE CUSTOM INDEX %s ON %s (%s) USING 
'%s'%s;",
+                                 indexMetadata.toCQLString(),
+                                 baseTable.toString(),
                                  
indexMetadata.options.get(IndexTarget.TARGET_OPTION_NAME),
                                  
indexMetadata.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME),
                                  options.isEmpty() ? "" : " WITH OPTIONS " + 
toCQL(options));
         }
         else
         {
-            return String.format("CREATE INDEX %s ON %s.%s (%s);",
-                                 quoteIdentifier(indexMetadata.name),
-                                 quoteIdentifier(keyspace),
-                                 quoteIdentifier(cf),
+            return String.format("CREATE INDEX %s ON %s (%s);",
+                                 indexMetadata.toCQLString(),
+                                 baseTable.toString(),
                                  
indexMetadata.options.get(IndexTarget.TARGET_OPTION_NAME));
         }
     }
     private static String toCQL(UserType userType)
     {
         StringBuilder sb = new StringBuilder();
-        sb.append(String.format("CREATE TYPE %s.%s(",
-                                quoteIdentifier(userType.keyspace),
-                                quoteIdentifier(userType.getNameAsString())));
+        sb.append("CREATE TYPE ").append(userType.toCQLString()).append(" (");
 
         Consumer<StringBuilder> commaAppender = commaAppender(" ");
         for (int i = 0; i < userType.size(); i++)
@@ -356,35 +349,33 @@ public class ColumnFamilyStoreCQLHelper
         return builder.toString();
     }
 
-    private static String toCQL(ColumnDefinition cd)
+    private static String toCQL(ColumnMetadata cd)
     {
         return toCQL(cd, false);
     }
 
-    private static String toCQL(ColumnDefinition cd, boolean 
isStaticCompactTable)
+    private static String toCQL(ColumnMetadata cd, boolean 
isStaticCompactTable)
     {
         return String.format("%s %s%s",
-                             quoteIdentifier(cd.name.toString()),
+                             cd.name.toCQLString(),
                              cd.type.asCQL3Type().toString(),
                              cd.isStatic() && !isStaticCompactTable ? " 
static" : "");
     }
 
-    private static String toCQLAdd(String keyspace, String cf, 
ColumnDefinition cd)
+    private static String toCQLAdd(TableMetadata table, ColumnMetadata cd)
     {
-        return String.format("ALTER TABLE %s.%s ADD %s %s%s;",
-                             quoteIdentifier(keyspace),
-                             quoteIdentifier(cf),
-                             quoteIdentifier(cd.name.toString()),
+        return String.format("ALTER TABLE %s ADD %s %s%s;",
+                             table.toString(),
+                             cd.name.toCQLString(),
                              cd.type.asCQL3Type().toString(),
                              cd.isStatic() ? " static" : "");
     }
 
-    private static String toCQLDrop(String keyspace, String cf, 
CFMetaData.DroppedColumn droppedColumn)
+    private static String toCQLDrop(TableMetadata table, DroppedColumn 
droppedColumn)
     {
-        return String.format("ALTER TABLE %s.%s DROP %s USING TIMESTAMP %s;",
-                             quoteIdentifier(keyspace),
-                             quoteIdentifier(cf),
-                             quoteIdentifier(droppedColumn.name),
+        return String.format("ALTER TABLE %s DROP %s USING TIMESTAMP %s;",
+                             table.toString(),
+                             droppedColumn.column.name.toCQLString(),
                              droppedColumn.droppedTime);
     }
 
@@ -419,21 +410,16 @@ public class ColumnFamilyStoreCQLHelper
         };
     }
 
-    private static String quoteIdentifier(String id)
-    {
-        return ColumnIdentifier.maybeQuote(id);
-    }
-
     /**
      * Whether or not the given metadata is compatible / representable with 
CQL Language
      */
-    public static boolean isCqlCompatible(CFMetaData metaData)
+    public static boolean isCqlCompatible(TableMetadata metaData)
     {
         if (metaData.isSuper())
             return false;
 
         if (metaData.isCompactTable()
-            && metaData.partitionColumns().withoutStatics().size() > 1
+            && metaData.regularColumns().size() > 1
             && metaData.clusteringColumns().size() >= 1)
             return false;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index d788e2e..e361ffe 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -102,11 +102,14 @@ public interface ColumnFamilyStoreMBean
      */
     public Map<String,String> getCompressionParameters();
 
+    public String getCompressionParametersJson();
+
     /**
-     * Set the compression parameters
+     * Set the compression parameters locally for this node
      * @param opts map of string names to values
      */
     public void setCompressionParameters(Map<String,String> opts);
+    public void setCompressionParametersJson(String options);
 
     /**
      * Set new crc check chance

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java 
b/src/java/org/apache/cassandra/db/Columns.java
index e9e3abf..f38856f 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -28,8 +28,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 
 import net.nicoulaj.compilecommand.annotations.DontInline;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -46,19 +46,19 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
  * An immutable and sorted list of (non-PK) columns for a given table.
  * <p>
  * Note that in practice, it will either store only static columns, or only 
regular ones. When
- * we need both type of columns, we use a {@link PartitionColumns} object.
+ * we need both type of columns, we use a {@link RegularAndStaticColumns} 
object.
  */
-public class Columns extends AbstractCollection<ColumnDefinition> implements 
Collection<ColumnDefinition>
+public class Columns extends AbstractCollection<ColumnMetadata> implements 
Collection<ColumnMetadata>
 {
     public static final Serializer serializer = new Serializer();
     public static final Columns NONE = new Columns(BTree.empty(), 0);
-    public static final ColumnDefinition FIRST_COMPLEX =
-        new ColumnDefinition("",
-                             "",
-                             
ColumnIdentifier.getInterned(ByteBufferUtil.EMPTY_BYTE_BUFFER, 
UTF8Type.instance),
-                             SetType.getInstance(UTF8Type.instance, true),
-                             ColumnDefinition.NO_POSITION,
-                             ColumnDefinition.Kind.REGULAR);
+    private static final ColumnMetadata FIRST_COMPLEX =
+        new ColumnMetadata("",
+                           "",
+                           
ColumnIdentifier.getInterned(ByteBufferUtil.EMPTY_BYTE_BUFFER, 
UTF8Type.instance),
+                           SetType.getInstance(UTF8Type.instance, true),
+                           ColumnMetadata.NO_POSITION,
+                           ColumnMetadata.Kind.REGULAR);
 
     private final Object[] columns;
     private final int complexIdx; // Index of the first complex column
@@ -82,7 +82,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      *
      * @return the newly created {@code Columns} containing only {@code c}.
      */
-    public static Columns of(ColumnDefinition c)
+    public static Columns of(ColumnMetadata c)
     {
         return new Columns(BTree.singleton(c), c.isComplex() ? 0 : 1);
     }
@@ -93,9 +93,9 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      * @param s the set from which to create the new {@code Columns}.
      * @return the newly created {@code Columns} containing the columns from 
{@code s}.
      */
-    public static Columns from(Collection<ColumnDefinition> s)
+    public static Columns from(Collection<ColumnMetadata> s)
     {
-        Object[] tree = 
BTree.<ColumnDefinition>builder(Comparator.naturalOrder()).addAll(s).build();
+        Object[] tree = 
BTree.<ColumnMetadata>builder(Comparator.naturalOrder()).addAll(s).build();
         return new Columns(tree, findFirstComplexIdx(tree));
     }
 
@@ -103,7 +103,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
     {
         // have fast path for common no-complex case
         int size = BTree.size(tree);
-        if (!BTree.isEmpty(tree) && BTree.<ColumnDefinition>findByIndex(tree, 
size - 1).isSimple())
+        if (!BTree.isEmpty(tree) && BTree.<ColumnMetadata>findByIndex(tree, 
size - 1).isSimple())
             return size;
         return BTree.ceilIndex(tree, Comparator.naturalOrder(), FIRST_COMPLEX);
     }
@@ -176,7 +176,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      *
      * @return the {@code i}th simple column in this object.
      */
-    public ColumnDefinition getSimple(int i)
+    public ColumnMetadata getSimple(int i)
     {
         return BTree.findByIndex(columns, i);
     }
@@ -189,7 +189,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      *
      * @return the {@code i}th complex column in this object.
      */
-    public ColumnDefinition getComplex(int i)
+    public ColumnMetadata getComplex(int i)
     {
         return BTree.findByIndex(columns, complexIdx + i);
     }
@@ -203,7 +203,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      * @return the index for simple column {@code c} if it is contains in this
      * object
      */
-    public int simpleIdx(ColumnDefinition c)
+    public int simpleIdx(ColumnMetadata c)
     {
         return BTree.findIndex(columns, Comparator.naturalOrder(), c);
     }
@@ -217,7 +217,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      * @return the index for complex column {@code c} if it is contains in this
      * object
      */
-    public int complexIdx(ColumnDefinition c)
+    public int complexIdx(ColumnMetadata c)
     {
         return BTree.findIndex(columns, Comparator.naturalOrder(), c) - 
complexIdx;
     }
@@ -229,7 +229,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      *
      * @return whether {@code c} is contained by this object.
      */
-    public boolean contains(ColumnDefinition c)
+    public boolean contains(ColumnMetadata c)
     {
         return BTree.findIndex(columns, Comparator.naturalOrder(), c) >= 0;
     }
@@ -251,8 +251,8 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
         if (this == NONE)
             return other;
 
-        Object[] tree = BTree.<ColumnDefinition>merge(this.columns, 
other.columns, Comparator.naturalOrder(),
-                                                      UpdateFunction.noOp());
+        Object[] tree = BTree.<ColumnMetadata>merge(this.columns, 
other.columns, Comparator.naturalOrder(),
+                                                    UpdateFunction.noOp());
         if (tree == this.columns)
             return this;
         if (tree == other.columns)
@@ -275,9 +275,9 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
         if (other.size() > this.size())
             return false;
 
-        BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = 
BTree.slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC);
+        BTreeSearchIterator<ColumnMetadata, ColumnMetadata> iter = 
BTree.slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC);
         for (Object def : other)
-            if (iter.next((ColumnDefinition) def) == null)
+            if (iter.next((ColumnMetadata) def) == null)
                 return false;
         return true;
     }
@@ -287,7 +287,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      *
      * @return an iterator over the simple columns of this object.
      */
-    public Iterator<ColumnDefinition> simpleColumns()
+    public Iterator<ColumnMetadata> simpleColumns()
     {
         return BTree.iterator(columns, 0, complexIdx - 1, BTree.Dir.ASC);
     }
@@ -297,7 +297,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      *
      * @return an iterator over the complex columns of this object.
      */
-    public Iterator<ColumnDefinition> complexColumns()
+    public Iterator<ColumnMetadata> complexColumns()
     {
         return BTree.iterator(columns, complexIdx, BTree.size(columns) - 1, 
BTree.Dir.ASC);
     }
@@ -307,9 +307,9 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      *
      * @return an iterator over all the columns of this object.
      */
-    public BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iterator()
+    public BTreeSearchIterator<ColumnMetadata, ColumnMetadata> iterator()
     {
-        return BTree.<ColumnDefinition, ColumnDefinition>slice(columns, 
Comparator.naturalOrder(), BTree.Dir.ASC);
+        return BTree.<ColumnMetadata, ColumnMetadata>slice(columns, 
Comparator.naturalOrder(), BTree.Dir.ASC);
     }
 
     /**
@@ -319,11 +319,11 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      *
      * @return an iterator returning columns in alphabetical order.
      */
-    public Iterator<ColumnDefinition> selectOrderIterator()
+    public Iterator<ColumnMetadata> selectOrderIterator()
     {
         // In wildcard selection, we want to return all columns in 
alphabetical order,
         // irregarding of whether they are complex or not
-        return Iterators.<ColumnDefinition>
+        return Iterators.<ColumnMetadata>
                          mergeSorted(ImmutableList.of(simpleColumns(), 
complexColumns()),
                                      (s, c) ->
                                      {
@@ -340,12 +340,12 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      * @return newly allocated columns containing all the columns of {@code 
this} expect
      * for {@code column}.
      */
-    public Columns without(ColumnDefinition column)
+    public Columns without(ColumnMetadata column)
     {
         if (!contains(column))
             return this;
 
-        Object[] newColumns = BTreeRemoval.<ColumnDefinition>remove(columns, 
Comparator.naturalOrder(), column);
+        Object[] newColumns = BTreeRemoval.<ColumnMetadata>remove(columns, 
Comparator.naturalOrder(), column);
         return new Columns(newColumns);
     }
 
@@ -355,15 +355,15 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      *
      * @return a predicate to test the inclusion of sorted columns in this 
object.
      */
-    public Predicate<ColumnDefinition> inOrderInclusionTester()
+    public Predicate<ColumnMetadata> inOrderInclusionTester()
     {
-        SearchIterator<ColumnDefinition, ColumnDefinition> iter = 
BTree.slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC);
+        SearchIterator<ColumnMetadata, ColumnMetadata> iter = 
BTree.slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC);
         return column -> iter.next(column) != null;
     }
 
     public void digest(MessageDigest digest)
     {
-        for (ColumnDefinition c : this)
+        for (ColumnMetadata c : this)
             digest.update(c.name.bytes.duplicate());
     }
 
@@ -372,7 +372,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
      * @param function
      * @param reversed
      */
-    public void apply(Consumer<ColumnDefinition> function, boolean reversed)
+    public void apply(Consumer<ColumnMetadata> function, boolean reversed)
     {
         BTree.apply(columns, function, reversed);
     }
@@ -400,7 +400,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
     {
         StringBuilder sb = new StringBuilder("[");
         boolean first = true;
-        for (ColumnDefinition def : this)
+        for (ColumnMetadata def : this)
         {
             if (first) first = false; else sb.append(" ");
             sb.append(def.name);
@@ -413,33 +413,33 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
         public void serialize(Columns columns, DataOutputPlus out) throws 
IOException
         {
             out.writeUnsignedVInt(columns.size());
-            for (ColumnDefinition column : columns)
+            for (ColumnMetadata column : columns)
                 ByteBufferUtil.writeWithVIntLength(column.name.bytes, out);
         }
 
         public long serializedSize(Columns columns)
         {
             long size = TypeSizes.sizeofUnsignedVInt(columns.size());
-            for (ColumnDefinition column : columns)
+            for (ColumnMetadata column : columns)
                 size += 
ByteBufferUtil.serializedSizeWithVIntLength(column.name.bytes);
             return size;
         }
 
-        public Columns deserialize(DataInputPlus in, CFMetaData metadata) 
throws IOException
+        public Columns deserialize(DataInputPlus in, TableMetadata metadata) 
throws IOException
         {
             int length = (int)in.readUnsignedVInt();
-            BTree.Builder<ColumnDefinition> builder = 
BTree.builder(Comparator.naturalOrder());
+            BTree.Builder<ColumnMetadata> builder = 
BTree.builder(Comparator.naturalOrder());
             builder.auto(false);
             for (int i = 0; i < length; i++)
             {
                 ByteBuffer name = ByteBufferUtil.readWithVIntLength(in);
-                ColumnDefinition column = metadata.getColumnDefinition(name);
+                ColumnMetadata column = metadata.getColumn(name);
                 if (column == null)
                 {
                     // If we don't find the definition, it could be we have 
data for a dropped column, and we shouldn't
-                    // fail deserialization because of that. So we grab a 
"fake" ColumnDefinition that ensure proper
+                    // fail deserialization because of that. So we grab a 
"fake" ColumnMetadata that ensure proper
                     // deserialization. The column will be ignore later on 
anyway.
-                    column = metadata.getDroppedColumnDefinition(name);
+                    column = metadata.getDroppedColumn(name);
                     if (column == null)
                         throw new RuntimeException("Unknown column " + 
UTF8Type.instance.getString(name) + " during deserialization");
                 }
@@ -452,7 +452,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
          * If both ends have a pre-shared superset of the columns we are 
serializing, we can send them much
          * more efficiently. Both ends must provide the identically same set 
of columns.
          */
-        public void serializeSubset(Collection<ColumnDefinition> columns, 
Columns superset, DataOutputPlus out) throws IOException
+        public void serializeSubset(Collection<ColumnMetadata> columns, 
Columns superset, DataOutputPlus out) throws IOException
         {
             /**
              * We weight this towards small sets, and sets where the majority 
of items are present, since
@@ -482,7 +482,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
             }
         }
 
-        public long serializedSubsetSize(Collection<ColumnDefinition> columns, 
Columns superset)
+        public long serializedSubsetSize(Collection<ColumnMetadata> columns, 
Columns superset)
         {
             int columnCount = columns.size();
             int supersetCount = superset.size();
@@ -513,9 +513,9 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
             }
             else
             {
-                BTree.Builder<ColumnDefinition> builder = 
BTree.builder(Comparator.naturalOrder());
+                BTree.Builder<ColumnMetadata> builder = 
BTree.builder(Comparator.naturalOrder());
                 int firstComplexIdx = 0;
-                for (ColumnDefinition column : superset)
+                for (ColumnMetadata column : superset)
                 {
                     if ((encoded & 1) == 0)
                     {
@@ -531,13 +531,13 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
 
         // encodes a 1 bit for every *missing* column, on the assumption 
presence is more common,
         // and because this is consistent with encoding 0 to represent all 
present
-        private static long encodeBitmap(Collection<ColumnDefinition> columns, 
Columns superset, int supersetCount)
+        private static long encodeBitmap(Collection<ColumnMetadata> columns, 
Columns superset, int supersetCount)
         {
             long bitmap = 0L;
-            BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = 
superset.iterator();
+            BTreeSearchIterator<ColumnMetadata, ColumnMetadata> iter = 
superset.iterator();
             // the index we would encounter next if all columns are present
             int expectIndex = 0;
-            for (ColumnDefinition column : columns)
+            for (ColumnMetadata column : columns)
             {
                 if (iter.next(column) == null)
                     throw new IllegalStateException(columns + " is not a 
subset of " + superset);
@@ -556,15 +556,15 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
         }
 
         @DontInline
-        private void serializeLargeSubset(Collection<ColumnDefinition> 
columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus 
out) throws IOException
+        private void serializeLargeSubset(Collection<ColumnMetadata> columns, 
int columnCount, Columns superset, int supersetCount, DataOutputPlus out) 
throws IOException
         {
             // write flag indicating we're in lengthy mode
             out.writeUnsignedVInt(supersetCount - columnCount);
-            BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = 
superset.iterator();
+            BTreeSearchIterator<ColumnMetadata, ColumnMetadata> iter = 
superset.iterator();
             if (columnCount < supersetCount / 2)
             {
                 // write present columns
-                for (ColumnDefinition column : columns)
+                for (ColumnMetadata column : columns)
                 {
                     if (iter.next(column) == null)
                         throw new IllegalStateException();
@@ -575,7 +575,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
             {
                 // write missing columns
                 int prev = -1;
-                for (ColumnDefinition column : columns)
+                for (ColumnMetadata column : columns)
                 {
                     if (iter.next(column) == null)
                         throw new IllegalStateException();
@@ -594,7 +594,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
             int supersetCount = superset.size();
             int columnCount = supersetCount - delta;
 
-            BTree.Builder<ColumnDefinition> builder = 
BTree.builder(Comparator.naturalOrder());
+            BTree.Builder<ColumnMetadata> builder = 
BTree.builder(Comparator.naturalOrder());
             if (columnCount < supersetCount / 2)
             {
                 for (int i = 0 ; i < columnCount ; i++)
@@ -605,7 +605,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
             }
             else
             {
-                Iterator<ColumnDefinition> iter = superset.iterator();
+                Iterator<ColumnMetadata> iter = superset.iterator();
                 int idx = 0;
                 int skipped = 0;
                 while (true)
@@ -613,7 +613,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
                     int nextMissingIndex = skipped < delta ? 
(int)in.readUnsignedVInt() : supersetCount;
                     while (idx < nextMissingIndex)
                     {
-                        ColumnDefinition def = iter.next();
+                        ColumnMetadata def = iter.next();
                         builder.add(def);
                         idx++;
                     }
@@ -628,15 +628,15 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
         }
 
         @DontInline
-        private int serializeLargeSubsetSize(Collection<ColumnDefinition> 
columns, int columnCount, Columns superset, int supersetCount)
+        private int serializeLargeSubsetSize(Collection<ColumnMetadata> 
columns, int columnCount, Columns superset, int supersetCount)
         {
             // write flag indicating we're in lengthy mode
             int size = TypeSizes.sizeofUnsignedVInt(supersetCount - 
columnCount);
-            BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = 
superset.iterator();
+            BTreeSearchIterator<ColumnMetadata, ColumnMetadata> iter = 
superset.iterator();
             if (columnCount < supersetCount / 2)
             {
                 // write present columns
-                for (ColumnDefinition column : columns)
+                for (ColumnMetadata column : columns)
                 {
                     if (iter.next(column) == null)
                         throw new IllegalStateException();
@@ -647,7 +647,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
             {
                 // write missing columns
                 int prev = -1;
-                for (ColumnDefinition column : columns)
+                for (ColumnMetadata column : columns)
                 {
                     if (iter.next(column) == null)
                         throw new IllegalStateException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/CompactTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CompactTables.java 
b/src/java/org/apache/cassandra/db/CompactTables.java
index 31e482c..29993a2 100644
--- a/src/java/org/apache/cassandra/db/CompactTables.java
+++ b/src/java/org/apache/cassandra/db/CompactTables.java
@@ -20,9 +20,9 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -79,11 +79,11 @@ public abstract class CompactTables
 
     private CompactTables() {}
 
-    public static ColumnDefinition getCompactValueColumn(PartitionColumns 
columns, boolean isSuper)
+    public static ColumnMetadata getCompactValueColumn(RegularAndStaticColumns 
columns, boolean isSuper)
     {
         if (isSuper)
         {
-            for (ColumnDefinition column : columns.regulars)
+            for (ColumnMetadata column : columns.regulars)
                 if (column.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN))
                     return column;
             throw new AssertionError("Invalid super column table definition, 
no 'dynamic' map column");
@@ -92,14 +92,14 @@ public abstract class CompactTables
         return columns.regulars.getSimple(0);
     }
 
-    public static boolean hasEmptyCompactValue(CFMetaData metadata)
+    public static boolean hasEmptyCompactValue(TableMetadata metadata)
     {
-        return metadata.compactValueColumn().type instanceof EmptyType;
+        return metadata.compactValueColumn.type instanceof EmptyType;
     }
 
-    public static boolean isSuperColumnMapColumn(ColumnDefinition column)
+    public static boolean isSuperColumnMapColumn(ColumnMetadata column)
     {
-        return column.kind == ColumnDefinition.Kind.REGULAR && 
column.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN);
+        return column.kind == ColumnMetadata.Kind.REGULAR && 
column.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN);
     }
 
     public static DefaultNames defaultNameGenerator(Set<String> usedNames)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index ab4243f..2214d8d 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -28,9 +28,9 @@ import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.ReadRepairDecision;
+import org.apache.cassandra.service.ReadRepairDecision;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -381,10 +381,10 @@ public enum ConsistencyLevel
         return this == SERIAL || this == LOCAL_SERIAL;
     }
 
-    public void validateCounterForWrite(CFMetaData metadata) throws 
InvalidRequestException
+    public void validateCounterForWrite(TableMetadata metadata) throws 
InvalidRequestException
     {
         if (this == ConsistencyLevel.ANY)
-            throw new InvalidRequestException("Consistency level ANY is not 
yet supported for counter table " + metadata.cfName);
+            throw new InvalidRequestException("Consistency level ANY is not 
yet supported for counter table " + metadata.name);
 
         if (isSerialConsistency())
             throw new InvalidRequestException("Counter operations are 
inherently non-serializable");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java 
b/src/java/org/apache/cassandra/db/CounterMutation.java
index 4e4a30d..0f1ad06 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
@@ -65,9 +66,9 @@ public class CounterMutation implements IMutation
         return mutation.getKeyspaceName();
     }
 
-    public Collection<UUID> getColumnFamilyIds()
+    public Collection<TableId> getTableIds()
     {
-        return mutation.getColumnFamilyIds();
+        return mutation.getTableIds();
     }
 
     public Collection<PartitionUpdate> getPartitionUpdates()
@@ -175,7 +176,7 @@ public class CounterMutation implements IMutation
                         {
                             public Object apply(final ColumnData data)
                             {
-                                return 
Objects.hashCode(update.metadata().cfId, key(), row.clustering(), 
data.column());
+                                return Objects.hashCode(update.metadata().id, 
key(), row.clustering(), data.column());
                             }
                         }));
                     }
@@ -186,7 +187,7 @@ public class CounterMutation implements IMutation
 
     private PartitionUpdate processModifications(PartitionUpdate changes)
     {
-        ColumnFamilyStore cfs = 
Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changes.metadata().cfId);
+        ColumnFamilyStore cfs = 
Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changes.metadata().id);
 
         List<PartitionUpdate.CounterMark> marks = 
changes.collectCounterMarks();
 
@@ -239,7 +240,7 @@ public class CounterMutation implements IMutation
     private void 
updateWithCurrentValuesFromCFS(List<PartitionUpdate.CounterMark> marks, 
ColumnFamilyStore cfs)
     {
         ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
-        BTreeSet.Builder<Clustering> names = 
BTreeSet.builder(cfs.metadata.comparator);
+        BTreeSet.Builder<Clustering> names = 
BTreeSet.builder(cfs.metadata().comparator);
         for (PartitionUpdate.CounterMark mark : marks)
         {
             if (mark.clustering() != Clustering.STATIC_CLUSTERING)
@@ -252,7 +253,7 @@ public class CounterMutation implements IMutation
 
         int nowInSec = FBUtilities.nowInSeconds();
         ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(names.build(), false);
-        SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key(), 
builder.build(), filter);
+        SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(cfs.metadata(), nowInSec, key(), 
builder.build(), filter);
         PeekingIterator<PartitionUpdate.CounterMark> markIter = 
Iterators.peekingIterator(marks.iterator());
         try (ReadExecutionController controller = cmd.executionController();
              RowIterator partition = 
UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, controller), 
nowInSec))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java 
b/src/java/org/apache/cassandra/db/DataRange.java
index ffe041e..2983a57 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -19,8 +19,8 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
@@ -236,12 +236,12 @@ public class DataRange
         return new DataRange(range, clusteringIndexFilter);
     }
 
-    public String toString(CFMetaData metadata)
+    public String toString(TableMetadata metadata)
     {
-        return String.format("range=%s pfilter=%s", 
keyRange.getString(metadata.getKeyValidator()), 
clusteringIndexFilter.toString(metadata));
+        return String.format("range=%s pfilter=%s", 
keyRange.getString(metadata.partitionKeyType), 
clusteringIndexFilter.toString(metadata));
     }
 
-    public String toCQLString(CFMetaData metadata)
+    public String toCQLString(TableMetadata metadata)
     {
         if (isUnrestricted())
             return "UNRESTRICTED";
@@ -269,15 +269,15 @@ public class DataRange
         return sb.toString();
     }
 
-    private void appendClause(PartitionPosition pos, StringBuilder sb, 
CFMetaData metadata, boolean isStart, boolean isInclusive)
+    private void appendClause(PartitionPosition pos, StringBuilder sb, 
TableMetadata metadata, boolean isStart, boolean isInclusive)
     {
         sb.append("token(");
-        
sb.append(ColumnDefinition.toCQLString(metadata.partitionKeyColumns()));
+        sb.append(ColumnMetadata.toCQLString(metadata.partitionKeyColumns()));
         sb.append(") ").append(getOperator(isStart, isInclusive)).append(" ");
         if (pos instanceof DecoratedKey)
         {
             sb.append("token(");
-            appendKeyString(sb, metadata.getKeyValidator(), 
((DecoratedKey)pos).getKey());
+            appendKeyString(sb, metadata.partitionKeyType, 
((DecoratedKey)pos).getKey());
             sb.append(")");
         }
         else
@@ -380,10 +380,10 @@ public class DataRange
         }
 
         @Override
-        public String toString(CFMetaData metadata)
+        public String toString(TableMetadata metadata)
         {
             return String.format("range=%s (paging) pfilter=%s lastReturned=%s 
(%s)",
-                                 
keyRange.getString(metadata.getKeyValidator()),
+                                 keyRange.getString(metadata.partitionKeyType),
                                  clusteringIndexFilter.toString(metadata),
                                  lastReturned.toString(metadata),
                                  inclusive ? "included" : "excluded");
@@ -392,7 +392,7 @@ public class DataRange
 
     public static class Serializer
     {
-        public void serialize(DataRange range, DataOutputPlus out, int 
version, CFMetaData metadata) throws IOException
+        public void serialize(DataRange range, DataOutputPlus out, int 
version, TableMetadata metadata) throws IOException
         {
             AbstractBounds.rowPositionSerializer.serialize(range.keyRange, 
out, version);
             
ClusteringIndexFilter.serializer.serialize(range.clusteringIndexFilter, out, 
version);
@@ -405,7 +405,7 @@ public class DataRange
             }
         }
 
-        public DataRange deserialize(DataInputPlus in, int version, CFMetaData 
metadata) throws IOException
+        public DataRange deserialize(DataInputPlus in, int version, 
TableMetadata metadata) throws IOException
         {
             AbstractBounds<PartitionPosition> range = 
AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, 
version);
             ClusteringIndexFilter filter = 
ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
@@ -422,7 +422,7 @@ public class DataRange
             }
         }
 
-        public long serializedSize(DataRange range, int version, CFMetaData 
metadata)
+        public long serializedSize(DataRange range, int version, TableMetadata 
metadata)
         {
             long size = 
AbstractBounds.rowPositionSerializer.serializedSize(range.keyRange, version)
                       + 
ClusteringIndexFilter.serializer.serializedSize(range.clusteringIndexFilter, 
version)

Reply via email to