This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 28882ec33d57ee2a48f7073549fc767b6116b289 Merge: d8b2678 dd62420 Author: Aleksey Yeschenko <alek...@apache.org> AuthorDate: Fri Nov 12 11:39:20 2021 +0000 Merge branch 'cassandra-4.0' into trunk CHANGES.txt | 1 + .../config/CassandraRelevantProperties.java | 5 ++ .../org/apache/cassandra/cql3/QueryProcessor.java | 69 ++++++++++++++--- .../metrics/ClientRequestsMetricsHolder.java | 54 +++++++++++++ .../org/apache/cassandra/service/StorageProxy.java | 81 +++++++++---------- .../cassandra/cql3/NodeLocalConsistencyTest.java | 90 ++++++++++++++++++++++ 6 files changed, 246 insertions(+), 54 deletions(-) diff --cc CHANGES.txt index c39e9f6,f23a16f..bd781fe --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,62 -1,5 +1,63 @@@ -4.0.2 +4.1 + * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065) + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914) + * Actively update auth cache in the background (CASSANDRA-16957) + * Add unix time conversion functions (CASSANDRA-17029) + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap OOMs rather than only supporting direct only (CASSANDRA-17128) + * Forbid other Future implementations with checkstyle (CASSANDRA-17055) + * commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active (CASSANDRA-17085) + * Add a Denylist to block reads and writes on specific partition keys (CASSANDRA-12106) + * v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054) + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023) + * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309) + * Allow to GRANT or REVOKE multiple permissions in a single statement (CASSANDRA-17030) + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027) + * Log time spent writing keys during compaction (CASSANDRA-17037) + * Make nodetool compactionstats and sstable_tasks consistent (CASSANDRA-16976) + * Add metrics and logging around index summary redistribution (CASSANDRA-17036) + * Add configuration options for minimum allowable replication factor and default replication factor (CASSANDRA-14557) + * Expose information about stored hints via a nodetool command and a virtual table (CASSANDRA-14795) + * Add broadcast_rpc_address to system.local (CASSANDRA-11181) + * Add support for type casting in WHERE clause components and in the values of INSERT/UPDATE statements (CASSANDRA-14337) + * add credentials file support to CQLSH (CASSANDRA-16983) + * Skip remaining bytes in the Envelope buffer when a ProtocolException is thrown to avoid double decoding (CASSANDRA-17026) + * Allow reverse iteration of resources during permissions checking (CASSANDRA-17016) + * Add feature to verify correct ownership of attached locations on disk at startup (CASSANDRA-16879) + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666) + * Add soft/hard limits to local reads to protect against reading too much data in a single query (CASSANDRA-16896) + * Avoid token cache invalidation for removing a non-member node (CASSANDRA-15290) + * Allow configuration of consistency levels on auth operations (CASSANDRA-12988) + * Add number of sstables in a compaction to compactionstats output (CASSANDRA-16844) + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153) + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it (CASSANDRA-16806) + * Include SASI components to snapshots (CASSANDRA-15134) + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938) + * Reduce native transport max frame size to 16MB (CASSANDRA-16886) + * Add support for filtering using IN restrictions (CASSANDRA-14344) + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404) + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880) + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854) + * Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark (CASSANDRA-16850) + * Add TTL support to nodetool snapshots (CASSANDRA-16789) + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842) + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859) + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663) + * Implement nodetool getauditlog command (CASSANDRA-16725) + * Clean up repair code (CASSANDRA-13720) + * Background schedule to clean up orphaned hints files (CASSANDRA-16815) + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for which indexes are actually being built (CASSANDRA-16776) + * Batch the token metadata update to improve the speed (CASSANDRA-15291) + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775) + * Make JMXTimer expose attributes using consistent time unit (CASSANDRA-16760) + * Remove check on gossip status from DynamicEndpointSnitch::updateScores (CASSANDRA-11671) + * Fix AbstractReadQuery::toCQLString not returning valid CQL (CASSANDRA-16510) + * Log when compacting many tombstones (CASSANDRA-16780) + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799) + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701) + * Add a system property to set hostId if not yet initialized (CASSANDRA-14582) + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version changes, fixed to rely on latest version (CASSANDRA-16651) +Merged from 4.0: + * Queries performed with NODE_LOCAL consistency level do not update request metrics (CASSANDRA-17052) * Fix multiple full sources can be select unexpectedly for bootstrap streaming (CASSANDRA-16945) * Fix cassandra.yaml formatting of parameters (CASSANDRA-17131) * Add backward compatibility for CQLSSTableWriter Date fields (CASSANDRA-17117) diff --cc src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 9e9dd03,0887e1d..9b247ca --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@@ -166,21 -160,10 +166,26 @@@ public enum CassandraRelevantPropertie REPLACEMENT_ALLOW_EMPTY("cassandra.allow_empty_replace_address", "true"), /** + * Directory where Cassandra puts its logs, defaults to "." which is current directory. + */ + LOG_DIR("cassandra.logdir", "."), + + /** + * Directory where Cassandra persists logs from audit logging. If this property is not set, the audit log framework + * will set it automatically to {@link CassandraRelevantProperties#LOG_DIR} + "/audit". + */ + LOG_DIR_AUDIT("cassandra.logdir.audit"), + + CONSISTENT_DIRECTORY_LISTINGS("cassandra.consistent_directory_listings", "false"), + CLOCK_GLOBAL("cassandra.clock", null), + CLOCK_MONOTONIC_APPROX("cassandra.monotonic_clock.approx", null), + CLOCK_MONOTONIC_PRECISE("cassandra.monotonic_clock.precise", null), + ++ /* + * Whether {@link org.apache.cassandra.db.ConsistencyLevel#NODE_LOCAL} should be allowed. + */ + ENABLE_NODELOCAL_QUERIES("cassandra.enable_nodelocal_queries", "false"), + //cassandra properties (without the "cassandra." prefix) /** diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java index 42494e2,31ac023..eea8336 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@@ -58,8 -60,8 +60,9 @@@ import org.apache.cassandra.transport.P import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.*; + import static org.apache.cassandra.config.CassandraRelevantProperties.ENABLE_NODELOCAL_QUERIES; import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; public class QueryProcessor implements QueryHandler { @@@ -221,19 -223,67 +224,67 @@@ statement.authorize(clientState); statement.validate(clientState); - ResultMessage result; - if (options.getConsistency() == ConsistencyLevel.NODE_LOCAL) + ResultMessage result = options.getConsistency() == ConsistencyLevel.NODE_LOCAL + ? processNodeLocalStatement(statement, queryState, options) + : statement.execute(queryState, options, queryStartNanoTime); + + return result == null ? new ResultMessage.Void() : result; + } + + private ResultMessage processNodeLocalStatement(CQLStatement statement, QueryState queryState, QueryOptions options) + { + if (!ENABLE_NODELOCAL_QUERIES.getBoolean()) + throw new InvalidRequestException("NODE_LOCAL consistency level is highly dangerous and should be used only for debugging purposes"); + + if (statement instanceof BatchStatement || statement instanceof ModificationStatement) + return processNodeLocalWrite(statement, queryState, options); + else if (statement instanceof SelectStatement) + return processNodeLocalSelect((SelectStatement) statement, queryState, options); + else + throw new InvalidRequestException("NODE_LOCAL consistency level can only be used with BATCH, UPDATE, INSERT, DELETE, and SELECT statements"); + } + + private ResultMessage processNodeLocalWrite(CQLStatement statement, QueryState queryState, QueryOptions options) + { + ClientRequestMetrics levelMetrics = ClientRequestsMetricsHolder.writeMetricsForLevel(ConsistencyLevel.NODE_LOCAL); + ClientRequestMetrics globalMetrics = ClientRequestsMetricsHolder.writeMetrics; + - long startTime = System.nanoTime(); ++ long startTime = nanoTime(); + try { - assert Boolean.getBoolean("cassandra.enable_nodelocal_queries") : "Node local consistency level is highly dangerous and should be used only for debugging purposes"; - assert statement instanceof SelectStatement : "Only SELECT statements are permitted for node-local execution"; - logger.info("Statement {} executed with NODE_LOCAL consistency level.", statement); - result = statement.executeLocally(queryState, options); + return statement.executeLocally(queryState, options); } - else + finally { - result = statement.execute(queryState, options, queryStartNanoTime); - long latency = System.nanoTime() - startTime; ++ long latency = nanoTime() - startTime; + levelMetrics.addNano(latency); + globalMetrics.addNano(latency); + } + } + + private ResultMessage processNodeLocalSelect(SelectStatement statement, QueryState queryState, QueryOptions options) + { + ClientRequestMetrics levelMetrics = ClientRequestsMetricsHolder.readMetricsForLevel(ConsistencyLevel.NODE_LOCAL); + ClientRequestMetrics globalMetrics = ClientRequestsMetricsHolder.readMetrics; + + if (StorageService.instance.isBootstrapMode() && !SchemaConstants.isLocalSystemKeyspace(statement.keyspace())) + { + levelMetrics.unavailables.mark(); + globalMetrics.unavailables.mark(); + throw new IsBootstrappingException(); + } + - long startTime = System.nanoTime(); ++ long startTime = nanoTime(); + try + { + return statement.executeLocally(queryState, options); + } + finally + { - long latency = System.nanoTime() - startTime; ++ long latency = nanoTime() - startTime; + levelMetrics.addNano(latency); + globalMetrics.addNano(latency); } - return result == null ? new ResultMessage.Void() : result; } public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState, long queryStartNanoTime) diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index fd20230,8c4b27a..6f83b20 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -103,13 -100,8 +102,9 @@@ import org.apache.cassandra.locator.Rep import org.apache.cassandra.locator.ReplicaPlans; import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.metrics.CASClientRequestMetrics; - import org.apache.cassandra.metrics.CASClientWriteRequestMetrics; - import org.apache.cassandra.metrics.ClientRequestMetrics; - import org.apache.cassandra.metrics.ClientWriteRequestMetrics; +import org.apache.cassandra.metrics.DenylistMetrics; import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.metrics.StorageMetrics; - import org.apache.cassandra.metrics.ViewWriteMetrics; import org.apache.cassandra.net.ForwardingInfo; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageFlag; @@@ -137,15 -126,23 +132,22 @@@ import org.apache.cassandra.utils.MBean import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.CountDownLatch; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; +import static com.google.common.collect.Iterables.concat; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.net.Message.out; + import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics; + import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics; + import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics; + import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetricsForLevel; + import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.viewWriteMetrics; + import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics; + import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetricsForLevel; import static org.apache.cassandra.net.NoPayload.noPayload; -import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ; -import static org.apache.cassandra.net.Verb.MUTATION_REQ; -import static org.apache.cassandra.net.Verb.PAXOS_COMMIT_REQ; -import static org.apache.cassandra.net.Verb.PAXOS_PREPARE_REQ; -import static org.apache.cassandra.net.Verb.PAXOS_PROPOSE_REQ; -import static org.apache.cassandra.net.Verb.TRUNCATE_REQ; +import static org.apache.cassandra.net.Verb.*; import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup; import static org.apache.cassandra.service.paxos.PrepareVerbHandler.doPrepare; import static org.apache.cassandra.service.paxos.ProposeVerbHandler.doPropose; @@@ -175,15 -168,7 +177,9 @@@ public class StorageProxy implements St return new AtomicInteger(0); } }; - private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read"); - private static final ClientWriteRequestMetrics writeMetrics = new ClientWriteRequestMetrics("Write"); - private static final CASClientWriteRequestMetrics casWriteMetrics = new CASClientWriteRequestMetrics("CASWrite"); - private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead"); - private static final ViewWriteMetrics viewWriteMetrics = new ViewWriteMetrics("ViewWrite"); - private static final Map<ConsistencyLevel, ClientRequestMetrics> readMetricsMap = new EnumMap<>(ConsistencyLevel.class); - private static final Map<ConsistencyLevel, ClientWriteRequestMetrics> writeMetricsMap = new EnumMap<>(ConsistencyLevel.class); + + private static final DenylistMetrics denylistMetrics = new DenylistMetrics(); + private static final String DISABLE_SERIAL_READ_LINEARIZABILITY_KEY = "cassandra.unsafe.disable-serial-reads-linearizability"; private static final boolean disableSerialReadLinearizability = Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY, "false")); @@@ -373,15 -344,9 +364,15 @@@ catch (ReadTimeoutException e) { casWriteMetrics.timeouts.mark(); - writeMetricsMap.get(consistencyForPaxos).timeouts.mark(); + writeMetricsForLevel(consistencyForPaxos).timeouts.mark(); throw e; } + catch (ReadAbortException e) + { + casWriteMetrics.markAbort(e); - writeMetricsMap.get(consistencyForPaxos).markAbort(e); ++ writeMetricsForLevel(consistencyForPaxos).markAbort(e); + throw e; + } catch (WriteFailureException | ReadFailureException e) { casWriteMetrics.failures.mark(); @@@ -396,9 -361,9 +387,9 @@@ } finally { - final long latency = System.nanoTime() - startTimeForMetrics; + final long latency = nanoTime() - startTimeForMetrics; casWriteMetrics.addNano(latency); - writeMetricsMap.get(consistencyForPaxos).addNano(latency); + writeMetricsForLevel(consistencyForPaxos).addNano(latency); } } @@@ -898,9 -863,9 +889,9 @@@ } finally { - long latency = System.nanoTime() - startTime; + long latency = nanoTime() - startTime; writeMetrics.addNano(latency); - writeMetricsMap.get(consistencyLevel).addNano(latency); + writeMetricsForLevel(consistencyLevel).addNano(latency); updateCoordinatorWriteLatencyTableMetric(mutations, latency); } } @@@ -1197,9 -1143,9 +1188,9 @@@ } finally { - long latency = System.nanoTime() - startTime; + long latency = nanoTime() - startTime; writeMetrics.addNano(latency); - writeMetricsMap.get(consistency_level).addNano(latency); + writeMetricsForLevel(consistency_level).addNano(latency); updateCoordinatorWriteLatencyTableMetric(mutations, latency); } } @@@ -1853,16 -1786,9 +1844,16 @@@ { readMetrics.timeouts.mark(); casReadMetrics.timeouts.mark(); - readMetricsMap.get(consistencyLevel).timeouts.mark(); + readMetricsForLevel(consistencyLevel).timeouts.mark(); throw e; } + catch (ReadAbortException e) + { + readMetrics.markAbort(e); + casReadMetrics.markAbort(e); - readMetricsMap.get(consistencyLevel).markAbort(e); ++ readMetricsForLevel(consistencyLevel).markAbort(e); + throw e; + } catch (ReadFailureException e) { readMetrics.failures.mark(); @@@ -1872,10 -1798,10 +1863,10 @@@ } finally { - long latency = System.nanoTime() - start; + long latency = nanoTime() - start; readMetrics.addNano(latency); casReadMetrics.addNano(latency); - readMetricsMap.get(consistencyLevel).addNano(latency); + readMetricsForLevel(consistencyLevel).addNano(latency); Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); } @@@ -1908,14 -1834,9 +1899,14 @@@ catch (ReadTimeoutException e) { readMetrics.timeouts.mark(); - readMetricsMap.get(consistencyLevel).timeouts.mark(); + readMetricsForLevel(consistencyLevel).timeouts.mark(); throw e; } + catch (ReadAbortException e) + { + recordReadRegularAbort(consistencyLevel, e); + throw e; + } catch (ReadFailureException e) { readMetrics.failures.mark(); @@@ -1924,21 -1845,15 +1915,21 @@@ } finally { - long latency = System.nanoTime() - start; + long latency = nanoTime() - start; readMetrics.addNano(latency); - readMetricsMap.get(consistencyLevel).addNano(latency); + readMetricsForLevel(consistencyLevel).addNano(latency); // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 for (ReadCommand command : group.queries) Keyspace.openAndGetStore(command.metadata()).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); } } + public static void recordReadRegularAbort(ConsistencyLevel consistencyLevel, Throwable cause) + { + readMetrics.markAbort(cause); - readMetricsMap.get(consistencyLevel).markAbort(cause); ++ readMetricsForLevel(consistencyLevel).markAbort(cause); + } + public static PartitionIterator concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair<?, ?>> repairs) { PartitionIterator concatenated = PartitionIterators.concat(iterators); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org