This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e256b981728153e13fe273c5410e46c66a4c30b4 Merge: 9f56bf4 371add4 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Mon Feb 7 11:55:39 2022 +0100 Merge branch 'cassandra-4.0' into trunk .../driver/core/PreparedStatementHelper.java | 119 +++++ src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 14 + .../org/apache/cassandra/cql3/QueryHandler.java | 11 +- .../org/apache/cassandra/cql3/QueryProcessor.java | 212 +++++++-- .../apache/cassandra/cql3/UntypedResultSet.java | 10 + .../cassandra/cql3/statements/BatchStatement.java | 17 + .../cql3/statements/QualifiedStatement.java | 5 + .../org/apache/cassandra/db/SystemKeyspace.java | 35 +- src/java/org/apache/cassandra/gms/Gossiper.java | 2 +- .../apache/cassandra/service/CassandraDaemon.java | 2 +- .../org/apache/cassandra/service/ClientState.java | 15 + .../apache/cassandra/service/StorageService.java | 27 ++ .../cassandra/transport/messages/BatchMessage.java | 11 +- .../transport/messages/ExecuteMessage.java | 21 + .../transport/messages/PrepareMessage.java | 13 + .../apache/cassandra/utils/CassandraVersion.java | 8 + .../distributed/test/MixedModeFuzzTest.java | 486 +++++++++++++++++++++ .../test/PrepareBatchStatementsTest.java | 101 +++++ .../distributed/test/ReprepareFuzzTest.java | 351 +++++++++++++++ .../test/ReprepareNewBehaviourTest.java | 55 +++ .../distributed/test/ReprepareTestBase.java | 13 +- test/unit/org/apache/cassandra/cql3/CQLTester.java | 2 +- .../cassandra/cql3/PstmtPersistenceTest.java | 4 +- .../validation/entities/SecondaryIndexTest.java | 4 +- .../cassandra/cql3/validation/entities/UFTest.java | 20 +- .../validation/operations/AggregationTest.java | 4 +- .../org/apache/cassandra/tools/ToolRunner.java | 3 +- .../cassandra/transport/MessagePayloadTest.java | 2 +- 29 files changed, 1477 insertions(+), 92 deletions(-) diff --cc src/java/org/apache/cassandra/config/Config.java index 369fe04,f06c24e..d3e6783 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -80,11 -72,11 +80,13 @@@ public class Confi public boolean auto_bootstrap = true; public volatile boolean hinted_handoff_enabled = true; public Set<String> hinted_handoff_disabled_datacenters = Sets.newConcurrentHashSet(); - public volatile int max_hint_window_in_ms = 3 * 3600 * 1000; // three hours + @Replaces(oldName = "max_hint_window_in_ms", converter = Converters.MILLIS_DURATION, deprecated = true) + public volatile SmallestDurationMilliseconds max_hint_window = new SmallestDurationMilliseconds("3h"); public String hints_directory; + public boolean hint_window_persistent_enabled = true; + public volatile boolean force_new_prepared_statement_behaviour = false; + public ParameterizedClass seed_provider; public DiskAccessMode disk_access_mode = DiskAccessMode.auto; diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index ef9dd43,3b7301c..59cc169 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -3588,236 -3392,17 +3588,250 @@@ public class DatabaseDescripto conf.consecutive_message_errors_threshold = value; } + public static boolean getPartitionDenylistEnabled() + { + return conf.partition_denylist_enabled; + } + + public static void setPartitionDenylistEnabled(boolean enabled) + { + conf.partition_denylist_enabled = enabled; + } + + public static boolean getDenylistWritesEnabled() + { + return conf.denylist_writes_enabled; + } + + public static void setDenylistWritesEnabled(boolean enabled) + { + conf.denylist_writes_enabled = enabled; + } + + public static boolean getDenylistReadsEnabled() + { + return conf.denylist_reads_enabled; + } + + public static void setDenylistReadsEnabled(boolean enabled) + { + conf.denylist_reads_enabled = enabled; + } + + public static boolean getDenylistRangeReadsEnabled() + { + return conf.denylist_range_reads_enabled; + } + + public static void setDenylistRangeReadsEnabled(boolean enabled) + { + conf.denylist_range_reads_enabled = enabled; + } + + public static int getDenylistRefreshSeconds() + { + return conf.denylist_refresh.toSecondsAsInt(); + } + + public static void setDenylistRefreshSeconds(int seconds) + { + if (seconds <= 0) + throw new IllegalArgumentException("denylist_refresh must be a positive integer."); + + conf.denylist_refresh = SmallestDurationSeconds.inSeconds(seconds); + } + + public static int getDenylistInitialLoadRetrySeconds() + { + return conf.denylist_initial_load_retry.toSecondsAsInt(); + } + + public static void setDenylistInitialLoadRetrySeconds(int seconds) + { + if (seconds <= 0) + throw new IllegalArgumentException("denylist_initial_load_retry must be a positive integer."); + + conf.denylist_initial_load_retry = SmallestDurationSeconds.inSeconds(seconds); + } + + public static ConsistencyLevel getDenylistConsistencyLevel() + { + return conf.denylist_consistency_level; + } + + public static void setDenylistConsistencyLevel(ConsistencyLevel cl) + { + conf.denylist_consistency_level = cl; + } + + public static int getDenylistMaxKeysPerTable() + { + return conf.denylist_max_keys_per_table; + } + + public static void setDenylistMaxKeysPerTable(int value) + { + if (value <= 0) + throw new IllegalArgumentException("denylist_max_keys_per_table must be a positive integer."); + conf.denylist_max_keys_per_table = value; + } + + public static int getDenylistMaxKeysTotal() + { + return conf.denylist_max_keys_total; + } + + public static void setDenylistMaxKeysTotal(int value) + { + if (value <= 0) + throw new IllegalArgumentException("denylist_max_keys_total must be a positive integer."); + conf.denylist_max_keys_total = value; + } + + public static boolean getAuthCacheWarmingEnabled() + { + return conf.auth_cache_warming_enabled; + } + + public static SubnetGroups getClientErrorReportingExclusions() + { + return conf.client_error_reporting_exclusions; + } + + public static SubnetGroups getInternodeErrorReportingExclusions() + { + return conf.internode_error_reporting_exclusions; + } + + public static boolean getTrackWarningsEnabled() + { + return conf.track_warnings.enabled; + } + + public static void setTrackWarningsEnabled(boolean value) + { + conf.track_warnings.enabled = value; + } + + public static long getCoordinatorReadSizeWarnThresholdKB() + { + return conf.track_warnings.coordinator_read_size.getWarnThresholdKb(); + } + + public static void setCoordinatorReadSizeWarnThresholdKB(long threshold) + { + conf.track_warnings.coordinator_read_size.setWarnThresholdKb(threshold); + } + + public static long getCoordinatorReadSizeAbortThresholdKB() + { + return conf.track_warnings.coordinator_read_size.getAbortThresholdKb(); + } + + public static void setCoordinatorReadSizeAbortThresholdKB(long threshold) + { + conf.track_warnings.coordinator_read_size.setAbortThresholdKb(threshold); + } + + public static long getLocalReadSizeWarnThresholdKb() + { + return conf.track_warnings.local_read_size.getWarnThresholdKb(); + } + + public static void setLocalReadSizeWarnThresholdKb(long value) + { + conf.track_warnings.local_read_size.setWarnThresholdKb(value); + } + + public static long getLocalReadSizeAbortThresholdKb() + { + return conf.track_warnings.local_read_size.getAbortThresholdKb(); + } + + public static void setLocalReadSizeAbortThresholdKb(long value) + { + conf.track_warnings.local_read_size.setAbortThresholdKb(value); + } + + public static int getRowIndexSizeWarnThresholdKiB() + { + return conf.track_warnings.row_index_size.getWarnThresholdKb(); + } + + public static void setRowIndexSizeWarnThresholdKiB(int value) + { + conf.track_warnings.row_index_size.setWarnThresholdKb(value); + } + + public static int getRowIndexSizeAbortThresholdKiB() + { + return conf.track_warnings.row_index_size.getAbortThresholdKb(); + } + + public static void setRowIndexSizeAbortThresholdKiB(int value) + { + conf.track_warnings.row_index_size.setAbortThresholdKb(value); + } + + public static int getDefaultKeyspaceRF() { return conf.default_keyspace_rf; } + + public static void setDefaultKeyspaceRF(int value) throws ConfigurationException + { + if (value < 1) + { + throw new ConfigurationException("default_keyspace_rf cannot be less than 1"); + } + + if (value < getMinimumKeyspaceRF()) + { + throw new ConfigurationException(String.format("default_keyspace_rf to be set (%d) cannot be less than minimum_keyspace_rf (%d)", value, getMinimumKeyspaceRF())); + } + + conf.default_keyspace_rf = value; + } + + public static int getMinimumKeyspaceRF() { return conf.minimum_keyspace_rf; } + + public static void setMinimumKeyspaceRF(int value) throws ConfigurationException + { + if (value < 0) + { + throw new ConfigurationException("minimum_keyspace_rf cannot be negative"); + } + + if (value > getDefaultKeyspaceRF()) + { + throw new ConfigurationException(String.format("minimum_keyspace_rf to be set (%d) cannot be greater than default_keyspace_rf (%d)", value, getDefaultKeyspaceRF())); + } + + conf.minimum_keyspace_rf = value; + } + + public static boolean getUseStatementsEnabled() + { + return conf.use_statements_enabled; + } + + public static void setUseStatementsEnabled(boolean enabled) + { + if (enabled != conf.use_statements_enabled) + { + logger.info("Setting use_statements_enabled to {}", enabled); + conf.use_statements_enabled = enabled; + } + } ++ + public static boolean getForceNewPreparedStatementBehaviour() + { + return conf.force_new_prepared_statement_behaviour; + } + + public static void setForceNewPreparedStatementBehaviour(boolean value) + { + if (value != conf.force_new_prepared_statement_behaviour) + { + logger.info("Setting force_new_prepared_statement_behaviour to {}", value); + conf.force_new_prepared_statement_behaviour = value; + } + } } diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java index 0e0c043,62b88fb..c996d0d --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@@ -493,14 -530,83 +531,83 @@@ public class QueryProcessor implements return prepare(query, clientState); } - public static ResultMessage.Prepared prepare(String queryString, ClientState clientState) + private volatile boolean newPreparedStatementBehaviour = false; + public boolean useNewPreparedStatementBehaviour() { - ResultMessage.Prepared existing = getStoredPreparedStatement(queryString, clientState.getRawKeyspace()); - if (existing != null) - return existing; + if (newPreparedStatementBehaviour || DatabaseDescriptor.getForceNewPreparedStatementBehaviour()) + return true; - CQLStatement statement = getStatement(queryString, clientState); - Prepared prepared = new Prepared(statement, queryString); + synchronized (this) + { + CassandraVersion minVersion = Gossiper.instance.getMinVersion(DatabaseDescriptor.getWriteRpcTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + if (minVersion != null && + ((minVersion.major == 3 && minVersion.minor == 0 && minVersion.compareTo(NEW_PREPARED_STATEMENT_BEHAVIOUR_SINCE_30) >= 0) || + (minVersion.major == 3 && minVersion.minor > 0 && minVersion.compareTo(NEW_PREPARED_STATEMENT_BEHAVIOUR_SINCE_3X) >= 0) || + (minVersion.compareTo(NEW_PREPARED_STATEMENT_BEHAVIOUR_SINCE_40, true) >= 0))) + { + logger.info("Fully upgraded to at least {}", minVersion); + newPreparedStatementBehaviour = true; + } + + return newPreparedStatementBehaviour; + } + } + + /** + * This method got slightly out of hand, but this is with best intentions: to allow users to be upgraded from any + * prior version, and help implementers avoid previous mistakes by clearly separating fully qualified and non-fully + * qualified statement behaviour. + * + * Basically we need to handle 4 different hashes here; + * 1. fully qualified query with keyspace + * 2. fully qualified query without keyspace + * 3. unqualified query with keyspace + * 4. unqualified query without keyspace + * + * The correct combination to return is 2/3 - the problem is during upgrades (assuming upgrading from < 3.0.26) + * - Existing clients have hash 1 or 3 - * - Query prepared on a 3.0.25/3.11.12/4.0.2 instance needs to return hash 1/3 to be able to execute it on a 3.0.25 instance ++ * - Query prepared on a 3.0.26/3.11.12/4.0.2 instance needs to return hash 1/3 to be able to execute it on a 3.0.25 instance + * - This is handled by the useNewPreparedStatementBehaviour flag - while there still are 3.0.25 instances in + * the cluster we always return hash 1/3 + * - Once fully upgraded we start returning hash 2/3, this will cause a prepared statement id mismatch for existing + * clients, but they will be able to continue using the old prepared statement id after that exception since we + * store the query both with and without keyspace. + */ + public ResultMessage.Prepared prepare(String queryString, ClientState clientState) + { + boolean useNewPreparedStatementBehaviour = useNewPreparedStatementBehaviour(); + MD5Digest hashWithoutKeyspace = computeId(queryString, null); + MD5Digest hashWithKeyspace = computeId(queryString, clientState.getRawKeyspace()); + Prepared cachedWithoutKeyspace = preparedStatements.getIfPresent(hashWithoutKeyspace); + Prepared cachedWithKeyspace = preparedStatements.getIfPresent(hashWithKeyspace); + // We assume it is only safe to return cached prepare if we have both instances + boolean safeToReturnCached = cachedWithoutKeyspace != null && cachedWithKeyspace != null; + + if (safeToReturnCached) + { + if (useNewPreparedStatementBehaviour) + { + if (cachedWithoutKeyspace.fullyQualified) // For fully qualified statements, we always skip keyspace to avoid digest switching + return createResultMessage(hashWithoutKeyspace, cachedWithoutKeyspace); + + if (clientState.getRawKeyspace() != null && !cachedWithKeyspace.fullyQualified) // For non-fully qualified statements, we always include keyspace to avoid ambiguity + return createResultMessage(hashWithKeyspace, cachedWithKeyspace); + + } + else // legacy caches, pre-CASSANDRA-15252 behaviour + { + return createResultMessage(hashWithKeyspace, cachedWithKeyspace); + } + } + else + { + // Make sure the missing one is going to be eventually re-prepared + evictPrepared(hashWithKeyspace); + evictPrepared(hashWithoutKeyspace); + } + + Prepared prepared = parseAndPrepare(queryString, clientState, false); + CQLStatement statement = prepared.statement; int boundTerms = statement.getBindVariables().size(); if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT) @@@ -563,13 -675,16 +676,16 @@@ // (if the keyspace is null, queryString has to have a fully-qualified keyspace so it's fine. long statementSize = ObjectSizes.measureDeep(prepared.statement); // don't execute the statement if it's bigger than the allowed threshold - if (statementSize > capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB())) + if (statementSize > capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMiB())) throw new InvalidRequestException(String.format("Prepared statement of size %d bytes is larger than allowed maximum of %d MB: %s...", statementSize, - DatabaseDescriptor.getPreparedStatementsCacheSizeMB(), + DatabaseDescriptor.getPreparedStatementsCacheSizeMiB(), queryString.substring(0, 200))); MD5Digest statementId = computeId(queryString, keyspace); - preparedStatements.put(statementId, prepared); + Prepared previous = preparedStatements.get(statementId, (ignored_) -> prepared); + if (previous == prepared) + SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString); + SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString); ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(prepared.statement); ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(prepared.statement); diff --cc src/java/org/apache/cassandra/service/ClientState.java index 14a2ccf,f76e7e3..24d6225 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@@ -51,9 -49,8 +51,10 @@@ import org.apache.cassandra.exceptions. import org.apache.cassandra.exceptions.UnauthorizedException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; + import org.apache.cassandra.utils.MD5Digest; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; + /** * State related to a client connection. */ diff --cc src/java/org/apache/cassandra/service/StorageService.java index e5ca6c8,2639ce6..882c38c --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -59,7 -46,16 +59,20 @@@ import com.google.common.base.Predicate import com.google.common.base.Predicates; import com.google.common.collect.*; import com.google.common.util.concurrent.*; + ++import org.apache.cassandra.config.CassandraRelevantProperties; ++import org.apache.cassandra.concurrent.*; + import org.apache.cassandra.config.ParameterizedClass; + import org.apache.cassandra.cql3.QueryHandler; + import org.apache.cassandra.dht.RangeStreamer.FetchReplica; + import org.apache.cassandra.fql.FullQueryLogger; + import org.apache.cassandra.fql.FullQueryLoggerOptions; + import org.apache.cassandra.fql.FullQueryLoggerOptionsCompositeData; ++import org.apache.cassandra.io.util.File; + import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; ++import org.apache.cassandra.utils.concurrent.Future; ++import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.apache.commons.lang3.StringUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --cc src/java/org/apache/cassandra/transport/messages/BatchMessage.java index 3a5cffa,afc308a..a524808 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@@ -45,9 -49,8 +49,10 @@@ import org.apache.cassandra.transport.P import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MD5Digest; + import org.apache.cassandra.utils.NoSpamLogger; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; + public class BatchMessage extends Message.Request { public static final Message.Codec<BatchMessage> codec = new Message.Codec<BatchMessage>() diff --cc src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index 9c9fe2a,19d40ba..692d183 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@@ -36,11 -40,12 +40,14 @@@ import org.apache.cassandra.transport.P import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MD5Digest; + import org.apache.cassandra.utils.NoSpamLogger; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; + public class ExecuteMessage extends Message.Request { + private static final NoSpamLogger nospam = NoSpamLogger.getLogger(logger, 10, TimeUnit.MINUTES); + public static final Message.Codec<ExecuteMessage> codec = new Message.Codec<ExecuteMessage>() { public ExecuteMessage decode(ByteBuf body, ProtocolVersion version) diff --cc src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index ec29f7d,6c60f78..20861d0 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@@ -29,11 -34,13 +34,15 @@@ import org.apache.cassandra.transport.C import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.JVMStabilityInspector; + import org.apache.cassandra.utils.NoSpamLogger; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; + public class PrepareMessage extends Message.Request { + private static final Logger logger = LoggerFactory.getLogger(PrepareMessage.class); + private static final NoSpamLogger nospam = NoSpamLogger.getLogger(logger, 10, TimeUnit.MINUTES); + public static final Message.Codec<PrepareMessage> codec = new Message.Codec<PrepareMessage>() { public PrepareMessage decode(ByteBuf body, ProtocolVersion version) diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java index 7d5be0d,8d0adbf..da35189 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@@ -1271,20 -1021,9 +1271,20 @@@ public abstract class CQLTeste return currentTable == null ? query : String.format(query, keyspace + "." + currentTable); } + public String formatViewQuery(String query) + { + return formatViewQuery(KEYSPACE, query); + } + + public String formatViewQuery(String keyspace, String query) + { + String currentView = currentView(); + return currentView == null ? query : String.format(query, keyspace + "." + currentView); + } + protected ResultMessage.Prepared prepare(String query) throws Throwable { - return QueryProcessor.prepare(formatQuery(query), ClientState.forInternalCalls()); + return QueryProcessor.instance.prepare(formatQuery(query), ClientState.forInternalCalls()); } protected UntypedResultSet execute(String query, Object... values) throws Throwable --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org