http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 5b63ba6..e826dd8 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -38,34 +38,34 @@ import com.google.common.io.ByteStreams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.*;
 
+import static java.lang.String.format;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
+
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
 
@@ -102,187 +102,203 @@ public final class SystemKeyspace
     public static final String BUILT_VIEWS = "built_views";
     public static final String PREPARED_STATEMENTS = "prepared_statements";
 
-    public static final CFMetaData Batches =
-        compile(BATCHES,
-                "batches awaiting replay",
-                "CREATE TABLE %s ("
-                + "id timeuuid,"
-                + "mutations list<blob>,"
-                + "version int,"
-                + "PRIMARY KEY ((id)))")
-                .copy(new LocalPartitioner(TimeUUIDType.instance))
-                
.compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
-                .gcGraceSeconds(0);
-
-    private static final CFMetaData Paxos =
-        compile(PAXOS,
-                "in-progress paxos proposals",
-                "CREATE TABLE %s ("
-                + "row_key blob,"
-                + "cf_id UUID,"
-                + "in_progress_ballot timeuuid,"
-                + "most_recent_commit blob,"
-                + "most_recent_commit_at timeuuid,"
-                + "most_recent_commit_version int,"
-                + "proposal blob,"
-                + "proposal_ballot timeuuid,"
-                + "proposal_version int,"
-                + "PRIMARY KEY ((row_key), cf_id))")
-                .compaction(CompactionParams.lcs(emptyMap()));
-
-    private static final CFMetaData BuiltIndexes =
-        compile(BUILT_INDEXES,
-                "built column indexes",
-                "CREATE TABLE \"%s\" ("
-                + "table_name text," // table_name here is the name of the 
keyspace - don't be fooled
-                + "index_name text,"
-                + "PRIMARY KEY ((table_name), index_name)) "
-                + "WITH COMPACT STORAGE");
-
-    private static final CFMetaData Local =
-        compile(LOCAL,
-                "information about the local node",
-                "CREATE TABLE %s ("
-                + "key text,"
-                + "bootstrapped text,"
-                + "broadcast_address inet,"
-                + "cluster_name text,"
-                + "cql_version text,"
-                + "data_center text,"
-                + "gossip_generation int,"
-                + "host_id uuid,"
-                + "listen_address inet,"
-                + "native_protocol_version text,"
-                + "partitioner text,"
-                + "rack text,"
-                + "release_version text,"
-                + "rpc_address inet,"
-                + "schema_version uuid,"
-                + "tokens set<varchar>,"
-                + "truncated_at map<uuid, blob>,"
-                + "PRIMARY KEY ((key)))"
-                ).recordDeprecatedSystemColumn("thrift_version", 
UTF8Type.instance);
-
-    private static final CFMetaData Peers =
-        compile(PEERS,
-                "information about known peers in the cluster",
-                "CREATE TABLE %s ("
-                + "peer inet,"
-                + "data_center text,"
-                + "host_id uuid,"
-                + "preferred_ip inet,"
-                + "rack text,"
-                + "release_version text,"
-                + "rpc_address inet,"
-                + "schema_version uuid,"
-                + "tokens set<varchar>,"
-                + "PRIMARY KEY ((peer)))");
-
-    private static final CFMetaData PeerEvents =
-        compile(PEER_EVENTS,
-                "events related to peers",
-                "CREATE TABLE %s ("
-                + "peer inet,"
-                + "hints_dropped map<uuid, int>,"
-                + "PRIMARY KEY ((peer)))");
-
-    private static final CFMetaData RangeXfers =
-        compile(RANGE_XFERS,
-                "ranges requested for transfer",
-                "CREATE TABLE %s ("
-                + "token_bytes blob,"
-                + "requested_at timestamp,"
-                + "PRIMARY KEY ((token_bytes)))");
-
-    private static final CFMetaData CompactionHistory =
-        compile(COMPACTION_HISTORY,
-                "week-long compaction history",
-                "CREATE TABLE %s ("
-                + "id uuid,"
-                + "bytes_in bigint,"
-                + "bytes_out bigint,"
-                + "columnfamily_name text,"
-                + "compacted_at timestamp,"
-                + "keyspace_name text,"
-                + "rows_merged map<int, bigint>,"
-                + "PRIMARY KEY ((id)))")
-                .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7));
-
-    private static final CFMetaData SSTableActivity =
-        compile(SSTABLE_ACTIVITY,
-                "historic sstable read rates",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "columnfamily_name text,"
-                + "generation int,"
-                + "rate_120m double,"
-                + "rate_15m double,"
-                + "PRIMARY KEY ((keyspace_name, columnfamily_name, 
generation)))");
-
-    private static final CFMetaData SizeEstimates =
-        compile(SIZE_ESTIMATES,
-                "per-table primary range size estimates",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "table_name text,"
-                + "range_start text,"
-                + "range_end text,"
-                + "mean_partition_size bigint,"
-                + "partitions_count bigint,"
-                + "PRIMARY KEY ((keyspace_name), table_name, range_start, 
range_end))")
-                .gcGraceSeconds(0);
-
-    private static final CFMetaData AvailableRanges =
-        compile(AVAILABLE_RANGES,
-                "available keyspace/ranges during bootstrap/replace that are 
ready to be served",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "ranges set<blob>,"
-                + "PRIMARY KEY ((keyspace_name)))");
-
-    private static final CFMetaData TransferredRanges =
-        compile(TRANSFERRED_RANGES,
-                "record of transferred ranges for streaming operation",
-                "CREATE TABLE %s ("
-                + "operation text,"
-                + "peer inet,"
-                + "keyspace_name text,"
-                + "ranges set<blob>,"
-                + "PRIMARY KEY ((operation, keyspace_name), peer))");
-
-    private static final CFMetaData ViewsBuildsInProgress =
-        compile(VIEWS_BUILDS_IN_PROGRESS,
-                "views builds current progress",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "view_name text,"
-                + "last_token varchar,"
-                + "generation_number int,"
-                + "PRIMARY KEY ((keyspace_name), view_name))");
-
-    private static final CFMetaData BuiltViews =
-        compile(BUILT_VIEWS,
-                "built views",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "view_name text,"
-                + "status_replicated boolean,"
-                + "PRIMARY KEY ((keyspace_name), view_name))");
-
-    private static final CFMetaData PreparedStatements =
-        compile(PREPARED_STATEMENTS,
-                "prepared statements",
-                "CREATE TABLE %s ("
-                + "prepared_id blob,"
-                + "logged_keyspace text,"
-                + "query_string text,"
-                + "PRIMARY KEY ((prepared_id)))");
-
-
-    private static CFMetaData compile(String name, String description, String 
schema)
-    {
-        return CFMetaData.compile(String.format(schema, name), 
SchemaConstants.SYSTEM_KEYSPACE_NAME)
-                         .comment(description);
+    public static final TableMetadata Batches =
+        parse(BATCHES,
+              "batches awaiting replay",
+              "CREATE TABLE %s ("
+              + "id timeuuid,"
+              + "mutations list<blob>,"
+              + "version int,"
+              + "PRIMARY KEY ((id)))")
+              .partitioner(new LocalPartitioner(TimeUUIDType.instance))
+              .compaction(CompactionParams.scts(singletonMap("min_threshold", 
"2")))
+              .build();
+
+    private static final TableMetadata Paxos =
+        parse(PAXOS,
+              "in-progress paxos proposals",
+              "CREATE TABLE %s ("
+              + "row_key blob,"
+              + "cf_id UUID,"
+              + "in_progress_ballot timeuuid,"
+              + "most_recent_commit blob,"
+              + "most_recent_commit_at timeuuid,"
+              + "most_recent_commit_version int,"
+              + "proposal blob,"
+              + "proposal_ballot timeuuid,"
+              + "proposal_version int,"
+              + "PRIMARY KEY ((row_key), cf_id))")
+              .compaction(CompactionParams.lcs(emptyMap()))
+              .build();
+
+    private static final TableMetadata BuiltIndexes =
+        parse(BUILT_INDEXES,
+              "built column indexes",
+              "CREATE TABLE \"%s\" ("
+              + "table_name text," // table_name here is the name of the 
keyspace - don't be fooled
+              + "index_name text,"
+              + "PRIMARY KEY ((table_name), index_name)) "
+              + "WITH COMPACT STORAGE")
+              .build();
+
+    private static final TableMetadata Local =
+        parse(LOCAL,
+              "information about the local node",
+              "CREATE TABLE %s ("
+              + "key text,"
+              + "bootstrapped text,"
+              + "broadcast_address inet,"
+              + "cluster_name text,"
+              + "cql_version text,"
+              + "data_center text,"
+              + "gossip_generation int,"
+              + "host_id uuid,"
+              + "listen_address inet,"
+              + "native_protocol_version text,"
+              + "partitioner text,"
+              + "rack text,"
+              + "release_version text,"
+              + "rpc_address inet,"
+              + "schema_version uuid,"
+              + "tokens set<varchar>,"
+              + "truncated_at map<uuid, blob>,"
+              + "PRIMARY KEY ((key)))")
+              .recordDeprecatedSystemColumn("thrift_version", 
UTF8Type.instance)
+              .build();
+
+    private static final TableMetadata Peers =
+        parse(PEERS,
+              "information about known peers in the cluster",
+              "CREATE TABLE %s ("
+              + "peer inet,"
+              + "data_center text,"
+              + "host_id uuid,"
+              + "preferred_ip inet,"
+              + "rack text,"
+              + "release_version text,"
+              + "rpc_address inet,"
+              + "schema_version uuid,"
+              + "tokens set<varchar>,"
+              + "PRIMARY KEY ((peer)))")
+              .build();
+
+    private static final TableMetadata PeerEvents =
+        parse(PEER_EVENTS,
+              "events related to peers",
+              "CREATE TABLE %s ("
+              + "peer inet,"
+              + "hints_dropped map<uuid, int>,"
+              + "PRIMARY KEY ((peer)))")
+              .build();
+
+    private static final TableMetadata RangeXfers =
+        parse(RANGE_XFERS,
+              "ranges requested for transfer",
+              "CREATE TABLE %s ("
+              + "token_bytes blob,"
+              + "requested_at timestamp,"
+              + "PRIMARY KEY ((token_bytes)))")
+              .build();
+
+    private static final TableMetadata CompactionHistory =
+        parse(COMPACTION_HISTORY,
+              "week-long compaction history",
+              "CREATE TABLE %s ("
+              + "id uuid,"
+              + "bytes_in bigint,"
+              + "bytes_out bigint,"
+              + "columnfamily_name text,"
+              + "compacted_at timestamp,"
+              + "keyspace_name text,"
+              + "rows_merged map<int, bigint>,"
+              + "PRIMARY KEY ((id)))")
+              .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7))
+              .build();
+
+    private static final TableMetadata SSTableActivity =
+        parse(SSTABLE_ACTIVITY,
+              "historic sstable read rates",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "columnfamily_name text,"
+              + "generation int,"
+              + "rate_120m double,"
+              + "rate_15m double,"
+              + "PRIMARY KEY ((keyspace_name, columnfamily_name, 
generation)))")
+              .build();
+
+    private static final TableMetadata SizeEstimates =
+        parse(SIZE_ESTIMATES,
+              "per-table primary range size estimates",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "table_name text,"
+              + "range_start text,"
+              + "range_end text,"
+              + "mean_partition_size bigint,"
+              + "partitions_count bigint,"
+              + "PRIMARY KEY ((keyspace_name), table_name, range_start, 
range_end))")
+              .build();
+
+    private static final TableMetadata AvailableRanges =
+        parse(AVAILABLE_RANGES,
+              "available keyspace/ranges during bootstrap/replace that are 
ready to be served",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "ranges set<blob>,"
+              + "PRIMARY KEY ((keyspace_name)))")
+              .build();
+
+    private static final TableMetadata TransferredRanges =
+        parse(TRANSFERRED_RANGES,
+              "record of transferred ranges for streaming operation",
+              "CREATE TABLE %s ("
+              + "operation text,"
+              + "peer inet,"
+              + "keyspace_name text,"
+              + "ranges set<blob>,"
+              + "PRIMARY KEY ((operation, keyspace_name), peer))")
+              .build();
+
+    private static final TableMetadata ViewsBuildsInProgress =
+        parse(VIEWS_BUILDS_IN_PROGRESS,
+              "views builds current progress",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "view_name text,"
+              + "last_token varchar,"
+              + "generation_number int,"
+              + "PRIMARY KEY ((keyspace_name), view_name))")
+              .build();
+
+    private static final TableMetadata BuiltViews =
+        parse(BUILT_VIEWS,
+              "built views",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "view_name text,"
+              + "status_replicated boolean,"
+              + "PRIMARY KEY ((keyspace_name), view_name))")
+              .build();
+
+    private static final TableMetadata PreparedStatements =
+        parse(PREPARED_STATEMENTS,
+              "prepared statements",
+              "CREATE TABLE %s ("
+              + "prepared_id blob,"
+              + "logged_keyspace text,"
+              + "query_string text,"
+              + "PRIMARY KEY ((prepared_id)))")
+              .build();
+
+    private static TableMetadata.Builder parse(String table, String 
description, String cql)
+    {
+        return CreateTableStatement.parse(format(cql, table), 
SchemaConstants.SYSTEM_KEYSPACE_NAME)
+                                   
.id(TableId.forSystemTable(SchemaConstants.SYSTEM_KEYSPACE_NAME, table))
+                                   .dcLocalReadRepairChance(0.0)
+                                   .gcGraceSeconds(0)
+                                   .memtableFlushPeriod((int) 
TimeUnit.HOURS.toMillis(1))
+                                   .comment(description);
     }
 
     public static KeyspaceMetadata metadata()
@@ -321,7 +337,7 @@ public final class SystemKeyspace
                         .build();
     }
 
-    private static volatile Map<UUID, Pair<CommitLogPosition, Long>> 
truncationRecords;
+    private static volatile Map<TableId, Pair<CommitLogPosition, Long>> 
truncationRecords;
 
     public enum BootstrapState
     {
@@ -352,7 +368,7 @@ public final class SystemKeyspace
                      "listen_address" +
                      ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        executeOnceInternal(String.format(req, LOCAL),
+        executeOnceInternal(format(req, LOCAL),
                             LOCAL,
                             DatabaseDescriptor.getClusterName(),
                             FBUtilities.getReleaseVersionString(),
@@ -377,7 +393,7 @@ public final class SystemKeyspace
         if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY))
             return;
         String req = "INSERT INTO system.%s (id, keyspace_name, 
columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, 
?, ?, ?, ?, ?, ?)";
-        executeInternal(String.format(req, COMPACTION_HISTORY),
+        executeInternal(format(req, COMPACTION_HISTORY),
                         UUIDGen.getTimeUUID(),
                         ksname,
                         cfname,
@@ -389,21 +405,21 @@ public final class SystemKeyspace
 
     public static TabularData getCompactionHistory() throws OpenDataException
     {
-        UntypedResultSet queryResultSet = 
executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY));
+        UntypedResultSet queryResultSet = executeInternal(format("SELECT * 
from system.%s", COMPACTION_HISTORY));
         return CompactionHistoryTabularData.from(queryResultSet);
     }
 
     public static boolean isViewBuilt(String keyspaceName, String viewName)
     {
         String req = "SELECT view_name FROM %s.\"%s\" WHERE keyspace_name=? 
AND view_name=?";
-        UntypedResultSet result = executeInternal(String.format(req, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName);
+        UntypedResultSet result = executeInternal(format(req, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName);
         return !result.isEmpty();
     }
 
     public static boolean isViewStatusReplicated(String keyspaceName, String 
viewName)
     {
         String req = "SELECT status_replicated FROM %s.\"%s\" WHERE 
keyspace_name=? AND view_name=?";
-        UntypedResultSet result = executeInternal(String.format(req, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName);
+        UntypedResultSet result = executeInternal(format(req, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName);
 
         if (result.isEmpty())
             return false;
@@ -417,7 +433,7 @@ public final class SystemKeyspace
             return;
 
         String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name, 
status_replicated) VALUES (?, ?, ?)";
-        executeInternal(String.format(req, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName, 
replicated);
+        executeInternal(format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, 
BUILT_VIEWS), keyspaceName, viewName, replicated);
         forceBlockingFlush(BUILT_VIEWS);
     }
 
@@ -434,7 +450,7 @@ public final class SystemKeyspace
 
     public static void beginViewBuild(String ksname, String viewName, int 
generationNumber)
     {
-        executeInternal(String.format("INSERT INTO system.%s (keyspace_name, 
view_name, generation_number) VALUES (?, ?, ?)", VIEWS_BUILDS_IN_PROGRESS),
+        executeInternal(format("INSERT INTO system.%s (keyspace_name, 
view_name, generation_number) VALUES (?, ?, ?)", VIEWS_BUILDS_IN_PROGRESS),
                         ksname,
                         viewName,
                         generationNumber);
@@ -461,13 +477,13 @@ public final class SystemKeyspace
     {
         String req = "INSERT INTO system.%s (keyspace_name, view_name, 
last_token) VALUES (?, ?, ?)";
         Token.TokenFactory factory = 
ViewsBuildsInProgress.partitioner.getTokenFactory();
-        executeInternal(String.format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, 
viewName, factory.toString(token));
+        executeInternal(format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, 
viewName, factory.toString(token));
     }
 
     public static Pair<Integer, Token> getViewBuildStatus(String ksname, 
String viewName)
     {
         String req = "SELECT generation_number, last_token FROM system.%s 
WHERE keyspace_name = ? AND view_name = ?";
-        UntypedResultSet queryResultSet = executeInternal(String.format(req, 
VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
+        UntypedResultSet queryResultSet = executeInternal(format(req, 
VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
         if (queryResultSet == null || queryResultSet.isEmpty())
             return null;
 
@@ -489,7 +505,7 @@ public final class SystemKeyspace
     public static synchronized void saveTruncationRecord(ColumnFamilyStore 
cfs, long truncatedAt, CommitLogPosition position)
     {
         String req = "UPDATE system.%s SET truncated_at = truncated_at + ? 
WHERE key = '%s'";
-        executeInternal(String.format(req, LOCAL, LOCAL), 
truncationAsMapEntry(cfs, truncatedAt, position));
+        executeInternal(format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, 
truncatedAt, position));
         truncationRecords = null;
         forceBlockingFlush(LOCAL);
     }
@@ -497,14 +513,14 @@ public final class SystemKeyspace
     /**
      * This method is used to remove information about truncation time for 
specified column family
      */
-    public static synchronized void removeTruncationRecord(UUID cfId)
+    public static synchronized void removeTruncationRecord(TableId id)
     {
-        Pair<CommitLogPosition, Long> truncationRecord = 
getTruncationRecord(cfId);
+        Pair<CommitLogPosition, Long> truncationRecord = 
getTruncationRecord(id);
         if (truncationRecord == null)
             return;
 
         String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
-        executeInternal(String.format(req, LOCAL, LOCAL), cfId);
+        executeInternal(format(req, LOCAL, LOCAL), id.asUUID());
         truncationRecords = null;
         forceBlockingFlush(LOCAL);
     }
@@ -515,7 +531,7 @@ public final class SystemKeyspace
         {
             CommitLogPosition.serializer.serialize(position, out);
             out.writeLong(truncatedAt);
-            return singletonMap(cfs.metadata.cfId, out.asNewBuffer());
+            return singletonMap(cfs.metadata.id.asUUID(), out.asNewBuffer());
         }
         catch (IOException e)
         {
@@ -523,36 +539,36 @@ public final class SystemKeyspace
         }
     }
 
-    public static CommitLogPosition getTruncatedPosition(UUID cfId)
+    public static CommitLogPosition getTruncatedPosition(TableId id)
     {
-        Pair<CommitLogPosition, Long> record = getTruncationRecord(cfId);
+        Pair<CommitLogPosition, Long> record = getTruncationRecord(id);
         return record == null ? null : record.left;
     }
 
-    public static long getTruncatedAt(UUID cfId)
+    public static long getTruncatedAt(TableId id)
     {
-        Pair<CommitLogPosition, Long> record = getTruncationRecord(cfId);
+        Pair<CommitLogPosition, Long> record = getTruncationRecord(id);
         return record == null ? Long.MIN_VALUE : record.right;
     }
 
-    private static synchronized Pair<CommitLogPosition, Long> 
getTruncationRecord(UUID cfId)
+    private static synchronized Pair<CommitLogPosition, Long> 
getTruncationRecord(TableId id)
     {
         if (truncationRecords == null)
             truncationRecords = readTruncationRecords();
-        return truncationRecords.get(cfId);
+        return truncationRecords.get(id);
     }
 
-    private static Map<UUID, Pair<CommitLogPosition, Long>> 
readTruncationRecords()
+    private static Map<TableId, Pair<CommitLogPosition, Long>> 
readTruncationRecords()
     {
-        UntypedResultSet rows = executeInternal(String.format("SELECT 
truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL));
+        UntypedResultSet rows = executeInternal(format("SELECT truncated_at 
FROM system.%s WHERE key = '%s'", LOCAL, LOCAL));
 
-        Map<UUID, Pair<CommitLogPosition, Long>> records = new HashMap<>();
+        Map<TableId, Pair<CommitLogPosition, Long>> records = new HashMap<>();
 
         if (!rows.isEmpty() && rows.one().has("truncated_at"))
         {
             Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", 
UUIDType.instance, BytesType.instance);
             for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet())
-                records.put(entry.getKey(), 
truncationRecordFromBlob(entry.getValue()));
+                records.put(TableId.fromUUID(entry.getKey()), 
truncationRecordFromBlob(entry.getValue()));
         }
 
         return records;
@@ -579,7 +595,7 @@ public final class SystemKeyspace
             return;
 
         String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
-        executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens));
+        executeInternal(format(req, PEERS), ep, tokensAsSet(tokens));
     }
 
     public static synchronized void updatePreferredIP(InetAddress ep, 
InetAddress preferred_ip)
@@ -588,7 +604,7 @@ public final class SystemKeyspace
             return;
 
         String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, 
?)";
-        executeInternal(String.format(req, PEERS), ep, preferred_ip);
+        executeInternal(format(req, PEERS), ep, preferred_ip);
         forceBlockingFlush(PEERS);
     }
 
@@ -598,20 +614,20 @@ public final class SystemKeyspace
             return;
 
         String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
-        executeInternal(String.format(req, PEERS, columnName), ep, value);
+        executeInternal(format(req, PEERS, columnName), ep, value);
     }
 
     public static synchronized void updateHintsDropped(InetAddress ep, UUID 
timePeriod, int value)
     {
         // with 30 day TTL
         String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? 
] = ? WHERE peer = ?";
-        executeInternal(String.format(req, PEER_EVENTS), timePeriod, value, 
ep);
+        executeInternal(format(req, PEER_EVENTS), timePeriod, value, ep);
     }
 
     public static synchronized void updateSchemaVersion(UUID version)
     {
         String req = "INSERT INTO system.%s (key, schema_version) VALUES 
('%s', ?)";
-        executeInternal(String.format(req, LOCAL, LOCAL), version);
+        executeInternal(format(req, LOCAL, LOCAL), version);
     }
 
     private static Set<String> tokensAsSet(Collection<Token> tokens)
@@ -640,7 +656,7 @@ public final class SystemKeyspace
     public static synchronized void removeEndpoint(InetAddress ep)
     {
         String req = "DELETE FROM system.%s WHERE peer = ?";
-        executeInternal(String.format(req, PEERS), ep);
+        executeInternal(format(req, PEERS), ep);
         forceBlockingFlush(PEERS);
     }
 
@@ -656,7 +672,7 @@ public final class SystemKeyspace
             return;
 
         String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', ?)";
-        executeInternal(String.format(req, LOCAL, LOCAL), tokensAsSet(tokens));
+        executeInternal(format(req, LOCAL, LOCAL), tokensAsSet(tokens));
         forceBlockingFlush(LOCAL);
     }
 
@@ -710,7 +726,7 @@ public final class SystemKeyspace
     public static InetAddress getPreferredIP(InetAddress ep)
     {
         String req = "SELECT preferred_ip FROM system.%s WHERE peer=?";
-        UntypedResultSet result = executeInternal(String.format(req, PEERS), 
ep);
+        UntypedResultSet result = executeInternal(format(req, PEERS), ep);
         if (!result.isEmpty() && result.one().has("preferred_ip"))
             return result.one().getInetAddress("preferred_ip");
         return ep;
@@ -752,7 +768,7 @@ public final class SystemKeyspace
                 return new 
CassandraVersion(FBUtilities.getReleaseVersionString());
             }
             String req = "SELECT release_version FROM system.%s WHERE peer=?";
-            UntypedResultSet result = executeInternal(String.format(req, 
PEERS), ep);
+            UntypedResultSet result = executeInternal(format(req, PEERS), ep);
             if (result != null && result.one().has("release_version"))
             {
                 return new 
CassandraVersion(result.one().getString("release_version"));
@@ -791,7 +807,7 @@ public final class SystemKeyspace
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL);
 
         String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL, 
LOCAL));
+        UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL));
 
         if (result.isEmpty() || !result.one().has("cluster_name"))
         {
@@ -811,7 +827,7 @@ public final class SystemKeyspace
     public static Collection<Token> getSavedTokens()
     {
         String req = "SELECT tokens FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL, 
LOCAL));
+        UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL));
         return result.isEmpty() || !result.one().has("tokens")
              ? Collections.<Token>emptyList()
              : deserializeTokens(result.one().getSet("tokens", 
UTF8Type.instance));
@@ -820,7 +836,7 @@ public final class SystemKeyspace
     public static int incrementAndGetGeneration()
     {
         String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL, 
LOCAL));
+        UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL));
 
         int generation;
         if (result.isEmpty() || !result.one().has("gossip_generation"))
@@ -848,7 +864,7 @@ public final class SystemKeyspace
         }
 
         req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', 
?)";
-        executeInternal(String.format(req, LOCAL, LOCAL), generation);
+        executeInternal(format(req, LOCAL, LOCAL), generation);
         forceBlockingFlush(LOCAL);
 
         return generation;
@@ -857,7 +873,7 @@ public final class SystemKeyspace
     public static BootstrapState getBootstrapState()
     {
         String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL, 
LOCAL));
+        UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL));
 
         if (result.isEmpty() || !result.one().has("bootstrapped"))
             return BootstrapState.NEEDS_BOOTSTRAP;
@@ -886,14 +902,14 @@ public final class SystemKeyspace
             return;
 
         String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', 
?)";
-        executeInternal(String.format(req, LOCAL, LOCAL), state.name());
+        executeInternal(format(req, LOCAL, LOCAL), state.name());
         forceBlockingFlush(LOCAL);
     }
 
     public static boolean isIndexBuilt(String keyspaceName, String indexName)
     {
         String req = "SELECT index_name FROM %s.\"%s\" WHERE table_name=? AND 
index_name=?";
-        UntypedResultSet result = executeInternal(String.format(req, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_INDEXES), keyspaceName, indexName);
+        UntypedResultSet result = executeInternal(format(req, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_INDEXES), keyspaceName, indexName);
         return !result.isEmpty();
     }
 
@@ -915,7 +931,7 @@ public final class SystemKeyspace
     {
         List<String> names = new ArrayList<>(indexNames);
         String req = "SELECT index_name from %s.\"%s\" WHERE table_name=? AND 
index_name IN ?";
-        UntypedResultSet results = executeInternal(String.format(req, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_INDEXES), keyspaceName, names);
+        UntypedResultSet results = executeInternal(format(req, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_INDEXES), keyspaceName, names);
         return StreamSupport.stream(results.spliterator(), false)
                             .map(r -> r.getString("index_name"))
                             .collect(Collectors.toList());
@@ -928,7 +944,7 @@ public final class SystemKeyspace
     public static UUID getLocalHostId()
     {
         String req = "SELECT host_id FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL, 
LOCAL));
+        UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL));
 
         // Look up the Host UUID (return it if found)
         if (!result.isEmpty() && result.one().has("host_id"))
@@ -946,7 +962,7 @@ public final class SystemKeyspace
     public static UUID setLocalHostId(UUID hostId)
     {
         String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)";
-        executeInternal(String.format(req, LOCAL, LOCAL), hostId);
+        executeInternal(format(req, LOCAL, LOCAL), hostId);
         return hostId;
     }
 
@@ -956,7 +972,7 @@ public final class SystemKeyspace
     public static String getRack()
     {
         String req = "SELECT rack FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL, 
LOCAL));
+        UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL));
 
         // Look up the Rack (return it if found)
         if (!result.isEmpty() && result.one().has("rack"))
@@ -971,7 +987,7 @@ public final class SystemKeyspace
     public static String getDatacenter()
     {
         String req = "SELECT data_center FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL, 
LOCAL));
+        UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL));
 
         // Look up the Data center (return it if found)
         if (!result.isEmpty() && result.one().has("data_center"))
@@ -980,16 +996,16 @@ public final class SystemKeyspace
         return null;
     }
 
-    public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData 
metadata, int nowInSec)
+    public static PaxosState loadPaxosState(DecoratedKey key, TableMetadata 
metadata, int nowInSec)
     {
         String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-        UntypedResultSet results = 
QueryProcessor.executeInternalWithNow(nowInSec, System.nanoTime(), 
String.format(req, PAXOS), key.getKey(), metadata.cfId);
+        UntypedResultSet results = 
QueryProcessor.executeInternalWithNow(nowInSec, System.nanoTime(), format(req, 
PAXOS), key.getKey(), metadata.id.asUUID());
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();
 
         Commit promised = row.has("in_progress_ballot")
-                        ? new Commit(row.getUUID("in_progress_ballot"), new 
PartitionUpdate(metadata, key, metadata.partitionColumns(), 1))
+                        ? new Commit(row.getUUID("in_progress_ballot"), new 
PartitionUpdate(metadata, key, metadata.regularAndStaticColumns(), 1))
                         : Commit.emptyCommit(key, metadata);
         // either we have both a recently accepted ballot and update or we 
have neither
         Commit accepted = row.has("proposal_version") && row.has("proposal")
@@ -1007,27 +1023,27 @@ public final class SystemKeyspace
     public static void savePaxosPromise(Commit promise)
     {
         String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET 
in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
-        executeInternal(String.format(req, PAXOS),
+        executeInternal(format(req, PAXOS),
                         UUIDGen.microsTimestamp(promise.ballot),
                         paxosTtlSec(promise.update.metadata()),
                         promise.ballot,
                         promise.update.partitionKey().getKey(),
-                        promise.update.metadata().cfId);
+                        promise.update.metadata().id.asUUID());
     }
 
     public static void savePaxosProposal(Commit proposal)
     {
-        executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND 
TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key 
= ? AND cf_id = ?", PAXOS),
+        executeInternal(format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? 
SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? 
AND cf_id = ?", PAXOS),
                         UUIDGen.microsTimestamp(proposal.ballot),
                         paxosTtlSec(proposal.update.metadata()),
                         proposal.ballot,
                         PartitionUpdate.toBytes(proposal.update, 
MessagingService.current_version),
                         MessagingService.current_version,
                         proposal.update.partitionKey().getKey(),
-                        proposal.update.metadata().cfId);
+                        proposal.update.metadata().id.asUUID());
     }
 
-    public static int paxosTtlSec(CFMetaData metadata)
+    public static int paxosTtlSec(TableMetadata metadata)
     {
         // keep paxos state around for at least 3h
         return Math.max(3 * 3600, metadata.params.gcGraceSeconds);
@@ -1038,14 +1054,14 @@ public final class SystemKeyspace
         // We always erase the last proposal (with the commit timestamp to no 
erase more recent proposal in case the commit is old)
         // even though that's really just an optimization  since 
SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
         String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET 
proposal_ballot = null, proposal = null, most_recent_commit_at = ?, 
most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND 
cf_id = ?";
-        executeInternal(String.format(cql, PAXOS),
+        executeInternal(format(cql, PAXOS),
                         UUIDGen.microsTimestamp(commit.ballot),
                         paxosTtlSec(commit.update.metadata()),
                         commit.ballot,
                         PartitionUpdate.toBytes(commit.update, 
MessagingService.current_version),
                         MessagingService.current_version,
                         commit.update.partitionKey().getKey(),
-                        commit.update.metadata().cfId);
+                        commit.update.metadata().id.asUUID());
     }
 
     /**
@@ -1058,7 +1074,7 @@ public final class SystemKeyspace
     public static RestorableMeter getSSTableReadMeter(String keyspace, String 
table, int generation)
     {
         String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and 
columnfamily_name=? and generation=?";
-        UntypedResultSet results = executeInternal(String.format(cql, 
SSTABLE_ACTIVITY), keyspace, table, generation);
+        UntypedResultSet results = executeInternal(format(cql, 
SSTABLE_ACTIVITY), keyspace, table, generation);
 
         if (results.isEmpty())
             return new RestorableMeter();
@@ -1076,7 +1092,7 @@ public final class SystemKeyspace
     {
         // Store values with a one-day TTL to handle corner cases where 
cleanup might not occur
         String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, 
generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000";
-        executeInternal(String.format(cql, SSTABLE_ACTIVITY),
+        executeInternal(format(cql, SSTABLE_ACTIVITY),
                         keyspace,
                         table,
                         generation,
@@ -1090,7 +1106,7 @@ public final class SystemKeyspace
     public static void clearSSTableReadMeter(String keyspace, String table, 
int generation)
     {
         String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND 
columnfamily_name=? and generation=?";
-        executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, 
generation);
+        executeInternal(format(cql, SSTABLE_ACTIVITY), keyspace, table, 
generation);
     }
 
     /**
@@ -1099,7 +1115,7 @@ public final class SystemKeyspace
     public static void updateSizeEstimates(String keyspace, String table, 
Map<Range<Token>, Pair<Long, Long>> estimates)
     {
         long timestamp = FBUtilities.timestampMicros();
-        PartitionUpdate update = new PartitionUpdate(SizeEstimates, 
UTF8Type.instance.decompose(keyspace), SizeEstimates.partitionColumns(), 
estimates.size());
+        PartitionUpdate update = new PartitionUpdate(SizeEstimates, 
UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), 
estimates.size());
         Mutation mutation = new Mutation(update);
 
         // delete all previous values with a single range tombstone.
@@ -1126,7 +1142,7 @@ public final class SystemKeyspace
      */
     public static void clearSizeEstimates(String keyspace, String table)
     {
-        String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ? 
AND table_name = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES);
+        String cql = format("DELETE FROM %s WHERE keyspace_name = ? AND 
table_name = ?", SizeEstimates.toString());
         executeInternal(cql, keyspace, table);
     }
 
@@ -1138,14 +1154,14 @@ public final class SystemKeyspace
         {
             rangesToUpdate.add(rangeToBytes(range));
         }
-        executeInternal(String.format(cql, AVAILABLE_RANGES), rangesToUpdate, 
keyspace);
+        executeInternal(format(cql, AVAILABLE_RANGES), rangesToUpdate, 
keyspace);
     }
 
     public static synchronized Set<Range<Token>> getAvailableRanges(String 
keyspace, IPartitioner partitioner)
     {
         Set<Range<Token>> result = new HashSet<>();
         String query = "SELECT * FROM system.%s WHERE keyspace_name=?";
-        UntypedResultSet rs = executeInternal(String.format(query, 
AVAILABLE_RANGES), keyspace);
+        UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES), 
keyspace);
         for (UntypedResultSet.Row row : rs)
         {
             Set<ByteBuffer> rawRanges = row.getSet("ranges", 
BytesType.instance);
@@ -1174,14 +1190,14 @@ public final class SystemKeyspace
         {
             rangesToUpdate.add(rangeToBytes(range));
         }
-        executeInternal(String.format(cql, TRANSFERRED_RANGES), 
rangesToUpdate, description, peer, keyspace);
+        executeInternal(format(cql, TRANSFERRED_RANGES), rangesToUpdate, 
description, peer, keyspace);
     }
 
     public static synchronized Map<InetAddress, Set<Range<Token>>> 
getTransferredRanges(String description, String keyspace, IPartitioner 
partitioner)
     {
         Map<InetAddress, Set<Range<Token>>> result = new HashMap<>();
         String query = "SELECT * FROM system.%s WHERE operation = ? AND 
keyspace_name = ?";
-        UntypedResultSet rs = executeInternal(String.format(query, 
TRANSFERRED_RANGES), description, keyspace);
+        UntypedResultSet rs = executeInternal(format(query, 
TRANSFERRED_RANGES), description, keyspace);
         for (UntypedResultSet.Row row : rs)
         {
             InetAddress peer = row.getInetAddress("peer");
@@ -1213,9 +1229,9 @@ public final class SystemKeyspace
 
         {
             logger.info("Detected version upgrade from {} to {}, snapshotting 
system keyspace", previous, next);
-            String snapshotName = 
Keyspace.getTimestampedSnapshotName(String.format("upgrade-%s-%s",
-                                                                               
     previous,
-                                                                               
     next));
+            String snapshotName = 
Keyspace.getTimestampedSnapshotName(format("upgrade-%s-%s",
+                                                                             
previous,
+                                                                             
next));
             Keyspace systemKs = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
             systemKs.snapshot(snapshotName, null);
             return true;
@@ -1238,7 +1254,7 @@ public final class SystemKeyspace
     private static String getPreviousVersionString()
     {
         String req = "SELECT release_version FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, 
SystemKeyspace.LOCAL, SystemKeyspace.LOCAL));
+        UntypedResultSet result = executeInternal(format(req, 
SystemKeyspace.LOCAL, SystemKeyspace.LOCAL));
         if (result.isEmpty() || !result.one().has("release_version"))
         {
             // it isn't inconceivable that one might try to upgrade a node 
straight from <= 1.1 to whatever
@@ -1298,24 +1314,21 @@ public final class SystemKeyspace
 
     public static void writePreparedStatement(String loggedKeyspace, MD5Digest 
key, String cql)
     {
-        executeInternal(String.format("INSERT INTO %s.%s"
-                                      + " (logged_keyspace, prepared_id, 
query_string) VALUES (?, ?, ?)",
-                                      SchemaConstants.SYSTEM_KEYSPACE_NAME, 
PREPARED_STATEMENTS),
+        executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, 
query_string) VALUES (?, ?, ?)",
+                               PreparedStatements.toString()),
                         loggedKeyspace, key.byteBuffer(), cql);
         logger.debug("stored prepared statement for logged keyspace '{}': 
'{}'", loggedKeyspace, cql);
     }
 
     public static void removePreparedStatement(MD5Digest key)
     {
-        executeInternal(String.format("DELETE FROM %s.%s"
-                                      + " WHERE prepared_id = ?",
-                                      SchemaConstants.SYSTEM_KEYSPACE_NAME, 
PREPARED_STATEMENTS),
+        executeInternal(format("DELETE FROM %s WHERE prepared_id = ?", 
PreparedStatements.toString()),
                         key.byteBuffer());
     }
 
     public static List<Pair<String, String>> loadPreparedStatements()
     {
-        String query = String.format("SELECT logged_keyspace, query_string 
FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
+        String query = format("SELECT logged_keyspace, query_string FROM %s", 
PreparedStatements.toString());
         UntypedResultSet resultSet = executeOnceInternal(query);
         List<Pair<String, String>> r = new ArrayList<>();
         for (UntypedResultSet.Row row : resultSet)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java 
b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index c32a642..d361bac 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -18,19 +18,13 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.io.IOError;
-import java.util.*;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.PeekingIterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.CFMetaData;
+
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.net.MessagingService;
 
 /**
  * Helper class to deserialize Unfiltered object from disk efficiently.
@@ -43,7 +37,7 @@ public class UnfilteredDeserializer
 {
     private static final Logger logger = 
LoggerFactory.getLogger(UnfilteredDeserializer.class);
 
-    protected final CFMetaData metadata;
+    protected final TableMetadata metadata;
     protected final DataInputPlus in;
     protected final SerializationHelper helper;
 
@@ -57,7 +51,7 @@ public class UnfilteredDeserializer
 
     private final Row.Builder builder;
 
-    private UnfilteredDeserializer(CFMetaData metadata,
+    private UnfilteredDeserializer(TableMetadata metadata,
                                    DataInputPlus in,
                                    SerializationHeader header,
                                    SerializationHelper helper)
@@ -70,7 +64,7 @@ public class UnfilteredDeserializer
         this.builder = BTreeRow.sortedBuilder();
     }
 
-    public static UnfilteredDeserializer create(CFMetaData metadata,
+    public static UnfilteredDeserializer create(TableMetadata metadata,
                                                 DataInputPlus in,
                                                 SerializationHeader header,
                                                 SerializationHelper helper)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/UnknownColumnException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnknownColumnException.java 
b/src/java/org/apache/cassandra/db/UnknownColumnException.java
deleted file mode 100644
index 55dc453..0000000
--- a/src/java/org/apache/cassandra/db/UnknownColumnException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/**
- * Exception thrown when we read a column internally that is unknown. Note that
- * this is an internal exception and is not meant to be user facing.
- */
-public class UnknownColumnException extends Exception
-{
-    public final ByteBuffer columnName;
-
-    public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName)
-    {
-        super(String.format("Unknown column %s in table %s.%s", 
stringify(columnName), metadata.ksName, metadata.cfName));
-        this.columnName = columnName;
-    }
-
-    private static String stringify(ByteBuffer name)
-    {
-        try
-        {
-            return UTF8Type.instance.getString(name);
-        }
-        catch (Exception e)
-        {
-            return ByteBufferUtil.bytesToHex(name);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/UnknownColumnFamilyException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnknownColumnFamilyException.java 
b/src/java/org/apache/cassandra/db/UnknownColumnFamilyException.java
deleted file mode 100644
index c43b50a..0000000
--- a/src/java/org/apache/cassandra/db/UnknownColumnFamilyException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.util.UUID;
-
-
-public class UnknownColumnFamilyException extends IOException
-{
-    public final UUID cfId;
-
-    public UnknownColumnFamilyException(String msg, UUID cfId)
-    {
-        super(msg);
-        this.cfId = cfId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 71f8c43..3936ce4 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -22,7 +22,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
@@ -37,6 +37,9 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
 {
     protected final SSTableReader sstable;
+    // We could use sstable.metadata(), but that can change during execution 
so it's good hygiene to grab an immutable instance
+    protected final TableMetadata metadata;
+
     protected final DecoratedKey key;
     protected final DeletionTime partitionLevelDeletion;
     protected final ColumnFilter columns;
@@ -62,11 +65,12 @@ public abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
                                       FileHandle ifile)
     {
         this.sstable = sstable;
+        this.metadata = sstable.metadata();
         this.ifile = ifile;
         this.key = key;
         this.columns = columnFilter;
         this.slices = slices;
-        this.helper = new SerializationHelper(sstable.metadata, 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL, columnFilter);
+        this.helper = new SerializationHelper(metadata, 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL, columnFilter);
 
         if (indexEntry == null)
         {
@@ -178,12 +182,12 @@ public abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
                                 : createReaderInternal(indexEntry, file, 
shouldCloseFile);
     };
 
-    public CFMetaData metadata()
+    public TableMetadata metadata()
     {
-        return sstable.metadata;
+        return metadata;
     }
 
-    public PartitionColumns columns()
+    public RegularAndStaticColumns columns()
     {
         return columns.fetchedColumns();
     }
@@ -306,7 +310,7 @@ public abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
         private void createDeserializer()
         {
             assert file != null && deserializer == null;
-            deserializer = UnfilteredDeserializer.create(sstable.metadata, 
file, sstable.header, helper);
+            deserializer = UnfilteredDeserializer.create(metadata, file, 
sstable.header, helper);
         }
 
         protected void seekToPosition(long position) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index 5d9ca37..3e6cc27 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -199,7 +199,7 @@ public class SSTableIterator extends AbstractSSTableIterator
         private ForwardIndexedReader(RowIndexEntry indexEntry, FileDataInput 
file, boolean shouldCloseFile)
         {
             super(file, shouldCloseFile);
-            this.indexState = new IndexState(this, 
sstable.metadata.comparator, indexEntry, false, ifile);
+            this.indexState = new IndexState(this, metadata.comparator, 
indexEntry, false, ifile);
             this.lastBlockIdx = indexState.blocksCount(); // if we never call 
setForSlice, that's where we want to stop
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 4de234c..6443a01 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.columniterator;
 import java.io.IOException;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
@@ -28,6 +27,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.btree.BTree;
 
 /**
@@ -88,7 +88,7 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
         protected ReusablePartitionData createBuffer(int blocksCount)
         {
             int estimatedRowCount = 16;
-            int columnCount = metadata().partitionColumns().regulars.size();
+            int columnCount = metadata().regularColumns().size();
             if (columnCount == 0 || metadata().clusteringColumns().isEmpty())
             {
                 estimatedRowCount = 1;
@@ -222,7 +222,7 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
         private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput 
file, boolean shouldCloseFile)
         {
             super(file, shouldCloseFile);
-            this.indexState = new IndexState(this, 
sstable.metadata.comparator, indexEntry, true, ifile);
+            this.indexState = new IndexState(this, metadata.comparator, 
indexEntry, true, ifile);
         }
 
         @Override
@@ -321,18 +321,18 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
 
     private class ReusablePartitionData
     {
-        private final CFMetaData metadata;
+        private final TableMetadata metadata;
         private final DecoratedKey partitionKey;
-        private final PartitionColumns columns;
+        private final RegularAndStaticColumns columns;
 
         private MutableDeletionInfo.Builder deletionBuilder;
         private MutableDeletionInfo deletionInfo;
         private BTree.Builder<Row> rowBuilder;
         private ImmutableBTreePartition built;
 
-        private ReusablePartitionData(CFMetaData metadata,
+        private ReusablePartitionData(TableMetadata metadata,
                                       DecoratedKey partitionKey,
-                                      PartitionColumns columns,
+                                      RegularAndStaticColumns columns,
                                       int initialRowCapacity)
         {
             this.metadata = metadata;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 0ab941b..7e94911 100644
--- 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -32,8 +32,10 @@ import org.slf4j.LoggerFactory;
 import net.nicoulaj.compilecommand.annotations.DontInline;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
@@ -272,7 +274,7 @@ public abstract class AbstractCommitLogSegmentManager
      *
      * Flushes any dirty CFs for this segment and any older segments, and then 
discards the segments
      */
-    void forceRecycleAll(Iterable<UUID> droppedCfs)
+    void forceRecycleAll(Iterable<TableId> droppedTables)
     {
         List<CommitLogSegment> segmentsToRecycle = new 
ArrayList<>(activeSegments);
         CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() 
- 1);
@@ -292,8 +294,8 @@ public abstract class AbstractCommitLogSegmentManager
             future.get();
 
             for (CommitLogSegment segment : activeSegments)
-                for (UUID cfId : droppedCfs)
-                    segment.markClean(cfId, CommitLogPosition.NONE, 
segment.getCurrentCommitLogPosition());
+                for (TableId tableId : droppedTables)
+                    segment.markClean(tableId, CommitLogPosition.NONE, 
segment.getCurrentCommitLogPosition());
 
             // now recycle segments that are unused, as we may not have 
triggered a discardCompletedSegments()
             // if the previous active segment was the only one to recycle 
(since an active segment isn't
@@ -367,27 +369,26 @@ public abstract class AbstractCommitLogSegmentManager
         final CommitLogPosition maxCommitLogPosition = 
segments.get(segments.size() - 1).getCurrentCommitLogPosition();
 
         // a map of CfId -> forceFlush() to ensure we only queue one flush per 
cf
-        final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
+        final Map<TableId, ListenableFuture<?>> flushes = new 
LinkedHashMap<>();
 
         for (CommitLogSegment segment : segments)
         {
-            for (UUID dirtyCFId : segment.getDirtyCFIDs())
+            for (TableId dirtyTableId : segment.getDirtyTableIds())
             {
-                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
-                if (pair == null)
+                TableMetadata metadata = 
Schema.instance.getTableMetadata(dirtyTableId);
+                if (metadata == null)
                 {
                     // even though we remove the schema entry before a final 
flush when dropping a CF,
                     // it's still possible for a writer to race and finish his 
append after the flush.
-                    logger.trace("Marking clean CF {} that doesn't exist 
anymore", dirtyCFId);
-                    segment.markClean(dirtyCFId, CommitLogPosition.NONE, 
segment.getCurrentCommitLogPosition());
+                    logger.trace("Marking clean CF {} that doesn't exist 
anymore", dirtyTableId);
+                    segment.markClean(dirtyTableId, CommitLogPosition.NONE, 
segment.getCurrentCommitLogPosition());
                 }
-                else if (!flushes.containsKey(dirtyCFId))
+                else if (!flushes.containsKey(dirtyTableId))
                 {
-                    String keyspace = pair.left;
-                    final ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
+                    final ColumnFamilyStore cfs = 
Keyspace.open(metadata.keyspace).getColumnFamilyStore(dirtyTableId);
                     // can safely call forceFlush here as we will only ever 
block (briefly) for other attempts to flush,
                     // no deadlock possibility since switchLock removal
-                    flushes.put(dirtyCFId, force ? cfs.forceFlush() : 
cfs.forceFlush(maxCommitLogPosition));
+                    flushes.put(dirtyTableId, force ? cfs.forceFlush() : 
cfs.forceFlush(maxCommitLogPosition));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 750fabc..e93a131 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -205,9 +206,9 @@ public class CommitLog implements CommitLogMBean
     /**
      * Flushes all dirty CFs, waiting for them to free and recycle any 
segments they were retaining
      */
-    public void forceRecycleAllSegments(Iterable<UUID> droppedCfs)
+    public void forceRecycleAllSegments(Iterable<TableId> droppedTables)
     {
-        segmentManager.forceRecycleAll(droppedCfs);
+        segmentManager.forceRecycleAll(droppedTables);
     }
 
     /**
@@ -215,7 +216,7 @@ public class CommitLog implements CommitLogMBean
      */
     public void forceRecycleAllSegments()
     {
-        segmentManager.forceRecycleAll(Collections.<UUID>emptyList());
+        segmentManager.forceRecycleAll(Collections.emptyList());
     }
 
     /**
@@ -295,13 +296,13 @@ public class CommitLog implements CommitLogMBean
      * Modifies the per-CF dirty cursors of any commit log segments for the 
column family according to the position
      * given. Discards any commit log segments that are no longer used.
      *
-     * @param cfId    the column family ID that was flushed
+     * @param id         the table that was flushed
      * @param lowerBound the lowest covered replay position of the flush
      * @param lowerBound the highest covered replay position of the flush
      */
-    public void discardCompletedSegments(final UUID cfId, final 
CommitLogPosition lowerBound, final CommitLogPosition upperBound)
+    public void discardCompletedSegments(final TableId id, final 
CommitLogPosition lowerBound, final CommitLogPosition upperBound)
     {
-        logger.trace("discard completed log segments for {}-{}, table {}", 
lowerBound, upperBound, cfId);
+        logger.trace("discard completed log segments for {}-{}, table {}", 
lowerBound, upperBound, id);
 
         // Go thru the active segment files, which are ordered oldest to 
newest, marking the
         // flushed CF as clean, until we reach the segment file containing the 
CommitLogPosition passed
@@ -310,7 +311,7 @@ public class CommitLog implements CommitLogMBean
         for (Iterator<CommitLogSegment> iter = 
segmentManager.getActiveSegments().iterator(); iter.hasNext();)
         {
             CommitLogSegment segment = iter.next();
-            segment.markClean(cfId, lowerBound, upperBound);
+            segment.markClean(id, lowerBound, upperBound);
 
             if (segment.isUnused())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index d25609a..1da0cee 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -29,15 +29,16 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.UnknownColumnFamilyException;
 import 
org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
 import 
org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.exceptions.UnknownTableException;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -51,7 +52,7 @@ public class CommitLogReader
     @VisibleForTesting
     public static final int ALL_MUTATIONS = -1;
     private final CRC32 checksum;
-    private final Map<UUID, AtomicInteger> invalidMutations;
+    private final Map<TableId, AtomicInteger> invalidMutations;
 
     private byte[] buffer;
 
@@ -62,7 +63,7 @@ public class CommitLogReader
         buffer = new byte[4096];
     }
 
-    public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
+    public Set<Map.Entry<TableId, AtomicInteger>> getInvalidMutations()
     {
         return invalidMutations.entrySet();
     }
@@ -367,15 +368,15 @@ public class CommitLogReader
             for (PartitionUpdate upd : mutation.getPartitionUpdates())
                 upd.validate();
         }
-        catch (UnknownColumnFamilyException ex)
+        catch (UnknownTableException ex)
         {
-            if (ex.cfId == null)
+            if (ex.id == null)
                 return;
-            AtomicInteger i = invalidMutations.get(ex.cfId);
+            AtomicInteger i = invalidMutations.get(ex.id);
             if (i == null)
             {
                 i = new AtomicInteger(1);
-                invalidMutations.put(ex.cfId, i);
+                invalidMutations.put(ex.id, i);
             }
             else
                 i.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 4d2971f..961107c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -21,15 +21,12 @@ package org.apache.cassandra.db.commitlog;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
 import com.google.common.collect.*;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
@@ -37,10 +34,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
@@ -64,7 +62,7 @@ public class CommitLogReplayer implements CommitLogReadHandler
     private final Queue<Future<Integer>> futures;
 
     private final AtomicInteger replayedCount;
-    private final Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted;
+    private final Map<TableId, IntervalSet<CommitLogPosition>> cfPersisted;
     private final CommitLogPosition globalPosition;
 
     // Used to throttle speed of replay of mutations if we pass the max 
outstanding count
@@ -78,11 +76,11 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
 
     CommitLogReplayer(CommitLog commitLog,
                       CommitLogPosition globalPosition,
-                      Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted,
+                      Map<TableId, IntervalSet<CommitLogPosition>> cfPersisted,
                       ReplayFilter replayFilter)
     {
-        this.keyspacesReplayed = new NonBlockingHashSet<Keyspace>();
-        this.futures = new ArrayDeque<Future<Integer>>();
+        this.keyspacesReplayed = new NonBlockingHashSet<>();
+        this.futures = new ArrayDeque<>();
         // count the number of replayed mutation. We don't really care about 
atomicity, but we need it to be a reference.
         this.replayedCount = new AtomicInteger();
         this.cfPersisted = cfPersisted;
@@ -95,35 +93,35 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
     public static CommitLogReplayer construct(CommitLog commitLog)
     {
         // compute per-CF and global replay intervals
-        Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted = new 
HashMap<>();
+        Map<TableId, IntervalSet<CommitLogPosition>> cfPersisted = new 
HashMap<>();
         ReplayFilter replayFilter = ReplayFilter.create();
 
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
             // but, if we've truncated the cf in question, then we need to 
need to start replay after the truncation
-            CommitLogPosition truncatedAt = 
SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
+            CommitLogPosition truncatedAt = 
SystemKeyspace.getTruncatedPosition(cfs.metadata.id);
             if (truncatedAt != null)
             {
                 // Point in time restore is taken to mean that the tables need 
to be replayed even if they were
                 // deleted at a later point in time. Any truncation record 
after that point must thus be cleared prior
                 // to replay (CASSANDRA-9195).
                 long restoreTime = commitLog.archiver.restorePointInTime;
-                long truncatedTime = 
SystemKeyspace.getTruncatedAt(cfs.metadata.cfId);
+                long truncatedTime = 
SystemKeyspace.getTruncatedAt(cfs.metadata.id);
                 if (truncatedTime > restoreTime)
                 {
                     if (replayFilter.includes(cfs.metadata))
                     {
                         logger.info("Restore point in time is before latest 
truncation of table {}.{}. Clearing truncation record.",
-                                    cfs.metadata.ksName,
-                                    cfs.metadata.cfName);
-                        
SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId);
+                                    cfs.metadata.keyspace,
+                                    cfs.metadata.name);
+                        SystemKeyspace.removeTruncationRecord(cfs.metadata.id);
                         truncatedAt = null;
                     }
                 }
             }
 
             IntervalSet<CommitLogPosition> filter = 
persistedIntervals(cfs.getLiveSSTables(), truncatedAt);
-            cfPersisted.put(cfs.metadata.cfId, filter);
+            cfPersisted.put(cfs.metadata.id, filter);
         }
         CommitLogPosition globalPosition = 
firstNotCovered(cfPersisted.values());
         logger.debug("Global replay position is {} from columnfamilies {}", 
globalPosition, FBUtilities.toString(cfPersisted));
@@ -146,7 +144,7 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
      */
     public int blockForWrites()
     {
-        for (Map.Entry<UUID, AtomicInteger> entry : 
commitLogReader.getInvalidMutations())
+        for (Map.Entry<TableId, AtomicInteger> entry : 
commitLogReader.getInvalidMutations())
             logger.warn("Skipped {} mutations from unknown (probably removed) 
CF with id {}", entry.getValue(), entry.getKey());
 
         // wait for all the writes to finish on the mutation stage
@@ -192,7 +190,7 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
             {
                 public void runMayThrow()
                 {
-                    if 
(Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+                    if 
(Schema.instance.getKeyspaceMetadata(mutation.getKeyspaceName()) == null)
                         return;
                     if (commitLogReplayer.pointInTimeExceeded(mutation))
                         return;
@@ -207,12 +205,12 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
                     Mutation newMutation = null;
                     for (PartitionUpdate update : 
commitLogReplayer.replayFilter.filter(mutation))
                     {
-                        if (Schema.instance.getCF(update.metadata().cfId) == 
null)
+                        if 
(Schema.instance.getTableMetadata(update.metadata().id) == null)
                             continue; // dropped
 
                         // replay if current segment is newer than last 
flushed one or,
                         // if it is the last known segment, if we are after 
the commit log segment position
-                        if 
(commitLogReplayer.shouldReplay(update.metadata().cfId, new 
CommitLogPosition(segmentId, entryLocation)))
+                        if 
(commitLogReplayer.shouldReplay(update.metadata().id, new 
CommitLogPosition(segmentId, entryLocation)))
                         {
                             if (newMutation == null)
                                 newMutation = new 
Mutation(mutation.getKeyspaceName(), mutation.key());
@@ -272,7 +270,7 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
     {
         public abstract Iterable<PartitionUpdate> filter(Mutation mutation);
 
-        public abstract boolean includes(CFMetaData metadata);
+        public abstract boolean includes(TableMetadataRef metadata);
 
         public static ReplayFilter create()
         {
@@ -307,7 +305,7 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
             return mutation.getPartitionUpdates();
         }
 
-        public boolean includes(CFMetaData metadata)
+        public boolean includes(TableMetadataRef metadata)
         {
             return true;
         }
@@ -332,14 +330,14 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
             {
                 public boolean apply(PartitionUpdate upd)
                 {
-                    return cfNames.contains(upd.metadata().cfName);
+                    return cfNames.contains(upd.metadata().name);
                 }
             });
         }
 
-        public boolean includes(CFMetaData metadata)
+        public boolean includes(TableMetadataRef metadata)
         {
-            return toReplay.containsEntry(metadata.ksName, metadata.cfName);
+            return toReplay.containsEntry(metadata.keyspace, metadata.name);
         }
     }
 
@@ -349,9 +347,9 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
      *
      * @return true iff replay is necessary
      */
-    private boolean shouldReplay(UUID cfId, CommitLogPosition position)
+    private boolean shouldReplay(TableId tableId, CommitLogPosition position)
     {
-        return !cfPersisted.get(cfId).contains(position);
+        return !cfPersisted.get(tableId).contains(position);
     }
 
     protected boolean pointInTimeExceeded(Mutation fm)

Reply via email to