This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0c37176673ca71a75990a26e421b5991db6eefe1
Merge: 6bd373f5d2 2405d523fa
Author: Jon Meredith <jonmered...@apache.org>
AuthorDate: Wed Apr 27 12:03:44 2022 -0600

    Merge branch 'cassandra-4.0' into trunk

 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/concurrent/Stage.java     | 55 ++++++++++++++++------
 .../config/CassandraRelevantProperties.java        |  5 ++
 .../apache/cassandra/service/StorageService.java   | 19 ++------
 .../distributed/test/ring/AutoBootstrapTest.java   |  5 +-
 5 files changed, 55 insertions(+), 30 deletions(-)

diff --cc CHANGES.txt
index 8a4e661719,8b59569491..3f3a782cc3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -216,25 -75,7 +216,26 @@@ Merged from 3.11
   * Add key validation to ssstablescrub (CASSANDRA-16969)
   * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
   * Make assassinate more resilient to missing tokens (CASSANDRA-16847)
 + * Validate SASI tokenizer options before adding index to schema 
(CASSANDRA-15135)
 + * Fixup scrub output when no data post-scrub and clear up old use of row, 
which really means partition (CASSANDRA-16835)
 + * Reduce thread contention in CommitLogSegment and HintsBuffer 
(CASSANDRA-16072)
 + * Make cqlsh use the same set of reserved keywords than the server uses 
(CASSANDRA-15663)
 + * Optimize bytes skipping when reading SSTable files (CASSANDRA-14415)
 + * Enable tombstone compactions when unchecked_tombstone_compaction is set in 
TWCS (CASSANDRA-14496)
 + * Read only the required SSTables for single partition queries 
(CASSANDRA-16737)
  Merged from 3.0:
++ * Schema mutations may not be completed on drain (CASSANDRA-17524)
 + * Fix data corruption in AbstractCompositeType due to static boolean byte 
buffers (CASSANDRA-14752)
 + * Add procps dependency to RPM/Debian packages (CASSANDRA-17516)
 + * Suppress CVE-2021-44521 (CASSANDRA-17492)
 + * ConnectionLimitHandler may leaks connection count if remote connection 
drops (CASSANDRA-17252)
 + * Require ant >= 1.10 (CASSANDRA-17428)
 + * Disallow CONTAINS for UPDATE and DELETE (CASSANDRA-15266)
 + * Suppress inapplicable CVEs (CASSANDRA-17368)
 + * Fix flaky test - test_cqlsh_completion.TestCqlshCompletion 
(CASSANDRA-17338)
 + * Fixed TestCqlshOutput failing tests (CASSANDRA-17386)
 + * Lazy transaction log replica creation allows incorrect replica content 
divergence during anticompaction (CASSANDRA-17273)
 + * LeveledCompactionStrategy disk space check improvements (CASSANDRA-17272)
   * Fix conversion from megabits to bytes in streaming rate limiter 
(CASSANDRA-17243)
   * Upgrade logback to 1.2.9 (CASSANDRA-17204)
   * Avoid race in AbstractReplicationStrategy endpoint caching 
(CASSANDRA-16673)
diff --cc src/java/org/apache/cassandra/concurrent/Stage.java
index 1c15e4505a,5efaf16cf6..ac609aa774
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@@ -42,29 -49,35 +42,36 @@@ import static org.apache.cassandra.conc
  
  public enum Stage
  {
-     READ              ("ReadStage",             "request",  
DatabaseDescriptor::getConcurrentReaders,        
DatabaseDescriptor::setConcurrentReaders,        
Stage::multiThreadedLowSignalStage),
-     MUTATION          ("MutationStage",         "request",  
DatabaseDescriptor::getConcurrentWriters,        
DatabaseDescriptor::setConcurrentWriters,        
Stage::multiThreadedLowSignalStage),
-     COUNTER_MUTATION  ("CounterMutationStage",  "request",  
DatabaseDescriptor::getConcurrentCounterWriters, 
DatabaseDescriptor::setConcurrentCounterWriters, 
Stage::multiThreadedLowSignalStage),
-     VIEW_MUTATION     ("ViewMutationStage",     "request",  
DatabaseDescriptor::getConcurrentViewWriters,    
DatabaseDescriptor::setConcurrentViewWriters,    
Stage::multiThreadedLowSignalStage),
-     GOSSIP            ("GossipStage",           "internal", () -> 1,          
                               null,                                            
Stage::singleThreadedStage),
-     REQUEST_RESPONSE  ("RequestResponseStage",  "request",  
FBUtilities::getAvailableProcessors,             null,                          
                  Stage::multiThreadedLowSignalStage),
-     ANTI_ENTROPY      ("AntiEntropyStage",      "internal", () -> 1,          
                               null,                                            
Stage::singleThreadedStage),
-     MIGRATION         ("MigrationStage",        "internal", () -> 1,          
                               null,                                            
Stage::migrationStage),
-     MISC              ("MiscStage",             "internal", () -> 1,          
                               null,                                            
Stage::singleThreadedStage),
-     TRACING           ("TracingStage",          "internal", () -> 1,          
                               null,                                            
Stage::tracingStage),
-     INTERNAL_RESPONSE ("InternalResponseStage", "internal", 
FBUtilities::getAvailableProcessors,             null,                          
                  Stage::multiThreadedStage),
-     IMMEDIATE         ("ImmediateStage",        "internal", () -> 0,          
                               null,                                            
Stage::immediateExecutor),
-     PAXOS_REPAIR      ("PaxosRepairStage",      "internal", 
FBUtilities::getAvailableProcessors,             null,                          
                  Stage::multiThreadedStage),
+     READ              (false, "ReadStage",             "request",  
DatabaseDescriptor::getConcurrentReaders,        
DatabaseDescriptor::setConcurrentReaders,        
Stage::multiThreadedLowSignalStage),
+     MUTATION          (true,  "MutationStage",         "request",  
DatabaseDescriptor::getConcurrentWriters,        
DatabaseDescriptor::setConcurrentWriters,        
Stage::multiThreadedLowSignalStage),
+     COUNTER_MUTATION  (true,  "CounterMutationStage",  "request",  
DatabaseDescriptor::getConcurrentCounterWriters, 
DatabaseDescriptor::setConcurrentCounterWriters, 
Stage::multiThreadedLowSignalStage),
+     VIEW_MUTATION     (true,  "ViewMutationStage",     "request",  
DatabaseDescriptor::getConcurrentViewWriters,    
DatabaseDescriptor::setConcurrentViewWriters,    
Stage::multiThreadedLowSignalStage),
+     GOSSIP            (true,  "GossipStage",           "internal", () -> 1,   
                                      null,                                     
       Stage::singleThreadedStage),
+     REQUEST_RESPONSE  (false, "RequestResponseStage",  "request",  
FBUtilities::getAvailableProcessors,             null,                          
                  Stage::multiThreadedLowSignalStage),
+     ANTI_ENTROPY      (false, "AntiEntropyStage",      "internal", () -> 1,   
                                      null,                                     
       Stage::singleThreadedStage),
 -    MIGRATION         (false, "MigrationStage",        "internal", () -> 1,   
                                      null,                                     
       Stage::singleThreadedStage),
++    MIGRATION         (false, "MigrationStage",        "internal", () -> 1,   
                                      null,                                     
       Stage::migrationStage),
+     MISC              (false, "MiscStage",             "internal", () -> 1,   
                                      null,                                     
       Stage::singleThreadedStage),
 -    TRACING           (false, "TracingStage",          "internal", () -> 1,   
                                      null,                                     
       Stage::tracingExecutor),
++    TRACING           (false, "TracingStage",          "internal", () -> 1,   
                                      null,                                     
       Stage::tracingStage),
+     INTERNAL_RESPONSE (false, "InternalResponseStage", "internal", 
FBUtilities::getAvailableProcessors,             null,                          
                  Stage::multiThreadedStage),
 -    IMMEDIATE         (false, "ImmediateStage",        "internal", () -> 0,   
                                      null,                                     
       Stage::immediateExecutor);
++    IMMEDIATE         (false, "ImmediateStage",        "internal", () -> 0,   
                                      null,                                     
       Stage::immediateExecutor),
++    PAXOS_REPAIR      (false, "PaxosRepairStage",      "internal", 
FBUtilities::getAvailableProcessors,             null,                          
                  Stage::multiThreadedStage),
 +    ;
  
 -    public static final long KEEP_ALIVE_SECONDS = 60; // seconds to keep 
"extra" threads alive for when idle
      public final String jmxName;
 +    private final Supplier<ExecutorPlus> executorSupplier;
 +    private volatile ExecutorPlus executor;
+     /** Set true if this executor should be gracefully shutdown before 
stopping
+      * the commitlog allocator. Tasks on executors that issue mutations may
+      * block indefinitely waiting for a new commitlog segment, preventing a
+      * clean drain/shutdown.
+      */
+     public final boolean shutdownBeforeCommitlog;
 -    private final Supplier<LocalAwareExecutorService> initialiser;
 -    private volatile LocalAwareExecutorService executor = null;
  
-     Stage(String jmxName, String jmxType, IntSupplier numThreads, 
LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize, 
ExecutorServiceInitialiser executorSupplier)
 -    Stage(Boolean shutdownBeforeCommitlog, String jmxName, String jmxType, 
IntSupplier numThreads, LocalAwareExecutorService.MaximumPoolSizeListener 
onSetMaximumPoolSize, ExecutorServiceInitialiser initialiser)
++    Stage(boolean shutdownBeforeCommitlog, String jmxName, String jmxType, 
IntSupplier numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener 
onSetMaximumPoolSize, ExecutorServiceInitialiser executorSupplier)
      {
+         this.shutdownBeforeCommitlog = shutdownBeforeCommitlog;
          this.jmxName = jmxName;
 -        this.initialiser = () -> initialiser.init(jmxName,jmxType, 
numThreads.getAsInt(), onSetMaximumPoolSize);
 +        this.executorSupplier = () -> executorSupplier.init(jmxName, jmxType, 
numThreads.getAsInt(), onSetMaximumPoolSize);
      }
  
      private static String normalizeName(String stageName)
@@@ -144,6 -157,14 +151,14 @@@
                       .collect(Collectors.toList());
      }
  
 -    private static List<ExecutorService> mutatingExecutors()
++    private static List<ExecutorPlus> mutatingExecutors()
+     {
+         return Stream.of(Stage.values())
+                      .filter(stage -> stage.shutdownBeforeCommitlog)
+                      .map(Stage::executor)
+                      .collect(Collectors.toList());
+     }
+ 
      /**
       * This method shuts down all registered stages.
       */
@@@ -152,6 -173,18 +167,18 @@@
          ExecutorUtils.shutdownNow(executors());
      }
  
+     public static void shutdownAndAwaitMutatingExecutors(boolean interrupt, 
long timeout, TimeUnit units) throws InterruptedException, TimeoutException
+     {
 -        List<ExecutorService> executors = mutatingExecutors();
++        List<ExecutorPlus> executors = mutatingExecutors();
+         ExecutorUtils.shutdown(interrupt, executors);
+         ExecutorUtils.awaitTermination(timeout, units, executors);
+     }
+ 
+     public static boolean areMutationExecutorsTerminated()
+     {
 -        return 
mutatingExecutors().stream().allMatch(ExecutorService::isTerminated);
++        return 
mutatingExecutors().stream().allMatch(ExecutorPlus::isTerminated);
+     }
+ 
      @VisibleForTesting
      public static void shutdownAndWait(long timeout, TimeUnit units) throws 
InterruptedException, TimeoutException
      {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to