This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 94bcb4e5ec Only reload compaction strategies if disk boundaries change 94bcb4e5ec is described below commit 94bcb4e5ec4fb99b73276d90b9d08def6f3b4d30 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Thu Sep 1 09:43:47 2022 +0200 Only reload compaction strategies if disk boundaries change Patch by Aleksey Yeschenko and marcuse; reviewed by Aleksey Yeschenko for CASSANDRA-17874 Co-authored-by: Aleksey Yeschenko <alek...@apache.org> --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 4 +- .../org/apache/cassandra/db/DiskBoundaries.java | 8 ++ .../db/compaction/CompactionStrategyManager.java | 153 +++++++++++++-------- ...ompactionStrategyManagerBoundaryReloadTest.java | 103 ++++++++++++++ 5 files changed, 213 insertions(+), 56 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 04ff21265a..160ecef46b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Only reload compaction strategies if disk boundaries change (CASSANDRA-17874) * CEP-10: Simulator Java11 Support (CASSANDRA-17178) * Set the major compaction type correctly for compactionstats (CASSANDRA-18055) * Print exception message without stacktrace when nodetool commands fail on probe.getOwnershipWithPort() (CASSANDRA-18079) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index cb164a030f..b00a77ab40 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -385,7 +385,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner for (ColumnFamilyStore cfs : concatWithIndexes()) cfs.crcCheckChance = new DefaultValue(metadata().params.crcCheckChance); - compactionStrategyManager.maybeReload(metadata()); + compactionStrategyManager.maybeReloadParamsFromSchema(metadata().params.compaction); indexManager.reload(); @@ -418,7 +418,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner { CompactionParams compactionParams = CompactionParams.fromMap(options); compactionParams.validate(); - compactionStrategyManager.setNewLocalCompactionStrategy(compactionParams); + compactionStrategyManager.overrideLocalParams(compactionParams); } catch (Throwable t) { diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java index c340d2703d..32edcac433 100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaries.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db; import java.util.Collections; import java.util.List; +import java.util.Objects; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -163,4 +164,11 @@ public class DiskBoundaries int lastIndex = getDiskIndex(last); return directories.subList(firstIndex, lastIndex + 1); } + + public boolean isEquivalentTo(DiskBoundaries oldBoundaries) + { + return oldBoundaries != null && + Objects.equals(positions, oldBoundaries.positions) && + Objects.equals(directories, oldBoundaries.directories); + } } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index e98e4dd10f..bc0fc0dc81 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -42,7 +42,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; -import org.apache.cassandra.io.util.File; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +66,7 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.File; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; import org.apache.cassandra.notifications.SSTableAddedNotification; @@ -76,7 +76,6 @@ import org.apache.cassandra.notifications.SSTableMetadataChanged; import org.apache.cassandra.notifications.SSTableRepairStatusChanged; import org.apache.cassandra.repair.consistent.admin.CleanupSummary; import org.apache.cassandra.schema.CompactionParams; -import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.TimeUUID; @@ -181,9 +180,12 @@ public class CompactionStrategyManager implements INotificationConsumer this.compactionLogger = new CompactionLogger(cfs, this); this.boundariesSupplier = boundariesSupplier; this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange; - params = cfs.metadata().params.compaction; + + currentBoundaries = boundariesSupplier.get(); + params = schemaCompactionParams = cfs.metadata().params.compaction; enabled = params.isEnabled(); - reload(cfs.metadata().params.compaction); + setStrategy(schemaCompactionParams); + startup(); } /** @@ -456,19 +458,20 @@ public class CompactionStrategyManager implements INotificationConsumer } } - public void maybeReload(TableMetadata metadata) + /** + * Maybe reload the compaction strategies. Called after changing configuration. + */ + public void maybeReloadParamsFromSchema(CompactionParams params) { // compare the old schema configuration to the new one, ignore any locally set changes. - if (metadata.params.compaction.equals(schemaCompactionParams)) + if (params.equals(schemaCompactionParams)) return; writeLock.lock(); try { - // compare the old schema configuration to the new one, ignore any locally set changes. - if (metadata.params.compaction.equals(schemaCompactionParams)) - return; - reload(metadata.params.compaction); + if (!params.equals(schemaCompactionParams)) + reloadParamsFromSchema(params); } finally { @@ -476,18 +479,85 @@ public class CompactionStrategyManager implements INotificationConsumer } } + /** + * @param newParams new CompactionParams set in via CQL + */ + private void reloadParamsFromSchema(CompactionParams newParams) + { + logger.debug("Recreating compaction strategy for {}.{} - compaction parameters changed via CQL", + cfs.keyspace.getName(), cfs.getTableName()); + + /* + * It's possible for compaction to be explicitly enabled/disabled + * via JMX when already enabled/disabled via params. In that case, + * if we now toggle enabled/disabled via params, we'll technically + * be overriding JMX-set value with params-set value. + */ + boolean enabledWithJMX = enabled && !shouldBeEnabled(); + boolean disabledWithJMX = !enabled && shouldBeEnabled(); + + schemaCompactionParams = newParams; + setStrategy(newParams); + + // enable/disable via JMX overrides CQL params, but please see the comment above + if (enabled && !shouldBeEnabled() && !enabledWithJMX) + disable(); + else if (!enabled && shouldBeEnabled() && !disabledWithJMX) + enable(); + + startup(); + } + + private void maybeReloadParamsFromJMX(CompactionParams params) + { + // compare the old local configuration to the new one, ignoring schema + if (params.equals(this.params)) + return; + + writeLock.lock(); + try + { + if (!params.equals(this.params)) + reloadParamsFromJMX(params); + } + finally + { + writeLock.unlock(); + } + } + + /** + * @param newParams new CompactionParams set via JMX + */ + private void reloadParamsFromJMX(CompactionParams newParams) + { + logger.debug("Recreating compaction strategy for {}.{} - compaction parameters changed via JMX", + cfs.keyspace.getName(), cfs.getTableName()); + + setStrategy(newParams); + + // compaction params set via JMX override enable/disable via JMX + if (enabled && !shouldBeEnabled()) + disable(); + else if (!enabled && shouldBeEnabled()) + enable(); + + startup(); + } + /** * Checks if the disk boundaries changed and reloads the compaction strategies * to reflect the most up-to-date disk boundaries. - * + * <p> * This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date * disk locations and boundaries are used. - * + * <p> * This should *never* be called inside by a thread holding the {@link this#readLock}, since it * will potentially acquire the {@link this#writeLock} to update the compaction strategies * what can cause a deadlock. + * <p> + * TODO: improve this to reload after receiving a notification rather than trying to reload on every operation */ - //TODO improve this to reload after receiving a notification rather than trying to reload on every operation @VisibleForTesting protected void maybeReloadDiskBoundaries() { @@ -497,9 +567,8 @@ public class CompactionStrategyManager implements INotificationConsumer writeLock.lock(); try { - if (!currentBoundaries.isOutOfDate()) - return; - reload(params); + if (currentBoundaries.isOutOfDate()) + reloadDiskBoundaries(boundariesSupplier.get()); } finally { @@ -508,34 +577,23 @@ public class CompactionStrategyManager implements INotificationConsumer } /** - * Reload the compaction strategies - * - * Called after changing configuration and at startup. - * @param newCompactionParams + * @param newBoundaries new DiskBoundaries - potentially functionally equivalent to current ones */ - private void reload(CompactionParams newCompactionParams) + private void reloadDiskBoundaries(DiskBoundaries newBoundaries) { - boolean enabledWithJMX = enabled && !shouldBeEnabled(); - boolean disabledWithJMX = !enabled && shouldBeEnabled(); + DiskBoundaries oldBoundaries = currentBoundaries; + currentBoundaries = newBoundaries; - if (currentBoundaries != null) + if (newBoundaries.isEquivalentTo(oldBoundaries)) { - if (!newCompactionParams.equals(schemaCompactionParams)) - logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName()); - else if (currentBoundaries.isOutOfDate()) - logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName()); + logger.debug("Not recreating compaction strategy for {}.{} - disk boundaries are equivalent", + cfs.keyspace.getName(), cfs.getTableName()); + return; } - if (currentBoundaries == null || currentBoundaries.isOutOfDate()) - currentBoundaries = boundariesSupplier.get(); - - setStrategy(newCompactionParams); - schemaCompactionParams = cfs.metadata().params.compaction; - - if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX) - disable(); - else - enable(); + logger.debug("Recreating compaction strategy for {}.{} - disk boundaries are out of date", + cfs.keyspace.getName(), cfs.getTableName()); + setStrategy(params); startup(); } @@ -1142,23 +1200,10 @@ public class CompactionStrategyManager implements INotificationConsumer } } - public void setNewLocalCompactionStrategy(CompactionParams params) + public void overrideLocalParams(CompactionParams params) { - logger.info("Switching local compaction strategy from {} to {}}", this.params, params); - writeLock.lock(); - try - { - setStrategy(params); - if (shouldBeEnabled()) - enable(); - else - disable(); - startup(); - } - finally - { - writeLock.unlock(); - } + logger.info("Switching local compaction strategy from {} to {}", this.params, params); + maybeReloadParamsFromJMX(params); } private int getNumTokenPartitions() diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerBoundaryReloadTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerBoundaryReloadTest.java new file mode 100644 index 0000000000..1cecfead59 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerBoundaryReloadTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.net.UnknownHostException; +import java.util.List; + +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DiskBoundaries; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.service.StorageService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class CompactionStrategyManagerBoundaryReloadTest extends CQLTester +{ + @Test + public void testNoReload() + { + createTable("create table %s (id int primary key)"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + List<List<AbstractCompactionStrategy>> strategies = cfs.getCompactionStrategyManager().getStrategies(); + DiskBoundaries db = cfs.getDiskBoundaries(); + StorageService.instance.getTokenMetadata().invalidateCachedRings(); + // make sure the strategy instances are the same (no reload) + assertTrue(isSame(strategies, cfs.getCompactionStrategyManager().getStrategies())); + // but disk boundaries are not .equal (ring version changed) + assertNotEquals(db, cfs.getDiskBoundaries()); + assertTrue(db.isEquivalentTo(cfs.getDiskBoundaries())); + + db = cfs.getDiskBoundaries(); + alterTable("alter table %s with comment = 'abcd'"); + assertTrue(isSame(strategies, cfs.getCompactionStrategyManager().getStrategies())); + // disk boundaries don't change because of alter + assertEquals(db, cfs.getDiskBoundaries()); + } + + @Test + public void testReload() throws UnknownHostException + { + createTable("create table %s (id int primary key)"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + List<List<AbstractCompactionStrategy>> strategies = cfs.getCompactionStrategyManager().getStrategies(); + DiskBoundaries db = cfs.getDiskBoundaries(); + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + byte[] tk1 = new byte[1], tk2 = new byte[1]; + tk1[0] = 2; + tk2[0] = 1; + tmd.updateNormalToken(new ByteOrderedPartitioner.BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1")); + tmd.updateNormalToken(new ByteOrderedPartitioner.BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2")); + // make sure the strategy instances have been reloaded + assertFalse(isSame(strategies, + cfs.getCompactionStrategyManager().getStrategies())); + assertNotEquals(db, cfs.getDiskBoundaries()); + db = cfs.getDiskBoundaries(); + + strategies = cfs.getCompactionStrategyManager().getStrategies(); + alterTable("alter table %s with compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': false}"); + assertFalse(isSame(strategies, + cfs.getCompactionStrategyManager().getStrategies())); + assertEquals(db, cfs.getDiskBoundaries()); + + } + + private boolean isSame(List<List<AbstractCompactionStrategy>> a, List<List<AbstractCompactionStrategy>> b) + { + if (a.size() != b.size()) + return false; + for (int i = 0; i < a.size(); i++) + { + if (a.get(i).size() != b.get(i).size()) + return false; + for (int j = 0; j < a.get(i).size(); j++) + if (a.get(i).get(j) != b.get(i).get(j)) + return false; + } + return true; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org