http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index ee0974f..6716652 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -23,7 +23,6 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import com.google.common.collect.ImmutableList; import com.google.common.collect.MapDifference; @@ -32,7 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.*; -import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder; import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.functions.*; import org.apache.cassandra.cql3.statements.SelectStatement; @@ -42,19 +42,17 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.view.View; -import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; import static java.lang.String.format; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; -import static org.apache.cassandra.schema.CQLTypeParser.parse; /** * system_schema.* tables and methods for manipulating them. @@ -83,163 +81,167 @@ public final class SchemaKeyspace public static final List<String> ALL = ImmutableList.of(KEYSPACES, TABLES, COLUMNS, DROPPED_COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES); - private static final CFMetaData Keyspaces = - compile(KEYSPACES, - "keyspace definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "durable_writes boolean," - + "replication frozen<map<text, text>>," - + "PRIMARY KEY ((keyspace_name)))"); - - private static final CFMetaData Tables = - compile(TABLES, - "table definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "bloom_filter_fp_chance double," - + "caching frozen<map<text, text>>," - + "comment text," - + "compaction frozen<map<text, text>>," - + "compression frozen<map<text, text>>," - + "crc_check_chance double," - + "dclocal_read_repair_chance double," - + "default_time_to_live int," - + "extensions frozen<map<text, blob>>," - + "flags frozen<set<text>>," // SUPER, COUNTER, DENSE, COMPOUND - + "gc_grace_seconds int," - + "id uuid," - + "max_index_interval int," - + "memtable_flush_period_in_ms int," - + "min_index_interval int," - + "read_repair_chance double," - + "speculative_retry text," - + "cdc boolean," - + "PRIMARY KEY ((keyspace_name), table_name))"); - - private static final CFMetaData Columns = - compile(COLUMNS, - "column definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "column_name text," - + "clustering_order text," - + "column_name_bytes blob," - + "kind text," - + "position int," - + "type text," - + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); - - private static final CFMetaData DroppedColumns = - compile(DROPPED_COLUMNS, - "dropped column registry", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "column_name text," - + "dropped_time timestamp," - + "type text," - + "kind text," - + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); - - private static final CFMetaData Triggers = - compile(TRIGGERS, - "trigger definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "trigger_name text," - + "options frozen<map<text, text>>," - + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))"); - - private static final CFMetaData Views = - compile(VIEWS, - "view definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "view_name text," - + "base_table_id uuid," - + "base_table_name text," - + "where_clause text," - + "bloom_filter_fp_chance double," - + "caching frozen<map<text, text>>," - + "comment text," - + "compaction frozen<map<text, text>>," - + "compression frozen<map<text, text>>," - + "crc_check_chance double," - + "dclocal_read_repair_chance double," - + "default_time_to_live int," - + "extensions frozen<map<text, blob>>," - + "gc_grace_seconds int," - + "id uuid," - + "include_all_columns boolean," - + "max_index_interval int," - + "memtable_flush_period_in_ms int," - + "min_index_interval int," - + "read_repair_chance double," - + "speculative_retry text," - + "cdc boolean," - + "PRIMARY KEY ((keyspace_name), view_name))"); - - private static final CFMetaData Indexes = - compile(INDEXES, - "secondary index definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "index_name text," - + "kind text," - + "options frozen<map<text, text>>," - + "PRIMARY KEY ((keyspace_name), table_name, index_name))"); - - private static final CFMetaData Types = - compile(TYPES, - "user defined type definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "type_name text," - + "field_names frozen<list<text>>," - + "field_types frozen<list<text>>," - + "PRIMARY KEY ((keyspace_name), type_name))"); - - private static final CFMetaData Functions = - compile(FUNCTIONS, - "user defined function definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "function_name text," - + "argument_types frozen<list<text>>," - + "argument_names frozen<list<text>>," - + "body text," - + "language text," - + "return_type text," - + "called_on_null_input boolean," - + "PRIMARY KEY ((keyspace_name), function_name, argument_types))"); - - private static final CFMetaData Aggregates = - compile(AGGREGATES, - "user defined aggregate definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "aggregate_name text," - + "argument_types frozen<list<text>>," - + "final_func text," - + "initcond text," - + "return_type text," - + "state_func text," - + "state_type text," - + "PRIMARY KEY ((keyspace_name), aggregate_name, argument_types))"); - - public static final List<CFMetaData> ALL_TABLE_METADATA = + private static final TableMetadata Keyspaces = + parse(KEYSPACES, + "keyspace definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "durable_writes boolean," + + "replication frozen<map<text, text>>," + + "PRIMARY KEY ((keyspace_name)))"); + + private static final TableMetadata Tables = + parse(TABLES, + "table definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "bloom_filter_fp_chance double," + + "caching frozen<map<text, text>>," + + "comment text," + + "compaction frozen<map<text, text>>," + + "compression frozen<map<text, text>>," + + "crc_check_chance double," + + "dclocal_read_repair_chance double," + + "default_time_to_live int," + + "extensions frozen<map<text, blob>>," + + "flags frozen<set<text>>," // SUPER, COUNTER, DENSE, COMPOUND + + "gc_grace_seconds int," + + "id uuid," + + "max_index_interval int," + + "memtable_flush_period_in_ms int," + + "min_index_interval int," + + "read_repair_chance double," + + "speculative_retry text," + + "cdc boolean," + + "PRIMARY KEY ((keyspace_name), table_name))"); + + private static final TableMetadata Columns = + parse(COLUMNS, + "column definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "column_name text," + + "clustering_order text," + + "column_name_bytes blob," + + "kind text," + + "position int," + + "type text," + + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); + + private static final TableMetadata DroppedColumns = + parse(DROPPED_COLUMNS, + "dropped column registry", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "column_name text," + + "dropped_time timestamp," + + "type text," + + "kind text," + + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); + + private static final TableMetadata Triggers = + parse(TRIGGERS, + "trigger definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "trigger_name text," + + "options frozen<map<text, text>>," + + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))"); + + private static final TableMetadata Views = + parse(VIEWS, + "view definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "view_name text," + + "base_table_id uuid," + + "base_table_name text," + + "where_clause text," + + "bloom_filter_fp_chance double," + + "caching frozen<map<text, text>>," + + "comment text," + + "compaction frozen<map<text, text>>," + + "compression frozen<map<text, text>>," + + "crc_check_chance double," + + "dclocal_read_repair_chance double," + + "default_time_to_live int," + + "extensions frozen<map<text, blob>>," + + "gc_grace_seconds int," + + "id uuid," + + "include_all_columns boolean," + + "max_index_interval int," + + "memtable_flush_period_in_ms int," + + "min_index_interval int," + + "read_repair_chance double," + + "speculative_retry text," + + "cdc boolean," + + "PRIMARY KEY ((keyspace_name), view_name))"); + + private static final TableMetadata Indexes = + parse(INDEXES, + "secondary index definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "index_name text," + + "kind text," + + "options frozen<map<text, text>>," + + "PRIMARY KEY ((keyspace_name), table_name, index_name))"); + + private static final TableMetadata Types = + parse(TYPES, + "user defined type definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "type_name text," + + "field_names frozen<list<text>>," + + "field_types frozen<list<text>>," + + "PRIMARY KEY ((keyspace_name), type_name))"); + + private static final TableMetadata Functions = + parse(FUNCTIONS, + "user defined function definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "function_name text," + + "argument_types frozen<list<text>>," + + "argument_names frozen<list<text>>," + + "body text," + + "language text," + + "return_type text," + + "called_on_null_input boolean," + + "PRIMARY KEY ((keyspace_name), function_name, argument_types))"); + + private static final TableMetadata Aggregates = + parse(AGGREGATES, + "user defined aggregate definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "aggregate_name text," + + "argument_types frozen<list<text>>," + + "final_func text," + + "initcond text," + + "return_type text," + + "state_func text," + + "state_type text," + + "PRIMARY KEY ((keyspace_name), aggregate_name, argument_types))"); + + private static final List<TableMetadata> ALL_TABLE_METADATA = ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, Views, Types, Functions, Aggregates, Indexes); - private static CFMetaData compile(String name, String description, String schema) + private static TableMetadata parse(String name, String description, String cql) { - return CFMetaData.compile(String.format(schema, name), SchemaConstants.SCHEMA_KEYSPACE_NAME) - .comment(description) - .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7)); + return CreateTableStatement.parse(format(cql, name), SchemaConstants.SCHEMA_KEYSPACE_NAME) + .id(TableId.forSystemTable(SchemaConstants.SCHEMA_KEYSPACE_NAME, name)) + .dcLocalReadRepairChance(0.0) + .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7)) + .memtableFlushPeriod((int) TimeUnit.HOURS.toMillis(1)) + .comment(description) + .build(); } public static KeyspaceMetadata metadata() @@ -252,8 +254,8 @@ public final class SchemaKeyspace */ public static void saveSystemKeyspacesSchema() { - KeyspaceMetadata system = Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME); - KeyspaceMetadata schema = Schema.instance.getKSMetaData(SchemaConstants.SCHEMA_KEYSPACE_NAME); + KeyspaceMetadata system = Schema.instance.getKeyspaceMetadata(SchemaConstants.SYSTEM_KEYSPACE_NAME); + KeyspaceMetadata schema = Schema.instance.getKeyspaceMetadata(SchemaConstants.SCHEMA_KEYSPACE_NAME); long timestamp = FBUtilities.timestampMicros(); @@ -285,7 +287,7 @@ public final class SchemaKeyspace * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest * will be converted into UUID which would act as content-based version of the schema. */ - public static UUID calculateSchemaDigest() + static UUID calculateSchemaDigest() { MessageDigest digest; try @@ -337,10 +339,10 @@ public final class SchemaKeyspace private static ReadCommand getReadCommandForTableSchema(String schemaTableName) { ColumnFamilyStore cfs = getSchemaCFS(schemaTableName); - return PartitionRangeReadCommand.allDataRead(cfs.metadata, FBUtilities.nowInSeconds()); + return PartitionRangeReadCommand.allDataRead(cfs.metadata(), FBUtilities.nowInSeconds()); } - public static Collection<Mutation> convertSchemaToMutations() + static Collection<Mutation> convertSchemaToMutations() { Map<DecoratedKey, Mutation> mutationMap = new HashMap<>(); @@ -364,12 +366,7 @@ public final class SchemaKeyspace continue; DecoratedKey key = partition.partitionKey(); - Mutation mutation = mutationMap.get(key); - if (mutation == null) - { - mutation = new Mutation(SchemaConstants.SCHEMA_KEYSPACE_NAME, key); - mutationMap.put(key, mutation); - } + Mutation mutation = mutationMap.computeIfAbsent(key, k -> new Mutation(SchemaConstants.SCHEMA_KEYSPACE_NAME, key)); mutation.add(makeUpdateForSchema(partition, cmd.columnFilter())); } @@ -387,13 +384,13 @@ public final class SchemaKeyspace // This method is used during schema migration tasks, and if cdc is disabled, we want to force excluding the // 'cdc' column from the TABLES schema table because it is problematic if received by older nodes (see #12236 // and #12697). Otherwise though, we just simply "buffer" the content of the partition into a PartitionUpdate. - if (DatabaseDescriptor.isCDCEnabled() || !partition.metadata().cfName.equals(TABLES)) + if (DatabaseDescriptor.isCDCEnabled() || !partition.metadata().name.equals(TABLES)) return PartitionUpdate.fromIterator(partition, filter); // We want to skip the 'cdc' column. A simple solution for that is based on the fact that // 'PartitionUpdate.fromIterator()' will ignore any columns that are marked as 'fetched' but not 'queried'. ColumnFilter.Builder builder = ColumnFilter.allRegularColumnsBuilder(partition.metadata()); - for (ColumnDefinition column : filter.fetchedColumns()) + for (ColumnMetadata column : filter.fetchedColumns()) { if (!column.name.toString().equals("cdc")) builder.add(column); @@ -411,14 +408,15 @@ public final class SchemaKeyspace * Schema entities to mutations */ - private static DecoratedKey decorate(CFMetaData metadata, Object value) + @SuppressWarnings("unchecked") + private static DecoratedKey decorate(TableMetadata metadata, Object value) { - return metadata.decorateKey(((AbstractType)metadata.getKeyValidator()).decompose(value)); + return metadata.partitioner.decorateKey(((AbstractType)metadata.partitionKeyType).decompose(value)); } - public static Mutation.SimpleBuilder makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp) + static Mutation.SimpleBuilder makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp) { - Mutation.SimpleBuilder builder = Mutation.simpleBuilder(Keyspaces.ksName, decorate(Keyspaces, name)) + Mutation.SimpleBuilder builder = Mutation.simpleBuilder(Keyspaces.keyspace, decorate(Keyspaces, name)) .timestamp(timestamp); builder.update(Keyspaces) @@ -429,7 +427,7 @@ public final class SchemaKeyspace return builder; } - public static Mutation.SimpleBuilder makeCreateKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp) + static Mutation.SimpleBuilder makeCreateKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp) { Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); @@ -442,18 +440,18 @@ public final class SchemaKeyspace return builder; } - public static Mutation.SimpleBuilder makeDropKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp) + static Mutation.SimpleBuilder makeDropKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp) { Mutation.SimpleBuilder builder = Mutation.simpleBuilder(SchemaConstants.SCHEMA_KEYSPACE_NAME, decorate(Keyspaces, keyspace.name)) .timestamp(timestamp); - for (CFMetaData schemaTable : ALL_TABLE_METADATA) + for (TableMetadata schemaTable : ALL_TABLE_METADATA) builder.update(schemaTable).delete(); return builder; } - public static Mutation.SimpleBuilder makeCreateTypeMutation(KeyspaceMetadata keyspace, UserType type, long timestamp) + static Mutation.SimpleBuilder makeCreateTypeMutation(KeyspaceMetadata keyspace, UserType type, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); @@ -469,7 +467,7 @@ public final class SchemaKeyspace .add("field_types", type.fieldTypes().stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(toList())); } - public static Mutation.SimpleBuilder dropTypeFromSchemaMutation(KeyspaceMetadata keyspace, UserType type, long timestamp) + static Mutation.SimpleBuilder dropTypeFromSchemaMutation(KeyspaceMetadata keyspace, UserType type, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); @@ -477,7 +475,7 @@ public final class SchemaKeyspace return builder; } - public static Mutation.SimpleBuilder makeCreateTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp) + static Mutation.SimpleBuilder makeCreateTableMutation(KeyspaceMetadata keyspace, TableMetadata table, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); @@ -485,27 +483,27 @@ public final class SchemaKeyspace return builder; } - static void addTableToSchemaMutation(CFMetaData table, boolean withColumnsAndTriggers, Mutation.SimpleBuilder builder) + static void addTableToSchemaMutation(TableMetadata table, boolean withColumnsAndTriggers, Mutation.SimpleBuilder builder) { Row.SimpleBuilder rowBuilder = builder.update(Tables) - .row(table.cfName) - .add("id", table.cfId) - .add("flags", CFMetaData.flagsToStrings(table.flags())); + .row(table.name) + .add("id", table.id.asUUID()) + .add("flags", TableMetadata.Flag.toStringSet(table.flags)); addTableParamsToRowBuilder(table.params, rowBuilder); if (withColumnsAndTriggers) { - for (ColumnDefinition column : table.allColumns()) + for (ColumnMetadata column : table.columns()) addColumnToSchemaMutation(table, column, builder); - for (CFMetaData.DroppedColumn column : table.getDroppedColumns().values()) + for (DroppedColumn column : table.droppedColumns.values()) addDroppedColumnToSchemaMutation(table, column, builder); - for (TriggerMetadata trigger : table.getTriggers()) + for (TriggerMetadata trigger : table.triggers) addTriggerToSchemaMutation(table, trigger, builder); - for (IndexMetadata index : table.getIndexes()) + for (IndexMetadata index : table.indexes) addIndexToSchemaMutation(table, index, builder); } } @@ -534,43 +532,42 @@ public final class SchemaKeyspace builder.add("cdc", params.cdc); } - public static Mutation.SimpleBuilder makeUpdateTableMutation(KeyspaceMetadata keyspace, - CFMetaData oldTable, - CFMetaData newTable, + static Mutation.SimpleBuilder makeUpdateTableMutation(KeyspaceMetadata keyspace, + TableMetadata oldTable, + TableMetadata newTable, long timestamp) { Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); addTableToSchemaMutation(newTable, false, builder); - MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(oldTable.getColumnMetadata(), - newTable.getColumnMetadata()); + MapDifference<ByteBuffer, ColumnMetadata> columnDiff = Maps.difference(oldTable.columns, newTable.columns); // columns that are no longer needed - for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values()) + for (ColumnMetadata column : columnDiff.entriesOnlyOnLeft().values()) dropColumnFromSchemaMutation(oldTable, column, builder); // newly added columns - for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values()) + for (ColumnMetadata column : columnDiff.entriesOnlyOnRight().values()) addColumnToSchemaMutation(newTable, column, builder); // old columns with updated attributes for (ByteBuffer name : columnDiff.entriesDiffering().keySet()) - addColumnToSchemaMutation(newTable, newTable.getColumnDefinition(name), builder); + addColumnToSchemaMutation(newTable, newTable.getColumn(name), builder); // dropped columns - MapDifference<ByteBuffer, CFMetaData.DroppedColumn> droppedColumnDiff = - Maps.difference(oldTable.getDroppedColumns(), newTable.getDroppedColumns()); + MapDifference<ByteBuffer, DroppedColumn> droppedColumnDiff = + Maps.difference(oldTable.droppedColumns, newTable.droppedColumns); // newly dropped columns - for (CFMetaData.DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values()) + for (DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values()) addDroppedColumnToSchemaMutation(newTable, column, builder); // columns added then dropped again for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet()) - addDroppedColumnToSchemaMutation(newTable, newTable.getDroppedColumns().get(name), builder); + addDroppedColumnToSchemaMutation(newTable, newTable.droppedColumns.get(name), builder); - MapDifference<String, TriggerMetadata> triggerDiff = triggersDiff(oldTable.getTriggers(), newTable.getTriggers()); + MapDifference<String, TriggerMetadata> triggerDiff = triggersDiff(oldTable.triggers, newTable.triggers); // dropped triggers for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnLeft().values()) @@ -580,8 +577,7 @@ public final class SchemaKeyspace for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnRight().values()) addTriggerToSchemaMutation(newTable, trigger, builder); - MapDifference<String, IndexMetadata> indexesDiff = indexesDiff(oldTable.getIndexes(), - newTable.getIndexes()); + MapDifference<String, IndexMetadata> indexesDiff = indexesDiff(oldTable.indexes, newTable.indexes); // dropped indexes for (IndexMetadata index : indexesDiff.entriesOnlyOnLeft().values()) @@ -620,33 +616,33 @@ public final class SchemaKeyspace return Maps.difference(beforeMap, afterMap); } - public static Mutation.SimpleBuilder makeDropTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp) + static Mutation.SimpleBuilder makeDropTableMutation(KeyspaceMetadata keyspace, TableMetadata table, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); - builder.update(Tables).row(table.cfName).delete(); + builder.update(Tables).row(table.name).delete(); - for (ColumnDefinition column : table.allColumns()) + for (ColumnMetadata column : table.columns()) dropColumnFromSchemaMutation(table, column, builder); - for (TriggerMetadata trigger : table.getTriggers()) + for (TriggerMetadata trigger : table.triggers) dropTriggerFromSchemaMutation(table, trigger, builder); - for (IndexMetadata index : table.getIndexes()) + for (IndexMetadata index : table.indexes) dropIndexFromSchemaMutation(table, index, builder); return builder; } - private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, Mutation.SimpleBuilder builder) + private static void addColumnToSchemaMutation(TableMetadata table, ColumnMetadata column, Mutation.SimpleBuilder builder) { AbstractType<?> type = column.type; if (type instanceof ReversedType) type = ((ReversedType) type).baseType; builder.update(Columns) - .row(table.cfName, column.name.toString()) + .row(table.name, column.name.toString()) .add("column_name_bytes", column.name.bytes) .add("kind", column.kind.toString().toLowerCase()) .add("position", column.position()) @@ -654,34 +650,34 @@ public final class SchemaKeyspace .add("type", type.asCQL3Type().toString()); } - private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, Mutation.SimpleBuilder builder) + private static void dropColumnFromSchemaMutation(TableMetadata table, ColumnMetadata column, Mutation.SimpleBuilder builder) { // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference). - builder.update(Columns).row(table.cfName, column.name.toString()).delete(); + builder.update(Columns).row(table.name, column.name.toString()).delete(); } - private static void addDroppedColumnToSchemaMutation(CFMetaData table, CFMetaData.DroppedColumn column, Mutation.SimpleBuilder builder) + private static void addDroppedColumnToSchemaMutation(TableMetadata table, DroppedColumn column, Mutation.SimpleBuilder builder) { builder.update(DroppedColumns) - .row(table.cfName, column.name) + .row(table.name, column.column.name.toString()) .add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime))) - .add("type", expandUserTypes(column.type).asCQL3Type().toString()) - .add("kind", column.kind.toString().toLowerCase()); + .add("type", expandUserTypes(column.column.type).asCQL3Type().toString()) + .add("kind", column.column.kind.toString().toLowerCase()); } - private static void addTriggerToSchemaMutation(CFMetaData table, TriggerMetadata trigger, Mutation.SimpleBuilder builder) + private static void addTriggerToSchemaMutation(TableMetadata table, TriggerMetadata trigger, Mutation.SimpleBuilder builder) { builder.update(Triggers) - .row(table.cfName, trigger.name) + .row(table.name, trigger.name) .add("options", Collections.singletonMap("class", trigger.classOption)); } - private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerMetadata trigger, Mutation.SimpleBuilder builder) + private static void dropTriggerFromSchemaMutation(TableMetadata table, TriggerMetadata trigger, Mutation.SimpleBuilder builder) { - builder.update(Triggers).row(table.cfName, trigger.name).delete(); + builder.update(Triggers).row(table.name, trigger.name).delete(); } - public static Mutation.SimpleBuilder makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp) + static Mutation.SimpleBuilder makeCreateViewMutation(KeyspaceMetadata keyspace, ViewMetadata view, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); @@ -689,110 +685,106 @@ public final class SchemaKeyspace return builder; } - private static void addViewToSchemaMutation(ViewDefinition view, boolean includeColumns, Mutation.SimpleBuilder builder) + private static void addViewToSchemaMutation(ViewMetadata view, boolean includeColumns, Mutation.SimpleBuilder builder) { - CFMetaData table = view.metadata; + TableMetadata table = view.metadata; Row.SimpleBuilder rowBuilder = builder.update(Views) - .row(view.viewName) + .row(view.name) .add("include_all_columns", view.includeAllColumns) - .add("base_table_id", view.baseTableId) - .add("base_table_name", view.baseTableMetadata().cfName) + .add("base_table_id", view.baseTableId.asUUID()) + .add("base_table_name", view.baseTableName) .add("where_clause", view.whereClause) - .add("id", table.cfId); + .add("id", table.id.asUUID()); addTableParamsToRowBuilder(table.params, rowBuilder); if (includeColumns) { - for (ColumnDefinition column : table.allColumns()) + for (ColumnMetadata column : table.columns()) addColumnToSchemaMutation(table, column, builder); - for (CFMetaData.DroppedColumn column : table.getDroppedColumns().values()) + for (DroppedColumn column : table.droppedColumns.values()) addDroppedColumnToSchemaMutation(table, column, builder); } } - public static Mutation.SimpleBuilder makeDropViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp) + static Mutation.SimpleBuilder makeDropViewMutation(KeyspaceMetadata keyspace, ViewMetadata view, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); - builder.update(Views).row(view.viewName).delete(); + builder.update(Views).row(view.name).delete(); - CFMetaData table = view.metadata; - for (ColumnDefinition column : table.allColumns()) + TableMetadata table = view.metadata; + for (ColumnMetadata column : table.columns()) dropColumnFromSchemaMutation(table, column, builder); - for (IndexMetadata index : table.getIndexes()) + for (IndexMetadata index : table.indexes) dropIndexFromSchemaMutation(table, index, builder); return builder; } - public static Mutation.SimpleBuilder makeUpdateViewMutation(KeyspaceMetadata keyspace, - ViewDefinition oldView, - ViewDefinition newView, + static Mutation.SimpleBuilder makeUpdateViewMutation(KeyspaceMetadata keyspace, + ViewMetadata oldView, + ViewMetadata newView, long timestamp) { Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); addViewToSchemaMutation(newView, false, builder); - MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(oldView.metadata.getColumnMetadata(), - newView.metadata.getColumnMetadata()); + MapDifference<ByteBuffer, ColumnMetadata> columnDiff = Maps.difference(oldView.metadata.columns, + newView.metadata.columns); // columns that are no longer needed - for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values()) + for (ColumnMetadata column : columnDiff.entriesOnlyOnLeft().values()) dropColumnFromSchemaMutation(oldView.metadata, column, builder); // newly added columns - for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values()) + for (ColumnMetadata column : columnDiff.entriesOnlyOnRight().values()) addColumnToSchemaMutation(newView.metadata, column, builder); // old columns with updated attributes for (ByteBuffer name : columnDiff.entriesDiffering().keySet()) - addColumnToSchemaMutation(newView.metadata, newView.metadata.getColumnDefinition(name), builder); + addColumnToSchemaMutation(newView.metadata, newView.metadata.getColumn(name), builder); // dropped columns - MapDifference<ByteBuffer, CFMetaData.DroppedColumn> droppedColumnDiff = - Maps.difference(oldView.metadata.getDroppedColumns(), oldView.metadata.getDroppedColumns()); + MapDifference<ByteBuffer, DroppedColumn> droppedColumnDiff = + Maps.difference(oldView.metadata.droppedColumns, oldView.metadata.droppedColumns); // newly dropped columns - for (CFMetaData.DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values()) + for (DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values()) addDroppedColumnToSchemaMutation(oldView.metadata, column, builder); // columns added then dropped again for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet()) - addDroppedColumnToSchemaMutation(newView.metadata, newView.metadata.getDroppedColumns().get(name), builder); + addDroppedColumnToSchemaMutation(newView.metadata, newView.metadata.droppedColumns.get(name), builder); return builder; } - private static void addIndexToSchemaMutation(CFMetaData table, - IndexMetadata index, - Mutation.SimpleBuilder builder) + private static void addIndexToSchemaMutation(TableMetadata table, IndexMetadata index, Mutation.SimpleBuilder builder) { builder.update(Indexes) - .row(table.cfName, index.name) + .row(table.name, index.name) .add("kind", index.kind.toString()) .add("options", index.options); } - private static void dropIndexFromSchemaMutation(CFMetaData table, - IndexMetadata index, - Mutation.SimpleBuilder builder) + private static void dropIndexFromSchemaMutation(TableMetadata table, IndexMetadata index, Mutation.SimpleBuilder builder) { - builder.update(Indexes).row(table.cfName, index.name).delete(); + builder.update(Indexes).row(table.name, index.name).delete(); } - private static void addUpdatedIndexToSchemaMutation(CFMetaData table, + private static void addUpdatedIndexToSchemaMutation(TableMetadata table, IndexMetadata index, Mutation.SimpleBuilder builder) { addIndexToSchemaMutation(table, index, builder); } - public static Mutation.SimpleBuilder makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) + static Mutation.SimpleBuilder makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); @@ -803,7 +795,7 @@ public final class SchemaKeyspace static void addFunctionToSchemaMutation(UDFunction function, Mutation.SimpleBuilder builder) { builder.update(Functions) - .row(function.name().name, functionArgumentsList(function)) + .row(function.name().name, function.argumentsList()) .add("body", function.body()) .add("language", function.language()) .add("return_type", function.returnType().asCQL3Type().toString()) @@ -823,24 +815,15 @@ public final class SchemaKeyspace } } - private static List<String> functionArgumentsList(AbstractFunction fun) - { - return fun.argTypes() - .stream() - .map(AbstractType::asCQL3Type) - .map(CQL3Type::toString) - .collect(toList()); - } - - public static Mutation.SimpleBuilder makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) + static Mutation.SimpleBuilder makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); - builder.update(Functions).row(function.name().name, functionArgumentsList(function)).delete(); + builder.update(Functions).row(function.name().name, function.argumentsList()).delete(); return builder; } - public static Mutation.SimpleBuilder makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) + static Mutation.SimpleBuilder makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); @@ -851,7 +834,7 @@ public final class SchemaKeyspace static void addAggregateToSchemaMutation(UDAggregate aggregate, Mutation.SimpleBuilder builder) { builder.update(Aggregates) - .row(aggregate.name().name, functionArgumentsList(aggregate)) + .row(aggregate.name().name, aggregate.argumentsList()) .add("return_type", aggregate.returnType().asCQL3Type().toString()) .add("state_func", aggregate.stateFunction().name().name) .add("state_type", aggregate.stateType().asCQL3Type().toString()) @@ -862,11 +845,11 @@ public final class SchemaKeyspace : null); } - public static Mutation.SimpleBuilder makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) + static Mutation.SimpleBuilder makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); - builder.update(Aggregates).row(aggregate.name().name, functionArgumentsList(aggregate)).delete(); + builder.update(Aggregates).row(aggregate.name().name, aggregate.argumentsList()).delete(); return builder; } @@ -874,7 +857,7 @@ public final class SchemaKeyspace * Fetching schema */ - public static Keyspaces fetchNonSystemKeyspaces() + static Keyspaces fetchNonSystemKeyspaces() { return fetchKeyspacesWithout(SchemaConstants.SYSTEM_KEYSPACE_NAMES); } @@ -893,20 +876,6 @@ public final class SchemaKeyspace return keyspaces.build(); } - private static Keyspaces fetchKeyspacesOnly(Set<String> includedKeyspaceNames) - { - /* - * We know the keyspace names we are going to query, but we still want to run the SELECT IN - * query, to filter out the keyspaces that had been dropped by the applied mutation set. - */ - String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES); - - Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder(); - for (UntypedResultSet.Row row : query(query, new ArrayList<>(includedKeyspaceNames))) - keyspaces.add(fetchKeyspace(row.getString("keyspace_name"))); - return keyspaces.build(); - } - private static KeyspaceMetadata fetchKeyspace(String keyspaceName) { KeyspaceParams params = fetchKeyspaceParams(keyspaceName); @@ -952,7 +921,7 @@ public final class SchemaKeyspace return tables.build(); } - private static CFMetaData fetchTable(String keyspaceName, String tableName, Types types) + private static TableMetadata fetchTable(String keyspaceName, String tableName, Types types) { String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES); UntypedResultSet rows = query(query, keyspaceName, tableName); @@ -960,43 +929,17 @@ public final class SchemaKeyspace throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, tableName)); UntypedResultSet.Row row = rows.one(); - UUID id = row.getUUID("id"); - - Set<CFMetaData.Flag> flags = CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance)); - - boolean isSuper = flags.contains(CFMetaData.Flag.SUPER); - boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER); - boolean isDense = flags.contains(CFMetaData.Flag.DENSE); - boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND); - - List<ColumnDefinition> columns = fetchColumns(keyspaceName, tableName, types); - if (!columns.stream().anyMatch(ColumnDefinition::isPartitionKey)) - { - String msg = String.format("Table %s.%s did not have any partition key columns in the schema tables", keyspaceName, tableName); - throw new AssertionError(msg); - } - - Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, tableName); - Indexes indexes = fetchIndexes(keyspaceName, tableName); - Triggers triggers = fetchTriggers(keyspaceName, tableName); - - return CFMetaData.create(keyspaceName, - tableName, - id, - isDense, - isCompound, - isSuper, - isCounter, - false, - columns, - DatabaseDescriptor.getPartitioner()) - .params(createTableParamsFromRow(row)) - .droppedColumns(droppedColumns) - .indexes(indexes) - .triggers(triggers); + return TableMetadata.builder(keyspaceName, tableName, TableId.fromUUID(row.getUUID("id"))) + .flags(TableMetadata.Flag.fromStringSet(row.getFrozenSet("flags", UTF8Type.instance))) + .params(createTableParamsFromRow(row)) + .addColumns(fetchColumns(keyspaceName, tableName, types)) + .droppedColumns(fetchDroppedColumns(keyspaceName, tableName)) + .indexes(fetchIndexes(keyspaceName, tableName)) + .triggers(fetchTriggers(keyspaceName, tableName)) + .build(); } - public static TableParams createTableParamsFromRow(UntypedResultSet.Row row) + static TableParams createTableParamsFromRow(UntypedResultSet.Row row) { return TableParams.builder() .bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance")) @@ -1014,29 +957,29 @@ public final class SchemaKeyspace .readRepairChance(row.getDouble("read_repair_chance")) .crcCheckChance(row.getDouble("crc_check_chance")) .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))) - .cdc(row.has("cdc") ? row.getBoolean("cdc") : false) + .cdc(row.has("cdc") && row.getBoolean("cdc")) .build(); } - private static List<ColumnDefinition> fetchColumns(String keyspace, String table, Types types) + private static List<ColumnMetadata> fetchColumns(String keyspace, String table, Types types) { String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS); - List<ColumnDefinition> columns = new ArrayList<>(); + List<ColumnMetadata> columns = new ArrayList<>(); query(query, keyspace, table).forEach(row -> columns.add(createColumnFromRow(row, types))); return columns; } - public static ColumnDefinition createColumnFromRow(UntypedResultSet.Row row, Types types) + static ColumnMetadata createColumnFromRow(UntypedResultSet.Row row, Types types) { String keyspace = row.getString("keyspace_name"); String table = row.getString("table_name"); - ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase()); + ColumnMetadata.Kind kind = ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase()); int position = row.getInt("position"); ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase()); - AbstractType<?> type = parse(keyspace, row.getString("type"), types); + AbstractType<?> type = CQLTypeParser.parse(keyspace, row.getString("type"), types); if (order == ClusteringOrder.DESC) type = ReversedType.getInstance(type); @@ -1044,38 +987,41 @@ public final class SchemaKeyspace row.getBytes("column_name_bytes"), row.getString("column_name")); - return new ColumnDefinition(keyspace, table, name, type, position, kind); + return new ColumnMetadata(keyspace, table, name, type, position, kind); } - private static Map<ByteBuffer, CFMetaData.DroppedColumn> fetchDroppedColumns(String keyspace, String table) + private static Map<ByteBuffer, DroppedColumn> fetchDroppedColumns(String keyspace, String table) { String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, DROPPED_COLUMNS); - Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>(); + Map<ByteBuffer, DroppedColumn> columns = new HashMap<>(); for (UntypedResultSet.Row row : query(query, keyspace, table)) { - CFMetaData.DroppedColumn column = createDroppedColumnFromRow(row); - columns.put(UTF8Type.instance.decompose(column.name), column); + DroppedColumn column = createDroppedColumnFromRow(row); + columns.put(column.column.name.bytes, column); } return columns; } - private static CFMetaData.DroppedColumn createDroppedColumnFromRow(UntypedResultSet.Row row) + private static DroppedColumn createDroppedColumnFromRow(UntypedResultSet.Row row) { String keyspace = row.getString("keyspace_name"); + String table = row.getString("table_name"); String name = row.getString("column_name"); /* * we never store actual UDT names in dropped column types (so that we can safely drop types if nothing refers to * them anymore), so before storing dropped columns in schema we expand UDTs to tuples. See expandUserTypes method. * Because of that, we can safely pass Types.none() to parse() */ - AbstractType<?> type = parse(keyspace, row.getString("type"), org.apache.cassandra.schema.Types.none()); + AbstractType<?> type = CQLTypeParser.parse(keyspace, row.getString("type"), org.apache.cassandra.schema.Types.none()); + ColumnMetadata.Kind kind = row.has("kind") + ? ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase()) + : ColumnMetadata.Kind.REGULAR; + assert kind == ColumnMetadata.Kind.REGULAR || kind == ColumnMetadata.Kind.STATIC + : "Unexpected dropped column kind: " + kind.toString(); + + ColumnMetadata column = new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, ColumnMetadata.NO_POSITION, kind); long droppedTime = TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time")); - ColumnDefinition.Kind kind = row.has("kind") - ? ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase()) - : ColumnDefinition.Kind.REGULAR; - assert kind == ColumnDefinition.Kind.REGULAR || kind == ColumnDefinition.Kind.STATIC - : "Unexpected dropped column kind: " + kind; - return new CFMetaData.DroppedColumn(name, type, droppedTime, kind); + return new DroppedColumn(column, droppedTime); } private static Indexes fetchIndexes(String keyspace, String table) @@ -1119,7 +1065,7 @@ public final class SchemaKeyspace return views.build(); } - private static ViewDefinition fetchView(String keyspaceName, String viewName, Types types) + private static ViewMetadata fetchView(String keyspaceName, String viewName, Types types) { String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, VIEWS); UntypedResultSet rows = query(query, keyspaceName, viewName); @@ -1127,33 +1073,25 @@ public final class SchemaKeyspace throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, viewName)); UntypedResultSet.Row row = rows.one(); - UUID id = row.getUUID("id"); - UUID baseTableId = row.getUUID("base_table_id"); + TableId baseTableId = TableId.fromUUID(row.getUUID("base_table_id")); String baseTableName = row.getString("base_table_name"); boolean includeAll = row.getBoolean("include_all_columns"); String whereClause = row.getString("where_clause"); - List<ColumnDefinition> columns = fetchColumns(keyspaceName, viewName, types); + List<ColumnMetadata> columns = fetchColumns(keyspaceName, viewName, types); - Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, viewName); - - CFMetaData cfm = CFMetaData.create(keyspaceName, - viewName, - id, - false, - true, - false, - false, - true, - columns, - DatabaseDescriptor.getPartitioner()) - .params(createTableParamsFromRow(row)) - .droppedColumns(droppedColumns); + TableMetadata metadata = + TableMetadata.builder(keyspaceName, viewName, TableId.fromUUID(row.getUUID("id"))) + .isView(true) + .addColumns(columns) + .droppedColumns(fetchDroppedColumns(keyspaceName, viewName)) + .params(createTableParamsFromRow(row)) + .build(); - String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause); - SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect); + String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause); + SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect); - return new ViewDefinition(keyspaceName, viewName, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm); + return new ViewMetadata(keyspaceName, viewName, baseTableId, baseTableName, includeAll, rawStatement, whereClause, metadata); } private static Functions fetchFunctions(String keyspaceName, Types types) @@ -1189,14 +1127,18 @@ public final class SchemaKeyspace List<AbstractType<?>> argTypes = new ArrayList<>(); for (String type : row.getFrozenList("argument_types", UTF8Type.instance)) - argTypes.add(parse(ksName, type, types)); + argTypes.add(CQLTypeParser.parse(ksName, type, types)); - AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types); + AbstractType<?> returnType = CQLTypeParser.parse(ksName, row.getString("return_type"), types); String language = row.getString("language"); String body = row.getString("body"); boolean calledOnNullInput = row.getBoolean("called_on_null_input"); + /* + * TODO: find a way to get rid of Schema.instance dependency; evaluate if the opimisation below makes a difference + * in the first place. Remove if it isn't. + */ org.apache.cassandra.cql3.functions.Function existing = Schema.instance.findFunction(name, argTypes).orElse(null); if (existing instanceof UDFunction) { @@ -1205,7 +1147,8 @@ public final class SchemaKeyspace // statement, since CreateFunctionStatement needs to execute UDFunction.create but schema migration // also needs that (since it needs to handle its own change). UDFunction udf = (UDFunction) existing; - if (udf.argNames().equals(argNames) && // arg types checked in Functions.find call + if (udf.argNames().equals(argNames) && + udf.argTypes().equals(argTypes) && udf.returnType().equals(returnType) && !udf.isAggregate() && udf.language().equals(language) && @@ -1247,14 +1190,14 @@ public final class SchemaKeyspace List<AbstractType<?>> argTypes = row.getFrozenList("argument_types", UTF8Type.instance) .stream() - .map(t -> parse(ksName, t, types)) + .map(t -> CQLTypeParser.parse(ksName, t, types)) .collect(toList()); - AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types); + AbstractType<?> returnType = CQLTypeParser.parse(ksName, row.getString("return_type"), types); FunctionName stateFunc = new FunctionName(ksName, (row.getString("state_func"))); FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null; - AbstractType<?> stateType = row.has("state_type") ? parse(ksName, row.getString("state_type"), types) : null; + AbstractType<?> stateType = row.has("state_type") ? CQLTypeParser.parse(ksName, row.getString("state_type"), types) : null; ByteBuffer initcond = row.has("initcond") ? Terms.asBytes(ksName, row.getString("initcond"), stateType) : null; try @@ -1277,116 +1220,35 @@ public final class SchemaKeyspace */ /** - * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects - * (which also involves fs operations on add/drop ks/cf) - * - * @param mutations the schema changes to apply - * - * @throws ConfigurationException If one of metadata attributes has invalid value + * Computes the set of names of keyspaces affected by the provided schema mutations. */ - public static synchronized void mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws ConfigurationException + static Set<String> affectedKeyspaces(Collection<Mutation> mutations) { - mergeSchema(mutations); - Schema.instance.updateVersionAndAnnounce(); + // only compare the keyspaces affected by this set of schema mutations + return mutations.stream() + .map(m -> UTF8Type.instance.compose(m.key().getKey())) + .collect(toSet()); } - public static synchronized void mergeSchema(Collection<Mutation> mutations) + static void applyChanges(Collection<Mutation> mutations) { - // only compare the keyspaces affected by this set of schema mutations - Set<String> affectedKeyspaces = - mutations.stream() - .map(m -> UTF8Type.instance.compose(m.key().getKey())) - .collect(Collectors.toSet()); - - // fetch the current state of schema for the affected keyspaces only - Keyspaces before = Schema.instance.getKeyspaces(affectedKeyspaces); - - // apply the schema mutations and flush mutations.forEach(Mutation::apply); - if (FLUSH_SCHEMA_TABLES) - flush(); - - // fetch the new state of schema from schema tables (not applied to Schema.instance yet) - Keyspaces after = fetchKeyspacesOnly(affectedKeyspaces); - - // deal with the diff - MapDifference<String, KeyspaceMetadata> keyspacesDiff = before.diff(after); - - // dropped keyspaces - for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnLeft().values()) - { - keyspace.functions.udas().forEach(Schema.instance::dropAggregate); - keyspace.functions.udfs().forEach(Schema.instance::dropFunction); - keyspace.views.forEach(v -> Schema.instance.dropView(v.ksName, v.viewName)); - keyspace.tables.forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName)); - keyspace.types.forEach(Schema.instance::dropType); - Schema.instance.dropKeyspace(keyspace.name); - } + if (SchemaKeyspace.FLUSH_SCHEMA_TABLES) + SchemaKeyspace.flush(); + } - // new keyspaces - for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnRight().values()) - { - Schema.instance.addKeyspace(KeyspaceMetadata.create(keyspace.name, keyspace.params)); - keyspace.types.forEach(Schema.instance::addType); - keyspace.tables.forEach(Schema.instance::addTable); - keyspace.views.forEach(Schema.instance::addView); - keyspace.functions.udfs().forEach(Schema.instance::addFunction); - keyspace.functions.udas().forEach(Schema.instance::addAggregate); - } + static Keyspaces fetchKeyspaces(Set<String> toFetch) + { + /* + * We know the keyspace names we are going to query, but we still want to run the SELECT IN + * query, to filter out the keyspaces that had been dropped by the applied mutation set. + */ + String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES); - // updated keyspaces - for (Map.Entry<String, MapDifference.ValueDifference<KeyspaceMetadata>> diff : keyspacesDiff.entriesDiffering().entrySet()) - updateKeyspace(diff.getKey(), diff.getValue().leftValue(), diff.getValue().rightValue()); - } - - private static void updateKeyspace(String keyspaceName, KeyspaceMetadata keyspaceBefore, KeyspaceMetadata keyspaceAfter) - { - // calculate the deltas - MapDifference<String, CFMetaData> tablesDiff = keyspaceBefore.tables.diff(keyspaceAfter.tables); - MapDifference<String, ViewDefinition> viewsDiff = keyspaceBefore.views.diff(keyspaceAfter.views); - MapDifference<ByteBuffer, UserType> typesDiff = keyspaceBefore.types.diff(keyspaceAfter.types); - - Map<Pair<FunctionName, List<String>>, UDFunction> udfsBefore = new HashMap<>(); - keyspaceBefore.functions.udfs().forEach(f -> udfsBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f)); - Map<Pair<FunctionName, List<String>>, UDFunction> udfsAfter = new HashMap<>(); - keyspaceAfter.functions.udfs().forEach(f -> udfsAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f)); - MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = Maps.difference(udfsBefore, udfsAfter); - - Map<Pair<FunctionName, List<String>>, UDAggregate> udasBefore = new HashMap<>(); - keyspaceBefore.functions.udas().forEach(f -> udasBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f)); - Map<Pair<FunctionName, List<String>>, UDAggregate> udasAfter = new HashMap<>(); - keyspaceAfter.functions.udas().forEach(f -> udasAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f)); - MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff = Maps.difference(udasBefore, udasAfter); - - // update keyspace params, if changed - if (!keyspaceBefore.params.equals(keyspaceAfter.params)) - Schema.instance.updateKeyspace(keyspaceName, keyspaceAfter.params); - - // drop everything removed - udasDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropAggregate); - udfsDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropFunction); - viewsDiff.entriesOnlyOnLeft().values().forEach(v -> Schema.instance.dropView(v.ksName, v.viewName)); - tablesDiff.entriesOnlyOnLeft().values().forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName)); - typesDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropType); - - // add everything created - typesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addType); - tablesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addTable); - viewsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addView); - udfsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addFunction); - udasDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addAggregate); - - // update everything altered - for (MapDifference.ValueDifference<UserType> diff : typesDiff.entriesDiffering().values()) - Schema.instance.updateType(diff.rightValue()); - for (MapDifference.ValueDifference<CFMetaData> diff : tablesDiff.entriesDiffering().values()) - Schema.instance.updateTable(diff.rightValue()); - for (MapDifference.ValueDifference<ViewDefinition> diff : viewsDiff.entriesDiffering().values()) - Schema.instance.updateView(diff.rightValue()); - for (MapDifference.ValueDifference<UDFunction> diff : udfsDiff.entriesDiffering().values()) - Schema.instance.updateFunction(diff.rightValue()); - for (MapDifference.ValueDifference<UDAggregate> diff : udasDiff.entriesDiffering().values()) - Schema.instance.updateAggregate(diff.rightValue()); + Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder(); + for (UntypedResultSet.Row row : query(query, new ArrayList<>(toFetch))) + keyspaces.add(fetchKeyspace(row.getString("keyspace_name"))); + return keyspaces.build(); } /*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaPullVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaPullVerbHandler.java b/src/java/org/apache/cassandra/schema/SchemaPullVerbHandler.java new file mode 100644 index 0000000..45cf365 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaPullVerbHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.util.Collection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; + +/** + * Sends it's current schema state in form of mutations in reply to the remote node's request. + * Such a request is made when one of the nodes, by means of Gossip, detects schema disagreement in the ring. + */ +public final class SchemaPullVerbHandler implements IVerbHandler +{ + private static final Logger logger = LoggerFactory.getLogger(SchemaPullVerbHandler.class); + + public void doVerb(MessageIn message, int id) + { + logger.trace("Received schema pull request from {}", message.from); + + MessageOut<Collection<Mutation>> response = + new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE, + SchemaKeyspace.convertSchemaToMutations(), + MigrationManager.MigrationsSerializer.instance); + + MessagingService.instance().sendReply(response, id, message.from); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java b/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java new file mode 100644 index 0000000..f939cda --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.util.Collection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessageIn; + +/** + * Called when node receives updated schema state from the schema migration coordinator node. + * Such happens when user makes local schema migration on one of the nodes in the ring + * (which is going to act as coordinator) and that node sends (pushes) it's updated schema state + * (in form of mutations) to all the alive nodes in the cluster. + */ +public final class SchemaPushVerbHandler implements IVerbHandler<Collection<Mutation>> +{ + private static final Logger logger = LoggerFactory.getLogger(SchemaPushVerbHandler.class); + + public void doVerb(final MessageIn<Collection<Mutation>> message, int id) + { + logger.trace("Received schema push request from {}", message.from); + + StageManager.getStage(Stage.MIGRATION).submit(() -> Schema.instance.mergeAndAnnounceVersion(message.payload)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaVersionVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaVersionVerbHandler.java b/src/java/org/apache/cassandra/schema/SchemaVersionVerbHandler.java new file mode 100644 index 0000000..0a506e3 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaVersionVerbHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.UUIDSerializer; + +public final class SchemaVersionVerbHandler implements IVerbHandler +{ + private final Logger logger = LoggerFactory.getLogger(SchemaVersionVerbHandler.class); + + public void doVerb(MessageIn message, int id) + { + logger.trace("Received schema version request from {}", message.from); + + MessageOut<UUID> response = + new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE, + Schema.instance.getVersion(), + UUIDSerializer.serializer); + + MessagingService.instance().sendReply(response, id, message.from); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/TableId.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/TableId.java b/src/java/org/apache/cassandra/schema/TableId.java new file mode 100644 index 0000000..4b2592e --- /dev/null +++ b/src/java/org/apache/cassandra/schema/TableId.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.UUID; + +import org.apache.commons.lang3.ArrayUtils; + +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; + +/** + * The unique identifier of a table. + * <p> + * This is essentially a UUID, but we wrap it as it's used quite a bit in the code and having a nicely name class make + * the code more readable. + */ +public class TableId +{ + private final UUID id; + + private TableId(UUID id) + { + this.id = id; + } + + public static TableId fromUUID(UUID id) + { + return new TableId(id); + } + + public static TableId generate() + { + return new TableId(UUIDGen.getTimeUUID()); + } + + public static TableId fromString(String idString) + { + return new TableId(UUID.fromString(idString)); + } + + /** + * Creates the UUID of a system table. + * + * This is deterministically based on the table name as system tables are hardcoded and initialized independently + * on each node (they don't go through a CREATE), but we still want them to have the same ID everywhere. + * + * We shouldn't use this for any other table. + */ + public static TableId forSystemTable(String keyspace, String table) + { + assert SchemaConstants.SYSTEM_KEYSPACE_NAMES.contains(keyspace) + || SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(keyspace); + return new TableId(UUID.nameUUIDFromBytes(ArrayUtils.addAll(keyspace.getBytes(), table.getBytes()))); + } + + public String toHexString() + { + return ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(id)); + } + + public UUID asUUID() + { + return id; + } + + @Override + public final int hashCode() + { + return id.hashCode(); + } + + @Override + public final boolean equals(Object o) + { + return this == o || (o instanceof TableId && this.id.equals(((TableId) o).id)); + } + + @Override + public String toString() + { + return id.toString(); + } + + public void serialize(DataOutput out) throws IOException + { + out.writeLong(id.getMostSignificantBits()); + out.writeLong(id.getLeastSignificantBits()); + } + + public int serializedSize() + { + return 16; + } + + public static TableId deserialize(DataInput in) throws IOException + { + return new TableId(new UUID(in.readLong(), in.readLong())); + } +}