This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0c37176673ca71a75990a26e421b5991db6eefe1 Merge: 6bd373f5d2 2405d523fa Author: Jon Meredith <jonmered...@apache.org> AuthorDate: Wed Apr 27 12:03:44 2022 -0600 Merge branch 'cassandra-4.0' into trunk CHANGES.txt | 1 + .../org/apache/cassandra/concurrent/Stage.java | 55 ++++++++++++++++------ .../config/CassandraRelevantProperties.java | 5 ++ .../apache/cassandra/service/StorageService.java | 19 ++------ .../distributed/test/ring/AutoBootstrapTest.java | 5 +- 5 files changed, 55 insertions(+), 30 deletions(-) diff --cc CHANGES.txt index 8a4e661719,8b59569491..3f3a782cc3 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -216,25 -75,7 +216,26 @@@ Merged from 3.11 * Add key validation to ssstablescrub (CASSANDRA-16969) * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851) * Make assassinate more resilient to missing tokens (CASSANDRA-16847) + * Validate SASI tokenizer options before adding index to schema (CASSANDRA-15135) + * Fixup scrub output when no data post-scrub and clear up old use of row, which really means partition (CASSANDRA-16835) + * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072) + * Make cqlsh use the same set of reserved keywords than the server uses (CASSANDRA-15663) + * Optimize bytes skipping when reading SSTable files (CASSANDRA-14415) + * Enable tombstone compactions when unchecked_tombstone_compaction is set in TWCS (CASSANDRA-14496) + * Read only the required SSTables for single partition queries (CASSANDRA-16737) Merged from 3.0: ++ * Schema mutations may not be completed on drain (CASSANDRA-17524) + * Fix data corruption in AbstractCompositeType due to static boolean byte buffers (CASSANDRA-14752) + * Add procps dependency to RPM/Debian packages (CASSANDRA-17516) + * Suppress CVE-2021-44521 (CASSANDRA-17492) + * ConnectionLimitHandler may leaks connection count if remote connection drops (CASSANDRA-17252) + * Require ant >= 1.10 (CASSANDRA-17428) + * Disallow CONTAINS for UPDATE and DELETE (CASSANDRA-15266) + * Suppress inapplicable CVEs (CASSANDRA-17368) + * Fix flaky test - test_cqlsh_completion.TestCqlshCompletion (CASSANDRA-17338) + * Fixed TestCqlshOutput failing tests (CASSANDRA-17386) + * Lazy transaction log replica creation allows incorrect replica content divergence during anticompaction (CASSANDRA-17273) + * LeveledCompactionStrategy disk space check improvements (CASSANDRA-17272) * Fix conversion from megabits to bytes in streaming rate limiter (CASSANDRA-17243) * Upgrade logback to 1.2.9 (CASSANDRA-17204) * Avoid race in AbstractReplicationStrategy endpoint caching (CASSANDRA-16673) diff --cc src/java/org/apache/cassandra/concurrent/Stage.java index 1c15e4505a,5efaf16cf6..ac609aa774 --- a/src/java/org/apache/cassandra/concurrent/Stage.java +++ b/src/java/org/apache/cassandra/concurrent/Stage.java @@@ -42,29 -49,35 +42,36 @@@ import static org.apache.cassandra.conc public enum Stage { - READ ("ReadStage", "request", DatabaseDescriptor::getConcurrentReaders, DatabaseDescriptor::setConcurrentReaders, Stage::multiThreadedLowSignalStage), - MUTATION ("MutationStage", "request", DatabaseDescriptor::getConcurrentWriters, DatabaseDescriptor::setConcurrentWriters, Stage::multiThreadedLowSignalStage), - COUNTER_MUTATION ("CounterMutationStage", "request", DatabaseDescriptor::getConcurrentCounterWriters, DatabaseDescriptor::setConcurrentCounterWriters, Stage::multiThreadedLowSignalStage), - VIEW_MUTATION ("ViewMutationStage", "request", DatabaseDescriptor::getConcurrentViewWriters, DatabaseDescriptor::setConcurrentViewWriters, Stage::multiThreadedLowSignalStage), - GOSSIP ("GossipStage", "internal", () -> 1, null, Stage::singleThreadedStage), - REQUEST_RESPONSE ("RequestResponseStage", "request", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedLowSignalStage), - ANTI_ENTROPY ("AntiEntropyStage", "internal", () -> 1, null, Stage::singleThreadedStage), - MIGRATION ("MigrationStage", "internal", () -> 1, null, Stage::migrationStage), - MISC ("MiscStage", "internal", () -> 1, null, Stage::singleThreadedStage), - TRACING ("TracingStage", "internal", () -> 1, null, Stage::tracingStage), - INTERNAL_RESPONSE ("InternalResponseStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage), - IMMEDIATE ("ImmediateStage", "internal", () -> 0, null, Stage::immediateExecutor), - PAXOS_REPAIR ("PaxosRepairStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage), + READ (false, "ReadStage", "request", DatabaseDescriptor::getConcurrentReaders, DatabaseDescriptor::setConcurrentReaders, Stage::multiThreadedLowSignalStage), + MUTATION (true, "MutationStage", "request", DatabaseDescriptor::getConcurrentWriters, DatabaseDescriptor::setConcurrentWriters, Stage::multiThreadedLowSignalStage), + COUNTER_MUTATION (true, "CounterMutationStage", "request", DatabaseDescriptor::getConcurrentCounterWriters, DatabaseDescriptor::setConcurrentCounterWriters, Stage::multiThreadedLowSignalStage), + VIEW_MUTATION (true, "ViewMutationStage", "request", DatabaseDescriptor::getConcurrentViewWriters, DatabaseDescriptor::setConcurrentViewWriters, Stage::multiThreadedLowSignalStage), + GOSSIP (true, "GossipStage", "internal", () -> 1, null, Stage::singleThreadedStage), + REQUEST_RESPONSE (false, "RequestResponseStage", "request", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedLowSignalStage), + ANTI_ENTROPY (false, "AntiEntropyStage", "internal", () -> 1, null, Stage::singleThreadedStage), - MIGRATION (false, "MigrationStage", "internal", () -> 1, null, Stage::singleThreadedStage), ++ MIGRATION (false, "MigrationStage", "internal", () -> 1, null, Stage::migrationStage), + MISC (false, "MiscStage", "internal", () -> 1, null, Stage::singleThreadedStage), - TRACING (false, "TracingStage", "internal", () -> 1, null, Stage::tracingExecutor), ++ TRACING (false, "TracingStage", "internal", () -> 1, null, Stage::tracingStage), + INTERNAL_RESPONSE (false, "InternalResponseStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage), - IMMEDIATE (false, "ImmediateStage", "internal", () -> 0, null, Stage::immediateExecutor); ++ IMMEDIATE (false, "ImmediateStage", "internal", () -> 0, null, Stage::immediateExecutor), ++ PAXOS_REPAIR (false, "PaxosRepairStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage), + ; - public static final long KEEP_ALIVE_SECONDS = 60; // seconds to keep "extra" threads alive for when idle public final String jmxName; + private final Supplier<ExecutorPlus> executorSupplier; + private volatile ExecutorPlus executor; + /** Set true if this executor should be gracefully shutdown before stopping + * the commitlog allocator. Tasks on executors that issue mutations may + * block indefinitely waiting for a new commitlog segment, preventing a + * clean drain/shutdown. + */ + public final boolean shutdownBeforeCommitlog; - private final Supplier<LocalAwareExecutorService> initialiser; - private volatile LocalAwareExecutorService executor = null; - Stage(String jmxName, String jmxType, IntSupplier numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize, ExecutorServiceInitialiser executorSupplier) - Stage(Boolean shutdownBeforeCommitlog, String jmxName, String jmxType, IntSupplier numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize, ExecutorServiceInitialiser initialiser) ++ Stage(boolean shutdownBeforeCommitlog, String jmxName, String jmxType, IntSupplier numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize, ExecutorServiceInitialiser executorSupplier) { + this.shutdownBeforeCommitlog = shutdownBeforeCommitlog; this.jmxName = jmxName; - this.initialiser = () -> initialiser.init(jmxName,jmxType, numThreads.getAsInt(), onSetMaximumPoolSize); + this.executorSupplier = () -> executorSupplier.init(jmxName, jmxType, numThreads.getAsInt(), onSetMaximumPoolSize); } private static String normalizeName(String stageName) @@@ -144,6 -157,14 +151,14 @@@ .collect(Collectors.toList()); } - private static List<ExecutorService> mutatingExecutors() ++ private static List<ExecutorPlus> mutatingExecutors() + { + return Stream.of(Stage.values()) + .filter(stage -> stage.shutdownBeforeCommitlog) + .map(Stage::executor) + .collect(Collectors.toList()); + } + /** * This method shuts down all registered stages. */ @@@ -152,6 -173,18 +167,18 @@@ ExecutorUtils.shutdownNow(executors()); } + public static void shutdownAndAwaitMutatingExecutors(boolean interrupt, long timeout, TimeUnit units) throws InterruptedException, TimeoutException + { - List<ExecutorService> executors = mutatingExecutors(); ++ List<ExecutorPlus> executors = mutatingExecutors(); + ExecutorUtils.shutdown(interrupt, executors); + ExecutorUtils.awaitTermination(timeout, units, executors); + } + + public static boolean areMutationExecutorsTerminated() + { - return mutatingExecutors().stream().allMatch(ExecutorService::isTerminated); ++ return mutatingExecutors().stream().allMatch(ExecutorPlus::isTerminated); + } + @VisibleForTesting public static void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org