http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java 
b/src/java/org/apache/cassandra/service/CacheService.java
index e64ec75..4cb7470 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -32,23 +32,25 @@ import javax.management.ObjectName;
 
 import com.google.common.util.concurrent.Futures;
 
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.cache.*;
 import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.partitions.CachedBTreePartition;
 import org.apache.cassandra.db.partitions.CachedPartition;
-import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -265,13 +267,13 @@ public class CacheService implements CacheServiceMBean
         keyCache.clear();
     }
 
-    public void invalidateKeyCacheForCf(Pair<String, String> ksAndCFName)
+    public void invalidateKeyCacheForCf(TableMetadata tableMetadata)
     {
         Iterator<KeyCacheKey> keyCacheIterator = keyCache.keyIterator();
         while (keyCacheIterator.hasNext())
         {
             KeyCacheKey key = keyCacheIterator.next();
-            if (key.ksAndCFName.equals(ksAndCFName))
+            if (key.sameTable(tableMetadata))
                 keyCacheIterator.remove();
         }
     }
@@ -281,24 +283,24 @@ public class CacheService implements CacheServiceMBean
         rowCache.clear();
     }
 
-    public void invalidateRowCacheForCf(Pair<String, String> ksAndCFName)
+    public void invalidateRowCacheForCf(TableMetadata tableMetadata)
     {
         Iterator<RowCacheKey> rowCacheIterator = rowCache.keyIterator();
         while (rowCacheIterator.hasNext())
         {
-            RowCacheKey rowCacheKey = rowCacheIterator.next();
-            if (rowCacheKey.ksAndCFName.equals(ksAndCFName))
+            RowCacheKey key = rowCacheIterator.next();
+            if (key.sameTable(tableMetadata))
                 rowCacheIterator.remove();
         }
     }
 
-    public void invalidateCounterCacheForCf(Pair<String, String> ksAndCFName)
+    public void invalidateCounterCacheForCf(TableMetadata tableMetadata)
     {
         Iterator<CounterCacheKey> counterCacheIterator = 
counterCache.keyIterator();
         while (counterCacheIterator.hasNext())
         {
-            CounterCacheKey counterCacheKey = counterCacheIterator.next();
-            if (counterCacheKey.ksAndCFName.equals(ksAndCFName))
+            CounterCacheKey key = counterCacheIterator.next();
+            if (key.sameTable(tableMetadata))
                 counterCacheIterator.remove();
         }
     }
@@ -353,8 +355,10 @@ public class CacheService implements CacheServiceMBean
     {
         public void serialize(CounterCacheKey key, DataOutputPlus out, 
ColumnFamilyStore cfs) throws IOException
         {
-            assert(cfs.metadata.isCounter());
-            out.write(cfs.metadata.ksAndCFBytes);
+            assert(cfs.metadata().isCounter());
+            TableMetadata tableMetadata = cfs.metadata();
+            tableMetadata.id.serialize(out);
+            out.writeUTF(tableMetadata.indexName().orElse(""));
             key.write(out);
         }
 
@@ -362,8 +366,10 @@ public class CacheService implements CacheServiceMBean
         {
             //Keyspace and CF name are deserialized by AutoSaving cache and 
used to fetch the CFS provided as a
             //parameter so they aren't deserialized here, even though they are 
serialized by this serializer
-            final CounterCacheKey cacheKey = 
CounterCacheKey.read(cfs.metadata.ksAndCFName, in);
-            if (cfs == null || !cfs.metadata.isCounter() || 
!cfs.isCounterCacheEnabled())
+            if (cfs == null)
+                return null;
+            final CounterCacheKey cacheKey = 
CounterCacheKey.read(cfs.metadata(), in);
+            if (!cfs.metadata().isCounter() || !cfs.isCounterCacheEnabled())
                 return null;
 
             return StageManager.getStage(Stage.READ).submit(new 
Callable<Pair<CounterCacheKey, ClockAndCount>>()
@@ -384,7 +390,9 @@ public class CacheService implements CacheServiceMBean
         public void serialize(RowCacheKey key, DataOutputPlus out, 
ColumnFamilyStore cfs) throws IOException
         {
             assert(!cfs.isIndex());//Shouldn't have row cache entries for 
indexes
-            out.write(cfs.metadata.ksAndCFBytes);
+            TableMetadata tableMetadata = cfs.metadata();
+            tableMetadata.id.serialize(out);
+            out.writeUTF(tableMetadata.indexName().orElse(""));
             ByteBufferUtil.writeWithLength(key.key, out);
         }
 
@@ -395,7 +403,7 @@ public class CacheService implements CacheServiceMBean
             final ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
             if (cfs == null  || !cfs.isRowCacheEnabled())
                 return null;
-            final int rowsToCache = 
cfs.metadata.params.caching.rowsPerPartitionToCache();
+            final int rowsToCache = 
cfs.metadata().params.caching.rowsPerPartitionToCache();
             assert(!cfs.isIndex());//Shouldn't have row cache entries for 
indexes
 
             return StageManager.getStage(Stage.READ).submit(new 
Callable<Pair<RowCacheKey, IRowCacheEntry>>()
@@ -404,11 +412,11 @@ public class CacheService implements CacheServiceMBean
                 {
                     DecoratedKey key = cfs.decorateKey(buffer);
                     int nowInSec = FBUtilities.nowInSeconds();
-                    SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key);
+                    SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.fullPartitionRead(cfs.metadata(), nowInSec, key);
                     try (ReadExecutionController controller = 
cmd.executionController(); UnfilteredRowIterator iter = 
cmd.queryMemtableAndDisk(cfs, controller))
                     {
                         CachedPartition toCache = 
CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, 
nowInSec), nowInSec);
-                        return Pair.create(new 
RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache);
+                        return Pair.create(new RowCacheKey(cfs.metadata(), 
key), toCache);
                     }
                 }
             });
@@ -423,13 +431,15 @@ public class CacheService implements CacheServiceMBean
             if (entry == null)
                 return;
 
-            out.write(cfs.metadata.ksAndCFBytes);
+            TableMetadata tableMetadata = cfs.metadata();
+            tableMetadata.id.serialize(out);
+            out.writeUTF(tableMetadata.indexName().orElse(""));
             ByteBufferUtil.writeWithLength(key.key, out);
             out.writeInt(key.desc.generation);
             out.writeBoolean(true);
 
-            SerializationHeader header = new SerializationHeader(false, 
cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS);
-            key.desc.getFormat().getIndexSerializer(cfs.metadata, 
key.desc.version, header).serializeForCache(entry, out);
+            SerializationHeader header = new SerializationHeader(false, 
cfs.metadata(), cfs.metadata().regularAndStaticColumns(), 
EncodingStats.NO_STATS);
+            key.desc.getFormat().getIndexSerializer(cfs.metadata(), 
key.desc.version, header).serializeForCache(entry, out);
         }
 
         public Future<Pair<KeyCacheKey, RowIndexEntry>> 
deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException
@@ -455,11 +465,11 @@ public class CacheService implements CacheServiceMBean
                 RowIndexEntry.Serializer.skipForCache(input);
                 return null;
             }
-            RowIndexEntry.IndexSerializer<?> indexSerializer = 
reader.descriptor.getFormat().getIndexSerializer(reader.metadata,
+            RowIndexEntry.IndexSerializer<?> indexSerializer = 
reader.descriptor.getFormat().getIndexSerializer(reader.metadata(),
                                                                                
                                 reader.descriptor.version,
                                                                                
                                 reader.header);
             RowIndexEntry<?> entry = 
indexSerializer.deserializeForCache(input);
-            return Futures.immediateFuture(Pair.create(new 
KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry));
+            return Futures.immediateFuture(Pair.create(new 
KeyCacheKey(cfs.metadata(), reader.descriptor, key), entry));
         }
 
         private SSTableReader findDesc(int generation, Iterable<SSTableReader> 
collection)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index a026d6e..2beb039 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -46,10 +46,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.functions.ThreadAwareSecurityManager;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.*;
@@ -244,7 +245,7 @@ public class CassandraDaemon
             if (keyspaceName.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME))
                 continue;
 
-            for (CFMetaData cfm : 
Schema.instance.getTablesAndViews(keyspaceName))
+            for (TableMetadata cfm : 
Schema.instance.getTablesAndViews(keyspaceName))
             {
                 try
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java 
b/src/java/org/apache/cassandra/service/ClientState.java
index 52e71ae..80cf810 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -28,13 +28,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.*;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.Validation;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.AuthenticationException;
@@ -255,7 +255,7 @@ public class ClientState
     {
         // Skip keyspace validation for non-authenticated users. Apparently, 
some client libraries
         // call set_keyspace() before calling login(), and we have to handle 
that.
-        if (user != null && Schema.instance.getKSMetaData(ks) == null)
+        if (user != null && Schema.instance.getKeyspaceMetadata(ks) == null)
             throw new InvalidRequestException("Keyspace '" + ks + "' does not 
exist");
         keyspace = ks;
     }
@@ -290,14 +290,20 @@ public class ClientState
     public void hasColumnFamilyAccess(String keyspace, String columnFamily, 
Permission perm)
     throws UnauthorizedException, InvalidRequestException
     {
-        Validation.validateColumnFamily(keyspace, columnFamily);
+        Schema.instance.validateTable(keyspace, columnFamily);
         hasAccess(keyspace, perm, DataResource.table(keyspace, columnFamily));
     }
 
-    public void hasColumnFamilyAccess(CFMetaData cfm, Permission perm)
+    public void hasColumnFamilyAccess(TableMetadataRef tableRef, Permission 
perm)
     throws UnauthorizedException, InvalidRequestException
     {
-        hasAccess(cfm.ksName, perm, cfm.resource);
+        hasColumnFamilyAccess(tableRef.get(), perm);
+    }
+
+    public void hasColumnFamilyAccess(TableMetadata table, Permission perm)
+    throws UnauthorizedException, InvalidRequestException
+    {
+        hasAccess(table.keyspace, perm, table.resource);
     }
 
     private void hasAccess(String keyspace, Permission perm, DataResource 
resource)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index 991c768..cc7ab2d 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -25,8 +25,8 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
@@ -122,7 +122,7 @@ public class DataResolver extends ResponseResolver
             return new MergeListener(partitionKey, columns(versions), 
isReversed(versions));
         }
 
-        private PartitionColumns columns(List<UnfilteredRowIterator> versions)
+        private RegularAndStaticColumns columns(List<UnfilteredRowIterator> 
versions)
         {
             Columns statics = Columns.NONE;
             Columns regulars = Columns.NONE;
@@ -131,11 +131,11 @@ public class DataResolver extends ResponseResolver
                 if (iter == null)
                     continue;
 
-                PartitionColumns cols = iter.columns();
+                RegularAndStaticColumns cols = iter.columns();
                 statics = statics.mergeTo(cols.statics);
                 regulars = regulars.mergeTo(cols.regulars);
             }
-            return new PartitionColumns(statics, regulars);
+            return new RegularAndStaticColumns(statics, regulars);
         }
 
         private boolean isReversed(List<UnfilteredRowIterator> versions)
@@ -175,7 +175,7 @@ public class DataResolver extends ResponseResolver
         private class MergeListener implements 
UnfilteredRowIterators.MergeListener
         {
             private final DecoratedKey partitionKey;
-            private final PartitionColumns columns;
+            private final RegularAndStaticColumns columns;
             private final boolean isReversed;
             private final PartitionUpdate[] repairs = new 
PartitionUpdate[sources.length];
 
@@ -191,7 +191,7 @@ public class DataResolver extends ResponseResolver
             // For each source, record if there is an open range to send as 
repair, and from where.
             private final ClusteringBound[] markerToRepair = new 
ClusteringBound[sources.length];
 
-            public MergeListener(DecoratedKey partitionKey, PartitionColumns 
columns, boolean isReversed)
+            public MergeListener(DecoratedKey partitionKey, 
RegularAndStaticColumns columns, boolean isReversed)
             {
                 this.partitionKey = partitionKey;
                 this.columns = columns;
@@ -211,7 +211,7 @@ public class DataResolver extends ResponseResolver
                             currentRow(i, clustering).addRowDeletion(merged);
                     }
 
-                    public void onComplexDeletion(int i, Clustering 
clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+                    public void onComplexDeletion(int i, Clustering 
clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
                     {
                         if (merged != null && !merged.equals(original))
                             currentRow(i, 
clustering).addComplexDeletion(column, merged);
@@ -231,7 +231,7 @@ public class DataResolver extends ResponseResolver
                         // semantic (making sure we can always distinguish 
between a row that doesn't exist from one that do exist but has
                         /// no value for the column requested by the user) and 
so it won't be unexpected by the user that those columns are
                         // not repaired.
-                        ColumnDefinition column = cell.column();
+                        ColumnMetadata column = cell.column();
                         ColumnFilter filter = command.columnFilter();
                         return column.isComplex() ? 
filter.fetchedCellIsQueried(column, cell.path()) : 
filter.fetchedColumnIsQueried(column);
                     }
@@ -422,12 +422,12 @@ public class DataResolver extends ResponseResolver
 
         private class ShortReadRowProtection extends Transformation implements 
MoreRows<UnfilteredRowIterator>
         {
-            final CFMetaData metadata;
+            final TableMetadata metadata;
             final DecoratedKey partitionKey;
             Clustering lastClustering;
             int lastCount = 0;
 
-            private ShortReadRowProtection(CFMetaData metadata, DecoratedKey 
partitionKey)
+            private ShortReadRowProtection(TableMetadata metadata, 
DecoratedKey partitionKey)
             {
                 this.metadata = metadata;
                 this.partitionKey = partitionKey;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/MigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationListener.java 
b/src/java/org/apache/cassandra/service/MigrationListener.java
deleted file mode 100644
index 19d2592..0000000
--- a/src/java/org/apache/cassandra/service/MigrationListener.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.util.List;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-
-public abstract class MigrationListener
-{
-    public void onCreateKeyspace(String ksName)
-    {
-    }
-
-    public void onCreateColumnFamily(String ksName, String cfName)
-    {
-    }
-
-    public void onCreateView(String ksName, String viewName)
-    {
-        onCreateColumnFamily(ksName, viewName);
-    }
-
-    public void onCreateUserType(String ksName, String typeName)
-    {
-    }
-
-    public void onCreateFunction(String ksName, String functionName, 
List<AbstractType<?>> argTypes)
-    {
-    }
-
-    public void onCreateAggregate(String ksName, String aggregateName, 
List<AbstractType<?>> argTypes)
-    {
-    }
-
-    public void onUpdateKeyspace(String ksName)
-    {
-    }
-
-    // the boolean flag indicates whether the change that triggered this event 
may have a substantive
-    // impact on statements using the column family.
-    public void onUpdateColumnFamily(String ksName, String cfName, boolean 
affectsStatements)
-    {
-    }
-
-    public void onUpdateView(String ksName, String viewName, boolean 
columnsDidChange)
-    {
-        onUpdateColumnFamily(ksName, viewName, columnsDidChange);
-    }
-
-    public void onUpdateUserType(String ksName, String typeName)
-    {
-    }
-
-    public void onUpdateFunction(String ksName, String functionName, 
List<AbstractType<?>> argTypes)
-    {
-    }
-
-    public void onUpdateAggregate(String ksName, String aggregateName, 
List<AbstractType<?>> argTypes)
-    {
-    }
-
-    public void onDropKeyspace(String ksName)
-    {
-    }
-
-    public void onDropColumnFamily(String ksName, String cfName)
-    {
-    }
-
-    public void onDropView(String ksName, String viewName)
-    {
-        onDropColumnFamily(ksName, viewName);
-    }
-
-    public void onDropUserType(String ksName, String typeName)
-    {
-    }
-
-    public void onDropFunction(String ksName, String functionName, 
List<AbstractType<?>> argTypes)
-    {
-    }
-
-    public void onDropAggregate(String ksName, String aggregateName, 
List<AbstractType<?>> argTypes)
-    {
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java 
b/src/java/org/apache/cassandra/service/MigrationManager.java
deleted file mode 100644
index f9e4aff..0000000
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ /dev/null
@@ -1,626 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.*;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
-import org.apache.cassandra.config.ViewDefinition;
-import org.apache.cassandra.cql3.functions.UDAggregate;
-import org.apache.cassandra.cql3.functions.UDFunction;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.UserType;
-import org.apache.cassandra.exceptions.AlreadyExistsException;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.*;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class MigrationManager
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(MigrationManager.class);
-
-    public static final MigrationManager instance = new MigrationManager();
-
-    private static final RuntimeMXBean runtimeMXBean = 
ManagementFactory.getRuntimeMXBean();
-
-    public static final int MIGRATION_DELAY_IN_MS = 60000;
-
-    private static final int MIGRATION_TASK_WAIT_IN_SECONDS = 
Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", 
"1"));
-
-    private final List<MigrationListener> listeners = new 
CopyOnWriteArrayList<>();
-
-    private MigrationManager() {}
-
-    public void register(MigrationListener listener)
-    {
-        listeners.add(listener);
-    }
-
-    public void unregister(MigrationListener listener)
-    {
-        listeners.remove(listener);
-    }
-
-    public static void scheduleSchemaPull(InetAddress endpoint, EndpointState 
state)
-    {
-        VersionedValue value = 
state.getApplicationState(ApplicationState.SCHEMA);
-
-        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != 
null)
-            maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
-    }
-
-    /**
-     * If versions differ this node sends request with local migration list to 
the endpoint
-     * and expecting to receive a list of migrations to apply locally.
-     */
-    private static void maybeScheduleSchemaPull(final UUID theirVersion, final 
InetAddress endpoint)
-    {
-        if ((Schema.instance.getVersion() != null && 
Schema.instance.getVersion().equals(theirVersion)) || 
!shouldPullSchemaFrom(endpoint))
-        {
-            logger.debug("Not pulling schema because versions match or 
shouldPullSchemaFrom returned false");
-            return;
-        }
-
-        if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) 
|| runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
-        {
-            // If we think we may be bootstrapping or have recently started, 
submit MigrationTask immediately
-            logger.debug("Submitting migration task for {}", endpoint);
-            submitMigrationTask(endpoint);
-        }
-        else
-        {
-            // Include a delay to make sure we have a chance to apply any 
changes being
-            // pushed out simultaneously. See CASSANDRA-5025
-            Runnable runnable = () ->
-            {
-                // grab the latest version of the schema since it may have 
changed again since the initial scheduling
-                EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-                if (epState == null)
-                {
-                    logger.debug("epState vanished for {}, not submitting 
migration task", endpoint);
-                    return;
-                }
-                VersionedValue value = 
epState.getApplicationState(ApplicationState.SCHEMA);
-                UUID currentVersion = UUID.fromString(value.value);
-                if (Schema.instance.getVersion().equals(currentVersion))
-                {
-                    logger.debug("not submitting migration task for {} because 
our versions match", endpoint);
-                    return;
-                }
-                logger.debug("submitting migration task for {}", endpoint);
-                submitMigrationTask(endpoint);
-            };
-            ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 
MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    private static Future<?> submitMigrationTask(InetAddress endpoint)
-    {
-        /*
-         * Do not de-ref the future because that causes distributed deadlock 
(CASSANDRA-3832) because we are
-         * running in the gossip stage.
-         */
-        return StageManager.getStage(Stage.MIGRATION).submit(new 
MigrationTask(endpoint));
-    }
-
-    public static boolean shouldPullSchemaFrom(InetAddress endpoint)
-    {
-        /*
-         * Don't request schema from nodes with a differnt or unknonw major 
version (may have incompatible schema)
-         * Don't request schema from fat clients
-         */
-        return MessagingService.instance().knowsVersion(endpoint)
-                && MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version
-                && !Gossiper.instance.isGossipOnlyMember(endpoint);
-    }
-
-    public static boolean isReadyForBootstrap()
-    {
-        return MigrationTask.getInflightTasks().isEmpty();
-    }
-
-    public static void waitUntilReadyForBootstrap()
-    {
-        CountDownLatch completionLatch;
-        while ((completionLatch = MigrationTask.getInflightTasks().poll()) != 
null)
-        {
-            try
-            {
-                if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, 
TimeUnit.SECONDS))
-                    logger.error("Migration task failed to complete");
-            }
-            catch (InterruptedException e)
-            {
-                Thread.currentThread().interrupt();
-                logger.error("Migration task was interrupted");
-            }
-        }
-    }
-
-    public void notifyCreateKeyspace(KeyspaceMetadata ksm)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onCreateKeyspace(ksm.name);
-    }
-
-    public void notifyCreateColumnFamily(CFMetaData cfm)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
-    }
-
-    public void notifyCreateView(ViewDefinition view)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onCreateView(view.ksName, view.viewName);
-    }
-
-    public void notifyCreateUserType(UserType ut)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
-    }
-
-    public void notifyCreateFunction(UDFunction udf)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onCreateFunction(udf.name().keyspace, udf.name().name, 
udf.argTypes());
-    }
-
-    public void notifyCreateAggregate(UDAggregate udf)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onCreateAggregate(udf.name().keyspace, udf.name().name, 
udf.argTypes());
-    }
-
-    public void notifyUpdateKeyspace(KeyspaceMetadata ksm)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onUpdateKeyspace(ksm.name);
-    }
-
-    public void notifyUpdateColumnFamily(CFMetaData cfm, boolean 
columnsDidChange)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName, 
columnsDidChange);
-    }
-
-    public void notifyUpdateView(ViewDefinition view, boolean columnsDidChange)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onUpdateView(view.ksName, view.viewName, 
columnsDidChange);
-    }
-
-    public void notifyUpdateUserType(UserType ut)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
-
-        // FIXME: remove when we get rid of AbstractType in metadata. Doesn't 
really belong anywhere.
-        Schema.instance.getKSMetaData(ut.keyspace).functions.udfs().forEach(f 
-> f.userTypeUpdated(ut.keyspace, ut.getNameAsString()));
-    }
-
-    public void notifyUpdateFunction(UDFunction udf)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onUpdateFunction(udf.name().keyspace, udf.name().name, 
udf.argTypes());
-    }
-
-    public void notifyUpdateAggregate(UDAggregate udf)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onUpdateAggregate(udf.name().keyspace, udf.name().name, 
udf.argTypes());
-    }
-
-    public void notifyDropKeyspace(KeyspaceMetadata ksm)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onDropKeyspace(ksm.name);
-    }
-
-    public void notifyDropColumnFamily(CFMetaData cfm)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
-    }
-
-    public void notifyDropView(ViewDefinition view)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onDropView(view.ksName, view.viewName);
-    }
-
-    public void notifyDropUserType(UserType ut)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onDropUserType(ut.keyspace, ut.getNameAsString());
-    }
-
-    public void notifyDropFunction(UDFunction udf)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onDropFunction(udf.name().keyspace, udf.name().name, 
udf.argTypes());
-    }
-
-    public void notifyDropAggregate(UDAggregate udf)
-    {
-        for (MigrationListener listener : listeners)
-            listener.onDropAggregate(udf.name().keyspace, udf.name().name, 
udf.argTypes());
-    }
-
-    public static void announceNewKeyspace(KeyspaceMetadata ksm) throws 
ConfigurationException
-    {
-        announceNewKeyspace(ksm, false);
-    }
-
-    public static void announceNewKeyspace(KeyspaceMetadata ksm, boolean 
announceLocally) throws ConfigurationException
-    {
-        announceNewKeyspace(ksm, FBUtilities.timestampMicros(), 
announceLocally);
-    }
-
-    public static void announceNewKeyspace(KeyspaceMetadata ksm, long 
timestamp, boolean announceLocally) throws ConfigurationException
-    {
-        ksm.validate();
-
-        if (Schema.instance.getKSMetaData(ksm.name) != null)
-            throw new AlreadyExistsException(ksm.name);
-
-        logger.info("Create new Keyspace: {}", ksm);
-        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), 
announceLocally);
-    }
-
-    public static void announceNewColumnFamily(CFMetaData cfm) throws 
ConfigurationException
-    {
-        announceNewColumnFamily(cfm, false);
-    }
-
-    public static void announceNewColumnFamily(CFMetaData cfm, boolean 
announceLocally) throws ConfigurationException
-    {
-        announceNewColumnFamily(cfm, announceLocally, true);
-    }
-
-    /**
-     * Announces the table even if the definition is already know locally.
-     * This should generally be avoided but is used internally when we want to 
force the most up to date version of
-     * a system table schema (Note that we don't know if the schema we force 
_is_ the most recent version or not, we
-     * just rely on idempotency to basically ignore that announce if it's not. 
That's why we can't use announceUpdateColumnFamily,
-     * it would for instance delete new columns if this is not called with the 
most up-to-date version)
-     *
-     * Note that this is only safe for system tables where we know the cfId is 
fixed and will be the same whatever version
-     * of the definition is used.
-     */
-    public static void forceAnnounceNewColumnFamily(CFMetaData cfm) throws 
ConfigurationException
-    {
-        announceNewColumnFamily(cfm, false, false);
-    }
-
-    private static void announceNewColumnFamily(CFMetaData cfm, boolean 
announceLocally, boolean throwOnDuplicate) throws ConfigurationException
-    {
-        cfm.validate();
-
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(cfm.ksName);
-        if (ksm == null)
-            throw new ConfigurationException(String.format("Cannot add table 
'%s' to non existing keyspace '%s'.", cfm.cfName, cfm.ksName));
-        // If we have a table or a view which has the same name, we can't add 
a new one
-        else if (throwOnDuplicate && ksm.getTableOrViewNullable(cfm.cfName) != 
null)
-            throw new AlreadyExistsException(cfm.ksName, cfm.cfName);
-
-        logger.info("Create new table: {}", cfm);
-        announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceNewView(ViewDefinition view, boolean 
announceLocally) throws ConfigurationException
-    {
-        view.metadata.validate();
-
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(view.ksName);
-        if (ksm == null)
-            throw new ConfigurationException(String.format("Cannot add table 
'%s' to non existing keyspace '%s'.", view.viewName, view.ksName));
-        else if (ksm.getTableOrViewNullable(view.viewName) != null)
-            throw new AlreadyExistsException(view.ksName, view.viewName);
-
-        logger.info("Create new view: {}", view);
-        announce(SchemaKeyspace.makeCreateViewMutation(ksm, view, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceNewType(UserType newType, boolean 
announceLocally)
-    {
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(newType.keyspace);
-        announce(SchemaKeyspace.makeCreateTypeMutation(ksm, newType, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceNewFunction(UDFunction udf, boolean 
announceLocally)
-    {
-        logger.info("Create scalar function '{}'", udf.name());
-        KeyspaceMetadata ksm = 
Schema.instance.getKSMetaData(udf.name().keyspace);
-        announce(SchemaKeyspace.makeCreateFunctionMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceNewAggregate(UDAggregate udf, boolean 
announceLocally)
-    {
-        logger.info("Create aggregate function '{}'", udf.name());
-        KeyspaceMetadata ksm = 
Schema.instance.getKSMetaData(udf.name().keyspace);
-        announce(SchemaKeyspace.makeCreateAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceKeyspaceUpdate(KeyspaceMetadata ksm) throws 
ConfigurationException
-    {
-        announceKeyspaceUpdate(ksm, false);
-    }
-
-    public static void announceKeyspaceUpdate(KeyspaceMetadata ksm, boolean 
announceLocally) throws ConfigurationException
-    {
-        ksm.validate();
-
-        KeyspaceMetadata oldKsm = Schema.instance.getKSMetaData(ksm.name);
-        if (oldKsm == null)
-            throw new ConfigurationException(String.format("Cannot update non 
existing keyspace '%s'.", ksm.name));
-
-        logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, 
ksm);
-        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, 
ksm.params, FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceColumnFamilyUpdate(CFMetaData cfm) throws 
ConfigurationException
-    {
-        announceColumnFamilyUpdate(cfm, false);
-    }
-
-    public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean 
announceLocally) throws ConfigurationException
-    {
-        cfm.validate();
-
-        CFMetaData oldCfm = Schema.instance.getCFMetaData(cfm.ksName, 
cfm.cfName);
-        if (oldCfm == null)
-            throw new ConfigurationException(String.format("Cannot update non 
existing table '%s' in keyspace '%s'.", cfm.cfName, cfm.ksName));
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(cfm.ksName);
-
-        oldCfm.validateCompatibility(cfm);
-
-        logger.info("Update table '{}/{}' From {} To {}", cfm.ksName, 
cfm.cfName, oldCfm, cfm);
-        announce(SchemaKeyspace.makeUpdateTableMutation(ksm, oldCfm, cfm, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceViewUpdate(ViewDefinition view, boolean 
announceLocally) throws ConfigurationException
-    {
-        view.metadata.validate();
-
-        ViewDefinition oldView = Schema.instance.getView(view.ksName, 
view.viewName);
-        if (oldView == null)
-            throw new ConfigurationException(String.format("Cannot update non 
existing materialized view '%s' in keyspace '%s'.", view.viewName, 
view.ksName));
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(view.ksName);
-
-        oldView.metadata.validateCompatibility(view.metadata);
-
-        logger.info("Update view '{}/{}' From {} To {}", view.ksName, 
view.viewName, oldView, view);
-        announce(SchemaKeyspace.makeUpdateViewMutation(ksm, oldView, view, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceTypeUpdate(UserType updatedType, boolean 
announceLocally)
-    {
-        logger.info("Update type '{}.{}' to {}", updatedType.keyspace, 
updatedType.getNameAsString(), updatedType);
-        announceNewType(updatedType, announceLocally);
-    }
-
-    public static void announceKeyspaceDrop(String ksName) throws 
ConfigurationException
-    {
-        announceKeyspaceDrop(ksName, false);
-    }
-
-    public static void announceKeyspaceDrop(String ksName, boolean 
announceLocally) throws ConfigurationException
-    {
-        KeyspaceMetadata oldKsm = Schema.instance.getKSMetaData(ksName);
-        if (oldKsm == null)
-            throw new ConfigurationException(String.format("Cannot drop non 
existing keyspace '%s'.", ksName));
-
-        logger.info("Drop Keyspace '{}'", oldKsm.name);
-        announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceColumnFamilyDrop(String ksName, String cfName) 
throws ConfigurationException
-    {
-        announceColumnFamilyDrop(ksName, cfName, false);
-    }
-
-    public static void announceColumnFamilyDrop(String ksName, String cfName, 
boolean announceLocally) throws ConfigurationException
-    {
-        CFMetaData oldCfm = Schema.instance.getCFMetaData(ksName, cfName);
-        if (oldCfm == null)
-            throw new ConfigurationException(String.format("Cannot drop non 
existing table '%s' in keyspace '%s'.", cfName, ksName));
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName);
-
-        logger.info("Drop table '{}/{}'", oldCfm.ksName, oldCfm.cfName);
-        announce(SchemaKeyspace.makeDropTableMutation(ksm, oldCfm, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceViewDrop(String ksName, String viewName, 
boolean announceLocally) throws ConfigurationException
-    {
-        ViewDefinition view = Schema.instance.getView(ksName, viewName);
-        if (view == null)
-            throw new ConfigurationException(String.format("Cannot drop non 
existing materialized view '%s' in keyspace '%s'.", viewName, ksName));
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName);
-
-        logger.info("Drop table '{}/{}'", view.ksName, view.viewName);
-        announce(SchemaKeyspace.makeDropViewMutation(ksm, view, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceTypeDrop(UserType droppedType)
-    {
-        announceTypeDrop(droppedType, false);
-    }
-
-    public static void announceTypeDrop(UserType droppedType, boolean 
announceLocally)
-    {
-        KeyspaceMetadata ksm = 
Schema.instance.getKSMetaData(droppedType.keyspace);
-        announce(SchemaKeyspace.dropTypeFromSchemaMutation(ksm, droppedType, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceFunctionDrop(UDFunction udf, boolean 
announceLocally)
-    {
-        logger.info("Drop scalar function overload '{}' args '{}'", 
udf.name(), udf.argTypes());
-        KeyspaceMetadata ksm = 
Schema.instance.getKSMetaData(udf.name().keyspace);
-        announce(SchemaKeyspace.makeDropFunctionMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceAggregateDrop(UDAggregate udf, boolean 
announceLocally)
-    {
-        logger.info("Drop aggregate function overload '{}' args '{}'", 
udf.name(), udf.argTypes());
-        KeyspaceMetadata ksm = 
Schema.instance.getKSMetaData(udf.name().keyspace);
-        announce(SchemaKeyspace.makeDropAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    /**
-     * actively announce a new version to active hosts via rpc
-     * @param schema The schema mutation to be applied
-     */
-    private static void announce(Mutation.SimpleBuilder schema, boolean 
announceLocally)
-    {
-        List<Mutation> mutations = Collections.singletonList(schema.build());
-
-        if (announceLocally)
-            SchemaKeyspace.mergeSchema(mutations);
-        else
-            FBUtilities.waitOnFuture(announce(mutations));
-    }
-
-    private static void pushSchemaMutation(InetAddress endpoint, 
Collection<Mutation> schema)
-    {
-        MessageOut<Collection<Mutation>> msg = new 
MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
-                                                                schema,
-                                                                
MigrationsSerializer.instance);
-        MessagingService.instance().sendOneWay(msg, endpoint);
-    }
-
-    // Returns a future on the local application of the schema
-    private static Future<?> announce(final Collection<Mutation> schema)
-    {
-        Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new 
WrappedRunnable()
-        {
-            protected void runMayThrow() throws ConfigurationException
-            {
-                SchemaKeyspace.mergeSchemaAndAnnounceVersion(schema);
-            }
-        });
-
-        for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
-        {
-            // only push schema to nodes with known and equal versions
-            if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
-                    MessagingService.instance().knowsVersion(endpoint) &&
-                    MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version)
-                pushSchemaMutation(endpoint, schema);
-        }
-
-        return f;
-    }
-
-    /**
-     * Announce my version passively over gossip.
-     * Used to notify nodes as they arrive in the cluster.
-     *
-     * @param version The schema version to announce
-     */
-    public static void passiveAnnounce(UUID version)
-    {
-        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.schema(version));
-        logger.debug("Gossiping my schema version {}", version);
-    }
-
-    /**
-     * Clear all locally stored schema information and reset schema to initial 
state.
-     * Called by user (via JMX) who wants to get rid of schema disagreement.
-     */
-    public static void resetLocalSchema()
-    {
-        logger.info("Starting local schema reset...");
-
-        logger.debug("Truncating schema tables...");
-
-        SchemaKeyspace.truncate();
-
-        logger.debug("Clearing local schema keyspace definitions...");
-
-        Schema.instance.clear();
-
-        Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
-        liveEndpoints.remove(FBUtilities.getBroadcastAddress());
-
-        // force migration if there are nodes around
-        for (InetAddress node : liveEndpoints)
-        {
-            if (shouldPullSchemaFrom(node))
-            {
-                logger.debug("Requesting schema from {}", node);
-                FBUtilities.waitOnFuture(submitMigrationTask(node));
-                break;
-            }
-        }
-
-        logger.info("Local schema reset is complete.");
-    }
-
-    public static class MigrationsSerializer implements 
IVersionedSerializer<Collection<Mutation>>
-    {
-        public static MigrationsSerializer instance = new 
MigrationsSerializer();
-
-        public void serialize(Collection<Mutation> schema, DataOutputPlus out, 
int version) throws IOException
-        {
-            out.writeInt(schema.size());
-            for (Mutation mutation : schema)
-                Mutation.serializer.serialize(mutation, out, version);
-        }
-
-        public Collection<Mutation> deserialize(DataInputPlus in, int version) 
throws IOException
-        {
-            int count = in.readInt();
-            Collection<Mutation> schema = new ArrayList<>(count);
-
-            for (int i = 0; i < count; i++)
-                schema.add(Mutation.serializer.deserialize(in, version));
-
-            return schema;
-        }
-
-        public long serializedSize(Collection<Mutation> schema, int version)
-        {
-            int size = TypeSizes.sizeof(schema.size());
-            for (Mutation mutation : schema)
-                size += Mutation.serializer.serializedSize(mutation, version);
-            return size;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java 
b/src/java/org/apache/cassandra/service/MigrationTask.java
deleted file mode 100644
index 052b89e..0000000
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-
-class MigrationTask extends WrappedRunnable
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(MigrationTask.class);
-
-    private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = 
new ConcurrentLinkedQueue<>();
-
-    private static final Set<BootstrapState> monitoringBootstrapStates = 
EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
-
-    private final InetAddress endpoint;
-
-    MigrationTask(InetAddress endpoint)
-    {
-        this.endpoint = endpoint;
-    }
-
-    public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
-    {
-        return inflightTasks;
-    }
-
-    public void runMayThrow() throws Exception
-    {
-        // There is a chance that quite some time could have passed between 
now and the MM#maybeScheduleSchemaPull(),
-        // potentially enough for the endpoint node to restart - which is an 
issue if it does restart upgraded, with
-        // a higher major.
-        if (!MigrationManager.shouldPullSchemaFrom(endpoint))
-        {
-            logger.info("Skipped sending a migration request: node {} has a 
higher major version now.", endpoint);
-            return;
-        }
-
-        if (!FailureDetector.instance.isAlive(endpoint))
-        {
-            logger.debug("Can't send schema pull request: node {} is down.", 
endpoint);
-            return;
-        }
-
-        MessageOut message = new 
MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, 
MigrationManager.MigrationsSerializer.instance);
-
-        final CountDownLatch completionLatch = new CountDownLatch(1);
-
-        IAsyncCallback<Collection<Mutation>> cb = new 
IAsyncCallback<Collection<Mutation>>()
-        {
-            @Override
-            public void response(MessageIn<Collection<Mutation>> message)
-            {
-                try
-                {
-                    
SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload);
-                }
-                catch (ConfigurationException e)
-                {
-                    logger.error("Configuration exception merging remote 
schema", e);
-                }
-                finally
-                {
-                    completionLatch.countDown();
-                }
-            }
-
-            public boolean isLatencyForSnitch()
-            {
-                return false;
-            }
-        };
-
-        // Only save the latches if we need bootstrap or are bootstrapping
-        if 
(monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
-            inflightTasks.offer(completionLatch);
-
-        MessagingService.instance().sendRR(message, endpoint, cb);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java 
b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 5b1aa0d..297774a 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.service;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java 
b/src/java/org/apache/cassandra/service/ReadCallback.java
index 440e35a..d63e860 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -77,9 +77,9 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
     {
         this(resolver,
              consistencyLevel,
-             
consistencyLevel.blockFor(Keyspace.open(command.metadata().ksName)),
+             
consistencyLevel.blockFor(Keyspace.open(command.metadata().keyspace)),
              command,
-             Keyspace.open(command.metadata().ksName),
+             Keyspace.open(command.metadata().keyspace),
              filteredEndpoints,
              queryStartNanoTime);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/ReadRepairDecision.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadRepairDecision.java 
b/src/java/org/apache/cassandra/service/ReadRepairDecision.java
new file mode 100644
index 0000000..8d2ced7
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ReadRepairDecision.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+public enum ReadRepairDecision
+{
+    NONE, GLOBAL, DC_LOCAL;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java 
b/src/java/org/apache/cassandra/service/StartupChecks.java
index 75f7788..767bfc6 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -29,10 +29,11 @@ import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SystemKeyspace;
@@ -319,7 +320,7 @@ public class StartupChecks
             // we do a one-off scrub of the system keyspace first; we can't 
load the list of the rest of the keyspaces,
             // until system keyspace is opened.
 
-            for (CFMetaData cfm : 
Schema.instance.getTablesAndViews(SchemaConstants.SYSTEM_KEYSPACE_NAME))
+            for (TableMetadata cfm : 
Schema.instance.getTablesAndViews(SchemaConstants.SYSTEM_KEYSPACE_NAME))
                 ColumnFamilyStore.scrubDataDirectories(cfm);
 
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index d5a9f47..0585717 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -44,10 +44,10 @@ import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
@@ -236,7 +236,7 @@ public class StorageProxy implements StorageProxyMBean
             consistencyForPaxos.validateForCas();
             consistencyForCommit.validateForCasCommit(keyspaceName);
 
-            CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, 
cfName);
+            TableMetadata metadata = 
Schema.instance.getTableMetadata(keyspaceName, cfName);
 
             long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
             while (System.nanoTime() - queryStartNanoTime < timeout)
@@ -348,11 +348,11 @@ public class StorageProxy implements StorageProxyMBean
         };
     }
 
-    private static Pair<List<InetAddress>, Integer> 
getPaxosParticipants(CFMetaData cfm, DecoratedKey key, ConsistencyLevel 
consistencyForPaxos) throws UnavailableException
+    private static Pair<List<InetAddress>, Integer> 
getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel 
consistencyForPaxos) throws UnavailableException
     {
         Token tk = key.getToken();
-        List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(cfm.ksName, tk);
-        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, cfm.ksName);
+        List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk);
+        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
metadata.keyspace);
         if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
         {
             // Restrict naturalEndpoints and pendingEndpoints to node in the 
local DC only
@@ -387,7 +387,7 @@ public class StorageProxy implements StorageProxyMBean
      */
     private static Pair<UUID, Integer> beginAndRepairPaxos(long 
queryStartNanoTime,
                                                            DecoratedKey key,
-                                                           CFMetaData metadata,
+                                                           TableMetadata 
metadata,
                                                            List<InetAddress> 
liveEndpoints,
                                                            int 
requiredParticipants,
                                                            ConsistencyLevel 
consistencyForPaxos,
@@ -482,7 +482,7 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         recordCasContention(contentions);
-        throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
+        throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(metadata.keyspace)));
     }
 
     /**
@@ -528,7 +528,7 @@ public class StorageProxy implements StorageProxyMBean
     private static void commitPaxos(Commit proposal, ConsistencyLevel 
consistencyLevel, boolean shouldHint, long queryStartNanoTime) throws 
WriteTimeoutException
     {
         boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY;
-        Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
+        Keyspace keyspace = Keyspace.open(proposal.update.metadata().keyspace);
 
         Token tk = proposal.update.partitionKey().getToken();
         List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk);
@@ -1513,7 +1513,7 @@ public class StorageProxy implements StorageProxyMBean
     private static boolean systemKeyspaceQuery(List<? extends ReadCommand> 
cmds)
     {
         for (ReadCommand cmd : cmds)
-            if (!SchemaConstants.isSystemKeyspace(cmd.metadata().ksName))
+            if (!SchemaConstants.isSystemKeyspace(cmd.metadata().keyspace))
                 return false;
         return true;
     }
@@ -1566,7 +1566,7 @@ public class StorageProxy implements StorageProxyMBean
 
         long start = System.nanoTime();
         SinglePartitionReadCommand command = group.commands.get(0);
-        CFMetaData metadata = command.metadata();
+        TableMetadata metadata = command.metadata();
         DecoratedKey key = command.partitionKey();
 
         PartitionIterator result = null;
@@ -1590,7 +1590,7 @@ public class StorageProxy implements StorageProxyMBean
             }
             catch (WriteTimeoutException e)
             {
-                throw new ReadTimeoutException(consistencyLevel, 0, 
consistencyLevel.blockFor(Keyspace.open(metadata.ksName)), false);
+                throw new ReadTimeoutException(consistencyLevel, 0, 
consistencyLevel.blockFor(Keyspace.open(metadata.keyspace)), false);
             }
             catch (WriteFailureException e)
             {
@@ -1626,7 +1626,7 @@ public class StorageProxy implements StorageProxyMBean
             readMetrics.addNano(latency);
             casReadMetrics.addNano(latency);
             readMetricsMap.get(consistencyLevel).addNano(latency);
-            
Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).metric.coordinatorReadLatency.update(latency,
 TimeUnit.NANOSECONDS);
+            
Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name).metric.coordinatorReadLatency.update(latency,
 TimeUnit.NANOSECONDS);
         }
 
         return result;
@@ -1764,7 +1764,7 @@ public class StorageProxy implements StorageProxyMBean
                 ReadRepairMetrics.repairedBlocking.mark();
 
                 // Do a full data read to resolve the correct response (and 
repair node that need be)
-                Keyspace keyspace = Keyspace.open(command.metadata().ksName);
+                Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
                 DataResolver resolver = new DataResolver(keyspace, command, 
ConsistencyLevel.ALL, executor.handler.endpoints.size(), queryStartNanoTime);
                 repairHandler = new ReadCallback(resolver,
                                                  ConsistencyLevel.ALL,
@@ -1805,7 +1805,7 @@ public class StorageProxy implements StorageProxyMBean
                     logger.trace("Timed out waiting on digest mismatch repair 
requests");
                 // the caught exception here will have CL.ALL from the repair 
command,
                 // not whatever CL the initial command was at (CASSANDRA-7947)
-                int blockFor = 
consistency.blockFor(Keyspace.open(command.metadata().ksName));
+                int blockFor = 
consistency.blockFor(Keyspace.open(command.metadata().keyspace));
                 throw new ReadTimeoutException(consistency, blockFor-1, 
blockFor, true);
             }
         }
@@ -1903,7 +1903,7 @@ public class StorageProxy implements StorageProxyMBean
      */
     private static float estimateResultsPerRange(PartitionRangeReadCommand 
command, Keyspace keyspace)
     {
-        ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().cfId);
+        ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
         Index index = command.getIndex(cfs);
         float maxExpectedResults = index == null
                                  ? command.limits().estimateTotalResults(cfs)
@@ -2223,7 +2223,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         Tracing.trace("Computing ranges to query");
 
-        Keyspace keyspace = Keyspace.open(command.metadata().ksName);
+        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
         RangeIterator ranges = new RangeIterator(command, keyspace, 
consistencyLevel);
 
         // our estimate of how many result rows there will be per-range
@@ -2274,7 +2274,7 @@ public class StorageProxy implements StorageProxyMBean
                 return false;
             }
         };
-        // an empty message acts as a request to the SchemaCheckVerbHandler.
+        // an empty message acts as a request to the SchemaVersionVerbHandler.
         MessageOut message = new 
MessageOut(MessagingService.Verb.SCHEMA_CHECK);
         for (InetAddress endpoint : liveHosts)
             MessagingService.instance().sendRR(message, endpoint, cb);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 6cb67fc..62f4871 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -52,7 +52,7 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.Appender;
 import ch.qos.logback.core.hook.DelayingShutdownHook;
 import org.apache.cassandra.auth.AuthKeyspace;
-import org.apache.cassandra.auth.AuthMigrationListener;
+import org.apache.cassandra.auth.AuthSchemaChangeListener;
 import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
 import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
 import org.apache.cassandra.batchlog.BatchlogManager;
@@ -60,11 +60,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
-import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -85,6 +81,15 @@ import org.apache.cassandra.repair.*;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.SchemaPullVerbHandler;
+import org.apache.cassandra.schema.SchemaPushVerbHandler;
+import org.apache.cassandra.schema.SchemaVersionVerbHandler;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.service.paxos.CommitVerbHandler;
 import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
@@ -285,9 +290,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK,
 new GossipDigestAckVerbHandler());
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
 new GossipDigestAck2VerbHandler());
 
-        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.DEFINITIONS_UPDATE,
 new DefinitionsUpdateVerbHandler());
-        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SCHEMA_CHECK,
 new SchemaCheckVerbHandler());
-        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST,
 new MigrationRequestVerbHandler());
+        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.DEFINITIONS_UPDATE,
 new SchemaPushVerbHandler());
+        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SCHEMA_CHECK,
 new SchemaVersionVerbHandler());
+        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST,
 new SchemaPullVerbHandler());
 
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT,
 new SnapshotVerbHandler());
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, 
new EchoVerbHandler());
@@ -1045,7 +1050,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         DatabaseDescriptor.getRoleManager().setup();
         DatabaseDescriptor.getAuthenticator().setup();
         DatabaseDescriptor.getAuthorizer().setup();
-        MigrationManager.instance.register(new AuthMigrationListener());
+        Schema.instance.registerListener(new AuthSchemaChangeListener());
         authSetupComplete = true;
     }
 
@@ -1079,23 +1084,23 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         // (#8162 being an example), so even if the table definition exists, 
we still need to force the "current"
         // version of the schema, the one the node will be expecting.
 
-        KeyspaceMetadata defined = 
Schema.instance.getKSMetaData(expected.name);
+        KeyspaceMetadata defined = 
Schema.instance.getKeyspaceMetadata(expected.name);
         // If the keyspace doesn't exist, create it
         if (defined == null)
         {
             maybeAddKeyspace(expected);
-            defined = Schema.instance.getKSMetaData(expected.name);
+            defined = Schema.instance.getKeyspaceMetadata(expected.name);
         }
 
         // While the keyspace exists, it might miss table or have outdated one
         // There is also the potential for a race, as schema migrations add 
the bare
         // keyspace into Schema.instance before adding its tables, so double 
check that
         // all the expected tables are present
-        for (CFMetaData expectedTable : expected.tables)
+        for (TableMetadata expectedTable : expected.tables)
         {
-            CFMetaData definedTable = 
defined.tables.get(expectedTable.cfName).orElse(null);
+            TableMetadata definedTable = 
defined.tables.get(expectedTable.name).orElse(null);
             if (definedTable == null || !definedTable.equals(expectedTable))
-                MigrationManager.forceAnnounceNewColumnFamily(expectedTable);
+                MigrationManager.forceAnnounceNewTable(expectedTable);
         }
     }
 
@@ -1474,8 +1479,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     private void markViewsAsBuilt() {
         for (String keyspace : Schema.instance.getUserKeyspaces())
         {
-            for (ViewDefinition view: 
Schema.instance.getKSMetaData(keyspace).views)
-                SystemKeyspace.finishViewBuildStatus(view.ksName, 
view.viewName);
+            for (ViewMetadata view: 
Schema.instance.getKeyspaceMetadata(keyspace).views)
+                SystemKeyspace.finishViewBuildStatus(view.keyspace, view.name);
         }
     }
 
@@ -3616,15 +3621,15 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      */
     public List<InetAddress> getNaturalEndpoints(String keyspaceName, String 
cf, String key)
     {
-        KeyspaceMetadata ksMetaData = 
Schema.instance.getKSMetaData(keyspaceName);
+        KeyspaceMetadata ksMetaData = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
         if (ksMetaData == null)
             throw new IllegalArgumentException("Unknown keyspace '" + 
keyspaceName + "'");
 
-        CFMetaData cfMetaData = ksMetaData.getTableOrViewNullable(cf);
-        if (cfMetaData == null)
+        TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf);
+        if (metadata == null)
             throw new IllegalArgumentException("Unknown table '" + cf + "' in 
keyspace '" + keyspaceName + "'");
 
-        return getNaturalEndpoints(keyspaceName, 
tokenMetadata.partitioner.getToken(cfMetaData.getKeyValidator().fromString(key)));
+        return getNaturalEndpoints(keyspaceName, 
tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key)));
     }
 
     public List<InetAddress> getNaturalEndpoints(String keyspaceName, 
ByteBuffer key)
@@ -3760,7 +3765,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             Token token = tokens.get(index);
             Range<Token> range = new Range<>(prevToken, token);
             // always return an estimate > 0 (see CASSANDRA-7322)
-            splits.add(Pair.create(range, 
Math.max(cfs.metadata.params.minIndexInterval, 
cfs.estimatedKeysForRange(range))));
+            splits.add(Pair.create(range, 
Math.max(cfs.metadata().params.minIndexInterval, 
cfs.estimatedKeysForRange(range))));
             prevToken = token;
         }
         return splits;
@@ -4709,7 +4714,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public List<String> getNonSystemKeyspaces()
     {
-        return 
Collections.unmodifiableList(Schema.instance.getNonSystemKeyspaces());
+        return Schema.instance.getNonSystemKeyspaces();
     }
 
     public List<String> getNonLocalStrategyKeyspaces()
@@ -4991,9 +4996,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 }
             }
 
-            public CFMetaData getTableMetadata(String tableName)
+            public TableMetadataRef getTableMetadata(String tableName)
             {
-                return Schema.instance.getCFMetaData(keyspace, tableName);
+                return Schema.instance.getTableMetadataRef(keyspace, 
tableName);
             }
         };
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java 
b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 8a04108..0682b15 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -17,12 +17,12 @@
  */
 package org.apache.cassandra.service.pager;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.transport.ProtocolVersion;
 
@@ -77,10 +77,10 @@ abstract class AbstractQueryPager implements QueryPager
         return 
Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController),
 pager);
     }
 
-    public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int 
pageSize, ReadExecutionController executionController)
+    public UnfilteredPartitionIterator fetchPageUnfiltered(TableMetadata 
metadata, int pageSize, ReadExecutionController executionController)
     {
         if (isExhausted())
-            return EmptyIterators.unfilteredPartition(cfm);
+            return EmptyIterators.unfilteredPartition(metadata);
 
         pageSize = Math.min(pageSize, remaining);
         UnfilteredPager pager = new 
UnfilteredPager(limits.forPaging(pageSize), command.nowInSec());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java 
b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
index f9a8cda..5ac01b2 100644
--- a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.service.pager;
 import java.nio.ByteBuffer;
 import java.util.NoSuchElementException;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.aggregation.GroupingState;
 import org.apache.cassandra.db.filter.DataLimits;
@@ -319,7 +319,7 @@ public final class AggregationQueryPager implements 
QueryPager
                 this.rowIterator = delegate;
             }
 
-            public CFMetaData metadata()
+            public TableMetadata metadata()
             {
                 return rowIterator.metadata();
             }
@@ -329,7 +329,7 @@ public final class AggregationQueryPager implements 
QueryPager
                 return rowIterator.isReverseOrder();
             }
 
-            public PartitionColumns columns()
+            public RegularAndStaticColumns columns()
             {
                 return rowIterator.columns();
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java 
b/src/java/org/apache/cassandra/service/pager/PagingState.java
index bcf3979..f036f96 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.CompactTables;
 import org.apache.cassandra.db.TypeSizes;
@@ -189,7 +189,7 @@ public class PagingState
             this.protocolVersion = protocolVersion;
         }
 
-        private static List<AbstractType<?>> makeClusteringTypes(CFMetaData 
metadata)
+        private static List<AbstractType<?>> makeClusteringTypes(TableMetadata 
metadata)
         {
             // This is the types that will be used when serializing the 
clustering in the paging state. We can't really use the actual clustering
             // types however because we can't guarantee that there won't be a 
schema change between when we send the paging state and get it back,
@@ -203,7 +203,7 @@ public class PagingState
             return l;
         }
 
-        public static RowMark create(CFMetaData metadata, Row row, 
ProtocolVersion protocolVersion)
+        public static RowMark create(TableMetadata metadata, Row row, 
ProtocolVersion protocolVersion)
         {
             ByteBuffer mark;
             if (protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3))
@@ -234,7 +234,7 @@ public class PagingState
             return new RowMark(mark, protocolVersion);
         }
 
-        public Clustering clustering(CFMetaData metadata)
+        public Clustering clustering(TableMetadata metadata)
         {
             if (mark == null)
                 return null;
@@ -245,7 +245,7 @@ public class PagingState
         }
 
         // Old (pre-3.0) encoding of cells. We need that for the protocol v3 
as that is how things where encoded
-        private static ByteBuffer encodeCellName(CFMetaData metadata, 
Clustering clustering, ByteBuffer columnName, ByteBuffer collectionElement)
+        private static ByteBuffer encodeCellName(TableMetadata metadata, 
Clustering clustering, ByteBuffer columnName, ByteBuffer collectionElement)
         {
             boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
 
@@ -302,7 +302,7 @@ public class PagingState
             return CompositeType.build(isStatic, values);
         }
 
-        private static Clustering decodeClustering(CFMetaData metadata, 
ByteBuffer value)
+        private static Clustering decodeClustering(TableMetadata metadata, 
ByteBuffer value)
         {
             int csize = metadata.comparator.size();
             if (csize == 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java 
b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index 68547be..4f09f97 100644
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@ -42,7 +42,7 @@ public class PartitionRangeQueryPager extends 
AbstractQueryPager
 
         if (state != null)
         {
-            lastReturnedKey = 
command.metadata().decorateKey(state.partitionKey);
+            lastReturnedKey = 
command.metadata().partitioner.decorateKey(state.partitionKey);
             lastReturnedRow = state.rowMark;
             restoreState(lastReturnedKey, state.remaining, 
state.remainingInPartition);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java 
b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 3b0364c..6e23c8b 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -27,7 +27,7 @@ import java.util.UUID;
 
 import com.google.common.base.Objects;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -55,7 +55,7 @@ public class Commit
         this.update = update;
     }
 
-    public static Commit newPrepare(DecoratedKey key, CFMetaData metadata, 
UUID ballot)
+    public static Commit newPrepare(DecoratedKey key, TableMetadata metadata, 
UUID ballot)
     {
         return new Commit(ballot, PartitionUpdate.emptyUpdate(metadata, key));
     }
@@ -66,7 +66,7 @@ public class Commit
         return new Commit(ballot, update);
     }
 
-    public static Commit emptyCommit(DecoratedKey key, CFMetaData metadata)
+    public static Commit emptyCommit(DecoratedKey key, TableMetadata metadata)
     {
         return new Commit(UUIDGen.minTimeUUID(0), 
PartitionUpdate.emptyUpdate(metadata, key));
     }

Reply via email to