Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0f2266b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0f2266b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0f2266b Branch: refs/heads/cassandra-3.0 Commit: b0f2266b27e493b84dd61b59f1c926fc22754962 Parents: 70c8a53 61ac125 Author: Robert Stupp <sn...@snazy.de> Authored: Sat Feb 20 11:01:19 2016 +0100 Committer: Robert Stupp <sn...@snazy.de> Committed: Sat Feb 20 11:01:19 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/schema/SchemaKeyspace.java | 8 ++- .../validation/operations/AggregationTest.java | 60 +++++++++++++++++++- 3 files changed, 65 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0f2266b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index f0aa996,9d0046b..0c01009 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,19 -1,5 +1,20 @@@ -2.2.6 +3.0.4 + * Prevent logging in sandboxed state (CASSANDRA-11033) + * Disallow drop/alter operations of UDTs used by UDAs (CASSANDRA-10721) + * Add query time validation method on Index (CASSANDRA-11043) + * Avoid potential AssertionError in mixed version cluster (CASSANDRA-11128) + * Properly handle hinted handoff after topology changes (CASSANDRA-5902) + * AssertionError when listing sstable files on inconsistent disk state (CASSANDRA-11156) + * Fix wrong rack counting and invalid conditions check for TokenAllocation + (CASSANDRA-11139) + * Avoid creating empty hint files (CASSANDRA-11090) + * Fix leak detection strong reference loop using weak reference (CASSANDRA-11120) + * Configurie BatchlogManager to stop delayed tasks on shutdown (CASSANDRA-11062) + * Hadoop integration is incompatible with Cassandra Driver 3.0.0 (CASSANDRA-11001) + * Add dropped_columns to the list of schema table so it gets handled + properly (CASSANDRA-11050) +Merged from 2.2: + * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840) * (cqlsh) cqlsh cannot be called through symlink (CASSANDRA-11037) * fix ohc and java-driver pom dependencies in build.xml (CASSANDRA-10793) * Protect from keyspace dropped during repair (CASSANDRA-11065) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0f2266b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 62c78fd,0000000..9e05a73 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@@ -1,1408 -1,0 +1,1410 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.*; +import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.view.View; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.transport.Server; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +import static java.lang.String.format; + +import static java.util.stream.Collectors.toList; +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; +import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; +import static org.apache.cassandra.schema.CQLTypeParser.parse; + +/** + * system_schema.* tables and methods for manipulating them. + */ +public final class SchemaKeyspace +{ + private SchemaKeyspace() + { + } + + private static final Logger logger = LoggerFactory.getLogger(SchemaKeyspace.class); + + private static final boolean FLUSH_SCHEMA_TABLES = Boolean.valueOf(System.getProperty("cassandra.test.flush_local_schema_changes", "true")); + + public static final String NAME = "system_schema"; + + public static final String KEYSPACES = "keyspaces"; + public static final String TABLES = "tables"; + public static final String COLUMNS = "columns"; + public static final String DROPPED_COLUMNS = "dropped_columns"; + public static final String TRIGGERS = "triggers"; + public static final String VIEWS = "views"; + public static final String TYPES = "types"; + public static final String FUNCTIONS = "functions"; + public static final String AGGREGATES = "aggregates"; + public static final String INDEXES = "indexes"; + + public static final List<String> ALL = + ImmutableList.of(KEYSPACES, TABLES, COLUMNS, DROPPED_COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES); + + private static final CFMetaData Keyspaces = + compile(KEYSPACES, + "keyspace definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "durable_writes boolean," + + "replication frozen<map<text, text>>," + + "PRIMARY KEY ((keyspace_name)))"); + + private static final CFMetaData Tables = + compile(TABLES, + "table definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "bloom_filter_fp_chance double," + + "caching frozen<map<text, text>>," + + "comment text," + + "compaction frozen<map<text, text>>," + + "compression frozen<map<text, text>>," + + "crc_check_chance double," + + "dclocal_read_repair_chance double," + + "default_time_to_live int," + + "extensions frozen<map<text, blob>>," + + "flags frozen<set<text>>," // SUPER, COUNTER, DENSE, COMPOUND + + "gc_grace_seconds int," + + "id uuid," + + "max_index_interval int," + + "memtable_flush_period_in_ms int," + + "min_index_interval int," + + "read_repair_chance double," + + "speculative_retry text," + + "PRIMARY KEY ((keyspace_name), table_name))"); + + private static final CFMetaData Columns = + compile(COLUMNS, + "column definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "column_name text," + + "clustering_order text," + + "column_name_bytes blob," + + "kind text," + + "position int," + + "type text," + + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); + + private static final CFMetaData DroppedColumns = + compile(DROPPED_COLUMNS, + "dropped column registry", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "column_name text," + + "dropped_time timestamp," + + "type text," + + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); + + private static final CFMetaData Triggers = + compile(TRIGGERS, + "trigger definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "trigger_name text," + + "options frozen<map<text, text>>," + + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))"); + + private static final CFMetaData Views = + compile(VIEWS, + "view definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "view_name text," + + "base_table_id uuid," + + "base_table_name text," + + "where_clause text," + + "bloom_filter_fp_chance double," + + "caching frozen<map<text, text>>," + + "comment text," + + "compaction frozen<map<text, text>>," + + "compression frozen<map<text, text>>," + + "crc_check_chance double," + + "dclocal_read_repair_chance double," + + "default_time_to_live int," + + "extensions frozen<map<text, blob>>," + + "gc_grace_seconds int," + + "id uuid," + + "include_all_columns boolean," + + "max_index_interval int," + + "memtable_flush_period_in_ms int," + + "min_index_interval int," + + "read_repair_chance double," + + "speculative_retry text," + + "PRIMARY KEY ((keyspace_name), view_name))"); + + private static final CFMetaData Indexes = + compile(INDEXES, + "secondary index definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "index_name text," + + "kind text," + + "options frozen<map<text, text>>," + + "PRIMARY KEY ((keyspace_name), table_name, index_name))"); + + private static final CFMetaData Types = + compile(TYPES, + "user defined type definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "type_name text," + + "field_names frozen<list<text>>," + + "field_types frozen<list<text>>," + + "PRIMARY KEY ((keyspace_name), type_name))"); + + private static final CFMetaData Functions = + compile(FUNCTIONS, + "user defined function definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "function_name text," + + "argument_types frozen<list<text>>," + + "argument_names frozen<list<text>>," + + "body text," + + "language text," + + "return_type text," + + "called_on_null_input boolean," + + "PRIMARY KEY ((keyspace_name), function_name, argument_types))"); + + private static final CFMetaData Aggregates = + compile(AGGREGATES, + "user defined aggregate definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "aggregate_name text," + + "argument_types frozen<list<text>>," + + "final_func text," + + "initcond text," + + "return_type text," + + "state_func text," + + "state_type text," + + "PRIMARY KEY ((keyspace_name), aggregate_name, argument_types))"); + + public static final List<CFMetaData> ALL_TABLE_METADATA = + ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, Views, Types, Functions, Aggregates, Indexes); + + private static CFMetaData compile(String name, String description, String schema) + { + return CFMetaData.compile(String.format(schema, name), NAME) + .comment(description) + .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7)); + } + + public static KeyspaceMetadata metadata() + { + return KeyspaceMetadata.create(NAME, KeyspaceParams.local(), org.apache.cassandra.schema.Tables.of(ALL_TABLE_METADATA)); + } + + /** + * Add entries to system_schema.* for the hardcoded system keyspaces + */ + public static void saveSystemKeyspacesSchema() + { + KeyspaceMetadata system = Schema.instance.getKSMetaData(SystemKeyspace.NAME); + KeyspaceMetadata schema = Schema.instance.getKSMetaData(NAME); + + long timestamp = FBUtilities.timestampMicros(); + + // delete old, possibly obsolete entries in schema tables + for (String schemaTable : ALL) + { + String query = String.format("DELETE FROM %s.%s USING TIMESTAMP ? WHERE keyspace_name = ?", NAME, schemaTable); + for (String systemKeyspace : Schema.SYSTEM_KEYSPACE_NAMES) + executeOnceInternal(query, timestamp, systemKeyspace); + } + + // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added) + makeCreateKeyspaceMutation(system, timestamp + 1).apply(); + makeCreateKeyspaceMutation(schema, timestamp + 1).apply(); + } + + public static void truncate() + { + ALL.forEach(table -> getSchemaCFS(table).truncateBlocking()); + } + + static void flush() + { + if (!Boolean.getBoolean("cassandra.unsafesystem")) + ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush())); + } + + /** + * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest + * will be converted into UUID which would act as content-based version of the schema. + */ + public static UUID calculateSchemaDigest() + { + MessageDigest digest; + try + { + digest = MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) + { + throw new RuntimeException(e); + } + + for (String table : ALL) + { + // Due to CASSANDRA-11050 we want to exclude DROPPED_COLUMNS for schema digest computation. We can and + // should remove that in the next major release (so C* 4.0). + if (table.equals(DROPPED_COLUMNS)) + continue; + + ReadCommand cmd = getReadCommandForTableSchema(table); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); + PartitionIterator schema = cmd.executeInternal(orderGroup)) + { + while (schema.hasNext()) + { + try (RowIterator partition = schema.next()) + { + if (!isSystemKeyspaceSchemaPartition(partition.partitionKey())) + RowIterators.digest(partition, digest); + } + } + } + } + return UUID.nameUUIDFromBytes(digest.digest()); + } + + /** + * @param schemaTableName The name of the table responsible for part of the schema + * @return CFS responsible to hold low-level serialized schema + */ + private static ColumnFamilyStore getSchemaCFS(String schemaTableName) + { + return Keyspace.open(NAME).getColumnFamilyStore(schemaTableName); + } + + /** + * @param schemaTableName The name of the table responsible for part of the schema. + * @return low-level schema representation + */ + private static ReadCommand getReadCommandForTableSchema(String schemaTableName) + { + ColumnFamilyStore cfs = getSchemaCFS(schemaTableName); + return PartitionRangeReadCommand.allDataRead(cfs.metadata, FBUtilities.nowInSeconds()); + } + + public static Collection<Mutation> convertSchemaToMutations() + { + Map<DecoratedKey, Mutation> mutationMap = new HashMap<>(); + + for (String table : ALL) + convertSchemaToMutations(mutationMap, table); + + return mutationMap.values(); + } + + private static void convertSchemaToMutations(Map<DecoratedKey, Mutation> mutationMap, String schemaTableName) + { + ReadCommand cmd = getReadCommandForTableSchema(schemaTableName); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) + { + while (iter.hasNext()) + { + try (UnfilteredRowIterator partition = iter.next()) + { + if (isSystemKeyspaceSchemaPartition(partition.partitionKey())) + continue; + + DecoratedKey key = partition.partitionKey(); + Mutation mutation = mutationMap.get(key); + if (mutation == null) + { + mutation = new Mutation(NAME, key); + mutationMap.put(key, mutation); + } + + mutation.add(PartitionUpdate.fromIterator(partition)); + } + } + } + } + + private static ByteBuffer getSchemaKSKey(String ksName) + { + return AsciiType.instance.fromString(ksName); + } + + private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey) + { + return Schema.isSystemKeyspace(UTF8Type.instance.compose(partitionKey.getKey())); + } + + /* + * Schema entities to mutations + */ + + public static Mutation makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp) + { + RowUpdateBuilder adder = new RowUpdateBuilder(Keyspaces, timestamp, name).clustering(); + return adder.add(KeyspaceParams.Option.DURABLE_WRITES.toString(), params.durableWrites) + .frozenMap(KeyspaceParams.Option.REPLICATION.toString(), params.replication.asMap()) + .build(); + } + + public static Mutation makeCreateKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp) + { + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + + keyspace.tables.forEach(table -> addTableToSchemaMutation(table, timestamp, true, mutation)); + keyspace.views.forEach(view -> addViewToSchemaMutation(view, timestamp, true, mutation)); + keyspace.types.forEach(type -> addTypeToSchemaMutation(type, timestamp, mutation)); + keyspace.functions.udfs().forEach(udf -> addFunctionToSchemaMutation(udf, timestamp, mutation)); + keyspace.functions.udas().forEach(uda -> addAggregateToSchemaMutation(uda, timestamp, mutation)); + + return mutation; + } + + public static Mutation makeDropKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp) + { + int nowInSec = FBUtilities.nowInSeconds(); + Mutation mutation = new Mutation(NAME, Keyspaces.decorateKey(getSchemaKSKey(keyspace.name))); + + for (CFMetaData schemaTable : ALL_TABLE_METADATA) + mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec)); + + return mutation; + } + + public static Mutation makeCreateTypeMutation(KeyspaceMetadata keyspace, UserType type, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addTypeToSchemaMutation(type, timestamp, mutation); + return mutation; + } + + static void addTypeToSchemaMutation(UserType type, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = new RowUpdateBuilder(Types, timestamp, mutation) + .clustering(type.getNameAsString()) + .frozenList("field_names", type.fieldNames().stream().map(SchemaKeyspace::bbToString).collect(toList())) + .frozenList("field_types", type.fieldTypes().stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(toList())); + + adder.build(); + } + + private static String bbToString(ByteBuffer bb) + { + try + { + return ByteBufferUtil.string(bb); + } + catch (CharacterCodingException e) + { + throw new RuntimeException(e); + } + } + + public static Mutation dropTypeFromSchemaMutation(KeyspaceMetadata keyspace, UserType type, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + return RowUpdateBuilder.deleteRow(Types, timestamp, mutation, type.name); + } + + public static Mutation makeCreateTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addTableToSchemaMutation(table, timestamp, true, mutation); + return mutation; + } + + static void addTableToSchemaMutation(CFMetaData table, long timestamp, boolean withColumnsAndTriggers, Mutation mutation) + { + RowUpdateBuilder adder = new RowUpdateBuilder(Tables, timestamp, mutation).clustering(table.cfName); + + addTableParamsToSchemaMutation(table.params, adder); + + adder.add("id", table.cfId) + .frozenSet("flags", CFMetaData.flagsToStrings(table.flags())) + .build(); + + if (withColumnsAndTriggers) + { + for (ColumnDefinition column : table.allColumns()) + addColumnToSchemaMutation(table, column, timestamp, mutation); + + for (CFMetaData.DroppedColumn column : table.getDroppedColumns().values()) + addDroppedColumnToSchemaMutation(table, column, timestamp, mutation); + + for (TriggerMetadata trigger : table.getTriggers()) + addTriggerToSchemaMutation(table, trigger, timestamp, mutation); + + for (IndexMetadata index : table.getIndexes()) + addIndexToSchemaMutation(table, index, timestamp, mutation); + } + } + + private static void addTableParamsToSchemaMutation(TableParams params, RowUpdateBuilder adder) + { + adder.add("bloom_filter_fp_chance", params.bloomFilterFpChance) + .add("comment", params.comment) + .add("dclocal_read_repair_chance", params.dcLocalReadRepairChance) + .add("default_time_to_live", params.defaultTimeToLive) + .add("gc_grace_seconds", params.gcGraceSeconds) + .add("max_index_interval", params.maxIndexInterval) + .add("memtable_flush_period_in_ms", params.memtableFlushPeriodInMs) + .add("min_index_interval", params.minIndexInterval) + .add("read_repair_chance", params.readRepairChance) + .add("speculative_retry", params.speculativeRetry.toString()) + .add("crc_check_chance", params.crcCheckChance) + .frozenMap("caching", params.caching.asMap()) + .frozenMap("compaction", params.compaction.asMap()) + .frozenMap("compression", params.compression.asMap()) + .frozenMap("extensions", params.extensions); + } + + public static Mutation makeUpdateTableMutation(KeyspaceMetadata keyspace, + CFMetaData oldTable, + CFMetaData newTable, + long timestamp, + boolean fromThrift) + { + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + + addTableToSchemaMutation(newTable, timestamp, false, mutation); + + MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(oldTable.getColumnMetadata(), + newTable.getColumnMetadata()); + + // columns that are no longer needed + for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values()) + { + // Thrift only knows about the REGULAR ColumnDefinition type, so don't consider other type + // are being deleted just because they are not here. + if (!fromThrift || + column.kind == ColumnDefinition.Kind.REGULAR || + (newTable.isStaticCompactTable() && column.kind == ColumnDefinition.Kind.STATIC)) + { + dropColumnFromSchemaMutation(oldTable, column, timestamp, mutation); + } + } + + // newly added columns + for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values()) + addColumnToSchemaMutation(newTable, column, timestamp, mutation); + + // old columns with updated attributes + for (ByteBuffer name : columnDiff.entriesDiffering().keySet()) + addColumnToSchemaMutation(newTable, newTable.getColumnDefinition(name), timestamp, mutation); + + // dropped columns + MapDifference<ByteBuffer, CFMetaData.DroppedColumn> droppedColumnDiff = + Maps.difference(oldTable.getDroppedColumns(), newTable.getDroppedColumns()); + + // newly dropped columns + for (CFMetaData.DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values()) + addDroppedColumnToSchemaMutation(newTable, column, timestamp, mutation); + + // columns added then dropped again + for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet()) + addDroppedColumnToSchemaMutation(newTable, newTable.getDroppedColumns().get(name), timestamp, mutation); + + MapDifference<String, TriggerMetadata> triggerDiff = triggersDiff(oldTable.getTriggers(), newTable.getTriggers()); + + // dropped triggers + for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnLeft().values()) + dropTriggerFromSchemaMutation(oldTable, trigger, timestamp, mutation); + + // newly created triggers + for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnRight().values()) + addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation); + + MapDifference<String, IndexMetadata> indexesDiff = indexesDiff(oldTable.getIndexes(), + newTable.getIndexes()); + + // dropped indexes + for (IndexMetadata index : indexesDiff.entriesOnlyOnLeft().values()) + dropIndexFromSchemaMutation(oldTable, index, timestamp, mutation); + + // newly created indexes + for (IndexMetadata index : indexesDiff.entriesOnlyOnRight().values()) + addIndexToSchemaMutation(newTable, index, timestamp, mutation); + + // updated indexes need to be updated + for (MapDifference.ValueDifference<IndexMetadata> diff : indexesDiff.entriesDiffering().values()) + addUpdatedIndexToSchemaMutation(newTable, diff.rightValue(), timestamp, mutation); + + return mutation; + } + + private static MapDifference<String, IndexMetadata> indexesDiff(Indexes before, Indexes after) + { + Map<String, IndexMetadata> beforeMap = new HashMap<>(); + before.forEach(i -> beforeMap.put(i.name, i)); + + Map<String, IndexMetadata> afterMap = new HashMap<>(); + after.forEach(i -> afterMap.put(i.name, i)); + + return Maps.difference(beforeMap, afterMap); + } + + private static MapDifference<String, TriggerMetadata> triggersDiff(Triggers before, Triggers after) + { + Map<String, TriggerMetadata> beforeMap = new HashMap<>(); + before.forEach(t -> beforeMap.put(t.name, t)); + + Map<String, TriggerMetadata> afterMap = new HashMap<>(); + after.forEach(t -> afterMap.put(t.name, t)); + + return Maps.difference(beforeMap, afterMap); + } + + public static Mutation makeDropTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + + RowUpdateBuilder.deleteRow(Tables, timestamp, mutation, table.cfName); + + for (ColumnDefinition column : table.allColumns()) + dropColumnFromSchemaMutation(table, column, timestamp, mutation); + + for (TriggerMetadata trigger : table.getTriggers()) + dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation); + + for (IndexMetadata index : table.getIndexes()) + dropIndexFromSchemaMutation(table, index, timestamp, mutation); + + return mutation; + } + + private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation).clustering(table.cfName, column.name.toString()); + + AbstractType<?> type = column.type; + if (type instanceof ReversedType) + type = ((ReversedType) type).baseType; + + adder.add("column_name_bytes", column.name.bytes) + .add("kind", column.kind.toString().toLowerCase()) + .add("position", column.position()) + .add("clustering_order", column.clusteringOrder().toString().toLowerCase()) + .add("type", type.asCQL3Type().toString()) + .build(); + } + + private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) + { + // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference). + RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString()); + } + + private static void addDroppedColumnToSchemaMutation(CFMetaData table, CFMetaData.DroppedColumn column, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = new RowUpdateBuilder(DroppedColumns, timestamp, mutation).clustering(table.cfName, column.name); + + adder.add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime))) + .add("type", expandUserTypes(column.type).asCQL3Type().toString()) + .build(); + } + + private static void addTriggerToSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation) + { + new RowUpdateBuilder(Triggers, timestamp, mutation) + .clustering(table.cfName, trigger.name) + .frozenMap("options", Collections.singletonMap("class", trigger.classOption)) + .build(); + } + + private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation) + { + RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name); + } + + public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addViewToSchemaMutation(view, timestamp, true, mutation); + return mutation; + } + + private static void addViewToSchemaMutation(ViewDefinition view, long timestamp, boolean includeColumns, Mutation mutation) + { + RowUpdateBuilder builder = new RowUpdateBuilder(Views, timestamp, mutation) + .clustering(view.viewName); + + CFMetaData table = view.metadata; + + builder.add("include_all_columns", view.includeAllColumns) + .add("base_table_id", view.baseTableId) + .add("base_table_name", view.baseTableMetadata().cfName) + .add("where_clause", view.whereClause) + .add("id", table.cfId); + + addTableParamsToSchemaMutation(table.params, builder); + + if (includeColumns) + { + for (ColumnDefinition column : table.allColumns()) + addColumnToSchemaMutation(table, column, timestamp, mutation); + + for (CFMetaData.DroppedColumn column : table.getDroppedColumns().values()) + addDroppedColumnToSchemaMutation(table, column, timestamp, mutation); + } + + builder.build(); + } + + public static Mutation makeDropViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + + RowUpdateBuilder.deleteRow(Views, timestamp, mutation, view.viewName); + + CFMetaData table = view.metadata; + for (ColumnDefinition column : table.allColumns()) + dropColumnFromSchemaMutation(table, column, timestamp, mutation); + + for (IndexMetadata index : table.getIndexes()) + dropIndexFromSchemaMutation(table, index, timestamp, mutation); + + return mutation; + } + + public static Mutation makeUpdateViewMutation(KeyspaceMetadata keyspace, + ViewDefinition oldView, + ViewDefinition newView, + long timestamp) + { + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + + addViewToSchemaMutation(newView, timestamp, false, mutation); + + MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(oldView.metadata.getColumnMetadata(), + newView.metadata.getColumnMetadata()); + + // columns that are no longer needed + for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values()) + dropColumnFromSchemaMutation(oldView.metadata, column, timestamp, mutation); + + // newly added columns + for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values()) + addColumnToSchemaMutation(newView.metadata, column, timestamp, mutation); + + // old columns with updated attributes + for (ByteBuffer name : columnDiff.entriesDiffering().keySet()) + addColumnToSchemaMutation(newView.metadata, newView.metadata.getColumnDefinition(name), timestamp, mutation); + + // dropped columns + MapDifference<ByteBuffer, CFMetaData.DroppedColumn> droppedColumnDiff = + Maps.difference(oldView.metadata.getDroppedColumns(), oldView.metadata.getDroppedColumns()); + + // newly dropped columns + for (CFMetaData.DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values()) + addDroppedColumnToSchemaMutation(oldView.metadata, column, timestamp, mutation); + + // columns added then dropped again + for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet()) + addDroppedColumnToSchemaMutation(newView.metadata, newView.metadata.getDroppedColumns().get(name), timestamp, mutation); + + return mutation; + } + + private static void addIndexToSchemaMutation(CFMetaData table, + IndexMetadata index, + long timestamp, + Mutation mutation) + { + RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name); + + builder.add("kind", index.kind.toString()); + builder.frozenMap("options", index.options); + builder.build(); + } + + private static void dropIndexFromSchemaMutation(CFMetaData table, + IndexMetadata index, + long timestamp, + Mutation mutation) + { + RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, index.name); + } + + private static void addUpdatedIndexToSchemaMutation(CFMetaData table, + IndexMetadata index, + long timestamp, + Mutation mutation) + { + addIndexToSchemaMutation(table, index, timestamp, mutation); + } + + public static Mutation makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addFunctionToSchemaMutation(function, timestamp, mutation); + return mutation; + } + + static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = + new RowUpdateBuilder(Functions, timestamp, mutation).clustering(function.name().name, functionArgumentsList(function)); + + adder.add("body", function.body()) + .add("language", function.language()) + .add("return_type", function.returnType().asCQL3Type().toString()) + .add("called_on_null_input", function.isCalledOnNullInput()) + .frozenList("argument_names", function.argNames().stream().map((c) -> bbToString(c.bytes)).collect(toList())); + + adder.build(); + } + + private static List<String> functionArgumentsList(AbstractFunction fun) + { + return fun.argTypes() + .stream() + .map(AbstractType::asCQL3Type) + .map(CQL3Type::toString) + .collect(toList()); + } + + public static Mutation makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionArgumentsList(function)); + } + + public static Mutation makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addAggregateToSchemaMutation(aggregate, timestamp, mutation); + return mutation; + } + + static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = + new RowUpdateBuilder(Aggregates, timestamp, mutation) .clustering(aggregate.name().name, functionArgumentsList(aggregate)); + - CQL3Type stateCqlType = aggregate.stateType().asCQL3Type(); + adder.add("return_type", aggregate.returnType().asCQL3Type().toString()) + .add("state_func", aggregate.stateFunction().name().name) - .add("state_type", aggregate.stateType() != null ? stateCqlType.toString() : null) ++ .add("state_type", aggregate.stateType().asCQL3Type().toString()) + .add("final_func", aggregate.finalFunction() != null ? aggregate.finalFunction().name().name : null) - .add("initcond", aggregate.initialCondition() != null ? stateCqlType.asCQLLiteral(aggregate.initialCondition(), Server.CURRENT_VERSION) : null) ++ .add("initcond", aggregate.initialCondition() != null ++ // must use the frozen state type here, as 'null' for unfrozen collections may mean 'empty' ++ ? aggregate.stateType().freeze().asCQL3Type().asCQLLiteral(aggregate.initialCondition(), Server.CURRENT_VERSION) ++ : null) + .build(); + } + + public static Mutation makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionArgumentsList(aggregate)); + } + + /* + * Fetching schema + */ + + public static Keyspaces fetchNonSystemKeyspaces() + { + return fetchKeyspacesWithout(Schema.SYSTEM_KEYSPACE_NAMES); + } + + private static Keyspaces fetchKeyspacesWithout(Set<String> excludedKeyspaceNames) + { + String query = format("SELECT keyspace_name FROM %s.%s", NAME, KEYSPACES); + + Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder(); + for (UntypedResultSet.Row row : query(query)) + { + String keyspaceName = row.getString("keyspace_name"); + if (!excludedKeyspaceNames.contains(keyspaceName)) + keyspaces.add(fetchKeyspace(keyspaceName)); + } + return keyspaces.build(); + } + + private static Keyspaces fetchKeyspacesOnly(Set<String> includedKeyspaceNames) + { + /* + * We know the keyspace names we are going to query, but we still want to run the SELECT IN + * query, to filter out the keyspaces that had been dropped by the applied mutation set. + */ + String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", NAME, KEYSPACES); + + Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder(); + for (UntypedResultSet.Row row : query(query, new ArrayList<>(includedKeyspaceNames))) + keyspaces.add(fetchKeyspace(row.getString("keyspace_name"))); + return keyspaces.build(); + } + + private static KeyspaceMetadata fetchKeyspace(String keyspaceName) + { + KeyspaceParams params = fetchKeyspaceParams(keyspaceName); + Types types = fetchTypes(keyspaceName); + Tables tables = fetchTables(keyspaceName, types); + Views views = fetchViews(keyspaceName, types); + Functions functions = fetchFunctions(keyspaceName, types); + return KeyspaceMetadata.create(keyspaceName, params, tables, views, types, functions); + } + + private static KeyspaceParams fetchKeyspaceParams(String keyspaceName) + { + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, KEYSPACES); + + UntypedResultSet.Row row = query(query, keyspaceName).one(); + boolean durableWrites = row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString()); + Map<String, String> replication = row.getFrozenTextMap(KeyspaceParams.Option.REPLICATION.toString()); + return KeyspaceParams.create(durableWrites, replication); + } + + private static Types fetchTypes(String keyspaceName) + { + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, TYPES); + + Types.RawBuilder types = org.apache.cassandra.schema.Types.rawBuilder(keyspaceName); + for (UntypedResultSet.Row row : query(query, keyspaceName)) + { + String name = row.getString("type_name"); + List<String> fieldNames = row.getFrozenList("field_names", UTF8Type.instance); + List<String> fieldTypes = row.getFrozenList("field_types", UTF8Type.instance); + types.add(name, fieldNames, fieldTypes); + } + return types.build(); + } + + private static Tables fetchTables(String keyspaceName, Types types) + { + String query = format("SELECT table_name FROM %s.%s WHERE keyspace_name = ?", NAME, TABLES); + + Tables.Builder tables = org.apache.cassandra.schema.Tables.builder(); + for (UntypedResultSet.Row row : query(query, keyspaceName)) + tables.add(fetchTable(keyspaceName, row.getString("table_name"), types)); + return tables.build(); + } + + private static CFMetaData fetchTable(String keyspaceName, String tableName, Types types) + { + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, TABLES); + UntypedResultSet rows = query(query, keyspaceName, tableName); + if (rows.isEmpty()) + throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, tableName)); + UntypedResultSet.Row row = rows.one(); + + UUID id = row.getUUID("id"); + + Set<CFMetaData.Flag> flags = CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance)); + + boolean isSuper = flags.contains(CFMetaData.Flag.SUPER); + boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER); + boolean isDense = flags.contains(CFMetaData.Flag.DENSE); + boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND); + + List<ColumnDefinition> columns = fetchColumns(keyspaceName, tableName, types); + Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, tableName); + Indexes indexes = fetchIndexes(keyspaceName, tableName); + Triggers triggers = fetchTriggers(keyspaceName, tableName); + + return CFMetaData.create(keyspaceName, + tableName, + id, + isDense, + isCompound, + isSuper, + isCounter, + false, + columns, + DatabaseDescriptor.getPartitioner()) + .params(createTableParamsFromRow(row)) + .droppedColumns(droppedColumns) + .indexes(indexes) + .triggers(triggers); + } + + public static TableParams createTableParamsFromRow(UntypedResultSet.Row row) + { + return TableParams.builder() + .bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance")) + .caching(CachingParams.fromMap(row.getFrozenTextMap("caching"))) + .comment(row.getString("comment")) + .compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction"))) + .compression(CompressionParams.fromMap(row.getFrozenTextMap("compression"))) + .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance")) + .defaultTimeToLive(row.getInt("default_time_to_live")) + .extensions(row.getFrozenMap("extensions", UTF8Type.instance, BytesType.instance)) + .gcGraceSeconds(row.getInt("gc_grace_seconds")) + .maxIndexInterval(row.getInt("max_index_interval")) + .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms")) + .minIndexInterval(row.getInt("min_index_interval")) + .readRepairChance(row.getDouble("read_repair_chance")) + .crcCheckChance(row.getDouble("crc_check_chance")) + .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))) + .build(); + } + + private static List<ColumnDefinition> fetchColumns(String keyspace, String table, Types types) + { + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, COLUMNS); + List<ColumnDefinition> columns = new ArrayList<>(); + query(query, keyspace, table).forEach(row -> columns.add(createColumnFromRow(row, types))); + return columns; + } + + public static ColumnDefinition createColumnFromRow(UntypedResultSet.Row row, Types types) + { + String keyspace = row.getString("keyspace_name"); + String table = row.getString("table_name"); + + ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name")); + + ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase()); + + int position = row.getInt("position"); + ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase()); + + AbstractType<?> type = parse(keyspace, row.getString("type"), types); + if (order == ClusteringOrder.DESC) + type = ReversedType.getInstance(type); + + return new ColumnDefinition(keyspace, table, name, type, position, kind); + } + + private static Map<ByteBuffer, CFMetaData.DroppedColumn> fetchDroppedColumns(String keyspace, String table) + { + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, DROPPED_COLUMNS); + Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>(); + for (UntypedResultSet.Row row : query(query, keyspace, table)) + { + CFMetaData.DroppedColumn column = createDroppedColumnFromRow(row); + columns.put(UTF8Type.instance.decompose(column.name), column); + } + return columns; + } + + private static CFMetaData.DroppedColumn createDroppedColumnFromRow(UntypedResultSet.Row row) + { + String keyspace = row.getString("keyspace_name"); + String name = row.getString("column_name"); + /* + * we never store actual UDT names in dropped column types (so that we can safely drop types if nothing refers to + * them anymore), so before storing dropped columns in schema we expand UDTs to tuples. See expandUserTypes method. + * Because of that, we can safely pass Types.none() to parse() + */ + AbstractType<?> type = parse(keyspace, row.getString("type"), org.apache.cassandra.schema.Types.none()); + long droppedTime = TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time")); + return new CFMetaData.DroppedColumn(name, type, droppedTime); + } + + private static Indexes fetchIndexes(String keyspace, String table) + { + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, INDEXES); + Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder(); + query(query, keyspace, table).forEach(row -> indexes.add(createIndexMetadataFromRow(row))); + return indexes.build(); + } + + private static IndexMetadata createIndexMetadataFromRow(UntypedResultSet.Row row) + { + String name = row.getString("index_name"); + IndexMetadata.Kind type = IndexMetadata.Kind.valueOf(row.getString("kind")); + Map<String, String> options = row.getFrozenTextMap("options"); + return IndexMetadata.fromSchemaMetadata(name, type, options); + } + + private static Triggers fetchTriggers(String keyspace, String table) + { + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, TRIGGERS); + Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder(); + query(query, keyspace, table).forEach(row -> triggers.add(createTriggerFromRow(row))); + return triggers.build(); + } + + private static TriggerMetadata createTriggerFromRow(UntypedResultSet.Row row) + { + String name = row.getString("trigger_name"); + String classOption = row.getFrozenTextMap("options").get("class"); + return new TriggerMetadata(name, classOption); + } + + private static Views fetchViews(String keyspaceName, Types types) + { + String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name = ?", NAME, VIEWS); + + Views.Builder views = org.apache.cassandra.schema.Views.builder(); + for (UntypedResultSet.Row row : query(query, keyspaceName)) + views.add(fetchView(keyspaceName, row.getString("view_name"), types)); + return views.build(); + } + + private static ViewDefinition fetchView(String keyspaceName, String viewName, Types types) + { + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEWS); + UntypedResultSet rows = query(query, keyspaceName, viewName); + if (rows.isEmpty()) + throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, viewName)); + UntypedResultSet.Row row = rows.one(); + + UUID id = row.getUUID("id"); + UUID baseTableId = row.getUUID("base_table_id"); + String baseTableName = row.getString("base_table_name"); + boolean includeAll = row.getBoolean("include_all_columns"); + String whereClause = row.getString("where_clause"); + + List<ColumnDefinition> columns = fetchColumns(keyspaceName, viewName, types); + + Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, viewName); + + CFMetaData cfm = CFMetaData.create(keyspaceName, + viewName, + id, + false, + true, + false, + false, + true, + columns, + DatabaseDescriptor.getPartitioner()) + .params(createTableParamsFromRow(row)) + .droppedColumns(droppedColumns); + + String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause); + SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect); + + return new ViewDefinition(keyspaceName, viewName, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm); + } + + private static Functions fetchFunctions(String keyspaceName, Types types) + { + Functions udfs = fetchUDFs(keyspaceName, types); + Functions udas = fetchUDAs(keyspaceName, udfs, types); + + return org.apache.cassandra.schema.Functions.builder() + .add(udfs) + .add(udas) + .build(); + } + + private static Functions fetchUDFs(String keyspaceName, Types types) + { + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, FUNCTIONS); + + Functions.Builder functions = org.apache.cassandra.schema.Functions.builder(); + for (UntypedResultSet.Row row : query(query, keyspaceName)) + functions.add(createUDFFromRow(row, types)); + return functions.build(); + } + + private static UDFunction createUDFFromRow(UntypedResultSet.Row row, Types types) + { + String ksName = row.getString("keyspace_name"); + String functionName = row.getString("function_name"); + FunctionName name = new FunctionName(ksName, functionName); + + List<ColumnIdentifier> argNames = new ArrayList<>(); + for (String arg : row.getFrozenList("argument_names", UTF8Type.instance)) + argNames.add(new ColumnIdentifier(arg, true)); + + List<AbstractType<?>> argTypes = new ArrayList<>(); + for (String type : row.getFrozenList("argument_types", UTF8Type.instance)) + argTypes.add(parse(ksName, type, types)); + + AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types); + + String language = row.getString("language"); + String body = row.getString("body"); + boolean calledOnNullInput = row.getBoolean("called_on_null_input"); + + org.apache.cassandra.cql3.functions.Function existing = Schema.instance.findFunction(name, argTypes).orElse(null); + if (existing instanceof UDFunction) + { + // This check prevents duplicate compilation of effectively the same UDF. + // Duplicate compilation attempts can occur on the coordinator node handling the CREATE FUNCTION + // statement, since CreateFunctionStatement needs to execute UDFunction.create but schema migration + // also needs that (since it needs to handle its own change). + UDFunction udf = (UDFunction) existing; + if (udf.argNames().equals(argNames) && // arg types checked in Functions.find call + udf.returnType().equals(returnType) && + !udf.isAggregate() && + udf.language().equals(language) && + udf.body().equals(body) && + udf.isCalledOnNullInput() == calledOnNullInput) + { + logger.trace("Skipping duplicate compilation of already existing UDF {}", name); + return udf; + } + } + + try + { + return UDFunction.create(name, argNames, argTypes, returnType, calledOnNullInput, language, body); + } + catch (InvalidRequestException e) + { + logger.error(String.format("Cannot load function '%s' from schema: this function won't be available (on this node)", name), e); + return UDFunction.createBrokenFunction(name, argNames, argTypes, returnType, calledOnNullInput, language, body, e); + } + } + + private static Functions fetchUDAs(String keyspaceName, Functions udfs, Types types) + { + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, AGGREGATES); + + Functions.Builder aggregates = org.apache.cassandra.schema.Functions.builder(); + for (UntypedResultSet.Row row : query(query, keyspaceName)) + aggregates.add(createUDAFromRow(row, udfs, types)); + return aggregates.build(); + } + + private static UDAggregate createUDAFromRow(UntypedResultSet.Row row, Functions functions, Types types) + { + String ksName = row.getString("keyspace_name"); + String functionName = row.getString("aggregate_name"); + FunctionName name = new FunctionName(ksName, functionName); + + List<AbstractType<?>> argTypes = + row.getFrozenList("argument_types", UTF8Type.instance) + .stream() + .map(t -> parse(ksName, t, types)) + .collect(toList()); + + AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types); + + FunctionName stateFunc = new FunctionName(ksName, (row.getString("state_func"))); + FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null; + AbstractType<?> stateType = row.has("state_type") ? parse(ksName, row.getString("state_type"), types) : null; + ByteBuffer initcond = row.has("initcond") ? Terms.asBytes(ksName, row.getString("initcond"), stateType) : null; + + try + { + return UDAggregate.create(functions, name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond); + } + catch (InvalidRequestException reason) + { + return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason); + } + } + + private static UntypedResultSet query(String query, Object... variables) + { + return executeInternal(query, variables); + } + + /* + * Merging schema + */ + + /** + * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects + * (which also involves fs operations on add/drop ks/cf) + * + * @param mutations the schema changes to apply + * + * @throws ConfigurationException If one of metadata attributes has invalid value + */ + public static synchronized void mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws ConfigurationException + { + mergeSchema(mutations); + Schema.instance.updateVersionAndAnnounce(); + } + + public static synchronized void mergeSchema(Collection<Mutation> mutations) + { + // only compare the keyspaces affected by this set of schema mutations + Set<String> affectedKeyspaces = + mutations.stream() + .map(m -> UTF8Type.instance.compose(m.key().getKey())) + .collect(Collectors.toSet()); + + // fetch the current state of schema for the affected keyspaces only + Keyspaces before = Schema.instance.getKeyspaces(affectedKeyspaces); + + // apply the schema mutations and flush + mutations.forEach(Mutation::apply); + if (FLUSH_SCHEMA_TABLES) + flush(); + + // fetch the new state of schema from schema tables (not applied to Schema.instance yet) + Keyspaces after = fetchKeyspacesOnly(affectedKeyspaces); + + // deal with the diff + MapDifference<String, KeyspaceMetadata> keyspacesDiff = before.diff(after); + + // dropped keyspaces + for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnLeft().values()) + { + keyspace.functions.udas().forEach(Schema.instance::dropAggregate); + keyspace.functions.udfs().forEach(Schema.instance::dropFunction); + keyspace.views.forEach(v -> Schema.instance.dropView(v.ksName, v.viewName)); + keyspace.tables.forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName)); + keyspace.types.forEach(Schema.instance::dropType); + Schema.instance.dropKeyspace(keyspace.name); + } + + // new keyspaces + for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnRight().values()) + { + Schema.instance.addKeyspace(KeyspaceMetadata.create(keyspace.name, keyspace.params)); + keyspace.types.forEach(Schema.instance::addType); + keyspace.tables.forEach(Schema.instance::addTable); + keyspace.views.forEach(Schema.instance::addView); + keyspace.functions.udfs().forEach(Schema.instance::addFunction); + keyspace.functions.udas().forEach(Schema.instance::addAggregate); + } + + // updated keyspaces + for (Map.Entry<String, MapDifference.ValueDifference<KeyspaceMetadata>> diff : keyspacesDiff.entriesDiffering().entrySet()) + updateKeyspace(diff.getKey(), diff.getValue().leftValue(), diff.getValue().rightValue()); + } + + private static void updateKeyspace(String keyspaceName, KeyspaceMetadata keyspaceBefore, KeyspaceMetadata keyspaceAfter) + { + // calculate the deltas + MapDifference<String, CFMetaData> tablesDiff = keyspaceBefore.tables.diff(keyspaceAfter.tables); + MapDifference<String, ViewDefinition> viewsDiff = keyspaceBefore.views.diff(keyspaceAfter.views); + MapDifference<ByteBuffer, UserType> typesDiff = keyspaceBefore.types.diff(keyspaceAfter.types); + + Map<Pair<FunctionName, List<String>>, UDFunction> udfsBefore = new HashMap<>(); + keyspaceBefore.functions.udfs().forEach(f -> udfsBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f)); + Map<Pair<FunctionName, List<String>>, UDFunction> udfsAfter = new HashMap<>(); + keyspaceAfter.functions.udfs().forEach(f -> udfsAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f)); + MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = Maps.difference(udfsBefore, udfsAfter); + + Map<Pair<FunctionName, List<String>>, UDAggregate> udasBefore = new HashMap<>(); + keyspaceBefore.functions.udas().forEach(f -> udasBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f)); + Map<Pair<FunctionName, List<String>>, UDAggregate> udasAfter = new HashMap<>(); + keyspaceAfter.functions.udas().forEach(f -> udasAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f)); + MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff = Maps.difference(udasBefore, udasAfter); + + // update keyspace params, if changed + if (!keyspaceBefore.params.equals(keyspaceAfter.params)) + Schema.instance.updateKeyspace(keyspaceName, keyspaceAfter.params); + + // drop everything removed + udasDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropAggregate); + udfsDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropFunction); + viewsDiff.entriesOnlyOnLeft().values().forEach(v -> Schema.instance.dropView(v.ksName, v.viewName)); + tablesDiff.entriesOnlyOnLeft().values().forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName)); + typesDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropType); + + // add everything created + typesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addType); + tablesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addTable); + viewsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addView); + udfsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addFunction); + udasDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addAggregate); + + // update everything altered + for (MapDifference.ValueDifference<UserType> diff : typesDiff.entriesDiffering().values()) + Schema.instance.updateType(diff.rightValue()); + for (MapDifference.ValueDifference<CFMetaData> diff : tablesDiff.entriesDiffering().values()) + Schema.instance.updateTable(diff.rightValue()); + for (MapDifference.ValueDifference<ViewDefinition> diff : viewsDiff.entriesDiffering().values()) + Schema.instance.updateView(diff.rightValue()); + for (MapDifference.ValueDifference<UDFunction> diff : udfsDiff.entriesDiffering().values()) + Schema.instance.updateFunction(diff.rightValue()); + for (MapDifference.ValueDifference<UDAggregate> diff : udasDiff.entriesDiffering().values()) + Schema.instance.updateAggregate(diff.rightValue()); + } + + /* + * Type parsing and transformation + */ + + /* + * Recursively replaces any instances of UserType with an equivalent TupleType. + * We do it for dropped_columns, to allow safely dropping unused user types without retaining any references + * in dropped_columns. + */ + private static AbstractType<?> expandUserTypes(AbstractType<?> original) + { + if (original instanceof UserType) + return new TupleType(expandUserTypes(((UserType) original).fieldTypes())); + + if (original instanceof TupleType) + return new TupleType(expandUserTypes(((TupleType) original).allTypes())); + + if (original instanceof ListType<?>) + return ListType.getInstance(expandUserTypes(((ListType<?>) original).getElementsType()), original.isMultiCell()); + + if (original instanceof MapType<?,?>) + { + MapType<?, ?> mt = (MapType<?, ?>) original; + return MapType.getInstance(expandUserTypes(mt.getKeysType()), expandUserTypes(mt.getValuesType()), mt.isMultiCell()); + } + + if (original instanceof SetType<?>) + return SetType.getInstance(expandUserTypes(((SetType<?>) original).getElementsType()), original.isMultiCell()); + + // this is very unlikely to ever happen, but it's better to be safe than sorry + if (original instanceof ReversedType<?>) + return ReversedType.getInstance(expandUserTypes(((ReversedType) original).baseType)); + + if (original instanceof CompositeType) + return CompositeType.getInstance(expandUserTypes(original.getComponents())); + + return original; + } + + private static List<AbstractType<?>> expandUserTypes(List<AbstractType<?>> types) + { + return types.stream() + .map(SchemaKeyspace::expandUserTypes) + .collect(toList()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0f2266b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java index 9c05232,e7f47a2..c903127 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java @@@ -1714,110 -1686,57 +1714,168 @@@ public class AggregationTest extends CQ } @Test - public void testEmptyListInitcond() throws Throwable ++ public void testEmptyListAndNullInitcond() throws Throwable + { + String f = createFunction(KEYSPACE, + "list, int", + "CREATE FUNCTION %s(s list<text>, i int) " + + "CALLED ON NULL INPUT " + + "RETURNS list<text> " + + "LANGUAGE java " + + "AS 'if (i != null) s.add(String.valueOf(i)); return s;'"); + + String a = createAggregate(KEYSPACE, + "int", + "CREATE AGGREGATE %s(int) " + + "SFUNC " + shortFunctionName(f) + ' ' + + "STYPE list<text> " + + "INITCOND [ ]"); + + assertRows(execute("SELECT initcond FROM system_schema.aggregates WHERE keyspace_name=? AND aggregate_name=?", KEYSPACE, shortFunctionName(a)), + row("[]")); + + createTable("CREATE TABLE %s (a int primary key, b int)"); + execute("INSERT INTO %s (a, b) VALUES (1, 1)"); + execute("INSERT INTO %s (a, b) VALUES (2, null)"); + execute("INSERT INTO %s (a, b) VALUES (3, 2)"); + assertRows(execute("SELECT " + a + "(b) FROM %s"), row(Arrays.asList("1", "2"))); + } + + @Test + public void testLogbackReload() throws Throwable + { + // see https://issues.apache.org/jira/browse/CASSANDRA-11033 + + // make logback's scan interval 1ms - boilerplate, but necessary for this test + configureLogbackScanPeriod(1L); + try + { + + createTable("CREATE TABLE %s (" + + " year int PRIMARY KEY," + + " country text," + + " title text)"); + + String[] countries = Locale.getISOCountries(); + ThreadLocalRandom rand = ThreadLocalRandom.current(); + for (int i = 0; i < 10000; i++) + { + execute("INSERT INTO %s (year, country, title) VALUES (1980,?,?)", + countries[rand.nextInt(countries.length)], + "title-" + i); + } + + String albumCountByCountry = createFunction(KEYSPACE, + "map<text,bigint>,text,text", + "CREATE FUNCTION IF NOT EXISTS %s(state map<text,bigint>,country text, album_title text)\n" + + " RETURNS NULL ON NULL INPUT\n" + + " RETURNS map<text,bigint>\n" + + " LANGUAGE java\n" + + " AS $$\n" + + " if(state.containsKey(country)) {\n" + + " Long newCount = (Long)state.get(country) + 1;\n" + + " state.put(country, newCount);\n" + + " } else {\n" + + " state.put(country, 1L);\n" + + " }\n" + + " return state;\n" + + " $$;"); + + String releasesByCountry = createAggregate(KEYSPACE, + "text, text", + " CREATE AGGREGATE IF NOT EXISTS %s(text, text)\n" + + " SFUNC " + shortFunctionName(albumCountByCountry) + '\n' + + " STYPE map<text,bigint>\n" + + " INITCOND { };"); + + for (int i = 0; i < 1000; i++) + { + execute("SELECT " + releasesByCountry + "(country,title) FROM %s WHERE year=1980"); + } + } + finally + { + configureLogbackScanPeriod(60000L); + } + } + + private static void configureLogbackScanPeriod(long millis) + { + Logger l = LoggerFactory.getLogger(AggregationTest.class); + ch.qos.logback.classic.Logger logbackLogger = (ch.qos.logback.classic.Logger) l; + LoggerContext ctx = logbackLogger.getLoggerContext(); + TurboFilterList turboFilterList = ctx.getTurboFilterList(); + boolean done = false; + for (TurboFilter turboFilter : turboFilterList) + { + if (turboFilter instanceof ReconfigureOnChangeFilter) + { + ReconfigureOnChangeFilter reconfigureFilter = (ReconfigureOnChangeFilter) turboFilter; + reconfigureFilter.setRefreshPeriod(millis); + reconfigureFilter.stop(); + reconfigureFilter.start(); // start() sets the next check timestammp + done = true; + break; + } + } + assertTrue("ReconfigureOnChangeFilter not in logback's turbo-filter list - do that by adding scan=\"true\" to logback-test.xml's configuration element", done); + } ++ ++ @Test + public void testOrReplaceOptionals() throws Throwable + { + String fState = createFunction(KEYSPACE, + "list<text>, int", + "CREATE FUNCTION %s(s list<text>, i int) " + + "CALLED ON NULL INPUT " + + "RETURNS list<text> " + + "LANGUAGE java " + + "AS 'if (i != null) s.add(String.valueOf(i)); return s;'"); + + String fFinal = shortFunctionName(createFunction(KEYSPACE, + "list<text>", + "CREATE FUNCTION %s(s list<text>) " + + "CALLED ON NULL INPUT " + + "RETURNS list<text> " + + "LANGUAGE java " + + "AS 'return s;'")); + + String a = createAggregate(KEYSPACE, + "int", + "CREATE AGGREGATE %s(int) " + + "SFUNC " + shortFunctionName(fState) + ' ' + + "STYPE list<text> "); + + checkOptionals(a, null, null); + + String ddlPrefix = "CREATE OR REPLACE AGGREGATE " + a + "(int) " + + "SFUNC " + shortFunctionName(fState) + ' ' + + "STYPE list<text> "; + + // Test replacing INITCOND - for (String condition : new String[]{"", "INITCOND null"}) - { - execute(ddlPrefix + "INITCOND [ ] "); - checkOptionals(a, null, ByteBuffer.allocate(4)); ++ execute(ddlPrefix + "INITCOND [ ] "); ++ checkOptionals(a, null, "[]"); + - execute(ddlPrefix + condition); - checkOptionals(a, null, null); - } ++ execute(ddlPrefix); ++ checkOptionals(a, null, null); ++ ++ execute(ddlPrefix + "INITCOND [ ] "); ++ checkOptionals(a, null, "[]"); ++ ++ execute(ddlPrefix + "INITCOND null"); ++ checkOptionals(a, null, null); + + // Test replacing FINALFUNC - execute(ddlPrefix + "FINALFUNC " + shortFunctionName(fFinal) + " "); ++ execute(ddlPrefix + "FINALFUNC " + shortFunctionName(fFinal) + ' '); + checkOptionals(a, shortFunctionName(fFinal), null); + + execute(ddlPrefix); + checkOptionals(a, null, null); + } + - private void checkOptionals(String aggregateName, String finalFunc, ByteBuffer initCond) throws Throwable ++ private void checkOptionals(String aggregateName, String finalFunc, String initCond) throws Throwable + { - assertRows(execute("SELECT final_func, initcond FROM system.schema_aggregates WHERE keyspace_name=? AND aggregate_name=?", KEYSPACE, shortFunctionName(aggregateName)), ++ assertRows(execute("SELECT final_func, initcond FROM system_schema.aggregates WHERE keyspace_name=? AND aggregate_name=?", KEYSPACE, shortFunctionName(aggregateName)), + row(finalFunc, initCond)); + } }