This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 28882ec33d57ee2a48f7073549fc767b6116b289
Merge: d8b2678 dd62420
Author: Aleksey Yeschenko <alek...@apache.org>
AuthorDate: Fri Nov 12 11:39:20 2021 +0000

    Merge branch 'cassandra-4.0' into trunk

 CHANGES.txt                                        |  1 +
 .../config/CassandraRelevantProperties.java        |  5 ++
 .../org/apache/cassandra/cql3/QueryProcessor.java  | 69 ++++++++++++++---
 .../metrics/ClientRequestsMetricsHolder.java       | 54 +++++++++++++
 .../org/apache/cassandra/service/StorageProxy.java | 81 +++++++++----------
 .../cassandra/cql3/NodeLocalConsistencyTest.java   | 90 ++++++++++++++++++++++
 6 files changed, 246 insertions(+), 54 deletions(-)

diff --cc CHANGES.txt
index c39e9f6,f23a16f..bd781fe
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,62 -1,5 +1,63 @@@
 -4.0.2
 +4.1
 + * Introduce separate rate limiting settings for entire SSTable streaming 
(CASSANDRA-17065)
 + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
 + * Actively update auth cache in the background (CASSANDRA-16957)
 + * Add unix time conversion functions (CASSANDRA-17029)
 + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap 
OOMs rather than only supporting direct only (CASSANDRA-17128)
 + * Forbid other Future implementations with checkstyle (CASSANDRA-17055)
 + * commit log was switched from non-daemon to daemon threads, which causes 
the JVM to exit in some case as no non-daemon threads are active 
(CASSANDRA-17085)
 + * Add a Denylist to block reads and writes on specific partition keys 
(CASSANDRA-12106)
 + * v4+ protocol did not clean up client warnings, which caused leaking the 
state (CASSANDRA-17054)
 + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
 + * Ensure hint window is persistent across restarts of a node 
(CASSANDRA-14309)
 + * Allow to GRANT or REVOKE multiple permissions in a single statement 
(CASSANDRA-17030)
 + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027)
 + * Log time spent writing keys during compaction (CASSANDRA-17037)
 + * Make nodetool compactionstats and sstable_tasks consistent 
(CASSANDRA-16976)
 + * Add metrics and logging around index summary redistribution 
(CASSANDRA-17036)
 + * Add configuration options for minimum allowable replication factor and 
default replication factor (CASSANDRA-14557)
 + * Expose information about stored hints via a nodetool command and a virtual 
table (CASSANDRA-14795)
 + * Add broadcast_rpc_address to system.local (CASSANDRA-11181)
 + * Add support for type casting in WHERE clause components and in the values 
of INSERT/UPDATE statements (CASSANDRA-14337)
 + * add credentials file support to CQLSH (CASSANDRA-16983)
 + * Skip remaining bytes in the Envelope buffer when a ProtocolException is 
thrown to avoid double decoding (CASSANDRA-17026)
 + * Allow reverse iteration of resources during permissions checking 
(CASSANDRA-17016)
 + * Add feature to verify correct ownership of attached locations on disk at 
startup (CASSANDRA-16879)
 + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666)
 + * Add soft/hard limits to local reads to protect against reading too much 
data in a single query (CASSANDRA-16896)
 + * Avoid token cache invalidation for removing a non-member node 
(CASSANDRA-15290)
 + * Allow configuration of consistency levels on auth operations 
(CASSANDRA-12988)
 + * Add number of sstables in a compaction to compactionstats output 
(CASSANDRA-16844)
 + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153)
 + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation 
allows it (CASSANDRA-16806)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` 
(CASSANDRA-16938)
 + * Reduce native transport max frame size to 16MB (CASSANDRA-16886)
 + * Add support for filtering using IN restrictions (CASSANDRA-14344)
 + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404)
 + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 + * Add client warnings and abort to tombstone and coordinator reads which go 
past a low/high watermark (CASSANDRA-16850)
 + * Add TTL support to nodetool snapshots (CASSANDRA-16789)
 + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks 
(CASSANDRA-16842)
 + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
 + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
 + * Implement nodetool getauditlog command (CASSANDRA-16725)
 + * Clean up repair code (CASSANDRA-13720)
 + * Background schedule to clean up orphaned hints files (CASSANDRA-16815)
 + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for 
which indexes are actually being built (CASSANDRA-16776)
 + * Batch the token metadata update to improve the speed (CASSANDRA-15291)
 + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775)
 + * Make JMXTimer expose attributes using consistent time unit 
(CASSANDRA-16760)
 + * Remove check on gossip status from DynamicEndpointSnitch::updateScores 
(CASSANDRA-11671)
 + * Fix AbstractReadQuery::toCQLString not returning valid CQL 
(CASSANDRA-16510)
 + * Log when compacting many tombstones (CASSANDRA-16780)
 + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
 + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond 
to single WaitingOnCommit data points (CASSANDRA-16701)
 + * Add a system property to set hostId if not yet initialized 
(CASSANDRA-14582)
 + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version 
changes, fixed to rely on latest version (CASSANDRA-16651)
 +Merged from 4.0:
+  * Queries performed with NODE_LOCAL consistency level do not update request 
metrics (CASSANDRA-17052)
   * Fix multiple full sources can be select unexpectedly for bootstrap 
streaming (CASSANDRA-16945)
   * Fix cassandra.yaml formatting of parameters (CASSANDRA-17131)
   * Add backward compatibility for CQLSSTableWriter Date fields 
(CASSANDRA-17117)
diff --cc src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 9e9dd03,0887e1d..9b247ca
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@@ -166,21 -160,10 +166,26 @@@ public enum CassandraRelevantPropertie
      REPLACEMENT_ALLOW_EMPTY("cassandra.allow_empty_replace_address", "true"),
  
      /**
 +     * Directory where Cassandra puts its logs, defaults to "." which is 
current directory.
 +     */
 +    LOG_DIR("cassandra.logdir", "."),
 +
 +    /**
 +     * Directory where Cassandra persists logs from audit logging. If this 
property is not set, the audit log framework
 +     * will set it automatically to {@link 
CassandraRelevantProperties#LOG_DIR} + "/audit".
 +     */
 +    LOG_DIR_AUDIT("cassandra.logdir.audit"),
 +
 +    CONSISTENT_DIRECTORY_LISTINGS("cassandra.consistent_directory_listings", 
"false"),
 +    CLOCK_GLOBAL("cassandra.clock", null),
 +    CLOCK_MONOTONIC_APPROX("cassandra.monotonic_clock.approx", null),
 +    CLOCK_MONOTONIC_PRECISE("cassandra.monotonic_clock.precise", null),
 +
++    /*
+      * Whether {@link org.apache.cassandra.db.ConsistencyLevel#NODE_LOCAL} 
should be allowed.
+      */
+     ENABLE_NODELOCAL_QUERIES("cassandra.enable_nodelocal_queries", "false"),
+ 
      //cassandra properties (without the "cassandra." prefix)
  
      /**
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 42494e2,31ac023..eea8336
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -58,8 -60,8 +60,9 @@@ import org.apache.cassandra.transport.P
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.*;
  
+ import static 
org.apache.cassandra.config.CassandraRelevantProperties.ENABLE_NODELOCAL_QUERIES;
  import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +import static org.apache.cassandra.utils.Clock.Global.nanoTime;
  
  public class QueryProcessor implements QueryHandler
  {
@@@ -221,19 -223,67 +224,67 @@@
          statement.authorize(clientState);
          statement.validate(clientState);
  
-         ResultMessage result;
-         if (options.getConsistency() == ConsistencyLevel.NODE_LOCAL)
+         ResultMessage result = options.getConsistency() == 
ConsistencyLevel.NODE_LOCAL
+                              ? processNodeLocalStatement(statement, 
queryState, options)
+                              : statement.execute(queryState, options, 
queryStartNanoTime);
+ 
+         return result == null ? new ResultMessage.Void() : result;
+     }
+ 
+     private ResultMessage processNodeLocalStatement(CQLStatement statement, 
QueryState queryState, QueryOptions options)
+     {
+         if (!ENABLE_NODELOCAL_QUERIES.getBoolean())
+             throw new InvalidRequestException("NODE_LOCAL consistency level 
is highly dangerous and should be used only for debugging purposes");
+ 
+         if (statement instanceof BatchStatement || statement instanceof 
ModificationStatement)
+             return processNodeLocalWrite(statement, queryState, options);
+         else if (statement instanceof SelectStatement)
+             return processNodeLocalSelect((SelectStatement) statement, 
queryState, options);
+         else
+             throw new InvalidRequestException("NODE_LOCAL consistency level 
can only be used with BATCH, UPDATE, INSERT, DELETE, and SELECT statements");
+     }
+ 
+     private ResultMessage processNodeLocalWrite(CQLStatement statement, 
QueryState queryState, QueryOptions options)
+     {
+         ClientRequestMetrics  levelMetrics = 
ClientRequestsMetricsHolder.writeMetricsForLevel(ConsistencyLevel.NODE_LOCAL);
+         ClientRequestMetrics globalMetrics = 
ClientRequestsMetricsHolder.writeMetrics;
+ 
 -        long startTime = System.nanoTime();
++        long startTime = nanoTime();
+         try
          {
-             assert Boolean.getBoolean("cassandra.enable_nodelocal_queries") : 
"Node local consistency level is highly dangerous and should be used only for 
debugging purposes";
-             assert statement instanceof SelectStatement : "Only SELECT 
statements are permitted for node-local execution";
-             logger.info("Statement {} executed with NODE_LOCAL consistency 
level.", statement);
-             result = statement.executeLocally(queryState, options);
+             return statement.executeLocally(queryState, options);
          }
-         else
+         finally
          {
-             result = statement.execute(queryState, options, 
queryStartNanoTime);
 -            long latency = System.nanoTime() - startTime;
++            long latency = nanoTime() - startTime;
+              levelMetrics.addNano(latency);
+             globalMetrics.addNano(latency);
+         }
+     }
+ 
+     private ResultMessage processNodeLocalSelect(SelectStatement statement, 
QueryState queryState, QueryOptions options)
+     {
+         ClientRequestMetrics  levelMetrics = 
ClientRequestsMetricsHolder.readMetricsForLevel(ConsistencyLevel.NODE_LOCAL);
+         ClientRequestMetrics globalMetrics = 
ClientRequestsMetricsHolder.readMetrics;
+ 
+         if (StorageService.instance.isBootstrapMode() && 
!SchemaConstants.isLocalSystemKeyspace(statement.keyspace()))
+         {
+             levelMetrics.unavailables.mark();
+             globalMetrics.unavailables.mark();
+             throw new IsBootstrappingException();
+         }
+ 
 -        long startTime = System.nanoTime();
++        long startTime = nanoTime();
+         try
+         {
+             return statement.executeLocally(queryState, options);
+         }
+         finally
+         {
 -            long latency = System.nanoTime() - startTime;
++            long latency = nanoTime() - startTime;
+              levelMetrics.addNano(latency);
+             globalMetrics.addNano(latency);
          }
-         return result == null ? new ResultMessage.Void() : result;
      }
  
      public static ResultMessage process(String queryString, ConsistencyLevel 
cl, QueryState queryState, long queryStartNanoTime)
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index fd20230,8c4b27a..6f83b20
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -103,13 -100,8 +102,9 @@@ import org.apache.cassandra.locator.Rep
  import org.apache.cassandra.locator.ReplicaPlans;
  import org.apache.cassandra.locator.Replicas;
  import org.apache.cassandra.metrics.CASClientRequestMetrics;
- import org.apache.cassandra.metrics.CASClientWriteRequestMetrics;
- import org.apache.cassandra.metrics.ClientRequestMetrics;
- import org.apache.cassandra.metrics.ClientWriteRequestMetrics;
 +import org.apache.cassandra.metrics.DenylistMetrics;
  import org.apache.cassandra.metrics.ReadRepairMetrics;
  import org.apache.cassandra.metrics.StorageMetrics;
- import org.apache.cassandra.metrics.ViewWriteMetrics;
  import org.apache.cassandra.net.ForwardingInfo;
  import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessageFlag;
@@@ -137,15 -126,23 +132,22 @@@ import org.apache.cassandra.utils.MBean
  import org.apache.cassandra.utils.MonotonicClock;
  import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.concurrent.CountDownLatch;
 +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
  
 +import static com.google.common.collect.Iterables.concat;
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static java.util.concurrent.TimeUnit.NANOSECONDS;
 +import static org.apache.cassandra.net.Message.out;
+ import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics;
+ import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics;
+ import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics;
+ import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetricsForLevel;
+ import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.viewWriteMetrics;
+ import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics;
+ import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetricsForLevel;
  import static org.apache.cassandra.net.NoPayload.noPayload;
 -import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ;
 -import static org.apache.cassandra.net.Verb.MUTATION_REQ;
 -import static org.apache.cassandra.net.Verb.PAXOS_COMMIT_REQ;
 -import static org.apache.cassandra.net.Verb.PAXOS_PREPARE_REQ;
 -import static org.apache.cassandra.net.Verb.PAXOS_PROPOSE_REQ;
 -import static org.apache.cassandra.net.Verb.TRUNCATE_REQ;
 +import static org.apache.cassandra.net.Verb.*;
  import static 
org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup;
  import static org.apache.cassandra.service.paxos.PrepareVerbHandler.doPrepare;
  import static org.apache.cassandra.service.paxos.ProposeVerbHandler.doPropose;
@@@ -175,15 -168,7 +177,9 @@@ public class StorageProxy implements St
              return new AtomicInteger(0);
          }
      };
-     private static final ClientRequestMetrics readMetrics = new 
ClientRequestMetrics("Read");
-     private static final ClientWriteRequestMetrics writeMetrics = new 
ClientWriteRequestMetrics("Write");
-     private static final CASClientWriteRequestMetrics casWriteMetrics = new 
CASClientWriteRequestMetrics("CASWrite");
-     private static final CASClientRequestMetrics casReadMetrics = new 
CASClientRequestMetrics("CASRead");
-     private static final ViewWriteMetrics viewWriteMetrics = new 
ViewWriteMetrics("ViewWrite");
-     private static final Map<ConsistencyLevel, ClientRequestMetrics> 
readMetricsMap = new EnumMap<>(ConsistencyLevel.class);
-     private static final Map<ConsistencyLevel, ClientWriteRequestMetrics> 
writeMetricsMap = new EnumMap<>(ConsistencyLevel.class);
+ 
 +    private static final DenylistMetrics denylistMetrics = new 
DenylistMetrics();
 +
      private static final String DISABLE_SERIAL_READ_LINEARIZABILITY_KEY = 
"cassandra.unsafe.disable-serial-reads-linearizability";
      private static final boolean disableSerialReadLinearizability =
          
Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY,
 "false"));
@@@ -373,15 -344,9 +364,15 @@@
          catch (ReadTimeoutException e)
          {
              casWriteMetrics.timeouts.mark();
-             writeMetricsMap.get(consistencyForPaxos).timeouts.mark();
+             writeMetricsForLevel(consistencyForPaxos).timeouts.mark();
              throw e;
          }
 +        catch (ReadAbortException e)
 +        {
 +            casWriteMetrics.markAbort(e);
-             writeMetricsMap.get(consistencyForPaxos).markAbort(e);
++            writeMetricsForLevel(consistencyForPaxos).markAbort(e);
 +            throw e;
 +        }
          catch (WriteFailureException | ReadFailureException e)
          {
              casWriteMetrics.failures.mark();
@@@ -396,9 -361,9 +387,9 @@@
          }
          finally
          {
 -            final long latency = System.nanoTime() - startTimeForMetrics;
 +            final long latency = nanoTime() - startTimeForMetrics;
              casWriteMetrics.addNano(latency);
-             writeMetricsMap.get(consistencyForPaxos).addNano(latency);
+             writeMetricsForLevel(consistencyForPaxos).addNano(latency);
          }
      }
  
@@@ -898,9 -863,9 +889,9 @@@
          }
          finally
          {
 -            long latency = System.nanoTime() - startTime;
 +            long latency = nanoTime() - startTime;
              writeMetrics.addNano(latency);
-             writeMetricsMap.get(consistencyLevel).addNano(latency);
+             writeMetricsForLevel(consistencyLevel).addNano(latency);
              updateCoordinatorWriteLatencyTableMetric(mutations, latency);
          }
      }
@@@ -1197,9 -1143,9 +1188,9 @@@
          }
          finally
          {
 -            long latency = System.nanoTime() - startTime;
 +            long latency = nanoTime() - startTime;
              writeMetrics.addNano(latency);
-             writeMetricsMap.get(consistency_level).addNano(latency);
+             writeMetricsForLevel(consistency_level).addNano(latency);
              updateCoordinatorWriteLatencyTableMetric(mutations, latency);
          }
      }
@@@ -1853,16 -1786,9 +1844,16 @@@
          {
              readMetrics.timeouts.mark();
              casReadMetrics.timeouts.mark();
-             readMetricsMap.get(consistencyLevel).timeouts.mark();
+             readMetricsForLevel(consistencyLevel).timeouts.mark();
              throw e;
          }
 +        catch (ReadAbortException e)
 +        {
 +            readMetrics.markAbort(e);
 +            casReadMetrics.markAbort(e);
-             readMetricsMap.get(consistencyLevel).markAbort(e);
++            readMetricsForLevel(consistencyLevel).markAbort(e);
 +            throw e;
 +        }
          catch (ReadFailureException e)
          {
              readMetrics.failures.mark();
@@@ -1872,10 -1798,10 +1863,10 @@@
          }
          finally
          {
 -            long latency = System.nanoTime() - start;
 +            long latency = nanoTime() - start;
              readMetrics.addNano(latency);
              casReadMetrics.addNano(latency);
-             readMetricsMap.get(consistencyLevel).addNano(latency);
+             readMetricsForLevel(consistencyLevel).addNano(latency);
              
Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name).metric.coordinatorReadLatency.update(latency,
 TimeUnit.NANOSECONDS);
          }
  
@@@ -1908,14 -1834,9 +1899,14 @@@
          catch (ReadTimeoutException e)
          {
              readMetrics.timeouts.mark();
-             readMetricsMap.get(consistencyLevel).timeouts.mark();
+             readMetricsForLevel(consistencyLevel).timeouts.mark();
              throw e;
          }
 +        catch (ReadAbortException e)
 +        {
 +            recordReadRegularAbort(consistencyLevel, e);
 +            throw e;
 +        }
          catch (ReadFailureException e)
          {
              readMetrics.failures.mark();
@@@ -1924,21 -1845,15 +1915,21 @@@
          }
          finally
          {
 -            long latency = System.nanoTime() - start;
 +            long latency = nanoTime() - start;
              readMetrics.addNano(latency);
-             readMetricsMap.get(consistencyLevel).addNano(latency);
+             readMetricsForLevel(consistencyLevel).addNano(latency);
              // TODO avoid giving every command the same latency number.  Can 
fix this in CASSADRA-5329
              for (ReadCommand command : group.queries)
                  
Keyspace.openAndGetStore(command.metadata()).metric.coordinatorReadLatency.update(latency,
 TimeUnit.NANOSECONDS);
          }
      }
  
 +    public static void recordReadRegularAbort(ConsistencyLevel 
consistencyLevel, Throwable cause)
 +    {
 +        readMetrics.markAbort(cause);
-         readMetricsMap.get(consistencyLevel).markAbort(cause);
++        readMetricsForLevel(consistencyLevel).markAbort(cause);
 +    }
 +
      public static PartitionIterator 
concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair<?, 
?>> repairs)
      {
          PartitionIterator concatenated = PartitionIterators.concat(iterators);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to