This is an automated email from the ASF dual-hosted git repository. bereng pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push: new fa532a6 Prevent broken concurrent schema read/writes fa532a6 is described below commit fa532a61f810b428ccfdf4964684794a7fc0e885 Author: Bereng <berenguerbl...@gmail.com> AuthorDate: Wed Oct 20 10:44:50 2021 +0200 Prevent broken concurrent schema read/writes patch by Berenguer Blasi; reviewed by Caleb Rackliffe for CASSANDRA-16996 Co-authored-by: Berenguer Blasi <berenguerbl...@gmail.com> Co-authored-by: Caleb Rackliffe <calebrackli...@gmail.com> --- src/java/org/apache/cassandra/db/Keyspace.java | 2 +- .../apache/cassandra/schema/SchemaKeyspace.java | 175 +++++++++++---------- .../cassandra/schema/SchemaKeyspaceTables.java | 77 +++++++++ .../org/apache/cassandra/service/ClientState.java | 16 +- .../cassandra/utils/NativeSSTableLoaderClient.java | 41 +++-- .../apache/cassandra/config/CFMetaDataTest.java | 8 +- .../cassandra/cql3/PstmtPersistenceTest.java | 8 +- test/unit/org/apache/cassandra/cql3/ViewTest.java | 3 +- .../cql3/validation/operations/AlterTest.java | 14 +- .../cql3/validation/operations/CreateTest.java | 20 +-- .../operations/InsertUpdateIfConditionTest.java | 11 +- .../cassandra/schema/SchemaKeyspaceTest.java | 91 ++++++++++- .../service/StorageServiceServerTest.java | 17 +- 13 files changed, 340 insertions(+), 143 deletions(-) diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 5e39823..eb3de5a 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -452,7 +452,7 @@ public class Keyspace /** * If apply is blocking, apply must not be deferred - * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending + * Otherwise there is a race condition where ALL mutation workers are being blocked ending * in a complete deadlock of the mutation stage. See CASSANDRA-12689. * * @param mutation the row to write. Must not be modified after calling apply, since commitlog append diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 7dc6b23..6b0089f 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -21,40 +21,82 @@ import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.*; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.*; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.CFMetaData.DroppedColumn; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder; -import org.apache.cassandra.cql3.*; -import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.config.ViewDefinition; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.FieldIdentifier; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.Terms; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.cql3.functions.AbstractFunction; +import org.apache.cassandra.cql3.functions.FunctionName; +import org.apache.cassandra.cql3.functions.UDAggregate; +import org.apache.cassandra.cql3.functions.UDFunction; import org.apache.cassandra.cql3.statements.SelectStatement; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.db.partitions.*; -import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.marshal.ListType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.db.marshal.ReversedType; +import org.apache.cassandra.db.marshal.SetType; +import org.apache.cassandra.db.marshal.TupleType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.RowIterators; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.view.View; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.service.PendingRangeCalculatorService; -import org.apache.cassandra.transport.Server; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import static java.lang.String.format; - import static java.util.stream.Collectors.toList; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; @@ -62,7 +104,7 @@ import static org.apache.cassandra.schema.CQLTypeParser.parse; /** * system_schema.* tables and methods for manipulating them. - */ +*/ public final class SchemaKeyspace { private SchemaKeyspace() @@ -74,49 +116,14 @@ public final class SchemaKeyspace private static final boolean FLUSH_SCHEMA_TABLES = Boolean.parseBoolean(System.getProperty("cassandra.test.flush_local_schema_changes", "true")); private static final boolean IGNORE_CORRUPTED_SCHEMA_TABLES = Boolean.parseBoolean(System.getProperty("cassandra.ignore_corrupted_schema_tables", "false")); - public static final String KEYSPACES = "keyspaces"; - public static final String TABLES = "tables"; - public static final String COLUMNS = "columns"; - public static final String DROPPED_COLUMNS = "dropped_columns"; - public static final String TRIGGERS = "triggers"; - public static final String VIEWS = "views"; - public static final String TYPES = "types"; - public static final String FUNCTIONS = "functions"; - public static final String AGGREGATES = "aggregates"; - public static final String INDEXES = "indexes"; - - /** - * The order in this list matters. - * - * When flushing schema tables, we want to flush them in a way that mitigates the effects of an abrupt shutdown whilst - * the tables are being flushed. On startup, we load the schema from disk before replaying the CL, so we need to - * try to avoid problems like reading a table without columns or types, for example. So columns and types should be - * flushed before tables, which should be flushed before keyspaces. - * - * When truncating, the order should be reversed. For immutable lists this is an efficient operation that simply - * iterates in reverse order. - * - * See CASSANDRA-12213 for more details. - */ - public static final ImmutableList<String> ALL = - ImmutableList.of(COLUMNS, DROPPED_COLUMNS, TRIGGERS, TYPES, FUNCTIONS, AGGREGATES, INDEXES, TABLES, VIEWS, KEYSPACES); - /** * The tables to which we added the cdc column. This is used in {@link #makeUpdateForSchema} below to make sure we skip that * column is cdc is disabled as the columns breaks pre-cdc to post-cdc upgrades (typically, 3.0 -> 3.X). */ - private static final Set<String> TABLES_WITH_CDC_ADDED = ImmutableSet.of(TABLES, VIEWS); - - - /** - * Until we upgrade the messaging service version, that is version 4.0, we must preserve the old order (before CASSANDRA-12213) - * for digest calculations, otherwise the nodes will never agree on the schema during a rolling upgrade, see CASSANDRA-13559. - */ - public static final ImmutableList<String> ALL_FOR_DIGEST = - ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES); + private static final Set<String> TABLES_WITH_CDC_ADDED = ImmutableSet.of(SchemaKeyspaceTables.TABLES, SchemaKeyspaceTables.VIEWS); private static final CFMetaData Keyspaces = - compile(KEYSPACES, + compile(SchemaKeyspaceTables.KEYSPACES, "keyspace definitions", "CREATE TABLE %s (" + "keyspace_name text," @@ -125,7 +132,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name)))"); private static final CFMetaData Tables = - compile(TABLES, + compile(SchemaKeyspaceTables.TABLES, "table definitions", "CREATE TABLE %s (" + "keyspace_name text," @@ -151,7 +158,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name), table_name))"); private static final CFMetaData Columns = - compile(COLUMNS, + compile(SchemaKeyspaceTables.COLUMNS, "column definitions", "CREATE TABLE %s (" + "keyspace_name text," @@ -165,7 +172,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); private static final CFMetaData DroppedColumns = - compile(DROPPED_COLUMNS, + compile(SchemaKeyspaceTables.DROPPED_COLUMNS, "dropped column registry", "CREATE TABLE %s (" + "keyspace_name text," @@ -177,7 +184,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); private static final CFMetaData Triggers = - compile(TRIGGERS, + compile(SchemaKeyspaceTables.TRIGGERS, "trigger definitions", "CREATE TABLE %s (" + "keyspace_name text," @@ -187,7 +194,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))"); private static final CFMetaData Views = - compile(VIEWS, + compile(SchemaKeyspaceTables.VIEWS, "view definitions", "CREATE TABLE %s (" + "keyspace_name text," @@ -216,7 +223,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name), view_name))"); private static final CFMetaData Indexes = - compile(INDEXES, + compile(SchemaKeyspaceTables.INDEXES, "secondary index definitions", "CREATE TABLE %s (" + "keyspace_name text," @@ -227,7 +234,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name), table_name, index_name))"); private static final CFMetaData Types = - compile(TYPES, + compile(SchemaKeyspaceTables.TYPES, "user defined type definitions", "CREATE TABLE %s (" + "keyspace_name text," @@ -237,7 +244,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name), type_name))"); private static final CFMetaData Functions = - compile(FUNCTIONS, + compile(SchemaKeyspaceTables.FUNCTIONS, "user defined function definitions", "CREATE TABLE %s (" + "keyspace_name text," @@ -251,7 +258,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name), function_name, argument_types))"); private static final CFMetaData Aggregates = - compile(AGGREGATES, + compile(SchemaKeyspaceTables.AGGREGATES, "user defined aggregate definitions", "CREATE TABLE %s (" + "keyspace_name text," @@ -282,7 +289,7 @@ public final class SchemaKeyspace /** * Add entries to system_schema.* for the hardcoded system keyspaces */ - public static void saveSystemKeyspacesSchema() + public static synchronized void saveSystemKeyspacesSchema() { KeyspaceMetadata system = Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME); KeyspaceMetadata schema = Schema.instance.getKSMetaData(SchemaConstants.SCHEMA_KEYSPACE_NAME); @@ -290,7 +297,7 @@ public final class SchemaKeyspace long timestamp = FBUtilities.timestampMicros(); // delete old, possibly obsolete entries in schema tables - for (String schemaTable : ALL) + for (String schemaTable : SchemaKeyspaceTables.ALL) { String query = String.format("DELETE FROM %s.%s USING TIMESTAMP ? WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, schemaTable); for (String systemKeyspace : SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES) @@ -302,15 +309,15 @@ public final class SchemaKeyspace makeCreateKeyspaceMutation(schema, timestamp + 1).build().apply(); } - public static void truncate() + public static synchronized void truncate() { - ALL.reverse().forEach(table -> getSchemaCFS(table).truncateBlocking()); + SchemaKeyspaceTables.ALL.reverse().forEach(table -> getSchemaCFS(table).truncateBlocking()); } static void flush() { if (!DatabaseDescriptor.isUnsafeSystem()) - ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush())); + SchemaKeyspaceTables.ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush())); } /** @@ -328,7 +335,7 @@ public final class SchemaKeyspace } @VisibleForTesting - static Pair<UUID, UUID> calculateSchemaDigest(Set<ByteBuffer> columnsToExclude) + static synchronized Pair<UUID, UUID> calculateSchemaDigest(Set<ByteBuffer> columnsToExclude) { MessageDigest digest; MessageDigest digest30; @@ -342,7 +349,7 @@ public final class SchemaKeyspace throw new RuntimeException(e); } - for (String table : ALL_FOR_DIGEST) + for (String table : SchemaKeyspaceTables.ALL_FOR_DIGEST) { ReadCommand cmd = getReadCommandForTableSchema(table); try (ReadExecutionController executionController = cmd.executionController(); @@ -387,7 +394,7 @@ public final class SchemaKeyspace { Map<DecoratedKey, Mutation> mutationMap = new HashMap<>(); - for (String table : ALL) + for (String table : SchemaKeyspaceTables.ALL) convertSchemaToMutations(mutationMap, table); return mutationMap.values(); @@ -929,7 +936,7 @@ public final class SchemaKeyspace private static Keyspaces fetchKeyspacesWithout(Set<String> excludedKeyspaceNames) { - String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES); + String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES); Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder(); for (UntypedResultSet.Row row : query(query)) @@ -947,7 +954,7 @@ public final class SchemaKeyspace * We know the keyspace names we are going to query, but we still want to run the SELECT IN * query, to filter out the keyspaces that had been dropped by the applied mutation set. */ - String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES); + String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES); Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder(); for (UntypedResultSet.Row row : query(query, new ArrayList<>(includedKeyspaceNames))) @@ -967,7 +974,7 @@ public final class SchemaKeyspace private static KeyspaceParams fetchKeyspaceParams(String keyspaceName) { - String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES); + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES); UntypedResultSet.Row row = query(query, keyspaceName).one(); boolean durableWrites = row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString()); @@ -977,7 +984,7 @@ public final class SchemaKeyspace private static Types fetchTypes(String keyspaceName) { - String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TYPES); + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TYPES); Types.RawBuilder types = org.apache.cassandra.schema.Types.rawBuilder(keyspaceName); for (UntypedResultSet.Row row : query(query, keyspaceName)) @@ -992,7 +999,7 @@ public final class SchemaKeyspace private static Tables fetchTables(String keyspaceName, Types types) { - String query = format("SELECT table_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES); + String query = format("SELECT table_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES); Tables.Builder tables = org.apache.cassandra.schema.Tables.builder(); for (UntypedResultSet.Row row : query(query, keyspaceName)) @@ -1010,10 +1017,10 @@ public final class SchemaKeyspace "\"DELETE FROM %s.%s WHERE keyspace_name = '%s' AND table_name = '%s'; " + "DELETE FROM %s.%s WHERE keyspace_name = '%s' AND table_name = '%s';\" " + "If the table is not supposed to be dropped, restore %s.%s sstables from backups.", - keyspaceName, tableName, SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS, - SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES, keyspaceName, tableName, - SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS, keyspaceName, tableName, - SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS); + keyspaceName, tableName, SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS, + SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES, keyspaceName, tableName, + SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS, keyspaceName, tableName, + SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS); if (IGNORE_CORRUPTED_SCHEMA_TABLES) { @@ -1031,7 +1038,7 @@ public final class SchemaKeyspace private static CFMetaData fetchTable(String keyspaceName, String tableName, Types types) { - String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES); + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES); UntypedResultSet rows = query(query, keyspaceName, tableName); if (rows.isEmpty()) throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, tableName)); @@ -1097,7 +1104,7 @@ public final class SchemaKeyspace private static List<ColumnDefinition> fetchColumns(String keyspace, String table, Types types) { - String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS); + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS); UntypedResultSet columnRows = query(query, keyspace, table); if (columnRows.isEmpty()) throw new MissingColumns("Columns not found in schema table for " + keyspace + "." + table); @@ -1132,7 +1139,7 @@ public final class SchemaKeyspace private static Map<ByteBuffer, CFMetaData.DroppedColumn> fetchDroppedColumns(String keyspace, String table) { - String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, DROPPED_COLUMNS); + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.DROPPED_COLUMNS); Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>(); for (UntypedResultSet.Row row : query(query, keyspace, table)) { @@ -1162,7 +1169,7 @@ public final class SchemaKeyspace private static Indexes fetchIndexes(String keyspace, String table) { - String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, INDEXES); + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.INDEXES); Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder(); query(query, keyspace, table).forEach(row -> indexes.add(createIndexMetadataFromRow(row))); return indexes.build(); @@ -1178,7 +1185,7 @@ public final class SchemaKeyspace private static Triggers fetchTriggers(String keyspace, String table) { - String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TRIGGERS); + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TRIGGERS); Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder(); query(query, keyspace, table).forEach(row -> triggers.add(createTriggerFromRow(row))); return triggers.build(); @@ -1193,7 +1200,7 @@ public final class SchemaKeyspace private static Views fetchViews(String keyspaceName, Types types) { - String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, VIEWS); + String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.VIEWS); Views.Builder views = org.apache.cassandra.schema.Views.builder(); for (UntypedResultSet.Row row : query(query, keyspaceName)) @@ -1203,7 +1210,7 @@ public final class SchemaKeyspace private static ViewDefinition fetchView(String keyspaceName, String viewName, Types types) { - String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, VIEWS); + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.VIEWS); UntypedResultSet rows = query(query, keyspaceName, viewName); if (rows.isEmpty()) throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, viewName)); @@ -1251,7 +1258,7 @@ public final class SchemaKeyspace private static Functions fetchUDFs(String keyspaceName, Types types) { - String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, FUNCTIONS); + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.FUNCTIONS); Functions.Builder functions = org.apache.cassandra.schema.Functions.builder(); for (UntypedResultSet.Row row : query(query, keyspaceName)) @@ -1312,7 +1319,7 @@ public final class SchemaKeyspace private static Functions fetchUDAs(String keyspaceName, Functions udfs, Types types) { - String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, AGGREGATES); + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.AGGREGATES); Functions.Builder aggregates = org.apache.cassandra.schema.Functions.builder(); for (UntypedResultSet.Row row : query(query, keyspaceName)) diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspaceTables.java b/src/java/org/apache/cassandra/schema/SchemaKeyspaceTables.java new file mode 100644 index 0000000..a1ae445 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspaceTables.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import com.google.common.collect.ImmutableList; + +public class SchemaKeyspaceTables +{ + public static final String KEYSPACES = "keyspaces"; + public static final String TABLES = "tables"; + public static final String COLUMNS = "columns"; + public static final String DROPPED_COLUMNS = "dropped_columns"; + public static final String TRIGGERS = "triggers"; + public static final String VIEWS = "views"; + public static final String TYPES = "types"; + public static final String FUNCTIONS = "functions"; + public static final String AGGREGATES = "aggregates"; + public static final String INDEXES = "indexes"; + + /** + * The order in this list matters. + * + * When flushing schema tables, we want to flush them in a way that mitigates the effects of an abrupt shutdown + * whilst the tables are being flushed. On startup, we load the schema from disk before replaying the CL, so we need + * to try to avoid problems like reading a table without columns or types, for example. So columns and types should + * be flushed before tables, which should be flushed before keyspaces. + * + * When truncating, the order should be reversed. For immutable lists this is an efficient operation that simply + * iterates in reverse order. + * + * See CASSANDRA-12213 for more details. + */ + public static final ImmutableList<String> ALL = ImmutableList.of(COLUMNS, + DROPPED_COLUMNS, + TRIGGERS, + TYPES, + FUNCTIONS, + AGGREGATES, + INDEXES, + TABLES, + VIEWS, + KEYSPACES); + + /** + * Until we upgrade the messaging service version, that is version 4.0, we must preserve the old order (before + * CASSANDRA-12213) for digest calculations, otherwise the nodes will never agree on the schema during a rolling + * upgrade, see CASSANDRA-13559. + */ + public static final ImmutableList<String> ALL_FOR_DIGEST = ImmutableList.of(KEYSPACES, + TABLES, + COLUMNS, + TRIGGERS, + VIEWS, + TYPES, + FUNCTIONS, + AGGREGATES, + INDEXES); + + private SchemaKeyspaceTables() + { + } +} diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index 2ab1e18..155fd69 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -27,7 +27,15 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.auth.*; +import org.apache.cassandra.auth.AuthenticatedUser; +import org.apache.cassandra.auth.CassandraAuthorizer; +import org.apache.cassandra.auth.CassandraRoleManager; +import org.apache.cassandra.auth.DataResource; +import org.apache.cassandra.auth.FunctionResource; +import org.apache.cassandra.auth.IResource; +import org.apache.cassandra.auth.PasswordAuthenticator; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.auth.Resources; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; @@ -40,11 +48,11 @@ import org.apache.cassandra.exceptions.AuthenticationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.UnauthorizedException; -import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.SchemaKeyspaceTables; import org.apache.cassandra.thrift.ThriftValidation; +import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.CassandraVersion; /** * State related to a client connection. @@ -64,7 +72,7 @@ public class ClientState for (String cf : Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.PEERS)) READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SYSTEM_KEYSPACE_NAME, cf)); - SchemaKeyspace.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table))); + SchemaKeyspaceTables.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table))); // neither clients nor tools need authentication/authorization if (DatabaseDescriptor.isDaemonInitialized()) diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java index 1f10d2b..3e17ff0 100644 --- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java +++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java @@ -19,23 +19,40 @@ package org.apache.cassandra.utils; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; -import com.datastax.driver.core.*; - +import com.datastax.driver.core.AuthProvider; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PlainTextAuthProvider; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SSLOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TokenRange; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.dht.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.ReversedType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.dht.Token.TokenFactory; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.schema.CQLTypeParser; -import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.SchemaKeyspaceTables; import org.apache.cassandra.schema.Types; public class NativeSSTableLoaderClient extends SSTableLoader.Client @@ -110,7 +127,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client private static Types fetchTypes(String keyspace, Session session) { - String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TYPES); + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TYPES); Types.RawBuilder types = Types.rawBuilder(keyspace); for (Row row : session.execute(query, keyspace)) @@ -135,7 +152,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client private static Map<String, CFMetaData> fetchTables(String keyspace, Session session, IPartitioner partitioner, Types types) { Map<String, CFMetaData> tables = new HashMap<>(); - String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES); + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES); for (Row row : session.execute(query, keyspace)) { @@ -152,7 +169,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client private static Map<String, CFMetaData> fetchViews(String keyspace, Session session, IPartitioner partitioner, Types types) { Map<String, CFMetaData> tables = new HashMap<>(); - String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.VIEWS); + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.VIEWS); for (Row row : session.execute(query, keyspace)) { @@ -181,7 +198,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.COLUMNS); + SchemaKeyspaceTables.COLUMNS); List<ColumnDefinition> defs = new ArrayList<>(); for (Row colRow : session.execute(columnsQuery, keyspace, name)) @@ -200,7 +217,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client String droppedColumnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.DROPPED_COLUMNS); + SchemaKeyspaceTables.DROPPED_COLUMNS); Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = new HashMap<>(); for (Row colRow : session.execute(droppedColumnsQuery, keyspace, name)) { diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java index 78b372e..2b0dfc0 100644 --- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java +++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java @@ -151,15 +151,15 @@ public class CFMetaDataTest // Test schema conversion Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, FBUtilities.timestampMicros()).build(); - PartitionUpdate cfU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES)); - PartitionUpdate cdU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS)); + PartitionUpdate cfU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES)); + PartitionUpdate cdU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS)); - UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES), + UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES), UnfilteredRowIterators.filter(cfU.unfilteredIterator(), FBUtilities.nowInSeconds())) .one(); TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow); - UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS), + UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS), UnfilteredRowIterators.filter(cdU.unfilteredIterator(), FBUtilities.nowInSeconds())); Set<ColumnDefinition> columns = new HashSet<>(); for (UntypedResultSet.Row row : columnsRows) diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java index 753d6ff..dd477ec 100644 --- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java +++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java @@ -31,13 +31,15 @@ import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.SchemaKeyspaceTables; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.MD5Digest; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; public class PstmtPersistenceTest extends CQLTester { @@ -67,7 +69,7 @@ public class PstmtPersistenceTest extends CQLTester String statement1 = "SELECT * FROM %s WHERE pk = ?"; String statement2 = "SELECT * FROM %s WHERE key = ?"; String statement3 = "SELECT * FROM %S WHERE key = ?"; - stmtIds.add(prepareStatement(statement0, SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES, clientState)); + stmtIds.add(prepareStatement(statement0, SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES, clientState)); stmtIds.add(prepareStatement(statement1, clientState)); stmtIds.add(prepareStatement(statement2, "foo", "bar", clientState)); clientState.setKeyspace("foo"); diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java index 4073a10..ba7eb2b 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java @@ -56,6 +56,7 @@ import org.apache.cassandra.db.view.View; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.SchemaKeyspaceTables; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.FBUtilities; @@ -1692,7 +1693,7 @@ public class ViewTest extends CQLTester // Test the where clause stored in system_schema.views String schemaQuery = String.format("SELECT where_clause FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.VIEWS); + SchemaKeyspaceTables.VIEWS); assertRows(execute(schemaQuery, keyspace(), viewName), row(expectedSchemaWhereClause)); for (String insert : insertQueries) diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java index 33cd379..9d1c0dc 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java @@ -20,8 +20,8 @@ package org.apache.cassandra.cql3.validation.operations; import org.junit.Assert; import org.junit.Test; -import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -29,7 +29,7 @@ import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.SchemaKeyspaceTables; import static java.lang.String.format; import static org.junit.Assert.assertEquals; @@ -376,7 +376,7 @@ public class AlterTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor"))); @@ -385,7 +385,7 @@ public class AlterTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor"))); @@ -394,7 +394,7 @@ public class AlterTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor"))); @@ -403,7 +403,7 @@ public class AlterTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("enabled", "false"))); @@ -413,7 +413,7 @@ public class AlterTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("enabled", "false"))); diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java index edb6668..20da909 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java @@ -35,7 +35,7 @@ import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.SchemaKeyspaceTables; import org.apache.cassandra.triggers.ITrigger; import org.apache.cassandra.utils.ByteBufferUtil; @@ -44,8 +44,10 @@ import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.fail; -import static org.apache.cassandra.cql3.Duration.*; -import static org.junit.Assert.assertEquals; +import static org.apache.cassandra.cql3.Duration.NANOS_PER_HOUR; +import static org.apache.cassandra.cql3.Duration.NANOS_PER_MICRO; +import static org.apache.cassandra.cql3.Duration.NANOS_PER_MILLI; +import static org.apache.cassandra.cql3.Duration.NANOS_PER_MINUTE; public class CreateTest extends CQLTester { @@ -700,7 +702,7 @@ public class CreateTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor"))); @@ -710,7 +712,7 @@ public class CreateTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor"))); @@ -720,7 +722,7 @@ public class CreateTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor"))); @@ -730,7 +732,7 @@ public class CreateTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor"))); @@ -740,7 +742,7 @@ public class CreateTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("enabled", "false"))); @@ -750,7 +752,7 @@ public class CreateTest extends CQLTester assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TABLES), + SchemaKeyspaceTables.TABLES), KEYSPACE, currentTable()), row(map("enabled", "false"))); diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java index e86071a..1fb5d2b 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java @@ -29,6 +29,7 @@ import org.apache.cassandra.cql3.Duration; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.SchemaKeyspaceTables; import static java.lang.String.format; import static org.junit.Assert.assertEquals; @@ -1367,7 +1368,7 @@ public class InsertUpdateIfConditionTest extends CQLTester schemaChange("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} and durable_writes = true "); assertRows(execute(format("select durable_writes from %s.%s where keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.KEYSPACES), + SchemaKeyspaceTables.KEYSPACES), keyspace), row(true)); @@ -1376,14 +1377,14 @@ public class InsertUpdateIfConditionTest extends CQLTester assertRows(execute(format("select durable_writes from %s.%s where keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.KEYSPACES), + SchemaKeyspaceTables.KEYSPACES), keyspace), row(true)); // drop and confirm schemaChange("DROP KEYSPACE IF EXISTS " + keyspace); - assertEmpty(execute(format("select * from %s.%s where keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.KEYSPACES), + assertEmpty(execute(format("select * from %s.%s where keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES), keyspace)); } @@ -1461,7 +1462,7 @@ public class InsertUpdateIfConditionTest extends CQLTester execute("CREATE TYPE IF NOT EXISTS mytype (somefield int)"); assertRows(execute(format("SELECT type_name from %s.%s where keyspace_name = ? and type_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TYPES), + SchemaKeyspaceTables.TYPES), KEYSPACE, "mytype"), row("mytype")); @@ -1474,7 +1475,7 @@ public class InsertUpdateIfConditionTest extends CQLTester execute("DROP TYPE IF EXISTS mytype"); assertEmpty(execute(format("SELECT type_name from %s.%s where keyspace_name = ? and type_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, - SchemaKeyspace.TYPES), + SchemaKeyspaceTables.TYPES), KEYSPACE, "mytype")); } diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java index 19f06e5..34590d6 100644 --- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java @@ -23,17 +23,23 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import com.google.common.collect.ImmutableMap; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; @@ -61,12 +67,15 @@ import org.apache.cassandra.thrift.ThriftConversion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +@RunWith(BMUnitRunner.class) public class SchemaKeyspaceTest { private static final String KEYSPACE1 = "CFMetaDataTest1"; @@ -101,8 +110,74 @@ public class SchemaKeyspaceTest @Test public void testSchemaPullSynchoricity() throws Exception { - Method method = SchemaKeyspace.class.getDeclaredMethod("convertSchemaToMutations"); + for (String methodName : Arrays.asList("convertSchemaToMutations", + "truncate", + "saveSystemKeyspacesSchema")) + { + Method method = SchemaKeyspace.class.getDeclaredMethod(methodName); + assertTrue(methodName + " is not thread-safe", Modifier.isSynchronized(method.getModifiers())); + } + + Method method = SchemaKeyspace.class.getDeclaredMethod("calculateSchemaDigest", Set.class); assertTrue(Modifier.isSynchronized(method.getModifiers())); + method = SchemaKeyspace.class.getDeclaredMethod("mergeSchemaAndAnnounceVersion", Collection.class); + assertTrue(Modifier.isSynchronized(method.getModifiers())); + method = SchemaKeyspace.class.getDeclaredMethod("mergeSchema", Collection.class); + assertTrue(Modifier.isSynchronized(method.getModifiers())); + method = SchemaKeyspace.class.getDeclaredMethod("mergeSchema", Keyspaces.class, Keyspaces.class); + assertTrue(Modifier.isSynchronized(method.getModifiers())); + } + + /** See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to prevent concurrent schema pull/writes */ + @Test + @BMRule(name = "delay partition updates to schema tables", + targetClass = "ColumnFamilyStore", + targetMethod = "apply", + action = "Thread.sleep(5000);", + targetLocation = "AT EXIT") + public void testNoVisiblePartialSchemaUpdates() throws Exception + { + String keyspace = "sandbox"; + ExecutorService pool = Executors.newFixedThreadPool(2); + + SchemaKeyspace.truncate(); // Make sure there's nothing but the create we're about to do + CyclicBarrier barrier = new CyclicBarrier(2); + + Future<Void> creation = pool.submit(() -> { + barrier.await(); + createTable(keyspace, "CREATE TABLE test (a text primary key, b int, c int)"); + return null; + }); + + Future<Collection<Mutation>> mutationsFromThread = pool.submit(() -> { + barrier.await(); + + // Make sure we actually have a mutation to check for partial modification. + Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations(); + while (mutations.size() == 0) + mutations = SchemaKeyspace.convertSchemaToMutations(); + + return mutations; + }); + + creation.get(); // make sure the creation is finished + + Collection<Mutation> mutationsFromConcurrentAccess = mutationsFromThread.get(); + Collection<Mutation> settledMutations = SchemaKeyspace.convertSchemaToMutations(); + + // If the worker thread picked up the creation at all, it should have the same modifications. + // In other words, we should see all modifications or none. + if (mutationsFromConcurrentAccess.size() == settledMutations.size()) + { + assertEquals(1, settledMutations.size()); + Mutation mutationFromConcurrentAccess = mutationsFromConcurrentAccess.iterator().next(); + Mutation settledMutation = settledMutations.iterator().next(); + + assertEquals("Read partial schema change!", + settledMutation.getColumnFamilyIds(), mutationFromConcurrentAccess.getColumnFamilyIds()); + } + + pool.shutdownNow(); } @Test @@ -211,15 +286,15 @@ public class SchemaKeyspaceTest // Test schema conversion Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, FBUtilities.timestampMicros()).build(); - PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES)); - PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS)); + PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES)); + PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS)); - UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES), + UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES), UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), FBUtilities.nowInSeconds())) .one(); TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow); - UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS), + UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS), UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), FBUtilities.nowInSeconds())); Set<ColumnDefinition> columns = new HashSet<>(); for (UntypedResultSet.Row row : columnsRows) @@ -246,7 +321,7 @@ public class SchemaKeyspaceTest { for (PartitionUpdate p : m.getPartitionUpdates()) { - if (p.metadata().cfName.equals(SchemaKeyspace.TABLES)) + if (p.metadata().cfName.equals(SchemaKeyspaceTables.TABLES)) return true; } return false; @@ -332,7 +407,7 @@ public class SchemaKeyspaceTest KeyspaceParams.simple(1), SchemaLoader.standardCFMD(testKS, testTable)); // Delete partition column in the schema - String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=? and column_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS); + String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=? and column_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS); executeOnceInternal(query, testKS, testTable, "key"); SchemaKeyspace.fetchNonSystemKeyspaces(); } @@ -346,7 +421,7 @@ public class SchemaKeyspaceTest KeyspaceParams.simple(1), SchemaLoader.standardCFMD(testKS, testTable)); // Delete all colmns in the schema - String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS); + String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS); executeOnceInternal(query, testKS, testTable); SchemaKeyspace.fetchNonSystemKeyspaces(); } diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java index 297d19d..d5ab214 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java @@ -24,10 +24,17 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; + import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,9 +42,8 @@ import org.junit.runner.RunWith; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.SchemaConstants; -import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.WindowsFailedSnapshotTracker; import org.apache.cassandra.dht.Murmur3Partitioner; @@ -49,9 +55,10 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.PropertyFileSnitch; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.ReplicationParams; -import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.SchemaKeyspaceTables; import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.assertEquals; @@ -175,7 +182,7 @@ public class StorageServiceServerTest public void testTableSnapshot() throws IOException { // no need to insert extra data, even an "empty" database will have a little information in the system keyspace - StorageService.instance.takeTableSnapshot(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.KEYSPACES, UUID.randomUUID().toString()); + StorageService.instance.takeTableSnapshot(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES, UUID.randomUUID().toString()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org