http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java 
b/src/java/org/apache/cassandra/schema/Schema.java
index 6dc63aa..37af5a3 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.schema;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
@@ -42,6 +41,8 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnknownTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -110,22 +111,12 @@ public final class Schema
      */
     public void loadFromDisk(boolean updateVersion)
     {
-        load(SchemaKeyspace.fetchNonSystemKeyspaces());
+        SchemaKeyspace.fetchNonSystemKeyspaces().forEach(this::load);
         if (updateVersion)
             updateVersion();
     }
 
     /**
-     * Load up non-system keyspaces
-     *
-     * @param keyspaceDefs The non-system keyspace definitions
-     */
-    private void load(Iterable<KeyspaceMetadata> keyspaceDefs)
-    {
-        keyspaceDefs.forEach(this::load);
-    }
-
-    /**
      * Update (or insert) new keyspace definition
      *
      * @param ksm The metadata about keyspace
@@ -152,53 +143,49 @@ public final class Schema
            .forEach((name, metadata) -> 
indexMetadataRefs.put(Pair.create(ksm.name, name), new 
TableMetadataRef(metadata)));
     }
 
+    synchronized KeyspacesDiff apply(SchemaTransformation transformation)
+    {
+        Keyspaces before = keyspaces;
+        Keyspaces after = transformation.apply(before);
+
+        KeyspacesDiff diff = Keyspaces.diff(before, after, 
Diff.Mode.IN_MEMORY);
+
+//        keyspaces = after; TODO, more efficient this way
+        merge(diff);
+
+        return diff;
+    }
+
     private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated)
     {
         Keyspace keyspace = getKeyspaceInstance(updated.name);
-        if (keyspace != null)
+        if (null != keyspace)
             keyspace.setMetadata(updated);
 
-        MapDifference<TableId, TableMetadata> tablesDiff = 
previous.tables.diff(updated.tables);
-        MapDifference<TableId, ViewMetadata> viewsDiff = 
previous.views.diff(updated.views);
+        Tables.TablesDiff tablesDiff = Tables.diff(previous.tables, 
updated.tables, Diff.Mode.IN_MEMORY);
+        Views.ViewsDiff viewsDiff = Views.diff(previous.views, updated.views, 
Diff.Mode.IN_MEMORY);
+
         MapDifference<String, TableMetadata> indexesDiff = 
previous.tables.indexesDiff(updated.tables);
 
         // clean up after removed entries
-
-        tablesDiff.entriesOnlyOnLeft()
-                  .values()
-                  .forEach(table -> metadataRefs.remove(table.id));
-
-        viewsDiff.entriesOnlyOnLeft()
-                 .values()
-                 .forEach(view -> metadataRefs.remove(view.metadata.id));
+        tablesDiff.dropped.forEach(table -> metadataRefs.remove(table.id));
+        viewsDiff.dropped.forEach(view -> 
metadataRefs.remove(view.metadata.id));
 
         indexesDiff.entriesOnlyOnLeft()
                    .values()
                    .forEach(indexTable -> 
indexMetadataRefs.remove(Pair.create(indexTable.keyspace, 
indexTable.indexName().get())));
 
         // load up new entries
-
-        tablesDiff.entriesOnlyOnRight()
-                  .values()
-                  .forEach(table -> metadataRefs.put(table.id, new 
TableMetadataRef(table)));
-
-        viewsDiff.entriesOnlyOnRight()
-                 .values()
-                 .forEach(view -> metadataRefs.put(view.metadata.id, new 
TableMetadataRef(view.metadata)));
+        tablesDiff.created.forEach(table -> metadataRefs.put(table.id, new 
TableMetadataRef(table)));
+        viewsDiff.created.forEach(view -> metadataRefs.put(view.metadata.id, 
new TableMetadataRef(view.metadata)));
 
         indexesDiff.entriesOnlyOnRight()
                    .values()
                    .forEach(indexTable -> 
indexMetadataRefs.put(Pair.create(indexTable.keyspace, 
indexTable.indexName().get()), new TableMetadataRef(indexTable)));
 
         // refresh refs to updated ones
-
-        tablesDiff.entriesDiffering()
-                  .values()
-                  .forEach(diff -> 
metadataRefs.get(diff.rightValue().id).set(diff.rightValue()));
-
-        viewsDiff.entriesDiffering()
-                 .values()
-                 .forEach(diff -> 
metadataRefs.get(diff.rightValue().metadata.id).set(diff.rightValue().metadata));
+        tablesDiff.altered.forEach(diff -> 
metadataRefs.get(diff.after.id).set(diff.after));
+        viewsDiff.altered.forEach(diff -> 
metadataRefs.get(diff.after.metadata.id).set(diff.after.metadata));
 
         indexesDiff.entriesDiffering()
                    .values()
@@ -579,66 +566,57 @@ public final class Schema
         // apply the schema mutations and fetch the new versions of the 
altered keyspaces
         Keyspaces after = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces);
 
-        MapDifference<String, KeyspaceMetadata> keyspacesDiff = 
before.diff(after);
-
-        // dropped keyspaces
-        keyspacesDiff.entriesOnlyOnLeft().values().forEach(this::dropKeyspace);
-
-        // new keyspaces
-        
keyspacesDiff.entriesOnlyOnRight().values().forEach(this::createKeyspace);
-
-        // updated keyspaces
-        keyspacesDiff.entriesDiffering().entrySet().forEach(diff -> 
alterKeyspace(diff.getValue().leftValue(), diff.getValue().rightValue()));
+        merge(Keyspaces.diff(before, after, Diff.Mode.IN_MEMORY));
     }
 
-    private void alterKeyspace(KeyspaceMetadata before, KeyspaceMetadata after)
+    private void merge(KeyspacesDiff diff)
     {
-        // calculate the deltas
-        MapDifference<TableId, TableMetadata> tablesDiff = 
before.tables.diff(after.tables);
-        MapDifference<TableId, ViewMetadata> viewsDiff = 
before.views.diff(after.views);
-        MapDifference<ByteBuffer, UserType> typesDiff = 
before.types.diff(after.types);
-        MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = 
before.functions.udfsDiff(after.functions);
-        MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff 
= before.functions.udasDiff(after.functions);
+        diff.dropped.forEach(this::dropKeyspace);
+        diff.created.forEach(this::createKeyspace);
+        diff.altered.forEach(this::alterKeyspace);
+    }
 
+    private void alterKeyspace(KeyspaceDiff delta)
+    {
         // drop tables and views
-        viewsDiff.entriesOnlyOnLeft().values().forEach(this::dropView);
-        tablesDiff.entriesOnlyOnLeft().values().forEach(this::dropTable);
+        delta.views.dropped.forEach(this::dropView);
+        delta.tables.dropped.forEach(this::dropTable);
 
-        load(after);
+        load(delta.after);
 
         // add tables and views
-        tablesDiff.entriesOnlyOnRight().values().forEach(this::createTable);
-        viewsDiff.entriesOnlyOnRight().values().forEach(this::createView);
+        delta.tables.created.forEach(this::createTable);
+        delta.views.created.forEach(this::createView);
 
         // update tables and views
-        tablesDiff.entriesDiffering().values().forEach(diff -> 
alterTable(diff.rightValue()));
-        viewsDiff.entriesDiffering().values().forEach(diff -> 
alterView(diff.rightValue()));
+        delta.tables.altered.forEach(diff -> alterTable(diff.after));
+        delta.views.altered.forEach(diff -> alterView(diff.after));
 
         // deal with all removed, added, and altered views
-        Keyspace.open(before.name).viewManager.reload();
+        Keyspace.open(delta.after.name).viewManager.reload();
 
         // notify on everything dropped
-        
udasDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropAggregate);
-        
udfsDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropFunction);
-        viewsDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropView);
-        tablesDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropTable);
-        typesDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropType);
+        delta.udas.dropped.forEach(uda -> notifyDropAggregate((UDAggregate) 
uda));
+        delta.udfs.dropped.forEach(udf -> notifyDropFunction((UDFunction) 
udf));
+        delta.views.dropped.forEach(this::notifyDropView);
+        delta.tables.dropped.forEach(this::notifyDropTable);
+        delta.types.dropped.forEach(this::notifyDropType);
 
         // notify on everything created
-        
typesDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateType);
-        
tablesDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateTable);
-        
viewsDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateView);
-        
udfsDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateFunction);
-        
udasDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateAggregate);
+        delta.types.created.forEach(this::notifyCreateType);
+        delta.tables.created.forEach(this::notifyCreateTable);
+        delta.views.created.forEach(this::notifyCreateView);
+        delta.udfs.created.forEach(udf -> notifyCreateFunction((UDFunction) 
udf));
+        delta.udas.created.forEach(uda -> notifyCreateAggregate((UDAggregate) 
uda));
 
         // notify on everything altered
-        if (!before.params.equals(after.params))
-            notifyAlterKeyspace(after);
-        typesDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterType(diff.rightValue()));
-        tablesDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterTable(diff.leftValue(), diff.rightValue()));
-        viewsDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterView(diff.leftValue(), diff.rightValue()));
-        udfsDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterFunction(diff.rightValue()));
-        udasDiff.entriesDiffering().values().forEach(diff -> 
notifyAlterAggregate(diff.rightValue()));
+        if (!delta.before.params.equals(delta.after.params))
+            notifyAlterKeyspace(delta.before, delta.after);
+        delta.types.altered.forEach(diff -> notifyAlterType(diff.before, 
diff.after));
+        delta.tables.altered.forEach(diff -> notifyAlterTable(diff.before, 
diff.after));
+        delta.views.altered.forEach(diff -> notifyAlterView(diff.before, 
diff.after));
+        delta.udfs.altered.forEach(diff -> notifyAlterFunction(diff.before, 
diff.after));
+        delta.udas.altered.forEach(diff -> notifyAlterAggregate(diff.before, 
diff.after));
     }
 
     private void createKeyspace(KeyspaceMetadata keyspace)
@@ -697,7 +675,7 @@ public final class Schema
 
     private void createView(ViewMetadata view)
     {
-        
Keyspace.open(view.keyspace).initCf(metadataRefs.get(view.metadata.id), true);
+        
Keyspace.open(view.keyspace()).initCf(metadataRefs.get(view.metadata.id), true);
     }
 
     private void alterTable(TableMetadata updated)
@@ -707,7 +685,7 @@ public final class Schema
 
     private void alterView(ViewMetadata updated)
     {
-        
Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload();
+        
Keyspace.open(updated.keyspace()).getColumnFamilyStore(updated.name()).reload();
     }
 
     private void notifyCreateKeyspace(KeyspaceMetadata ksm)
@@ -722,7 +700,7 @@ public final class Schema
 
     private void notifyCreateView(ViewMetadata view)
     {
-        changeListeners.forEach(l -> l.onCreateView(view.keyspace, view.name));
+        changeListeners.forEach(l -> l.onCreateView(view.keyspace(), 
view.name()));
     }
 
     private void notifyCreateType(UserType ut)
@@ -740,36 +718,36 @@ public final class Schema
         changeListeners.forEach(l -> l.onCreateAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
     }
 
-    private void notifyAlterKeyspace(KeyspaceMetadata ksm)
+    private void notifyAlterKeyspace(KeyspaceMetadata before, KeyspaceMetadata 
after)
     {
-        changeListeners.forEach(l -> l.onAlterKeyspace(ksm.name));
+        changeListeners.forEach(l -> l.onAlterKeyspace(after.name));
     }
 
-    private void notifyAlterTable(TableMetadata current, TableMetadata updated)
+    private void notifyAlterTable(TableMetadata before, TableMetadata after)
     {
-        boolean changeAffectedPreparedStatements = 
current.changeAffectsPreparedStatements(updated);
-        changeListeners.forEach(l -> l.onAlterTable(updated.keyspace, 
updated.name, changeAffectedPreparedStatements));
+        boolean changeAffectedPreparedStatements = 
before.changeAffectsPreparedStatements(after);
+        changeListeners.forEach(l -> l.onAlterTable(after.keyspace, 
after.name, changeAffectedPreparedStatements));
     }
 
-    private void notifyAlterView(ViewMetadata current, ViewMetadata updated)
+    private void notifyAlterView(ViewMetadata before, ViewMetadata after)
     {
-        boolean changeAffectedPreparedStatements = 
current.metadata.changeAffectsPreparedStatements(updated.metadata);
-        changeListeners.forEach(l ->l.onAlterView(updated.keyspace, 
updated.name, changeAffectedPreparedStatements));
+        boolean changeAffectedPreparedStatements = 
before.metadata.changeAffectsPreparedStatements(after.metadata);
+        changeListeners.forEach(l ->l.onAlterView(after.keyspace(), 
after.name(), changeAffectedPreparedStatements));
     }
 
-    private void notifyAlterType(UserType ut)
+    private void notifyAlterType(UserType before, UserType after)
     {
-        changeListeners.forEach(l -> l.onAlterType(ut.keyspace, 
ut.getNameAsString()));
+        changeListeners.forEach(l -> l.onAlterType(after.keyspace, 
after.getNameAsString()));
     }
 
-    private void notifyAlterFunction(UDFunction udf)
+    private void notifyAlterFunction(UDFunction before, UDFunction after)
     {
-        changeListeners.forEach(l -> l.onAlterFunction(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
+        changeListeners.forEach(l -> l.onAlterFunction(after.name().keyspace, 
after.name().name, after.argTypes()));
     }
 
-    private void notifyAlterAggregate(UDAggregate udf)
+    private void notifyAlterAggregate(UDAggregate before, UDAggregate after)
     {
-        changeListeners.forEach(l -> l.onAlterAggregate(udf.name().keyspace, 
udf.name().name, udf.argTypes()));
+        changeListeners.forEach(l -> l.onAlterAggregate(after.name().keyspace, 
after.name().name, after.argTypes()));
     }
 
     private void notifyDropKeyspace(KeyspaceMetadata ksm)
@@ -784,7 +762,7 @@ public final class Schema
 
     private void notifyDropView(ViewMetadata view)
     {
-        changeListeners.forEach(l -> l.onDropView(view.keyspace, view.name));
+        changeListeners.forEach(l -> l.onDropView(view.keyspace(), 
view.name()));
     }
 
     private void notifyDropType(UserType ut)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index d2aa7e1..19b0bb7 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -29,19 +29,19 @@ import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.antlr.runtime.RecognitionException;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -50,6 +50,7 @@ import static java.lang.String.format;
 
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
+
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
 
@@ -268,6 +269,38 @@ public final class SchemaKeyspace
         return KeyspaceMetadata.create(SchemaConstants.SCHEMA_KEYSPACE_NAME, 
KeyspaceParams.local(), 
org.apache.cassandra.schema.Tables.of(ALL_TABLE_METADATA));
     }
 
+    static Collection<Mutation> convertSchemaDiffToMutations(KeyspacesDiff 
diff, long timestamp)
+    {
+        Map<String, Mutation> mutations = new HashMap<>();
+
+        diff.created.forEach(k -> mutations.put(k.name, 
makeCreateKeyspaceMutation(k, timestamp).build()));
+        diff.dropped.forEach(k -> mutations.put(k.name, 
makeDropKeyspaceMutation(k, timestamp).build()));
+        diff.altered.forEach(kd ->
+        {
+            KeyspaceMetadata ks = kd.after;
+
+            Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(ks.name, ks.params, timestamp);
+
+            kd.types.created.forEach(t -> addTypeToSchemaMutation(t, builder));
+            kd.types.dropped.forEach(t -> addDropTypeToSchemaMutation(t, 
builder));
+
+            kd.tables.created.forEach(t -> addTableToSchemaMutation(t, true, 
builder));
+            kd.tables.dropped.forEach(t -> addDropTableToSchemaMutation(t, 
builder));
+            kd.tables.altered.forEach(td -> 
addAlterTableToSchemaMutation(td.before, td.after, builder));
+
+            kd.views.created.forEach(v -> addViewToSchemaMutation(v, true, 
builder));
+            kd.views.dropped.forEach(v -> addDropViewToSchemaMutation(v, 
builder));
+            kd.views.altered.forEach(vd -> 
addAlterViewToSchemaMutation(vd.before, vd.after, builder));
+
+            kd.udfs.dropped.forEach(f -> 
addDropFunctionToSchemaMutation((UDFunction) f, builder));
+            kd.udas.dropped.forEach(a -> 
addDropAggregateToSchemaMutation((UDAggregate) a, builder));
+
+            mutations.put(ks.name, builder.build());
+        });
+
+        return mutations.values();
+    }
+
     /**
      * Add entries to system_schema.* for the hardcoded system keyspaces
      */
@@ -296,7 +329,7 @@ public final class SchemaKeyspace
         ALL.reverse().forEach(table -> getSchemaCFS(table).truncateBlocking());
     }
 
-    static void flush()
+    private static void flush()
     {
         if (!DatabaseDescriptor.isUnsafeSystem())
             ALL.forEach(table -> 
FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush()));
@@ -478,7 +511,7 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    static void addTypeToSchemaMutation(UserType type, Mutation.SimpleBuilder 
mutation)
+    private static void addTypeToSchemaMutation(UserType type, 
Mutation.SimpleBuilder mutation)
     {
         mutation.update(Types)
                 .row(type.getNameAsString())
@@ -486,12 +519,9 @@ public final class SchemaKeyspace
                 .add("field_types", 
type.fieldTypes().stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(toList()));
     }
 
-    static Mutation.SimpleBuilder dropTypeFromSchemaMutation(KeyspaceMetadata 
keyspace, UserType type, long timestamp)
+    private static void addDropTypeToSchemaMutation(UserType type, 
Mutation.SimpleBuilder builder)
     {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
         builder.update(Types).row(type.name).delete();
-        return builder;
     }
 
     static Mutation.SimpleBuilder makeCreateTableMutation(KeyspaceMetadata 
keyspace, TableMetadata table, long timestamp)
@@ -502,7 +532,7 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    static void addTableToSchemaMutation(TableMetadata table, boolean 
withColumnsAndTriggers, Mutation.SimpleBuilder builder)
+    private static void addTableToSchemaMutation(TableMetadata table, boolean 
withColumnsAndTriggers, Mutation.SimpleBuilder builder)
     {
         Row.SimpleBuilder rowBuilder = builder.update(Tables)
                                               .row(table.name)
@@ -551,13 +581,8 @@ public final class SchemaKeyspace
             builder.add("cdc", params.cdc);
     }
 
-    static Mutation.SimpleBuilder makeUpdateTableMutation(KeyspaceMetadata 
keyspace,
-                                                                 TableMetadata 
oldTable,
-                                                                 TableMetadata 
newTable,
-                                                                 long 
timestamp)
+    private static void addAlterTableToSchemaMutation(TableMetadata oldTable, 
TableMetadata newTable, Mutation.SimpleBuilder builder)
     {
-        Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-
         addTableToSchemaMutation(newTable, false, builder);
 
         MapDifference<ByteBuffer, ColumnMetadata> columnDiff = 
Maps.difference(oldTable.columns, newTable.columns);
@@ -609,7 +634,15 @@ public final class SchemaKeyspace
         // updated indexes need to be updated
         for (MapDifference.ValueDifference<IndexMetadata> diff : 
indexesDiff.entriesDiffering().values())
             addUpdatedIndexToSchemaMutation(newTable, diff.rightValue(), 
builder);
+    }
 
+    static Mutation.SimpleBuilder makeUpdateTableMutation(KeyspaceMetadata 
keyspace,
+                                                          TableMetadata 
oldTable,
+                                                          TableMetadata 
newTable,
+                                                          long timestamp)
+    {
+        Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        addAlterTableToSchemaMutation(oldTable, newTable, builder);
         return builder;
     }
 
@@ -639,7 +672,12 @@ public final class SchemaKeyspace
     {
         // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        addDropTableToSchemaMutation(table, builder);
+        return builder;
+    }
 
+    private static void addDropTableToSchemaMutation(TableMetadata table, 
Mutation.SimpleBuilder builder)
+    {
         builder.update(Tables).row(table.name).delete();
 
         for (ColumnMetadata column : table.columns())
@@ -650,8 +688,6 @@ public final class SchemaKeyspace
 
         for (IndexMetadata index : table.indexes)
             dropIndexFromSchemaMutation(table, index, builder);
-
-        return builder;
     }
 
     private static void addColumnToSchemaMutation(TableMetadata table, 
ColumnMetadata column, Mutation.SimpleBuilder builder)
@@ -696,23 +732,15 @@ public final class SchemaKeyspace
         builder.update(Triggers).row(table.name, trigger.name).delete();
     }
 
-    static Mutation.SimpleBuilder makeCreateViewMutation(KeyspaceMetadata 
keyspace, ViewMetadata view, long timestamp)
-    {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        addViewToSchemaMutation(view, true, builder);
-        return builder;
-    }
-
     private static void addViewToSchemaMutation(ViewMetadata view, boolean 
includeColumns, Mutation.SimpleBuilder builder)
     {
         TableMetadata table = view.metadata;
         Row.SimpleBuilder rowBuilder = builder.update(Views)
-                                              .row(view.name)
+                                              .row(view.name())
                                               .add("include_all_columns", 
view.includeAllColumns)
                                               .add("base_table_id", 
view.baseTableId.asUUID())
                                               .add("base_table_name", 
view.baseTableName)
-                                              .add("where_clause", 
view.whereClause)
+                                              .add("where_clause", 
view.whereClause.toString())
                                               .add("id", table.id.asUUID());
 
         addTableParamsToRowBuilder(table.params, rowBuilder);
@@ -727,60 +755,39 @@ public final class SchemaKeyspace
         }
     }
 
-    static Mutation.SimpleBuilder makeDropViewMutation(KeyspaceMetadata 
keyspace, ViewMetadata view, long timestamp)
+    private static void addDropViewToSchemaMutation(ViewMetadata view, 
Mutation.SimpleBuilder builder)
     {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-
-        builder.update(Views).row(view.name).delete();
+        builder.update(Views).row(view.name()).delete();
 
         TableMetadata table = view.metadata;
         for (ColumnMetadata column : table.columns())
             dropColumnFromSchemaMutation(table, column, builder);
-
-        for (IndexMetadata index : table.indexes)
-            dropIndexFromSchemaMutation(table, index, builder);
-
-        return builder;
     }
 
-    static Mutation.SimpleBuilder makeUpdateViewMutation(KeyspaceMetadata 
keyspace,
-                                                                ViewMetadata 
oldView,
-                                                                ViewMetadata 
newView,
-                                                                long timestamp)
+    static Mutation.SimpleBuilder makeUpdateViewMutation(KeyspaceMetadata 
keyspace, ViewMetadata oldView, ViewMetadata newView, long timestamp)
     {
         Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        addAlterViewToSchemaMutation(oldView, newView, builder);
+        return builder;
+    }
 
-        addViewToSchemaMutation(newView, false, builder);
+    private static void addAlterViewToSchemaMutation(ViewMetadata before, 
ViewMetadata after, Mutation.SimpleBuilder builder)
+    {
+        addViewToSchemaMutation(after, false, builder);
 
-        MapDifference<ByteBuffer, ColumnMetadata> columnDiff = 
Maps.difference(oldView.metadata.columns,
-                                                                               
newView.metadata.columns);
+        MapDifference<ByteBuffer, ColumnMetadata> columnDiff = 
Maps.difference(before.metadata.columns, after.metadata.columns);
 
         // columns that are no longer needed
         for (ColumnMetadata column : columnDiff.entriesOnlyOnLeft().values())
-            dropColumnFromSchemaMutation(oldView.metadata, column, builder);
+            dropColumnFromSchemaMutation(before.metadata, column, builder);
 
         // newly added columns
         for (ColumnMetadata column : columnDiff.entriesOnlyOnRight().values())
-            addColumnToSchemaMutation(newView.metadata, column, builder);
+            addColumnToSchemaMutation(after.metadata, column, builder);
 
         // old columns with updated attributes
         for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
-            addColumnToSchemaMutation(newView.metadata, 
newView.metadata.getColumn(name), builder);
-
-        // dropped columns
-        MapDifference<ByteBuffer, DroppedColumn> droppedColumnDiff =
-        Maps.difference(oldView.metadata.droppedColumns, 
oldView.metadata.droppedColumns);
-
-        // newly dropped columns
-        for (DroppedColumn column : 
droppedColumnDiff.entriesOnlyOnRight().values())
-            addDroppedColumnToSchemaMutation(oldView.metadata, column, 
builder);
-
-        // columns added then dropped again
-        for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet())
-            addDroppedColumnToSchemaMutation(newView.metadata, 
newView.metadata.droppedColumns.get(name), builder);
-
-        return builder;
+            addColumnToSchemaMutation(after.metadata, 
after.metadata.getColumn(name), builder);
     }
 
     private static void addIndexToSchemaMutation(TableMetadata table, 
IndexMetadata index, Mutation.SimpleBuilder builder)
@@ -811,7 +818,7 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    static void addFunctionToSchemaMutation(UDFunction function, 
Mutation.SimpleBuilder builder)
+    private static void addFunctionToSchemaMutation(UDFunction function, 
Mutation.SimpleBuilder builder)
     {
         builder.update(Functions)
                .row(function.name().name, function.argumentsList())
@@ -834,12 +841,9 @@ public final class SchemaKeyspace
         }
     }
 
-    static Mutation.SimpleBuilder makeDropFunctionMutation(KeyspaceMetadata 
keyspace, UDFunction function, long timestamp)
+    private static void addDropFunctionToSchemaMutation(UDFunction function, 
Mutation.SimpleBuilder builder)
     {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
         builder.update(Functions).row(function.name().name, 
function.argumentsList()).delete();
-        return builder;
     }
 
     static Mutation.SimpleBuilder makeCreateAggregateMutation(KeyspaceMetadata 
keyspace, UDAggregate aggregate, long timestamp)
@@ -850,7 +854,7 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    static void addAggregateToSchemaMutation(UDAggregate aggregate, 
Mutation.SimpleBuilder builder)
+    private static void addAggregateToSchemaMutation(UDAggregate aggregate, 
Mutation.SimpleBuilder builder)
     {
         builder.update(Aggregates)
                .row(aggregate.name().name, aggregate.argumentsList())
@@ -864,12 +868,9 @@ public final class SchemaKeyspace
                                 : null);
     }
 
-    static Mutation.SimpleBuilder makeDropAggregateMutation(KeyspaceMetadata 
keyspace, UDAggregate aggregate, long timestamp)
+    private static void addDropAggregateToSchemaMutation(UDAggregate 
aggregate, Mutation.SimpleBuilder builder)
     {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
         builder.update(Aggregates).row(aggregate.name().name, 
aggregate.argumentsList()).delete();
-        return builder;
     }
 
     /*
@@ -1008,7 +1009,7 @@ public final class SchemaKeyspace
         String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND 
table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS);
         UntypedResultSet columnRows = query(query, keyspace, table);
         if (columnRows.isEmpty())
-            throw new MissingColumns("Columns not found in schema table for " 
+ keyspace + "." + table);
+            throw new MissingColumns("Columns not found in schema table for " 
+ keyspace + '.' + table);
 
         List<ColumnMetadata> columns = new ArrayList<>();
         columnRows.forEach(row -> columns.add(createColumnFromRow(row, 
types)));
@@ -1107,7 +1108,7 @@ public final class SchemaKeyspace
 
         Views.Builder views = org.apache.cassandra.schema.Views.builder();
         for (UntypedResultSet.Row row : query(query, keyspaceName))
-            views.add(fetchView(keyspaceName, row.getString("view_name"), 
types));
+            views.put(fetchView(keyspaceName, row.getString("view_name"), 
types));
         return views.build();
     }
 
@@ -1122,7 +1123,7 @@ public final class SchemaKeyspace
         TableId baseTableId = TableId.fromUUID(row.getUUID("base_table_id"));
         String baseTableName = row.getString("base_table_name");
         boolean includeAll = row.getBoolean("include_all_columns");
-        String whereClause = row.getString("where_clause");
+        String whereClauseString = row.getString("where_clause");
 
         List<ColumnMetadata> columns = fetchColumns(keyspaceName, viewName, 
types);
 
@@ -1134,31 +1135,36 @@ public final class SchemaKeyspace
                          .params(createTableParamsFromRow(row))
                          .build();
 
-        String rawSelect = View.buildSelectStatement(baseTableName, columns, 
whereClause);
-        SelectStatement.RawStatement rawStatement = 
(SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
+        WhereClause whereClause;
 
-        return new ViewMetadata(keyspaceName, viewName, baseTableId, 
baseTableName, includeAll, rawStatement, whereClause, metadata);
+        try
+        {
+            whereClause = WhereClause.parse(whereClauseString);
+        }
+        catch (RecognitionException e)
+        {
+            throw new RuntimeException(format("Unexpected error while parsing 
materialized view's where clause for '%s' (got %s)", viewName, 
whereClauseString));
+        }
+
+        return new ViewMetadata(baseTableId, baseTableName, includeAll, 
whereClause, metadata);
     }
 
     private static Functions fetchFunctions(String keyspaceName, Types types)
     {
-        Functions udfs = fetchUDFs(keyspaceName, types);
-        Functions udas = fetchUDAs(keyspaceName, udfs, types);
+        Collection<UDFunction> udfs = fetchUDFs(keyspaceName, types);
+        Collection<UDAggregate> udas = fetchUDAs(keyspaceName, udfs, types);
 
-        return org.apache.cassandra.schema.Functions.builder()
-                                                    .add(udfs)
-                                                    .add(udas)
-                                                    .build();
+        return 
org.apache.cassandra.schema.Functions.builder().add(udfs).add(udas).build();
     }
 
-    private static Functions fetchUDFs(String keyspaceName, Types types)
+    private static Collection<UDFunction> fetchUDFs(String keyspaceName, Types 
types)
     {
         String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", 
SchemaConstants.SCHEMA_KEYSPACE_NAME, FUNCTIONS);
 
-        Functions.Builder functions = 
org.apache.cassandra.schema.Functions.builder();
+        Collection<UDFunction> functions = new ArrayList<>();
         for (UntypedResultSet.Row row : query(query, keyspaceName))
             functions.add(createUDFFromRow(row, types));
-        return functions.build();
+        return functions;
     }
 
     private static UDFunction createUDFFromRow(UntypedResultSet.Row row, Types 
types)
@@ -1217,17 +1223,16 @@ public final class SchemaKeyspace
         }
     }
 
-    private static Functions fetchUDAs(String keyspaceName, Functions udfs, 
Types types)
+    private static Collection<UDAggregate> fetchUDAs(String keyspaceName, 
Collection<UDFunction> udfs, Types types)
     {
         String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", 
SchemaConstants.SCHEMA_KEYSPACE_NAME, AGGREGATES);
 
-        Functions.Builder aggregates = 
org.apache.cassandra.schema.Functions.builder();
-        for (UntypedResultSet.Row row : query(query, keyspaceName))
-            aggregates.add(createUDAFromRow(row, udfs, types));
-        return aggregates.build();
+        Collection<UDAggregate> aggregates = new ArrayList<>();
+        query(query, keyspaceName).forEach(row -> 
aggregates.add(createUDAFromRow(row, udfs, types)));
+        return aggregates;
     }
 
-    private static UDAggregate createUDAFromRow(UntypedResultSet.Row row, 
Functions functions, Types types)
+    private static UDAggregate createUDAFromRow(UntypedResultSet.Row row, 
Collection<UDFunction> functions, Types types)
     {
         String ksName = row.getString("keyspace_name");
         String functionName = row.getString("aggregate_name");
@@ -1242,18 +1247,12 @@ public final class SchemaKeyspace
         AbstractType<?> returnType = CQLTypeParser.parse(ksName, 
row.getString("return_type"), types);
 
         FunctionName stateFunc = new FunctionName(ksName, 
(row.getString("state_func")));
+
         FunctionName finalFunc = row.has("final_func") ? new 
FunctionName(ksName, row.getString("final_func")) : null;
         AbstractType<?> stateType = row.has("state_type") ? 
CQLTypeParser.parse(ksName, row.getString("state_type"), types) : null;
         ByteBuffer initcond = row.has("initcond") ? Terms.asBytes(ksName, 
row.getString("initcond"), stateType) : null;
 
-        try
-        {
-            return UDAggregate.create(functions, name, argTypes, returnType, 
stateFunc, finalFunc, stateType, initcond);
-        }
-        catch (InvalidRequestException reason)
-        {
-            return UDAggregate.createBroken(name, argTypes, returnType, 
initcond, reason);
-        }
+        return UDAggregate.create(functions, name, argTypes, returnType, 
stateFunc, finalFunc, stateType, initcond);
     }
 
     private static UntypedResultSet query(String query, Object... variables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/SchemaTransformation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaTransformation.java 
b/src/java/org/apache/cassandra/schema/SchemaTransformation.java
new file mode 100644
index 0000000..c19ac7c
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaTransformation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+public interface SchemaTransformation
+{
+    /**
+     * Apply a statement transformation to a schema snapshot.
+     *
+     * Implementing methods should be side-effect free.
+     *
+     * @param schema Keyspaces to base the transformation on
+     * @return Keyspaces transformed by the statement
+     */
+    Keyspaces apply(Keyspaces schema);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/TableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java 
b/src/java/org/apache/cassandra/schema/TableMetadata.java
index 44b1f8a..ff7281c 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -38,6 +38,7 @@ import static java.lang.String.format;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 
+import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Iterables.transform;
 import static org.apache.cassandra.schema.IndexMetadata.isNameValid;
 
@@ -162,6 +163,21 @@ public final class TableMetadata
                .triggers(triggers);
     }
 
+    public TableMetadata withSwapped(TableParams params)
+    {
+        return unbuild().params(params).build();
+    }
+
+    public TableMetadata withSwapped(Triggers triggers)
+    {
+        return unbuild().triggers(triggers).build();
+    }
+
+    public TableMetadata withSwapped(Indexes indexes)
+    {
+        return unbuild().indexes(indexes).build();
+    }
+
     public boolean isView()
     {
         return isView;
@@ -430,7 +446,7 @@ public final class TableMetadata
      * This method should only be called for superColumn tables and "static
      * compact" ones. For any other table, all column names are UTF8.
      */
-    public AbstractType<?> staticCompactOrSuperTableColumnNameType()
+    AbstractType<?> staticCompactOrSuperTableColumnNameType()
     {
         if (isSuper())
         {
@@ -505,6 +521,22 @@ public final class TableMetadata
         return unbuild().params(builder.build()).build();
     }
 
+    boolean referencesUserType(ByteBuffer name)
+    {
+        return any(columns(), c -> c.type.referencesUserType(name));
+    }
+
+    public TableMetadata withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        Builder builder = unbuild();
+        columns().forEach(c -> builder.alterColumnType(c.name, 
c.type.withUpdatedUserType(udt)));
+
+        return builder.build();
+    }
+
     private void except(String format, Object... args)
     {
         throw new ConfigurationException(format(format, args));
@@ -534,6 +566,11 @@ public final class TableMetadata
             && triggers.equals(tm.triggers);
     }
 
+    public boolean equals(TableMetadata other, Diff.Mode mode)
+    {
+        return equals(other);
+    }
+
     @Override
     public int hashCode()
     {
@@ -833,7 +870,7 @@ public final class TableMetadata
             return this;
         }
 
-        public Builder addColumns(Iterable<ColumnMetadata> columns)
+        Builder addColumns(Iterable<ColumnMetadata> columns)
         {
             columns.forEach(this::addColumn);
             return this;
@@ -925,7 +962,7 @@ public final class TableMetadata
             return this;
         }
 
-        public Builder alterColumnType(ColumnIdentifier name, AbstractType<?> 
type)
+        Builder alterColumnType(ColumnIdentifier name, AbstractType<?> type)
         {
             ColumnMetadata column = columns.get(name.bytes);
             if (column == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/Tables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Tables.java 
b/src/java/org/apache/cassandra/schema/Tables.java
index a83c061..4e2c75c 100644
--- a/src/java/org/apache/cassandra/schema/Tables.java
+++ b/src/java/org/apache/cassandra/schema/Tables.java
@@ -17,32 +17,38 @@
  */
 package org.apache.cassandra.schema;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Predicate;
 
 import javax.annotation.Nullable;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
+import com.google.common.collect.*;
 
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.index.internal.CassandraIndex;
 
-import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.transform;
 
 /**
  * An immutable container for a keyspace's Tables.
  */
 public final class Tables implements Iterable<TableMetadata>
 {
+    private static final Tables NONE = builder().build();
+
     private final ImmutableMap<String, TableMetadata> tables;
+    private final ImmutableMap<TableId, TableMetadata> tablesById;
     private final ImmutableMap<String, TableMetadata> indexTables;
 
     private Tables(Builder builder)
     {
         tables = builder.tables.build();
+        tablesById = builder.tablesById.build();
         indexTables = builder.indexTables.build();
     }
 
@@ -53,7 +59,7 @@ public final class Tables implements Iterable<TableMetadata>
 
     public static Tables none()
     {
-        return builder().build();
+        return NONE;
     }
 
     public static Tables of(TableMetadata... tables)
@@ -71,6 +77,11 @@ public final class Tables implements Iterable<TableMetadata>
         return tables.values().iterator();
     }
 
+    public Iterable<TableMetadata> referencingUserType(ByteBuffer name)
+    {
+        return Iterables.filter(tables.values(), t -> 
t.referencesUserType(name));
+    }
+
     ImmutableMap<String, TableMetadata> indexTables()
     {
         return indexTables;
@@ -104,10 +115,21 @@ public final class Tables implements 
Iterable<TableMetadata>
         return tables.get(name);
     }
 
-    @Nullable
-    public TableMetadata getIndexTableNullable(String name)
+    TableMetadata getNullable(TableId id)
+    {
+        return tablesById.get(id);
+    }
+
+    boolean containsTable(TableId id)
     {
-        return indexTables.get(name);
+        return tablesById.containsKey(id);
+    }
+
+    public Tables filter(Predicate<TableMetadata> predicate)
+    {
+        Builder builder = builder();
+        tables.values().stream().filter(predicate).forEach(builder::add);
+        return builder.build();
     }
 
     /**
@@ -134,18 +156,19 @@ public final class Tables implements 
Iterable<TableMetadata>
         TableMetadata table =
             get(name).orElseThrow(() -> new 
IllegalStateException(String.format("Table %s doesn't exists", name)));
 
-        return builder().add(filter(this, t -> t != table)).build();
+        return without(table);
     }
 
-    MapDifference<TableId, TableMetadata> diff(Tables other)
+    public Tables without(TableMetadata table)
     {
-        Map<TableId, TableMetadata> thisTables = new HashMap<>();
-        this.forEach(t -> thisTables.put(t.id, t));
-
-        Map<TableId, TableMetadata> otherTables = new HashMap<>();
-        other.forEach(t -> otherTables.put(t.id, t));
+        return filter(t -> t != table);
+    }
 
-        return Maps.difference(thisTables, otherTables);
+    public Tables withUpdatedUserType(UserType udt)
+    {
+        return any(this, t -> t.referencesUserType(udt.name))
+             ? builder().add(transform(this, t -> 
t.withUpdatedUserType(udt))).build()
+             : this;
     }
 
     MapDifference<String, TableMetadata> indexesDiff(Tables other)
@@ -180,6 +203,7 @@ public final class Tables implements Iterable<TableMetadata>
     public static final class Builder
     {
         final ImmutableMap.Builder<String, TableMetadata> tables = new 
ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<TableId, TableMetadata> tablesById = new 
ImmutableMap.Builder<>();
         final ImmutableMap.Builder<String, TableMetadata> indexTables = new 
ImmutableMap.Builder<>();
 
         private Builder()
@@ -195,6 +219,8 @@ public final class Tables implements Iterable<TableMetadata>
         {
             tables.put(table.name, table);
 
+            tablesById.put(table.id, table);
+
             table.indexes
                  .stream()
                  .filter(i -> !i.isCustom())
@@ -217,4 +243,38 @@ public final class Tables implements 
Iterable<TableMetadata>
             return this;
         }
     }
+
+    static TablesDiff diff(Tables before, Tables after, Diff.Mode mode)
+    {
+        return TablesDiff.diff(before, after, mode);
+    }
+
+    public static final class TablesDiff extends Diff<Tables, TableMetadata>
+    {
+        private final static TablesDiff NONE = new TablesDiff(Tables.none(), 
Tables.none(), ImmutableList.of());
+
+        private TablesDiff(Tables created, Tables dropped, 
ImmutableCollection<Altered<TableMetadata>> altered)
+        {
+            super(created, dropped, altered);
+        }
+
+        private static TablesDiff diff(Tables before, Tables after, Mode mode)
+        {
+            if (before == after)
+                return NONE;
+
+            Tables created = after.filter(t -> !before.containsTable(t.id));
+            Tables dropped = before.filter(t -> !after.containsTable(t.id));
+
+            ImmutableList.Builder<Altered<TableMetadata>> altered = 
ImmutableList.builder();
+            before.forEach(tableBefore ->
+            {
+                TableMetadata tableAfter = after.getNullable(tableBefore.id);
+                if (null != tableAfter && !tableBefore.equals(tableAfter, 
mode))
+                    altered.add(new Altered<>(tableBefore, tableAfter));
+            });
+
+            return new TablesDiff(created, dropped, altered.build());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/Types.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Types.java 
b/src/java/org/apache/cassandra/schema/Types.java
index c2d8aac..f9c4afc 100644
--- a/src/java/org/apache/cassandra/schema/Types.java
+++ b/src/java/org/apache/cassandra/schema/Types.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.schema;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.function.Predicate;
 
 import javax.annotation.Nullable;
 
@@ -31,8 +32,12 @@ import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 
 import static java.lang.String.format;
-import static com.google.common.collect.Iterables.filter;
 import static java.util.stream.Collectors.toList;
+
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.transform;
+
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 /**
@@ -82,6 +87,11 @@ public final class Types implements Iterable<UserType>
         return types.values().iterator();
     }
 
+    public Iterable<UserType> referencingUserType(ByteBuffer name)
+    {
+        return Iterables.filter(types.values(), t -> 
t.referencesUserType(name) && !t.name.equals(name));
+    }
+
     /**
      * Get the type with the specified name
      *
@@ -105,6 +115,18 @@ public final class Types implements Iterable<UserType>
         return types.get(name);
     }
 
+    boolean containsType(ByteBuffer name)
+    {
+        return types.containsKey(name);
+    }
+
+    Types filter(Predicate<UserType> predicate)
+    {
+        Builder builder = builder();
+        types.values().stream().filter(predicate).forEach(builder::add);
+        return builder.build();
+    }
+
     /**
      * Create a Types instance with the provided type added
      */
@@ -124,12 +146,19 @@ public final class Types implements Iterable<UserType>
         UserType type =
             get(name).orElseThrow(() -> new IllegalStateException(format("Type 
%s doesn't exists", name)));
 
-        return builder().add(filter(this, t -> t != type)).build();
+        return without(type);
+    }
+
+    public Types without(UserType type)
+    {
+        return filter(t -> t != type);
     }
 
-    MapDifference<ByteBuffer, UserType> diff(Types other)
+    public Types withUpdatedUserType(UserType udt)
     {
-        return Maps.difference(types, other.types);
+        return any(this, t -> t.referencesUserType(udt.name))
+             ? builder().add(transform(this, t -> 
t.withUpdatedUserType(udt))).build()
+             : this;
     }
 
     @Override
@@ -305,7 +334,7 @@ public final class Types implements Iterable<UserType>
             {
                 List<FieldIdentifier> preparedFieldNames =
                     fieldNames.stream()
-                              .map(t -> FieldIdentifier.forInternalString(t))
+                              .map(FieldIdentifier::forInternalString)
                               .collect(toList());
 
                 List<AbstractType<?>> preparedFieldTypes =
@@ -329,4 +358,38 @@ public final class Types implements Iterable<UserType>
             }
         }
     }
+
+    static TypesDiff diff(Types before, Types after, Diff.Mode mode)
+    {
+        return TypesDiff.diff(before, after, mode);
+    }
+
+    static final class TypesDiff extends Diff<Types, UserType>
+    {
+        private static final TypesDiff NONE = new TypesDiff(Types.none(), 
Types.none(), ImmutableList.of());
+
+        private TypesDiff(Types created, Types dropped, 
ImmutableCollection<Altered<UserType>> altered)
+        {
+            super(created, dropped, altered);
+        }
+
+        private static TypesDiff diff(Types before, Types after, Mode mode)
+        {
+            if (before == after)
+                return NONE;
+
+            Types created = after.filter(t -> !before.containsType(t.name));
+            Types dropped = before.filter(t -> !after.containsType(t.name));
+
+            ImmutableList.Builder<Altered<UserType>> altered = 
ImmutableList.builder();
+            before.forEach(typeBefore ->
+            {
+                UserType typeAfter = after.getNullable(typeBefore.name);
+                if (null != typeAfter && !typeBefore.equals(typeAfter, mode))
+                    altered.add(new Altered<>(typeBefore, typeAfter));
+            });
+
+            return new TypesDiff(created, dropped, altered.build());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/ViewMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/ViewMetadata.java 
b/src/java/org/apache/cassandra/schema/ViewMetadata.java
index 57f4092..9831f5d 100644
--- a/src/java/org/apache/cassandra/schema/ViewMetadata.java
+++ b/src/java/org/apache/cassandra/schema/ViewMetadata.java
@@ -17,56 +17,52 @@
  */
 package org.apache.cassandra.schema;
 
-import java.util.List;
+import java.nio.ByteBuffer;
 import java.util.Objects;
-import java.util.stream.Collectors;
-
-import org.antlr.runtime.*;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.statements.SelectStatement;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.view.View;
-import org.apache.cassandra.exceptions.SyntaxException;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.marshal.UserType;
+
 public final class ViewMetadata
 {
-    public final String keyspace;
-    public final String name;
     public final TableId baseTableId;
     public final String baseTableName;
+
     public final boolean includeAllColumns;
     public final TableMetadata metadata;
 
-    public final SelectStatement.RawStatement select;
-    public final String whereClause;
+    public final WhereClause whereClause;
 
     /**
-     * @param name              Name of the view
      * @param baseTableId       Internal ID of the table which this view is 
based off of
      * @param includeAllColumns Whether to include all columns or not
      */
-    public ViewMetadata(String keyspace,
-                        String name,
-                        TableId baseTableId,
+    public ViewMetadata(TableId baseTableId,
                         String baseTableName,
                         boolean includeAllColumns,
-                        SelectStatement.RawStatement select,
-                        String whereClause,
+                        WhereClause whereClause,
                         TableMetadata metadata)
     {
-        this.keyspace = keyspace;
-        this.name = name;
         this.baseTableId = baseTableId;
         this.baseTableName = baseTableName;
         this.includeAllColumns = includeAllColumns;
-        this.select = select;
         this.whereClause = whereClause;
         this.metadata = metadata;
     }
 
+    public String keyspace()
+    {
+        return metadata.keyspace;
+    }
+
+    public String name()
+    {
+        return metadata.name;
+    }
+
     /**
      * @return true if the view specified by this definition will include the 
column, false otherwise
      */
@@ -77,7 +73,7 @@ public final class ViewMetadata
 
     public ViewMetadata copy(TableMetadata newMetadata)
     {
-        return new ViewMetadata(keyspace, name, baseTableId, baseTableName, 
includeAllColumns, select, whereClause, newMetadata);
+        return new ViewMetadata(baseTableId, baseTableName, includeAllColumns, 
whereClause, newMetadata);
     }
 
     public TableMetadata baseTableMetadata()
@@ -95,20 +91,21 @@ public final class ViewMetadata
             return false;
 
         ViewMetadata other = (ViewMetadata) o;
-        return Objects.equals(keyspace, other.keyspace)
-               && Objects.equals(name, other.name)
-               && Objects.equals(baseTableId, other.baseTableId)
+        return Objects.equals(baseTableId, other.baseTableId)
                && Objects.equals(includeAllColumns, other.includeAllColumns)
                && Objects.equals(whereClause, other.whereClause)
                && Objects.equals(metadata, other.metadata);
     }
 
+    public boolean equals(ViewMetadata other, Diff.Mode mode)
+    {
+        return equals(other);
+    }
+
     @Override
     public int hashCode()
     {
         return new HashCodeBuilder(29, 1597)
-               .append(keyspace)
-               .append(name)
                .append(baseTableId)
                .append(includeAllColumns)
                .append(whereClause)
@@ -120,8 +117,6 @@ public final class ViewMetadata
     public String toString()
     {
         return new ToStringBuilder(this)
-               .append("keyspace", keyspace)
-               .append("name", name)
                .append("baseTableId", baseTableId)
                .append("baseTableName", baseTableName)
                .append("includeAllColumns", includeAllColumns)
@@ -130,68 +125,43 @@ public final class ViewMetadata
                .toString();
     }
 
+    public boolean referencesUserType(ByteBuffer name)
+    {
+        return metadata.referencesUserType(name);
+    }
+
+    public ViewMetadata withUpdatedUserType(UserType udt)
+    {
+        return referencesUserType(udt.name)
+             ? copy(metadata.withUpdatedUserType(udt))
+             : this;
+    }
+
     /**
      * Replace the column 'from' with 'to' in this materialized view 
definition's partition,
      * clustering, or included columns.
      * @param from the existing column
      * @param to the new column
      */
-    public ViewMetadata renamePrimaryKeyColumn(ColumnIdentifier from, 
ColumnIdentifier to)
+    public ViewMetadata withRenamedPrimaryKeyColumn(ColumnIdentifier from, 
ColumnIdentifier to)
     {
         // convert whereClause to Relations, rename ids in Relations, then 
convert back to whereClause
-        List<Relation> relations = whereClauseToRelations(whereClause);
-        ColumnMetadata.Raw fromRaw = 
ColumnMetadata.Raw.forQuoted(from.toString());
-        ColumnMetadata.Raw toRaw = ColumnMetadata.Raw.forQuoted(to.toString());
-        List<Relation> newRelations =
-            relations.stream()
-                     .map(r -> r.renameIdentifier(fromRaw, toRaw))
-                     .collect(Collectors.toList());
-
-        String rawSelect = View.buildSelectStatement(baseTableName, 
metadata.columns(), whereClause);
-
-        return new ViewMetadata(keyspace,
-                                name,
-                                baseTableId,
+        ColumnMetadata.Raw rawFrom = 
ColumnMetadata.Raw.forQuoted(from.toString());
+        ColumnMetadata.Raw rawTo = ColumnMetadata.Raw.forQuoted(to.toString());
+
+        return new ViewMetadata(baseTableId,
                                 baseTableName,
                                 includeAllColumns,
-                                (SelectStatement.RawStatement) 
QueryProcessor.parseStatement(rawSelect),
-                                View.relationsToWhereClause(newRelations),
+                                whereClause.renameIdentifier(rawFrom, rawTo),
                                 
metadata.unbuild().renamePrimaryKeyColumn(from, to).build());
     }
 
     public ViewMetadata withAddedRegularColumn(ColumnMetadata column)
     {
-        return new ViewMetadata(keyspace,
-                                name,
-                                baseTableId,
+        return new ViewMetadata(baseTableId,
                                 baseTableName,
                                 includeAllColumns,
-                                select,
                                 whereClause,
                                 metadata.unbuild().addColumn(column).build());
     }
-
-    public ViewMetadata withAlteredColumnType(ColumnIdentifier name, 
AbstractType<?> type)
-    {
-        return new ViewMetadata(keyspace,
-                                this.name,
-                                baseTableId,
-                                baseTableName,
-                                includeAllColumns,
-                                select,
-                                whereClause,
-                                metadata.unbuild().alterColumnType(name, 
type).build());
-    }
-
-    private static List<Relation> whereClauseToRelations(String whereClause)
-    {
-        try
-        {
-            return CQLFragmentParser.parseAnyUnhandled(CqlParser::whereClause, 
whereClause).build().relations;
-        }
-        catch (RecognitionException | SyntaxException exc)
-        {
-            throw new RuntimeException("Unexpected error parsing materialized 
view's where clause while handling column rename: ", exc);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/Views.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Views.java 
b/src/java/org/apache/cassandra/schema/Views.java
index 5765433..7226a6b 100644
--- a/src/java/org/apache/cassandra/schema/Views.java
+++ b/src/java/org/apache/cassandra/schema/Views.java
@@ -21,24 +21,26 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
+import java.util.function.Predicate;
 
 import javax.annotation.Nullable;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
+import com.google.common.collect.*;
 
-import static com.google.common.collect.Iterables.filter;
+import org.apache.cassandra.db.marshal.UserType;
+
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.transform;
 
 public final class Views implements Iterable<ViewMetadata>
 {
+    private static final Views NONE = builder().build();
+
     private final ImmutableMap<String, ViewMetadata> views;
 
     private Views(Builder builder)
     {
-        views = builder.views.build();
+        views = ImmutableMap.copyOf(builder.views);
     }
 
     public static Builder builder()
@@ -46,9 +48,14 @@ public final class Views implements Iterable<ViewMetadata>
         return new Builder();
     }
 
+    public Builder unbuild()
+    {
+        return builder().put(this);
+    }
+
     public static Views none()
     {
-        return builder().build();
+        return NONE;
     }
 
     public Iterator<ViewMetadata> iterator()
@@ -56,7 +63,7 @@ public final class Views implements Iterable<ViewMetadata>
         return views.values().iterator();
     }
 
-    public Iterable<TableMetadata> metadatas()
+    Iterable<TableMetadata> metadatas()
     {
         return Iterables.transform(views.values(), view -> view.metadata);
     }
@@ -71,9 +78,9 @@ public final class Views implements Iterable<ViewMetadata>
         return views.isEmpty();
     }
 
-    public Iterable<ViewMetadata> forTable(UUID tableId)
+    public Iterable<ViewMetadata> forTable(TableId tableId)
     {
-        return Iterables.filter(this, v -> 
v.baseTableId.asUUID().equals(tableId));
+        return Iterables.filter(this, v -> v.baseTableId.equals(tableId));
     }
 
     /**
@@ -99,20 +106,32 @@ public final class Views implements Iterable<ViewMetadata>
         return views.get(name);
     }
 
+    boolean containsView(String name)
+    {
+        return views.containsKey(name);
+    }
+
+    Views filter(Predicate<ViewMetadata> predicate)
+    {
+        Builder builder = builder();
+        views.values().stream().filter(predicate).forEach(builder::put);
+        return builder.build();
+    }
+
     /**
      * Create a MaterializedViews instance with the provided materialized view 
added
      */
     public Views with(ViewMetadata view)
     {
-        if (get(view.name).isPresent())
-            throw new IllegalStateException(String.format("Materialized View 
%s already exists", view.name));
+        if (get(view.name()).isPresent())
+            throw new IllegalStateException(String.format("Materialized View 
%s already exists", view.name()));
 
-        return builder().add(this).add(view).build();
+        return builder().put(this).put(view).build();
     }
 
     public Views withSwapped(ViewMetadata view)
     {
-        return without(view.name).with(view);
+        return without(view.name()).with(view);
     }
 
     /**
@@ -123,18 +142,14 @@ public final class Views implements Iterable<ViewMetadata>
         ViewMetadata materializedView =
             get(name).orElseThrow(() -> new 
IllegalStateException(String.format("Materialized View %s doesn't exists", 
name)));
 
-        return builder().add(filter(this, v -> v != materializedView)).build();
+        return filter(v -> v != materializedView);
     }
 
-    MapDifference<TableId, ViewMetadata> diff(Views other)
+    Views withUpdatedUserTypes(UserType udt)
     {
-        Map<TableId, ViewMetadata> thisViews = new HashMap<>();
-        this.forEach(v -> thisViews.put(v.metadata.id, v));
-
-        Map<TableId, ViewMetadata> otherViews = new HashMap<>();
-        other.forEach(v -> otherViews.put(v.metadata.id, v));
-
-        return Maps.difference(thisViews, otherViews);
+        return any(this, v -> v.referencesUserType(udt.name))
+             ? builder().put(transform(this, v -> 
v.withUpdatedUserType(udt))).build()
+             : this;
     }
 
     @Override
@@ -157,7 +172,7 @@ public final class Views implements Iterable<ViewMetadata>
 
     public static final class Builder
     {
-        final ImmutableMap.Builder<String, ViewMetadata> views = new 
ImmutableMap.Builder<>();
+        final Map<String, ViewMetadata> views = new HashMap<>();
 
         private Builder()
         {
@@ -168,17 +183,61 @@ public final class Views implements Iterable<ViewMetadata>
             return new Views(this);
         }
 
+        public ViewMetadata get(String name)
+        {
+            return views.get(name);
+        }
+
+        public Builder put(ViewMetadata view)
+        {
+            views.put(view.name(), view);
+            return this;
+        }
 
-        public Builder add(ViewMetadata view)
+        public Builder remove(String name)
         {
-            views.put(view.name, view);
+            views.remove(name);
             return this;
         }
 
-        public Builder add(Iterable<ViewMetadata> views)
+        public Builder put(Iterable<ViewMetadata> views)
         {
-            views.forEach(this::add);
+            views.forEach(this::put);
             return this;
         }
     }
+
+    static ViewsDiff diff(Views before, Views after, Diff.Mode mode)
+    {
+        return ViewsDiff.diff(before, after, mode);
+    }
+
+    static final class ViewsDiff extends Diff<Views, ViewMetadata>
+    {
+        private static final ViewsDiff NONE = new ViewsDiff(Views.none(), 
Views.none(), ImmutableList.of());
+
+        private ViewsDiff(Views created, Views dropped, 
ImmutableCollection<Altered<ViewMetadata>> altered)
+        {
+            super(created, dropped, altered);
+        }
+
+        private static ViewsDiff diff(Views before, Views after, Mode mode)
+        {
+            if (before == after)
+                return NONE;
+
+            Views created = after.filter(v -> !before.containsView(v.name()));
+            Views dropped = before.filter(v -> !after.containsView(v.name()));
+
+            ImmutableList.Builder<Altered<ViewMetadata>> altered = 
ImmutableList.builder();
+            before.forEach(viewBefore ->
+            {
+                ViewMetadata viewAfter = after.getNullable(viewBefore.name());
+                if (null != viewAfter && !viewBefore.equals(viewAfter, mode))
+                    altered.add(new Altered<>(viewBefore, viewAfter));
+            });
+
+            return new ViewsDiff(created, dropped, altered.build());
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java 
b/src/java/org/apache/cassandra/service/ClientState.java
index dfddccd..908e323 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -43,7 +43,6 @@ import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.CassandraVersion;
 
 /**
  * State related to a client connection.
@@ -51,7 +50,6 @@ import org.apache.cassandra.utils.CassandraVersion;
 public class ClientState
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ClientState.class);
-    public static final CassandraVersion DEFAULT_CQL_VERSION = 
org.apache.cassandra.cql3.QueryProcessor.CQL_VERSION;
 
     private static final Set<IResource> READABLE_SYSTEM_RESOURCES = new 
HashSet<>();
     private static final Set<IResource> PROTECTED_AUTH_RESOURCES = new 
HashSet<>();
@@ -155,6 +153,13 @@ public class ClientState
         return new ClientState();
     }
 
+    public static ClientState forInternalCalls(String keyspace)
+    {
+        ClientState state = new ClientState();
+        state.setKeyspace(keyspace);
+        return state;
+    }
+
     /**
      * @return a ClientState object for external clients (native protocol 
users).
      */
@@ -275,7 +280,7 @@ public class ClientState
         return keyspace;
     }
 
-    public void setKeyspace(String ks) throws InvalidRequestException
+    public void setKeyspace(String ks)
     {
         // Skip keyspace validation for non-authenticated users. Apparently, 
some client libraries
         // call set_keyspace() before calling login(), and we have to handle 
that.
@@ -287,7 +292,7 @@ public class ClientState
     /**
      * Attempts to login the given user.
      */
-    public void login(AuthenticatedUser user) throws AuthenticationException
+    public void login(AuthenticatedUser user)
     {
         // Login privilege is not inherited via granted roles, so just
         // verify that the role with the credentials that were actually
@@ -298,40 +303,35 @@ public class ClientState
             throw new AuthenticationException(String.format("%s is not 
permitted to log in", user.getName()));
     }
 
-    public void hasAllKeyspacesAccess(Permission perm) throws 
UnauthorizedException
+    public void ensureAllKeyspacesPermission(Permission perm)
     {
         if (isInternal)
             return;
         validateLogin();
-        ensureHasPermission(perm, DataResource.root());
+        ensurePermission(perm, DataResource.root());
     }
 
-    public void hasKeyspaceAccess(String keyspace, Permission perm) throws 
UnauthorizedException, InvalidRequestException
+    public void ensureKeyspacePermission(String keyspace, Permission perm)
     {
-        hasAccess(keyspace, perm, DataResource.keyspace(keyspace));
+        ensurePermission(keyspace, perm, DataResource.keyspace(keyspace));
     }
 
-    public void hasColumnFamilyAccess(String keyspace, String columnFamily, 
Permission perm)
-    throws UnauthorizedException, InvalidRequestException
+    public void ensureTablePermission(String keyspace, String table, 
Permission perm)
     {
-        Schema.instance.validateTable(keyspace, columnFamily);
-        hasAccess(keyspace, perm, DataResource.table(keyspace, columnFamily));
+        ensurePermission(keyspace, perm, DataResource.table(keyspace, table));
     }
 
-    public void hasColumnFamilyAccess(TableMetadataRef tableRef, Permission 
perm)
-    throws UnauthorizedException, InvalidRequestException
+    public void ensureTablePermission(TableMetadataRef tableRef, Permission 
perm)
     {
-        hasColumnFamilyAccess(tableRef.get(), perm);
+        ensureTablePermission(tableRef.get(), perm);
     }
 
-    public void hasColumnFamilyAccess(TableMetadata table, Permission perm)
-    throws UnauthorizedException, InvalidRequestException
+    public void ensureTablePermission(TableMetadata table, Permission perm)
     {
-        hasAccess(table.keyspace, perm, table.resource);
+        ensurePermission(table.keyspace, perm, table.resource);
     }
 
-    private void hasAccess(String keyspace, Permission perm, DataResource 
resource)
-    throws UnauthorizedException, InvalidRequestException
+    private void ensurePermission(String keyspace, Permission perm, 
DataResource resource)
     {
         validateKeyspace(keyspace);
         if (isInternal)
@@ -343,10 +343,10 @@ public class ClientState
         if (PROTECTED_AUTH_RESOURCES.contains(resource))
             if ((perm == Permission.CREATE) || (perm == Permission.ALTER) || 
(perm == Permission.DROP))
                 throw new UnauthorizedException(String.format("%s schema is 
protected", resource));
-        ensureHasPermission(perm, resource);
+        ensurePermission(perm, resource);
     }
 
-    public void ensureHasPermission(Permission perm, IResource resource) 
throws UnauthorizedException
+    public void ensurePermission(Permission perm, IResource resource)
     {
         if (!DatabaseDescriptor.getAuthorizer().requireAuthorization())
             return;
@@ -356,12 +356,12 @@ public class ClientState
             if 
(((FunctionResource)resource).getKeyspace().equals(SchemaConstants.SYSTEM_KEYSPACE_NAME))
                 return;
 
-        checkPermissionOnResourceChain(perm, resource);
+        ensurePermissionOnResourceChain(perm, resource);
     }
 
-    // Convenience method called from checkAccess method of CQLStatement
+    // Convenience method called from authorize method of CQLStatement
     // Also avoids needlessly creating lots of FunctionResource objects
-    public void ensureHasPermission(Permission permission, Function function)
+    public void ensurePermission(Permission permission, Function function)
     {
         // Save creating a FunctionResource is we don't need to
         if (!DatabaseDescriptor.getAuthorizer().requireAuthorization())
@@ -371,12 +371,12 @@ public class ClientState
         if (function.isNative())
             return;
 
-        checkPermissionOnResourceChain(permission, 
FunctionResource.function(function.name().keyspace,
-                                                                             
function.name().name,
-                                                                             
function.argTypes()));
+        ensurePermissionOnResourceChain(permission, 
FunctionResource.function(function.name().keyspace,
+                                                                              
function.name().name,
+                                                                              
function.argTypes()));
     }
 
-    private void checkPermissionOnResourceChain(Permission perm, IResource 
resource)
+    private void ensurePermissionOnResourceChain(Permission perm, IResource 
resource)
     {
         for (IResource r : Resources.chain(resource))
             if (authorize(r).contains(perm))
@@ -388,7 +388,7 @@ public class ClientState
                                                       resource));
     }
 
-    private void preventSystemKSSchemaModification(String keyspace, 
DataResource resource, Permission perm) throws UnauthorizedException
+    private void preventSystemKSSchemaModification(String keyspace, 
DataResource resource, Permission perm)
     {
         // we only care about schema modification.
         if (!((perm == Permission.ALTER) || (perm == Permission.DROP) || (perm 
== Permission.CREATE)))
@@ -409,26 +409,26 @@ public class ClientState
         }
     }
 
-    public void validateLogin() throws UnauthorizedException
+    public void validateLogin()
     {
         if (user == null)
             throw new UnauthorizedException("You have not logged in");
     }
 
-    public void ensureNotAnonymous() throws UnauthorizedException
+    public void ensureNotAnonymous()
     {
         validateLogin();
         if (user.isAnonymous())
             throw new UnauthorizedException("You have to be logged in and not 
anonymous to perform this request");
     }
 
-    public void ensureIsSuper(String message) throws UnauthorizedException
+    public void ensureIsSuperUser(String message)
     {
         if (DatabaseDescriptor.getAuthenticator().requireAuthentication() && 
(user == null || !user.isSuper()))
             throw new UnauthorizedException(message);
     }
 
-    private static void validateKeyspace(String keyspace) throws 
InvalidRequestException
+    private static void validateKeyspace(String keyspace)
     {
         if (keyspace == null)
             throw new InvalidRequestException("You have not set a keyspace for 
this session");
@@ -439,11 +439,6 @@ public class ClientState
         return user;
     }
 
-    public static CassandraVersion[] getCQLSupportedVersion()
-    {
-        return new CassandraVersion[]{ QueryProcessor.CQL_VERSION };
-    }
-
     private Set<Permission> authorize(IResource resource)
     {
         return user.getPermissions(resource);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 8914440..3884da5 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -571,8 +572,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     public synchronized void initServer(int delay) throws 
ConfigurationException
     {
         logger.info("Cassandra version: {}", 
FBUtilities.getReleaseVersionString());
-        logger.info("CQL supported versions: {} (default: {})",
-                StringUtils.join(ClientState.getCQLSupportedVersion(), ", "), 
ClientState.DEFAULT_CQL_VERSION);
+        logger.info("CQL version: {}", QueryProcessor.CQL_VERSION);
         logger.info("Native protocol supported versions: {} (default: {})",
                     StringUtils.join(ProtocolVersion.supportedVersions(), ", 
"), ProtocolVersion.CURRENT);
 
@@ -1478,7 +1478,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         for (String keyspace : Schema.instance.getUserKeyspaces())
         {
             for (ViewMetadata view: 
Schema.instance.getKeyspaceMetadata(keyspace).views)
-                SystemKeyspace.finishViewBuildStatus(view.keyspace, view.name);
+                SystemKeyspace.finishViewBuildStatus(view.keyspace(), 
view.name());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java 
b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
index 1116575..b88bf0a 100644
--- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
+++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
@@ -93,7 +93,7 @@ public class SSTableOfflineRelevel
         Schema.instance.loadFromDisk(false);
 
         if (Schema.instance.getTableMetadataRef(keyspace, columnfamily) == 
null)
-            throw new IllegalArgumentException(String.format("Unknown 
keyspace/columnFamily %s.%s",
+            throw new IllegalArgumentException(String.format("Unknown 
keyspace/table %s.%s",
                     keyspace,
                     columnfamily));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java 
b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 20c992c..b88cac5 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Row;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java 
b/src/java/org/apache/cassandra/transport/Event.java
index ed77e59..aa1eaa2 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -23,7 +23,10 @@ import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.base.Objects;
+
 import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
 
 public abstract class Event
 {
@@ -267,6 +270,16 @@ public abstract class Event
             this(change, Target.KEYSPACE, keyspace, null);
         }
 
+        public static SchemaChange forFunction(Change change, UDFunction 
function)
+        {
+            return new SchemaChange(change, Target.FUNCTION, 
function.name().keyspace, function.name().name, function.argumentsList());
+        }
+
+        public static SchemaChange forAggregate(Change change, UDAggregate 
aggregate)
+        {
+            return new SchemaChange(change, Target.AGGREGATE, 
aggregate.name().keyspace, aggregate.name().name, aggregate.argumentsList());
+        }
+
         // Assumes the type has already been deserialized
         public static SchemaChange deserializeEvent(ByteBuf cb, 
ProtocolVersion version)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java 
b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index afd7659..7fd6b9c 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -28,7 +28,6 @@ import io.netty.buffer.ByteBuf;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
-import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
@@ -173,48 +172,47 @@ public class BatchMessage extends Message.Request
             }
 
             QueryHandler handler = ClientState.getCQLQueryHandler();
-            List<ParsedStatement.Prepared> prepared = new 
ArrayList<>(queryOrIdList.size());
+            List<CQLStatement> prepared = new 
ArrayList<>(queryOrIdList.size());
             for (int i = 0; i < queryOrIdList.size(); i++)
             {
                 Object query = queryOrIdList.get(i);
-                ParsedStatement.Prepared p;
+                CQLStatement statement;
                 if (query instanceof String)
                 {
-                    p = QueryProcessor.parseStatement((String)query,
-                                                      
state.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()));
+                    statement = QueryProcessor.parseStatement((String)query, 
state.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()));
                 }
                 else
                 {
-                    p = handler.getPrepared((MD5Digest)query);
-                    if (p == null)
+                    statement = 
handler.getPrepared((MD5Digest)query).statement;
+                    if (null == statement)
                         throw new 
PreparedQueryNotFoundException((MD5Digest)query);
                 }
 
                 List<ByteBuffer> queryValues = values.get(i);
-                if (queryValues.size() != p.statement.getBoundTerms())
+                if (queryValues.size() != statement.getBindVariables().size())
                     throw new InvalidRequestException(String.format("There 
were %d markers(?) in CQL but %d bound variables",
-                                                                    
p.statement.getBoundTerms(),
+                                                                    
statement.getBindVariables().size(),
                                                                     
queryValues.size()));
 
-                prepared.add(p);
+                prepared.add(statement);
             }
 
             BatchQueryOptions batchOptions = 
BatchQueryOptions.withPerStatementVariables(options, values, queryOrIdList);
             List<ModificationStatement> statements = new 
ArrayList<>(prepared.size());
             for (int i = 0; i < prepared.size(); i++)
             {
-                ParsedStatement.Prepared p = prepared.get(i);
-                batchOptions.prepareStatement(i, p.boundNames);
+                CQLStatement statement = prepared.get(i);
+                batchOptions.prepareStatement(i, statement.getBindVariables());
 
-                if (!(p.statement instanceof ModificationStatement))
+                if (!(statement instanceof ModificationStatement))
                     throw new InvalidRequestException("Invalid statement in 
batch: only UPDATE, INSERT and DELETE statements are allowed.");
 
-                statements.add((ModificationStatement)p.statement);
+                statements.add((ModificationStatement) statement);
             }
 
             // Note: It's ok at this point to pass a bogus value for the 
number of bound terms in the BatchState ctor
             // (and no value would be really correct, so we prefer passing a 
clearly wrong one).
-            BatchStatement batch = new BatchStatement(-1, batchType, 
statements, Attributes.none());
+            BatchStatement batch = new BatchStatement(batchType, 
VariableSpecifications.empty(), statements, Attributes.none());
             Message.Response response = handler.processBatch(batch, state, 
batchOptions, getCustomPayload(), queryStartNanoTime);
 
             if (tracingId != null)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to