http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java 
b/src/java/org/apache/cassandra/config/Schema.java
deleted file mode 100644
index c6fc2a8..0000000
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ /dev/null
@@ -1,776 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.config;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.UserType;
-import org.apache.cassandra.index.Index;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.locator.LocalStrategy;
-import org.apache.cassandra.schema.*;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.ConcurrentBiMap;
-import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-public class Schema
-{
-    private static final Logger logger = LoggerFactory.getLogger(Schema.class);
-
-    public static final Schema instance = new Schema();
-
-    /* metadata map for faster keyspace lookup */
-    private final Map<String, KeyspaceMetadata> keyspaces = new 
NonBlockingHashMap<>();
-
-    /* Keyspace objects, one per keyspace. Only one instance should ever exist 
for any given keyspace. */
-    private final Map<String, Keyspace> keyspaceInstances = new 
NonBlockingHashMap<>();
-
-    /* metadata map for faster ColumnFamily lookup */
-    private final ConcurrentBiMap<Pair<String, String>, UUID> cfIdMap = new 
ConcurrentBiMap<>();
-
-    private volatile UUID version;
-
-    /**
-     * Initialize empty schema object and load the hardcoded system tables
-     */
-    public Schema()
-    {
-        if (DatabaseDescriptor.isDaemonInitialized() || 
DatabaseDescriptor.isToolInitialized())
-        {
-            load(SchemaKeyspace.metadata());
-            load(SystemKeyspace.metadata());
-        }
-    }
-
-    /**
-     * load keyspace (keyspace) definitions, but do not initialize the 
keyspace instances.
-     * Schema version may be updated as the result.
-     */
-    public Schema loadFromDisk()
-    {
-        return loadFromDisk(true);
-    }
-
-    /**
-     * Load schema definitions from disk.
-     *
-     * @param updateVersion true if schema version needs to be updated
-     */
-    public Schema loadFromDisk(boolean updateVersion)
-    {
-        load(SchemaKeyspace.fetchNonSystemKeyspaces());
-        if (updateVersion)
-            updateVersion();
-        return this;
-    }
-
-    /**
-     * Load up non-system keyspaces
-     *
-     * @param keyspaceDefs The non-system keyspace definitions
-     *
-     * @return self to support chaining calls
-     */
-    public Schema load(Iterable<KeyspaceMetadata> keyspaceDefs)
-    {
-        keyspaceDefs.forEach(this::load);
-        return this;
-    }
-
-    /**
-     * Load specific keyspace into Schema
-     *
-     * @param keyspaceDef The keyspace to load up
-     *
-     * @return self to support chaining calls
-     */
-    public Schema load(KeyspaceMetadata keyspaceDef)
-    {
-        keyspaceDef.tables.forEach(this::load);
-        keyspaceDef.views.forEach(this::load);
-        setKeyspaceMetadata(keyspaceDef);
-        return this;
-    }
-
-    /**
-     * Get keyspace instance by name
-     *
-     * @param keyspaceName The name of the keyspace
-     *
-     * @return Keyspace object or null if keyspace was not found
-     */
-    public Keyspace getKeyspaceInstance(String keyspaceName)
-    {
-        return keyspaceInstances.get(keyspaceName);
-    }
-
-    /**
-     * Retrieve a CFS by name even if that CFS is an index
-     *
-     * An index is identified by looking for '.' in the CF name and separating 
to find the base table
-     * containing the index
-     * @param ksNameAndCFName
-     * @return The named CFS or null if the keyspace, base table, or index 
don't exist
-     */
-    public ColumnFamilyStore getColumnFamilyStoreIncludingIndexes(Pair<String, 
String> ksNameAndCFName)
-    {
-        String ksName = ksNameAndCFName.left;
-        String cfName = ksNameAndCFName.right;
-        Pair<String, String> baseTable;
-
-        /*
-         * Split does special case a one character regex, and it looks like it 
can detect
-         * if you use two characters to escape '.', but it still allocates a 
useless array.
-         */
-        int indexOfSeparator = cfName.indexOf('.');
-        if (indexOfSeparator > -1)
-            baseTable = Pair.create(ksName, cfName.substring(0, 
indexOfSeparator));
-        else
-            baseTable = ksNameAndCFName;
-
-        UUID cfId = cfIdMap.get(baseTable);
-        if (cfId == null)
-            return null;
-
-        Keyspace ks = keyspaceInstances.get(ksName);
-        if (ks == null)
-            return null;
-
-        ColumnFamilyStore baseCFS = ks.getColumnFamilyStore(cfId);
-
-        //Not an index
-        if (indexOfSeparator == -1)
-            return baseCFS;
-
-        if (baseCFS == null)
-            return null;
-
-        Index index = 
baseCFS.indexManager.getIndexByName(cfName.substring(indexOfSeparator + 1, 
cfName.length()));
-        if (index == null)
-            return null;
-
-        //Shouldn't ask for a backing table if there is none so just throw?
-        //Or should it return null?
-        return index.getBackingTable().get();
-    }
-
-    public ColumnFamilyStore getColumnFamilyStoreInstance(UUID cfId)
-    {
-        Pair<String, String> pair = cfIdMap.inverse().get(cfId);
-        if (pair == null)
-            return null;
-        Keyspace instance = getKeyspaceInstance(pair.left);
-        if (instance == null)
-            return null;
-        return instance.getColumnFamilyStore(cfId);
-    }
-
-    /**
-     * Store given Keyspace instance to the schema
-     *
-     * @param keyspace The Keyspace instance to store
-     *
-     * @throws IllegalArgumentException if Keyspace is already stored
-     */
-    public void storeKeyspaceInstance(Keyspace keyspace)
-    {
-        if (keyspaceInstances.containsKey(keyspace.getName()))
-            throw new IllegalArgumentException(String.format("Keyspace %s was 
already initialized.", keyspace.getName()));
-
-        keyspaceInstances.put(keyspace.getName(), keyspace);
-    }
-
-    /**
-     * Remove keyspace from schema
-     *
-     * @param keyspaceName The name of the keyspace to remove
-     *
-     * @return removed keyspace instance or null if it wasn't found
-     */
-    public Keyspace removeKeyspaceInstance(String keyspaceName)
-    {
-        return keyspaceInstances.remove(keyspaceName);
-    }
-
-    /**
-     * Remove keyspace definition from system
-     *
-     * @param ksm The keyspace definition to remove
-     */
-    public void clearKeyspaceMetadata(KeyspaceMetadata ksm)
-    {
-        keyspaces.remove(ksm.name);
-    }
-
-    /**
-     * Given a keyspace name and column family name, get the column family
-     * meta data. If the keyspace name or column family name is not valid
-     * this function returns null.
-     *
-     * @param keyspaceName The keyspace name
-     * @param cfName The ColumnFamily name
-     *
-     * @return ColumnFamily Metadata object or null if it wasn't found
-     */
-    public CFMetaData getCFMetaData(String keyspaceName, String cfName)
-    {
-        assert keyspaceName != null;
-
-        KeyspaceMetadata ksm = keyspaces.get(keyspaceName);
-        return ksm == null
-             ? null
-             : ksm.getTableOrViewNullable(cfName);
-    }
-
-    /**
-     * Get ColumnFamily metadata by its identifier
-     *
-     * @param cfId The ColumnFamily identifier
-     *
-     * @return metadata about ColumnFamily
-     */
-    public CFMetaData getCFMetaData(UUID cfId)
-    {
-        Pair<String,String> cf = getCF(cfId);
-        return (cf == null) ? null : getCFMetaData(cf.left, cf.right);
-    }
-
-    public CFMetaData getCFMetaData(Descriptor descriptor)
-    {
-        return getCFMetaData(descriptor.ksname, descriptor.cfname);
-    }
-
-    public int getNumberOfTables()
-    {
-        return cfIdMap.size();
-    }
-
-    public ViewDefinition getView(String keyspaceName, String viewName)
-    {
-        assert keyspaceName != null;
-        KeyspaceMetadata ksm = keyspaces.get(keyspaceName);
-        return (ksm == null) ? null : ksm.views.getNullable(viewName);
-    }
-
-    /**
-     * Get metadata about keyspace by its name
-     *
-     * @param keyspaceName The name of the keyspace
-     *
-     * @return The keyspace metadata or null if it wasn't found
-     */
-    public KeyspaceMetadata getKSMetaData(String keyspaceName)
-    {
-        assert keyspaceName != null;
-        return keyspaces.get(keyspaceName);
-    }
-
-    private Set<String> getNonSystemKeyspacesSet()
-    {
-        return Sets.difference(keyspaces.keySet(), 
SchemaConstants.SYSTEM_KEYSPACE_NAMES);
-    }
-
-    /**
-     * @return collection of the non-system keyspaces (note that this count as 
system only the
-     * non replicated keyspaces, so keyspace like system_traces which are 
replicated are actually
-     * returned. See getUserKeyspace() below if you don't want those)
-     */
-    public List<String> getNonSystemKeyspaces()
-    {
-        return ImmutableList.copyOf(getNonSystemKeyspacesSet());
-    }
-
-    /**
-     * @return a collection of keyspaces that do not use LocalStrategy for 
replication
-     */
-    public List<String> getNonLocalStrategyKeyspaces()
-    {
-        return keyspaces.values().stream()
-                .filter(keyspace -> keyspace.params.replication.klass != 
LocalStrategy.class)
-                .map(keyspace -> keyspace.name)
-                .collect(Collectors.toList());
-    }
-
-    /**
-     * @return collection of the user defined keyspaces
-     */
-    public List<String> getUserKeyspaces()
-    {
-        return 
ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), 
SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES));
-    }
-
-    /**
-     * Get metadata about keyspace inner ColumnFamilies
-     *
-     * @param keyspaceName The name of the keyspace
-     *
-     * @return metadata about ColumnFamilies the belong to the given keyspace
-     */
-    public Iterable<CFMetaData> getTablesAndViews(String keyspaceName)
-    {
-        assert keyspaceName != null;
-        KeyspaceMetadata ksm = keyspaces.get(keyspaceName);
-        assert ksm != null;
-        return ksm.tablesAndViews();
-    }
-
-    /**
-     * @return collection of the all keyspace names registered in the system 
(system and non-system)
-     */
-    public Set<String> getKeyspaces()
-    {
-        return keyspaces.keySet();
-    }
-
-    public Keyspaces getKeyspaces(Set<String> includedKeyspaceNames)
-    {
-        Keyspaces.Builder builder = Keyspaces.builder();
-        keyspaces.values()
-                 .stream()
-                 .filter(k -> includedKeyspaceNames.contains(k.name))
-                 .forEach(builder::add);
-        return builder.build();
-    }
-
-    /**
-     * Update (or insert) new keyspace definition
-     *
-     * @param ksm The metadata about keyspace
-     */
-    public void setKeyspaceMetadata(KeyspaceMetadata ksm)
-    {
-        assert ksm != null;
-
-        keyspaces.put(ksm.name, ksm);
-        Keyspace keyspace = getKeyspaceInstance(ksm.name);
-        if (keyspace != null)
-            keyspace.setMetadata(ksm);
-    }
-
-    /* ColumnFamily query/control methods */
-
-    /**
-     * @param cfId The identifier of the ColumnFamily to lookup
-     * @return The (ksname,cfname) pair for the given id, or null if it has 
been dropped.
-     */
-    public Pair<String,String> getCF(UUID cfId)
-    {
-        return cfIdMap.inverse().get(cfId);
-    }
-
-    /**
-     * @param ksAndCFName The identifier of the ColumnFamily to lookup
-     * @return true if the KS and CF pair is a known one, false otherwise.
-     */
-    public boolean hasCF(Pair<String, String> ksAndCFName)
-    {
-        return cfIdMap.containsKey(ksAndCFName);
-    }
-
-    /**
-     * Lookup keyspace/ColumnFamily identifier
-     *
-     * @param ksName The keyspace name
-     * @param cfName The ColumnFamily name
-     *
-     * @return The id for the given (ksname,cfname) pair, or null if it has 
been dropped.
-     */
-    public UUID getId(String ksName, String cfName)
-    {
-        return cfIdMap.get(Pair.create(ksName, cfName));
-    }
-
-    /**
-     * Load individual ColumnFamily Definition to the schema
-     * (to make ColumnFamily lookup faster)
-     *
-     * @param cfm The ColumnFamily definition to load
-     */
-    public void load(CFMetaData cfm)
-    {
-        Pair<String, String> key = Pair.create(cfm.ksName, cfm.cfName);
-
-        if (cfIdMap.containsKey(key))
-            throw new RuntimeException(String.format("Attempting to load 
already loaded table %s.%s", cfm.ksName, cfm.cfName));
-
-        logger.debug("Adding {} to cfIdMap", cfm);
-        cfIdMap.put(key, cfm.cfId);
-    }
-
-    /**
-     * Load individual View Definition to the schema
-     * (to make View lookup faster)
-     *
-     * @param view The View definition to load
-     */
-    public void load(ViewDefinition view)
-    {
-        CFMetaData cfm = view.metadata;
-        Pair<String, String> key = Pair.create(cfm.ksName, cfm.cfName);
-
-        if (cfIdMap.containsKey(key))
-            throw new RuntimeException(String.format("Attempting to load 
already loaded view %s.%s", cfm.ksName, cfm.cfName));
-
-        logger.debug("Adding {} to cfIdMap", cfm);
-        cfIdMap.put(key, cfm.cfId);
-    }
-
-    /**
-     * Used for ColumnFamily data eviction out from the schema
-     *
-     * @param cfm The ColumnFamily Definition to evict
-     */
-    public void unload(CFMetaData cfm)
-    {
-        cfIdMap.remove(Pair.create(cfm.ksName, cfm.cfName));
-    }
-
-    /**
-     * Used for View eviction from the schema
-     *
-     * @param view The view definition to evict
-     */
-    private void unload(ViewDefinition view)
-    {
-        cfIdMap.remove(Pair.create(view.ksName, view.viewName));
-    }
-
-    /* Function helpers */
-
-    /**
-     * Get all function overloads with the specified name
-     *
-     * @param name fully qualified function name
-     * @return an empty list if the keyspace or the function name are not 
found;
-     *         a non-empty collection of {@link Function} otherwise
-     */
-    public Collection<Function> getFunctions(FunctionName name)
-    {
-        if (!name.hasKeyspace())
-            throw new IllegalArgumentException(String.format("Function name 
must be fully quallified: got %s", name));
-
-        KeyspaceMetadata ksm = getKSMetaData(name.keyspace);
-        return ksm == null
-             ? Collections.emptyList()
-             : ksm.functions.get(name);
-    }
-
-    /**
-     * Find the function with the specified name
-     *
-     * @param name fully qualified function name
-     * @param argTypes function argument types
-     * @return an empty {@link Optional} if the keyspace or the function name 
are not found;
-     *         a non-empty optional of {@link Function} otherwise
-     */
-    public Optional<Function> findFunction(FunctionName name, 
List<AbstractType<?>> argTypes)
-    {
-        if (!name.hasKeyspace())
-            throw new IllegalArgumentException(String.format("Function name 
must be fully quallified: got %s", name));
-
-        KeyspaceMetadata ksm = getKSMetaData(name.keyspace);
-        return ksm == null
-             ? Optional.empty()
-             : ksm.functions.find(name, argTypes);
-    }
-
-    /* Version control */
-
-    /**
-     * @return current schema version
-     */
-    public UUID getVersion()
-    {
-        return version;
-    }
-
-    /**
-     * Read schema from system keyspace and calculate MD5 digest of every row, 
resulting digest
-     * will be converted into UUID which would act as content-based version of 
the schema.
-     */
-    public void updateVersion()
-    {
-        version = SchemaKeyspace.calculateSchemaDigest();
-        SystemKeyspace.updateSchemaVersion(version);
-    }
-
-    /*
-     * Like updateVersion, but also announces via gossip
-     */
-    public void updateVersionAndAnnounce()
-    {
-        updateVersion();
-        MigrationManager.passiveAnnounce(version);
-    }
-
-    /**
-     * Clear all KS/CF metadata and reset version.
-     */
-    public synchronized void clear()
-    {
-        for (String keyspaceName : getNonSystemKeyspaces())
-        {
-            KeyspaceMetadata ksm = getKSMetaData(keyspaceName);
-            ksm.tables.forEach(this::unload);
-            ksm.views.forEach(this::unload);
-            clearKeyspaceMetadata(ksm);
-        }
-
-        updateVersionAndAnnounce();
-    }
-
-    public void addKeyspace(KeyspaceMetadata ksm)
-    {
-        assert getKSMetaData(ksm.name) == null;
-        load(ksm);
-
-        Keyspace.open(ksm.name);
-        MigrationManager.instance.notifyCreateKeyspace(ksm);
-    }
-
-    public void updateKeyspace(String ksName, KeyspaceParams newParams)
-    {
-        KeyspaceMetadata ksm = update(ksName, ks -> ks.withSwapped(newParams));
-        MigrationManager.instance.notifyUpdateKeyspace(ksm);
-    }
-
-    public void dropKeyspace(String ksName)
-    {
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName);
-        String snapshotName = 
Keyspace.getTimestampedSnapshotNameWithPrefix(ksName, 
ColumnFamilyStore.SNAPSHOT_DROP_PREFIX);
-
-        
CompactionManager.instance.interruptCompactionFor(ksm.tablesAndViews(), true);
-
-        Keyspace keyspace = Keyspace.open(ksm.name);
-
-        // remove all cfs from the keyspace instance.
-        List<UUID> droppedCfs = new ArrayList<>();
-        for (CFMetaData cfm : ksm.tablesAndViews())
-        {
-            ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfm.cfName);
-
-            unload(cfm);
-
-            if (DatabaseDescriptor.isAutoSnapshot())
-                cfs.snapshot(snapshotName);
-            Keyspace.open(ksm.name).dropCf(cfm.cfId);
-
-            droppedCfs.add(cfm.cfId);
-        }
-
-        // remove the keyspace from the static instances.
-        Keyspace.clear(ksm.name);
-        clearKeyspaceMetadata(ksm);
-
-        Keyspace.writeOrder.awaitNewBarrier();
-
-        // force a new segment in the CL
-        CommitLog.instance.forceRecycleAllSegments(droppedCfs);
-
-        MigrationManager.instance.notifyDropKeyspace(ksm);
-    }
-
-    public void addTable(CFMetaData cfm)
-    {
-        assert getCFMetaData(cfm.ksName, cfm.cfName) == null;
-
-        // Make sure the keyspace is initialized
-        // and init the new CF before switching the KSM to the new one
-        // to avoid races as in CASSANDRA-10761
-        Keyspace.open(cfm.ksName).initCf(cfm, true);
-        // Update the keyspaces map with the updated metadata
-        update(cfm.ksName, ks -> ks.withSwapped(ks.tables.with(cfm)));
-        // Update the table ID <-> table name map (cfIdMap)
-        load(cfm);
-        MigrationManager.instance.notifyCreateColumnFamily(cfm);
-    }
-
-    public void updateTable(CFMetaData table)
-    {
-        CFMetaData current = getCFMetaData(table.ksName, table.cfName);
-        assert current != null;
-        boolean changeAffectsStatements = current.apply(table);
-
-        Keyspace keyspace = Keyspace.open(current.ksName);
-        keyspace.getColumnFamilyStore(current.cfName).reload();
-        MigrationManager.instance.notifyUpdateColumnFamily(current, 
changeAffectsStatements);
-    }
-
-    public void dropTable(String ksName, String tableName)
-    {
-        KeyspaceMetadata oldKsm = getKSMetaData(ksName);
-        assert oldKsm != null;
-        ColumnFamilyStore cfs = 
Keyspace.open(ksName).getColumnFamilyStore(tableName);
-        assert cfs != null;
-
-        // make sure all the indexes are dropped, or else.
-        cfs.indexManager.markAllIndexesRemoved();
-
-        // reinitialize the keyspace.
-        CFMetaData cfm = oldKsm.tables.get(tableName).get();
-        KeyspaceMetadata newKsm = 
oldKsm.withSwapped(oldKsm.tables.without(tableName));
-
-        unload(cfm);
-        setKeyspaceMetadata(newKsm);
-
-        
CompactionManager.instance.interruptCompactionFor(Collections.singleton(cfm), 
true);
-
-        if (DatabaseDescriptor.isAutoSnapshot())
-            
cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, 
ColumnFamilyStore.SNAPSHOT_DROP_PREFIX));
-        Keyspace.open(ksName).dropCf(cfm.cfId);
-        MigrationManager.instance.notifyDropColumnFamily(cfm);
-
-        
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(cfm.cfId));
-    }
-
-    public void addView(ViewDefinition view)
-    {
-        assert getCFMetaData(view.ksName, view.viewName) == null;
-
-        Keyspace keyspace = Keyspace.open(view.ksName);
-
-        // Make sure the keyspace is initialized and initialize the table.
-        keyspace.initCf(view.metadata, true);
-        // Update the keyspaces map with the updated metadata
-        update(view.ksName, ks -> ks.withSwapped(ks.views.with(view)));
-        // Update the table ID <-> table name map (cfIdMap)
-        load(view);
-
-        keyspace.viewManager.reload();
-        MigrationManager.instance.notifyCreateView(view);
-    }
-
-    public void updateView(ViewDefinition view)
-    {
-        ViewDefinition current = 
getKSMetaData(view.ksName).views.get(view.viewName).get();
-        boolean changeAffectsStatements = 
current.metadata.apply(view.metadata);
-
-        Keyspace keyspace = Keyspace.open(current.ksName);
-        keyspace.getColumnFamilyStore(current.viewName).reload();
-        Keyspace.open(current.ksName).viewManager.update(current.viewName);
-        MigrationManager.instance.notifyUpdateView(current, 
changeAffectsStatements);
-    }
-
-    public void dropView(String ksName, String viewName)
-    {
-        KeyspaceMetadata oldKsm = getKSMetaData(ksName);
-        assert oldKsm != null;
-        ColumnFamilyStore cfs = 
Keyspace.open(ksName).getColumnFamilyStore(viewName);
-        assert cfs != null;
-
-        // make sure all the indexes are dropped, or else.
-        cfs.indexManager.markAllIndexesRemoved();
-
-        // reinitialize the keyspace.
-        ViewDefinition view = oldKsm.views.get(viewName).get();
-        KeyspaceMetadata newKsm = 
oldKsm.withSwapped(oldKsm.views.without(viewName));
-
-        unload(view);
-        setKeyspaceMetadata(newKsm);
-
-        
CompactionManager.instance.interruptCompactionFor(Collections.singleton(view.metadata),
 true);
-
-        if (DatabaseDescriptor.isAutoSnapshot())
-            cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
-        Keyspace.open(ksName).dropCf(view.metadata.cfId);
-        Keyspace.open(ksName).viewManager.reload();
-        MigrationManager.instance.notifyDropView(view);
-
-        
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(view.metadata.cfId));
-    }
-
-    public void addType(UserType ut)
-    {
-        update(ut.keyspace, ks -> ks.withSwapped(ks.types.with(ut)));
-        MigrationManager.instance.notifyCreateUserType(ut);
-    }
-
-    public void updateType(UserType ut)
-    {
-        update(ut.keyspace, ks -> 
ks.withSwapped(ks.types.without(ut.name).with(ut)));
-        MigrationManager.instance.notifyUpdateUserType(ut);
-    }
-
-    public void dropType(UserType ut)
-    {
-        update(ut.keyspace, ks -> ks.withSwapped(ks.types.without(ut.name)));
-        MigrationManager.instance.notifyDropUserType(ut);
-    }
-
-    public void addFunction(UDFunction udf)
-    {
-        update(udf.name().keyspace, ks -> 
ks.withSwapped(ks.functions.with(udf)));
-        MigrationManager.instance.notifyCreateFunction(udf);
-    }
-
-    public void updateFunction(UDFunction udf)
-    {
-        update(udf.name().keyspace, ks -> 
ks.withSwapped(ks.functions.without(udf.name(), udf.argTypes()).with(udf)));
-        MigrationManager.instance.notifyUpdateFunction(udf);
-    }
-
-    public void dropFunction(UDFunction udf)
-    {
-        update(udf.name().keyspace, ks -> 
ks.withSwapped(ks.functions.without(udf.name(), udf.argTypes())));
-        MigrationManager.instance.notifyDropFunction(udf);
-    }
-
-    public void addAggregate(UDAggregate uda)
-    {
-        update(uda.name().keyspace, ks -> 
ks.withSwapped(ks.functions.with(uda)));
-        MigrationManager.instance.notifyCreateAggregate(uda);
-    }
-
-    public void updateAggregate(UDAggregate uda)
-    {
-        update(uda.name().keyspace, ks -> 
ks.withSwapped(ks.functions.without(uda.name(), uda.argTypes()).with(uda)));
-        MigrationManager.instance.notifyUpdateAggregate(uda);
-    }
-
-    public void dropAggregate(UDAggregate uda)
-    {
-        update(uda.name().keyspace, ks -> 
ks.withSwapped(ks.functions.without(uda.name(), uda.argTypes())));
-        MigrationManager.instance.notifyDropAggregate(uda);
-    }
-
-    private synchronized KeyspaceMetadata update(String keyspaceName, 
java.util.function.Function<KeyspaceMetadata, KeyspaceMetadata> transformation)
-    {
-        KeyspaceMetadata current = getKSMetaData(keyspaceName);
-        if (current == null)
-            throw new IllegalStateException(String.format("Keyspace %s doesn't 
exist", keyspaceName));
-
-        KeyspaceMetadata transformed = transformation.apply(current);
-        setKeyspaceMetadata(transformed);
-
-        return transformed;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/config/SchemaConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/SchemaConstants.java 
b/src/java/org/apache/cassandra/config/SchemaConstants.java
deleted file mode 100644
index 2416d6b..0000000
--- a/src/java/org/apache/cassandra/config/SchemaConstants.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.config;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Set;
-import java.util.UUID;
-
-import com.google.common.collect.ImmutableSet;
-
-public final class SchemaConstants
-{
-    public static final String SYSTEM_KEYSPACE_NAME = "system";
-    public static final String SCHEMA_KEYSPACE_NAME = "system_schema";
-
-    public static final String TRACE_KEYSPACE_NAME = "system_traces";
-    public static final String AUTH_KEYSPACE_NAME = "system_auth";
-    public static final String DISTRIBUTED_KEYSPACE_NAME = 
"system_distributed";
-
-    /* system keyspace names (the ones with LocalStrategy replication 
strategy) */
-    public static final Set<String> SYSTEM_KEYSPACE_NAMES = 
ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME);
-
-    /* replicate system keyspace names (the ones with a "true" replication 
strategy) */
-    public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = 
ImmutableSet.of(TRACE_KEYSPACE_NAME,
-                                                                               
        AUTH_KEYSPACE_NAME,
-                                                                               
        DISTRIBUTED_KEYSPACE_NAME);
-    /**
-     * longest permissible KS or CF name.  Our main concern is that filename 
not be more than 255 characters;
-     * the filename will contain both the KS and CF names. Since 
non-schema-name components only take up
-     * ~64 characters, we could allow longer names than this, but on Windows, 
the entire path should be not greater than
-     * 255 characters, so a lower limit here helps avoid problems.  See 
CASSANDRA-4110.
-     */
-    public static final int NAME_LENGTH = 48;
-
-    // 59adb24e-f3cd-3e02-97f0-5b395827453f
-    public static final UUID emptyVersion;
-
-    static
-    {
-        try
-        {
-            emptyVersion = 
UUID.nameUUIDFromBytes(MessageDigest.getInstance("MD5").digest());
-        }
-        catch (NoSuchAlgorithmException e)
-        {
-            throw new AssertionError();
-        }
-    }
-
-    /**
-     * @return whether or not the keyspace is a really system one (w/ 
LocalStrategy, unmodifiable, hardcoded)
-     */
-    public static boolean isSystemKeyspace(String keyspaceName)
-    {
-        return SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/config/ViewDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ViewDefinition.java 
b/src/java/org/apache/cassandra/config/ViewDefinition.java
deleted file mode 100644
index 77cbcc9..0000000
--- a/src/java/org/apache/cassandra/config/ViewDefinition.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.config;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import org.antlr.runtime.*;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.statements.SelectStatement;
-import org.apache.cassandra.db.view.View;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-
-public class ViewDefinition
-{
-    public final String ksName;
-    public final String viewName;
-    public final UUID baseTableId;
-    public final String baseTableName;
-    public final boolean includeAllColumns;
-    public final CFMetaData metadata;
-
-    public SelectStatement.RawStatement select;
-    public String whereClause;
-
-    public ViewDefinition(ViewDefinition def)
-    {
-        this(def.ksName, def.viewName, def.baseTableId, def.baseTableName, 
def.includeAllColumns, def.select, def.whereClause, def.metadata);
-    }
-
-    /**
-     * @param viewName          Name of the view
-     * @param baseTableId       Internal ID of the table which this view is 
based off of
-     * @param includeAllColumns Whether to include all columns or not
-     */
-    public ViewDefinition(String ksName, String viewName, UUID baseTableId, 
String baseTableName, boolean includeAllColumns, SelectStatement.RawStatement 
select, String whereClause, CFMetaData metadata)
-    {
-        this.ksName = ksName;
-        this.viewName = viewName;
-        this.baseTableId = baseTableId;
-        this.baseTableName = baseTableName;
-        this.includeAllColumns = includeAllColumns;
-        this.select = select;
-        this.whereClause = whereClause;
-        this.metadata = metadata;
-    }
-
-    /**
-     * @return true if the view specified by this definition will include the 
column, false otherwise
-     */
-    public boolean includes(ColumnIdentifier column)
-    {
-        return metadata.getColumnDefinition(column) != null;
-    }
-
-    public ViewDefinition copy()
-    {
-        return new ViewDefinition(ksName, viewName, baseTableId, 
baseTableName, includeAllColumns, select, whereClause, metadata.copy());
-    }
-
-    public CFMetaData baseTableMetadata()
-    {
-        return Schema.instance.getCFMetaData(baseTableId);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-            return true;
-
-        if (!(o instanceof ViewDefinition))
-            return false;
-
-        ViewDefinition other = (ViewDefinition) o;
-        return Objects.equals(ksName, other.ksName)
-               && Objects.equals(viewName, other.viewName)
-               && Objects.equals(baseTableId, other.baseTableId)
-               && Objects.equals(includeAllColumns, other.includeAllColumns)
-               && Objects.equals(whereClause, other.whereClause)
-               && Objects.equals(metadata, other.metadata);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return new HashCodeBuilder(29, 1597)
-               .append(ksName)
-               .append(viewName)
-               .append(baseTableId)
-               .append(includeAllColumns)
-               .append(whereClause)
-               .append(metadata)
-               .toHashCode();
-    }
-
-    @Override
-    public String toString()
-    {
-        return new ToStringBuilder(this)
-               .append("ksName", ksName)
-               .append("viewName", viewName)
-               .append("baseTableId", baseTableId)
-               .append("baseTableName", baseTableName)
-               .append("includeAllColumns", includeAllColumns)
-               .append("whereClause", whereClause)
-               .append("metadata", metadata)
-               .toString();
-    }
-
-    /**
-     * Replace the column 'from' with 'to' in this materialized view 
definition's partition,
-     * clustering, or included columns.
-     * @param from the existing column
-     * @param to the new column
-     */
-    public void renameColumn(ColumnIdentifier from, ColumnIdentifier to)
-    {
-        metadata.renameColumn(from, to);
-
-        // convert whereClause to Relations, rename ids in Relations, then 
convert back to whereClause
-        List<Relation> relations = whereClauseToRelations(whereClause);
-        ColumnDefinition.Raw fromRaw = 
ColumnDefinition.Raw.forQuoted(from.toString());
-        ColumnDefinition.Raw toRaw = 
ColumnDefinition.Raw.forQuoted(to.toString());
-        List<Relation> newRelations = relations.stream()
-                .map(r -> r.renameIdentifier(fromRaw, toRaw))
-                .collect(Collectors.toList());
-
-        this.whereClause = View.relationsToWhereClause(newRelations);
-        String rawSelect = View.buildSelectStatement(baseTableName, 
metadata.allColumns(), whereClause);
-        this.select = (SelectStatement.RawStatement) 
QueryProcessor.parseStatement(rawSelect);
-    }
-
-    private static List<Relation> whereClauseToRelations(String whereClause)
-    {
-        try
-        {
-            List<Relation> relations = 
CQLFragmentParser.parseAnyUnhandled(CqlParser::whereClause, 
whereClause).build().relations;
-
-            return relations;
-        }
-        catch (RecognitionException | SyntaxException exc)
-        {
-            throw new RuntimeException("Unexpected error parsing materialized 
view's where clause while handling column rename: ", exc);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/CQL3Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java 
b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index 7e375bf..d387a25 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.marshal.CollectionType.Kind;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -524,7 +524,7 @@ public interface CQL3Type
 
         public CQL3Type prepare(String keyspace)
         {
-            KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+            KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(keyspace);
             if (ksm == null)
                 throw new ConfigurationException(String.format("Keyspace %s 
doesn't exist", keyspace));
             return prepare(keyspace, ksm.types);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java 
b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
index 70f7590..7436c90 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
@@ -218,7 +218,6 @@ public class ColumnIdentifier implements IMeasurableMemory, 
Comparable<ColumnIde
         return ByteBufferUtil.compareUnsigned(this.bytes, that.bytes);
     }
 
-    @VisibleForTesting
     public static String maybeQuote(String text)
     {
         if (UNQUOTED_IDENTIFIER.matcher(text).matches() && 
!ReservedKeywords.isReserved(text))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java 
b/src/java/org/apache/cassandra/cql3/Constants.java
index 734d1d6..6dce3a3 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -424,7 +424,7 @@ public abstract class Constants
 
     public static class Setter extends Operation
     {
-        public Setter(ColumnDefinition column, Term t)
+        public Setter(ColumnMetadata column, Term t)
         {
             super(column, t);
         }
@@ -441,7 +441,7 @@ public abstract class Constants
 
     public static class Adder extends Operation
     {
-        public Adder(ColumnDefinition column, Term t)
+        public Adder(ColumnMetadata column, Term t)
         {
             super(column, t);
         }
@@ -461,7 +461,7 @@ public abstract class Constants
 
     public static class Substracter extends Operation
     {
-        public Substracter(ColumnDefinition column, Term t)
+        public Substracter(ColumnMetadata column, Term t)
         {
             super(column, t);
         }
@@ -486,7 +486,7 @@ public abstract class Constants
     // duplicating this further
     public static class Deleter extends Operation
     {
-        public Deleter(ColumnDefinition column)
+        public Deleter(ColumnMetadata column)
         {
             super(column, null);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/Json.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Json.java 
b/src/java/org/apache/cassandra/cql3/Json.java
index 2e67a1e..f0aee63 100644
--- a/src/java/org/apache/cassandra/cql3/Json.java
+++ b/src/java/org/apache/cassandra/cql3/Json.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.cql3;
 import java.io.IOException;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -59,7 +59,7 @@ public class Json
 
     public interface Raw
     {
-        public Prepared prepareAndCollectMarkers(CFMetaData metadata, 
Collection<ColumnDefinition> receivers, VariableSpecifications boundNames);
+        public Prepared prepareAndCollectMarkers(TableMetadata metadata, 
Collection<ColumnMetadata> receivers, VariableSpecifications boundNames);
     }
 
     /**
@@ -75,7 +75,7 @@ public class Json
             this.text = text;
         }
 
-        public Prepared prepareAndCollectMarkers(CFMetaData metadata, 
Collection<ColumnDefinition> receivers, VariableSpecifications boundNames)
+        public Prepared prepareAndCollectMarkers(TableMetadata metadata, 
Collection<ColumnMetadata> receivers, VariableSpecifications boundNames)
         {
             return new PreparedLiteral(parseJson(text, receivers));
         }
@@ -94,15 +94,15 @@ public class Json
             this.bindIndex = bindIndex;
         }
 
-        public Prepared prepareAndCollectMarkers(CFMetaData metadata, 
Collection<ColumnDefinition> receivers, VariableSpecifications boundNames)
+        public Prepared prepareAndCollectMarkers(TableMetadata metadata, 
Collection<ColumnMetadata> receivers, VariableSpecifications boundNames)
         {
             boundNames.add(bindIndex, makeReceiver(metadata));
             return new PreparedMarker(bindIndex, receivers);
         }
 
-        private ColumnSpecification makeReceiver(CFMetaData metadata)
+        private ColumnSpecification makeReceiver(TableMetadata metadata)
         {
-            return new ColumnSpecification(metadata.ksName, metadata.cfName, 
JSON_COLUMN_ID, UTF8Type.instance);
+            return new ColumnSpecification(metadata.keyspace, metadata.name, 
JSON_COLUMN_ID, UTF8Type.instance);
         }
     }
 
@@ -111,7 +111,7 @@ public class Json
      */
     public static abstract class Prepared
     {
-        public abstract Term.Raw getRawTermForColumn(ColumnDefinition def, 
boolean defaultUnset);
+        public abstract Term.Raw getRawTermForColumn(ColumnMetadata def, 
boolean defaultUnset);
     }
 
     /**
@@ -126,7 +126,7 @@ public class Json
             this.columnMap = columnMap;
         }
 
-        public Term.Raw getRawTermForColumn(ColumnDefinition def, boolean 
defaultUnset)
+        public Term.Raw getRawTermForColumn(ColumnMetadata def, boolean 
defaultUnset)
         {
             Term value = columnMap.get(def.name);
             return value == null
@@ -141,15 +141,15 @@ public class Json
     private static class PreparedMarker extends Prepared
     {
         private final int bindIndex;
-        private final Collection<ColumnDefinition> columns;
+        private final Collection<ColumnMetadata> columns;
 
-        public PreparedMarker(int bindIndex, Collection<ColumnDefinition> 
columns)
+        public PreparedMarker(int bindIndex, Collection<ColumnMetadata> 
columns)
         {
             this.bindIndex = bindIndex;
             this.columns = columns;
         }
 
-        public RawDelayedColumnValue getRawTermForColumn(ColumnDefinition def, 
boolean defaultUnset)
+        public RawDelayedColumnValue getRawTermForColumn(ColumnMetadata def, 
boolean defaultUnset)
         {
             return new RawDelayedColumnValue(this, def, defaultUnset);
         }
@@ -199,10 +199,10 @@ public class Json
     private static class RawDelayedColumnValue extends Term.Raw
     {
         private final PreparedMarker marker;
-        private final ColumnDefinition column;
+        private final ColumnMetadata column;
         private final boolean defaultUnset;
 
-        public RawDelayedColumnValue(PreparedMarker prepared, ColumnDefinition 
column, boolean defaultUnset)
+        public RawDelayedColumnValue(PreparedMarker prepared, ColumnMetadata 
column, boolean defaultUnset)
         {
             this.marker = prepared;
             this.column = column;
@@ -238,10 +238,10 @@ public class Json
     private static class DelayedColumnValue extends Term.NonTerminal
     {
         private final PreparedMarker marker;
-        private final ColumnDefinition column;
+        private final ColumnMetadata column;
         private final boolean defaultUnset;
 
-        public DelayedColumnValue(PreparedMarker prepared, ColumnDefinition 
column, boolean defaultUnset)
+        public DelayedColumnValue(PreparedMarker prepared, ColumnMetadata 
column, boolean defaultUnset)
         {
             this.marker = prepared;
             this.column = column;
@@ -278,7 +278,7 @@ public class Json
     /**
      * Given a JSON string, return a map of columns to their values for the 
insert.
      */
-    public static Map<ColumnIdentifier, Term> parseJson(String jsonString, 
Collection<ColumnDefinition> expectedReceivers)
+    public static Map<ColumnIdentifier, Term> parseJson(String jsonString, 
Collection<ColumnMetadata> expectedReceivers)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java 
b/src/java/org/apache/cassandra/cql3/Lists.java
index eb4b685..48fe54f 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -347,7 +347,7 @@ public abstract class Lists
 
     public static class Setter extends Operation
     {
-        public Setter(ColumnDefinition column, Term t)
+        public Setter(ColumnMetadata column, Term t)
         {
             super(column, t);
         }
@@ -365,7 +365,7 @@ public abstract class Lists
         }
     }
 
-    private static int existingSize(Row row, ColumnDefinition column)
+    private static int existingSize(Row row, ColumnMetadata column)
     {
         if (row == null)
             return 0;
@@ -378,7 +378,7 @@ public abstract class Lists
     {
         private final Term idx;
 
-        public SetterByIndex(ColumnDefinition column, Term idx, Term t)
+        public SetterByIndex(ColumnMetadata column, Term idx, Term t)
         {
             super(column, t);
             this.idx = idx;
@@ -428,7 +428,7 @@ public abstract class Lists
 
     public static class Appender extends Operation
     {
-        public Appender(ColumnDefinition column, Term t)
+        public Appender(ColumnMetadata column, Term t)
         {
             super(column, t);
         }
@@ -440,7 +440,7 @@ public abstract class Lists
             doAppend(value, column, params);
         }
 
-        static void doAppend(Term.Terminal value, ColumnDefinition column, 
UpdateParameters params) throws InvalidRequestException
+        static void doAppend(Term.Terminal value, ColumnMetadata column, 
UpdateParameters params) throws InvalidRequestException
         {
             if (column.type.isMultiCell())
             {
@@ -468,7 +468,7 @@ public abstract class Lists
 
     public static class Prepender extends Operation
     {
-        public Prepender(ColumnDefinition column, Term t)
+        public Prepender(ColumnMetadata column, Term t)
         {
             super(column, t);
         }
@@ -494,7 +494,7 @@ public abstract class Lists
 
     public static class Discarder extends Operation
     {
-        public Discarder(ColumnDefinition column, Term t)
+        public Discarder(ColumnMetadata column, Term t)
         {
             super(column, t);
         }
@@ -532,7 +532,7 @@ public abstract class Lists
 
     public static class DiscarderByIndex extends Operation
     {
-        public DiscarderByIndex(ColumnDefinition column, Term idx)
+        public DiscarderByIndex(ColumnMetadata column, Term idx)
         {
             super(column, idx);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java 
b/src/java/org/apache/cassandra/cql3/Maps.java
index c98b829..e02169e 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.stream.Collectors;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.rows.*;
@@ -328,7 +328,7 @@ public abstract class Maps
 
     public static class Setter extends Operation
     {
-        public Setter(ColumnDefinition column, Term t)
+        public Setter(ColumnMetadata column, Term t)
         {
             super(column, t);
         }
@@ -350,7 +350,7 @@ public abstract class Maps
     {
         private final Term k;
 
-        public SetterByKey(ColumnDefinition column, Term k, Term t)
+        public SetterByKey(ColumnMetadata column, Term k, Term t)
         {
             super(column, t);
             this.k = k;
@@ -388,7 +388,7 @@ public abstract class Maps
 
     public static class Putter extends Operation
     {
-        public Putter(ColumnDefinition column, Term t)
+        public Putter(ColumnMetadata column, Term t)
         {
             super(column, t);
         }
@@ -401,7 +401,7 @@ public abstract class Maps
                 doPut(value, column, params);
         }
 
-        static void doPut(Term.Terminal value, ColumnDefinition column, 
UpdateParameters params) throws InvalidRequestException
+        static void doPut(Term.Terminal value, ColumnMetadata column, 
UpdateParameters params) throws InvalidRequestException
         {
             if (column.type.isMultiCell())
             {
@@ -425,7 +425,7 @@ public abstract class Maps
 
     public static class DiscarderByKey extends Operation
     {
-        public DiscarderByKey(ColumnDefinition column, Term k)
+        public DiscarderByKey(ColumnMetadata column, Term k)
         {
             super(column, k);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java 
b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
index 4ddfabb..411af07 100644
--- a/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
+++ b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
@@ -21,8 +21,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.Term.MultiColumnRaw;
 import org.apache.cassandra.cql3.Term.Raw;
 import org.apache.cassandra.cql3.restrictions.MultiColumnRestriction;
@@ -46,7 +46,7 @@ import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidReq
  */
 public class MultiColumnRelation extends Relation
 {
-    private final List<ColumnDefinition.Raw> entities;
+    private final List<ColumnMetadata.Raw> entities;
 
     /** A Tuples.Literal or Tuples.Raw marker */
     private final Term.MultiColumnRaw valuesOrMarker;
@@ -56,7 +56,7 @@ public class MultiColumnRelation extends Relation
 
     private final Tuples.INRaw inMarker;
 
-    private MultiColumnRelation(List<ColumnDefinition.Raw> entities, Operator 
relationType, Term.MultiColumnRaw valuesOrMarker, List<? extends 
Term.MultiColumnRaw> inValues, Tuples.INRaw inMarker)
+    private MultiColumnRelation(List<ColumnMetadata.Raw> entities, Operator 
relationType, Term.MultiColumnRaw valuesOrMarker, List<? extends 
Term.MultiColumnRaw> inValues, Tuples.INRaw inMarker)
     {
         this.entities = entities;
         this.relationType = relationType;
@@ -76,7 +76,7 @@ public class MultiColumnRelation extends Relation
      * @param valuesOrMarker a Tuples.Literal instance or a Tuples.Raw marker
      * @return a new <code>MultiColumnRelation</code> instance
      */
-    public static MultiColumnRelation 
createNonInRelation(List<ColumnDefinition.Raw> entities, Operator relationType, 
Term.MultiColumnRaw valuesOrMarker)
+    public static MultiColumnRelation 
createNonInRelation(List<ColumnMetadata.Raw> entities, Operator relationType, 
Term.MultiColumnRaw valuesOrMarker)
     {
         assert relationType != Operator.IN;
         return new MultiColumnRelation(entities, relationType, valuesOrMarker, 
null, null);
@@ -89,7 +89,7 @@ public class MultiColumnRelation extends Relation
      * @param inValues a list of Tuples.Literal instances or a Tuples.Raw 
markers
      * @return a new <code>MultiColumnRelation</code> instance
      */
-    public static MultiColumnRelation 
createInRelation(List<ColumnDefinition.Raw> entities, List<? extends 
Term.MultiColumnRaw> inValues)
+    public static MultiColumnRelation 
createInRelation(List<ColumnMetadata.Raw> entities, List<? extends 
Term.MultiColumnRaw> inValues)
     {
         return new MultiColumnRelation(entities, Operator.IN, null, inValues, 
null);
     }
@@ -101,12 +101,12 @@ public class MultiColumnRelation extends Relation
      * @param inMarker a single IN marker
      * @return a new <code>MultiColumnRelation</code> instance
      */
-    public static MultiColumnRelation 
createSingleMarkerInRelation(List<ColumnDefinition.Raw> entities, Tuples.INRaw 
inMarker)
+    public static MultiColumnRelation 
createSingleMarkerInRelation(List<ColumnMetadata.Raw> entities, Tuples.INRaw 
inMarker)
     {
         return new MultiColumnRelation(entities, Operator.IN, null, null, 
inMarker);
     }
 
-    public List<ColumnDefinition.Raw> getEntities()
+    public List<ColumnMetadata.Raw> getEntities()
     {
         return entities;
     }
@@ -133,23 +133,21 @@ public class MultiColumnRelation extends Relation
     }
 
     @Override
-    protected Restriction newEQRestriction(CFMetaData cfm,
-                                           VariableSpecifications boundNames) 
throws InvalidRequestException
+    protected Restriction newEQRestriction(TableMetadata table, 
VariableSpecifications boundNames)
     {
-        List<ColumnDefinition> receivers = receivers(cfm);
-        Term term = toTerm(receivers, getValue(), cfm.ksName, boundNames);
+        List<ColumnMetadata> receivers = receivers(table);
+        Term term = toTerm(receivers, getValue(), table.keyspace, boundNames);
         return new MultiColumnRestriction.EQRestriction(receivers, term);
     }
 
     @Override
-    protected Restriction newINRestriction(CFMetaData cfm,
-                                           VariableSpecifications boundNames) 
throws InvalidRequestException
+    protected Restriction newINRestriction(TableMetadata table, 
VariableSpecifications boundNames)
     {
-        List<ColumnDefinition> receivers = receivers(cfm);
-        List<Term> terms = toTerms(receivers, inValues, cfm.ksName, 
boundNames);
+        List<ColumnMetadata> receivers = receivers(table);
+        List<Term> terms = toTerms(receivers, inValues, table.keyspace, 
boundNames);
         if (terms == null)
         {
-            Term term = toTerm(receivers, getValue(), cfm.ksName, boundNames);
+            Term term = toTerm(receivers, getValue(), table.keyspace, 
boundNames);
             return new 
MultiColumnRestriction.InRestrictionWithMarker(receivers, (AbstractMarker) 
term);
         }
 
@@ -160,34 +158,28 @@ public class MultiColumnRelation extends Relation
     }
 
     @Override
-    protected Restriction newSliceRestriction(CFMetaData cfm,
-                                              VariableSpecifications 
boundNames,
-                                              Bound bound,
-                                              boolean inclusive) throws 
InvalidRequestException
+    protected Restriction newSliceRestriction(TableMetadata table, 
VariableSpecifications boundNames, Bound bound, boolean inclusive)
     {
-        List<ColumnDefinition> receivers = receivers(cfm);
-        Term term = toTerm(receivers(cfm), getValue(), cfm.ksName, boundNames);
+        List<ColumnMetadata> receivers = receivers(table);
+        Term term = toTerm(receivers(table), getValue(), table.keyspace, 
boundNames);
         return new MultiColumnRestriction.SliceRestriction(receivers, bound, 
inclusive, term);
     }
 
     @Override
-    protected Restriction newContainsRestriction(CFMetaData cfm,
-                                                 VariableSpecifications 
boundNames,
-                                                 boolean isKey) throws 
InvalidRequestException
+    protected Restriction newContainsRestriction(TableMetadata table, 
VariableSpecifications boundNames, boolean isKey)
     {
         throw invalidRequest("%s cannot be used for multi-column relations", 
operator());
     }
 
     @Override
-    protected Restriction newIsNotRestriction(CFMetaData cfm,
-                                              VariableSpecifications 
boundNames) throws InvalidRequestException
+    protected Restriction newIsNotRestriction(TableMetadata table, 
VariableSpecifications boundNames)
     {
         // this is currently disallowed by the grammar
         throw new AssertionError(String.format("%s cannot be used for 
multi-column relations", operator()));
     }
 
     @Override
-    protected Restriction newLikeRestriction(CFMetaData cfm, 
VariableSpecifications boundNames, Operator operator) throws 
InvalidRequestException
+    protected Restriction newLikeRestriction(TableMetadata table, 
VariableSpecifications boundNames, Operator operator)
     {
         throw invalidRequest("%s cannot be used for multi-column relations", 
operator());
     }
@@ -203,13 +195,13 @@ public class MultiColumnRelation extends Relation
         return term;
     }
 
-    protected List<ColumnDefinition> receivers(CFMetaData cfm) throws 
InvalidRequestException
+    protected List<ColumnMetadata> receivers(TableMetadata table) throws 
InvalidRequestException
     {
-        List<ColumnDefinition> names = new ArrayList<>(getEntities().size());
+        List<ColumnMetadata> names = new ArrayList<>(getEntities().size());
         int previousPosition = -1;
-        for (ColumnDefinition.Raw raw : getEntities())
+        for (ColumnMetadata.Raw raw : getEntities())
         {
-            ColumnDefinition def = raw.prepare(cfm);
+            ColumnMetadata def = raw.prepare(table);
             checkTrue(def.isClusteringColumn(), "Multi-column relations can 
only be applied to clustering columns but was applied to: %s", def.name);
             checkFalse(names.contains(def), "Column \"%s\" appeared twice in a 
relation: %s", def.name, this);
 
@@ -223,12 +215,12 @@ public class MultiColumnRelation extends Relation
         return names;
     }
 
-    public Relation renameIdentifier(ColumnDefinition.Raw from, 
ColumnDefinition.Raw to)
+    public Relation renameIdentifier(ColumnMetadata.Raw from, 
ColumnMetadata.Raw to)
     {
         if (!entities.contains(from))
             return this;
 
-        List<ColumnDefinition.Raw> newEntities = entities.stream().map(e -> 
e.equals(from) ? to : e).collect(Collectors.toList());
+        List<ColumnMetadata.Raw> newEntities = entities.stream().map(e -> 
e.equals(from) ? to : e).collect(Collectors.toList());
         return new MultiColumnRelation(newEntities, operator(), 
valuesOrMarker, inValues, inMarker);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java 
b/src/java/org/apache/cassandra/cql3/Operation.java
index 85a1eb2..8db9306 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -19,12 +19,12 @@ package org.apache.cassandra.cql3;
 
 import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 
 /**
  * An UPDATE or DELETE operation.
@@ -43,13 +43,13 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
 public abstract class Operation
 {
     // the column the operation applies to
-    public final ColumnDefinition column;
+    public final ColumnMetadata column;
 
     // Term involved in the operation. In theory this should not be here since 
some operation
     // may require none of more than one term, but most need 1 so it simplify 
things a bit.
     protected final Term t;
 
-    protected Operation(ColumnDefinition column, Term t)
+    protected Operation(ColumnMetadata column, Term t)
     {
         assert column != null;
         this.column = column;
@@ -109,10 +109,12 @@ public abstract class Operation
          * It returns an Operation which can be though as post-preparation 
well-typed
          * Operation.
          *
+         *
+         * @param metadata
          * @param receiver the column this operation applies to.
          * @return the prepared update operation.
          */
-        public Operation prepare(CFMetaData cfm, ColumnDefinition receiver) 
throws InvalidRequestException;
+        public Operation prepare(TableMetadata metadata, ColumnMetadata 
receiver) throws InvalidRequestException;
 
         /**
          * @return whether this operation can be applied alongside the {@code
@@ -133,7 +135,7 @@ public abstract class Operation
         /**
          * The name of the column affected by this delete operation.
          */
-        public ColumnDefinition.Raw affectedColumn();
+        public ColumnMetadata.Raw affectedColumn();
 
         /**
          * This method validates the operation (i.e. validate it is well typed)
@@ -144,9 +146,10 @@ public abstract class Operation
          * Operation.
          *
          * @param receiver the "column" this operation applies to.
+         * @param metadata
          * @return the prepared delete operation.
          */
-        public Operation prepare(String keyspace, ColumnDefinition receiver, 
CFMetaData cfm) throws InvalidRequestException;
+        public Operation prepare(String keyspace, ColumnMetadata receiver, 
TableMetadata metadata) throws InvalidRequestException;
     }
 
     public static class SetValue implements RawUpdate
@@ -158,9 +161,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFMetaData cfm, ColumnDefinition receiver) 
throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata 
receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(cfm.ksName, receiver);
+            Term v = value.prepare(metadata.keyspace, receiver);
 
             if (receiver.type instanceof CounterColumnType)
                 throw new InvalidRequestException(String.format("Cannot set 
the value of counter column %s (counters can only be incremented/decremented, 
not set)", receiver.name));
@@ -210,7 +213,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFMetaData cfm, ColumnDefinition receiver) 
throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata 
receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
                 throw new InvalidRequestException(String.format("Invalid 
operation (%s) for non collection column %s", toString(receiver), 
receiver.name));
@@ -220,14 +223,14 @@ public abstract class Operation
             switch (((CollectionType)receiver.type).kind)
             {
                 case LIST:
-                    Term idx = selector.prepare(cfm.ksName, 
Lists.indexSpecOf(receiver));
-                    Term lval = value.prepare(cfm.ksName, 
Lists.valueSpecOf(receiver));
+                    Term idx = selector.prepare(metadata.keyspace, 
Lists.indexSpecOf(receiver));
+                    Term lval = value.prepare(metadata.keyspace, 
Lists.valueSpecOf(receiver));
                     return new Lists.SetterByIndex(receiver, idx, lval);
                 case SET:
                     throw new InvalidRequestException(String.format("Invalid 
operation (%s) for set column %s", toString(receiver), receiver.name));
                 case MAP:
-                    Term key = selector.prepare(cfm.ksName, 
Maps.keySpecOf(receiver));
-                    Term mval = value.prepare(cfm.ksName, 
Maps.valueSpecOf(receiver));
+                    Term key = selector.prepare(metadata.keyspace, 
Maps.keySpecOf(receiver));
+                    Term mval = value.prepare(metadata.keyspace, 
Maps.valueSpecOf(receiver));
                     return new Maps.SetterByKey(receiver, key, mval);
             }
             throw new AssertionError();
@@ -257,7 +260,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFMetaData cfm, ColumnDefinition receiver) 
throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata 
receiver) throws InvalidRequestException
         {
             if (!receiver.type.isUDT())
                 throw new InvalidRequestException(String.format("Invalid 
operation (%s) for non-UDT column %s", toString(receiver), receiver.name));
@@ -268,7 +271,7 @@ public abstract class Operation
             if (fieldPosition == -1)
                 throw new InvalidRequestException(String.format("UDT column %s 
does not have a field named %s", receiver.name, field));
 
-            Term val = value.prepare(cfm.ksName, 
UserTypes.fieldSpecOf(receiver, fieldPosition));
+            Term val = value.prepare(metadata.keyspace, 
UserTypes.fieldSpecOf(receiver, fieldPosition));
             return new UserTypes.SetterByField(receiver, field, val);
         }
 
@@ -295,9 +298,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFMetaData cfm, ColumnDefinition receiver) 
throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata 
receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(cfm.ksName, receiver);
+            Term v = value.prepare(metadata.keyspace, receiver);
 
             if (!(receiver.type instanceof CollectionType))
             {
@@ -340,13 +343,13 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFMetaData cfm, ColumnDefinition receiver) 
throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata 
receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
             {
                 if (!(receiver.type instanceof CounterColumnType))
                     throw new InvalidRequestException(String.format("Invalid 
operation (%s) for non counter column %s", toString(receiver), receiver.name));
-                return new Constants.Substracter(receiver, 
value.prepare(cfm.ksName, receiver));
+                return new Constants.Substracter(receiver, 
value.prepare(metadata.keyspace, receiver));
             }
             else if (!(receiver.type.isMultiCell()))
                 throw new InvalidRequestException(String.format("Invalid 
operation (%s) for frozen collection column %s", toString(receiver), 
receiver.name));
@@ -354,16 +357,16 @@ public abstract class Operation
             switch (((CollectionType)receiver.type).kind)
             {
                 case LIST:
-                    return new Lists.Discarder(receiver, 
value.prepare(cfm.ksName, receiver));
+                    return new Lists.Discarder(receiver, 
value.prepare(metadata.keyspace, receiver));
                 case SET:
-                    return new Sets.Discarder(receiver, 
value.prepare(cfm.ksName, receiver));
+                    return new Sets.Discarder(receiver, 
value.prepare(metadata.keyspace, receiver));
                 case MAP:
                     // The value for a map subtraction is actually a set
                     ColumnSpecification vr = new 
ColumnSpecification(receiver.ksName,
                                                                      
receiver.cfName,
                                                                      
receiver.name,
                                                                      
SetType.getInstance(((MapType)receiver.type).getKeysType(), false));
-                    return new Sets.Discarder(receiver, 
value.prepare(cfm.ksName, vr));
+                    return new Sets.Discarder(receiver, 
value.prepare(metadata.keyspace, vr));
             }
             throw new AssertionError();
         }
@@ -388,9 +391,9 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFMetaData cfm, ColumnDefinition receiver) 
throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata 
receiver) throws InvalidRequestException
         {
-            Term v = value.prepare(cfm.ksName, receiver);
+            Term v = value.prepare(metadata.keyspace, receiver);
 
             if (!(receiver.type instanceof ListType))
                 throw new InvalidRequestException(String.format("Invalid 
operation (%s) for non list column %s", toString(receiver), receiver.name));
@@ -413,19 +416,19 @@ public abstract class Operation
 
     public static class ColumnDeletion implements RawDeletion
     {
-        private final ColumnDefinition.Raw id;
+        private final ColumnMetadata.Raw id;
 
-        public ColumnDeletion(ColumnDefinition.Raw id)
+        public ColumnDeletion(ColumnMetadata.Raw id)
         {
             this.id = id;
         }
 
-        public ColumnDefinition.Raw affectedColumn()
+        public ColumnMetadata.Raw affectedColumn()
         {
             return id;
         }
 
-        public Operation prepare(String keyspace, ColumnDefinition receiver, 
CFMetaData cfm) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnMetadata receiver, 
TableMetadata metadata) throws InvalidRequestException
         {
             // No validation, deleting a column is always "well typed"
             return new Constants.Deleter(receiver);
@@ -434,21 +437,21 @@ public abstract class Operation
 
     public static class ElementDeletion implements RawDeletion
     {
-        private final ColumnDefinition.Raw id;
+        private final ColumnMetadata.Raw id;
         private final Term.Raw element;
 
-        public ElementDeletion(ColumnDefinition.Raw id, Term.Raw element)
+        public ElementDeletion(ColumnMetadata.Raw id, Term.Raw element)
         {
             this.id = id;
             this.element = element;
         }
 
-        public ColumnDefinition.Raw affectedColumn()
+        public ColumnMetadata.Raw affectedColumn()
         {
             return id;
         }
 
-        public Operation prepare(String keyspace, ColumnDefinition receiver, 
CFMetaData cfm) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnMetadata receiver, 
TableMetadata metadata) throws InvalidRequestException
         {
             if (!(receiver.type.isCollection()))
                 throw new InvalidRequestException(String.format("Invalid 
deletion operation for non collection column %s", receiver.name));
@@ -473,21 +476,21 @@ public abstract class Operation
 
     public static class FieldDeletion implements RawDeletion
     {
-        private final ColumnDefinition.Raw id;
+        private final ColumnMetadata.Raw id;
         private final FieldIdentifier field;
 
-        public FieldDeletion(ColumnDefinition.Raw id, FieldIdentifier field)
+        public FieldDeletion(ColumnMetadata.Raw id, FieldIdentifier field)
         {
             this.id = id;
             this.field = field;
         }
 
-        public ColumnDefinition.Raw affectedColumn()
+        public ColumnMetadata.Raw affectedColumn()
         {
             return id;
         }
 
-        public Operation prepare(String keyspace, ColumnDefinition receiver, 
CFMetaData cfm) throws InvalidRequestException
+        public Operation prepare(String keyspace, ColumnMetadata receiver, 
TableMetadata metadata) throws InvalidRequestException
         {
             if (!receiver.type.isUDT())
                 throw new InvalidRequestException(String.format("Invalid field 
deletion operation for non-UDT column %s", receiver.name));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java 
b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 57d5eac..afe20d7 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList;
 
 import io.netty.buffer.ByteBuf;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -101,7 +101,7 @@ public abstract class QueryOptions
      * @return the value correspong to column {@code columnName} in the (JSON) 
bind value at index {@code bindIndex}. This may return null if the
      * JSON value has no value for this column.
      */
-    public Term getJsonColumnValue(int bindIndex, ColumnIdentifier columnName, 
Collection<ColumnDefinition> expectedReceivers) throws InvalidRequestException
+    public Term getJsonColumnValue(int bindIndex, ColumnIdentifier columnName, 
Collection<ColumnMetadata> expectedReceivers) throws InvalidRequestException
     {
         if (jsonValuesCache == null)
             jsonValuesCache = new 
ArrayList<>(Collections.<Map<ColumnIdentifier, 
Term>>nCopies(getValues().size(), null));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index a100878..cf0e777 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -39,8 +39,9 @@ import org.slf4j.LoggerFactory;
 import org.antlr.runtime.*;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaChangeListener;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.cql3.statements.*;
@@ -163,7 +164,7 @@ public class QueryProcessor implements QueryHandler
 
     private QueryProcessor()
     {
-        MigrationManager.instance.register(new MigrationSubscriber());
+        Schema.instance.registerListener(new StatementInvalidatingListener());
     }
 
     public ParsedStatement.Prepared getPrepared(MD5Digest id)
@@ -559,7 +560,7 @@ public class QueryProcessor implements QueryHandler
         internalStatements.clear();
     }
 
-    private static class MigrationSubscriber extends MigrationListener
+    private static class StatementInvalidatingListener extends 
SchemaChangeListener
     {
         private static void removeInvalidPreparedStatements(String ksName, 
String cfName)
         {
@@ -659,18 +660,18 @@ public class QueryProcessor implements QueryHandler
         {
             // in case there are other overloads, we have to remove all 
overloads since argument type
             // matching may change (due to type casting)
-            if (Schema.instance.getKSMetaData(ksName).functions.get(new 
FunctionName(ksName, functionName)).size() > 1)
+            if (Schema.instance.getKeyspaceMetadata(ksName).functions.get(new 
FunctionName(ksName, functionName)).size() > 1)
                 removeInvalidPreparedStatementsForFunction(ksName, 
functionName);
         }
 
-        public void onUpdateColumnFamily(String ksName, String cfName, boolean 
affectsStatements)
+        public void onAlterTable(String ksName, String cfName, boolean 
affectsStatements)
         {
             logger.trace("Column definitions for {}.{} changed, invalidating 
related prepared statements", ksName, cfName);
             if (affectsStatements)
                 removeInvalidPreparedStatements(ksName, cfName);
         }
 
-        public void onUpdateFunction(String ksName, String functionName, 
List<AbstractType<?>> argTypes)
+        public void onAlterFunction(String ksName, String functionName, 
List<AbstractType<?>> argTypes)
         {
             // Updating a function may imply we've changed the body of the 
function, so we need to invalid statements so that
             // the new definition is picked (the function is resolved at 
preparation time).
@@ -679,7 +680,7 @@ public class QueryProcessor implements QueryHandler
             removeInvalidPreparedStatementsForFunction(ksName, functionName);
         }
 
-        public void onUpdateAggregate(String ksName, String aggregateName, 
List<AbstractType<?>> argTypes)
+        public void onAlterAggregate(String ksName, String aggregateName, 
List<AbstractType<?>> argTypes)
         {
             // Updating a function may imply we've changed the body of the 
function, so we need to invalid statements so that
             // the new definition is picked (the function is resolved at 
preparation time).
@@ -694,7 +695,7 @@ public class QueryProcessor implements QueryHandler
             removeInvalidPreparedStatements(ksName, null);
         }
 
-        public void onDropColumnFamily(String ksName, String cfName)
+        public void onDropTable(String ksName, String cfName)
         {
             logger.trace("Table {}.{} was dropped, invalidating related 
prepared statements", ksName, cfName);
             removeInvalidPreparedStatements(ksName, cfName);

Reply via email to