Repository: cassandra Updated Branches: refs/heads/trunk c49bc639f -> a5d095e62
Make it possible to compact a given token range Patch by Vishy Kasar; reviewed by marcuse for CASSANDRA-10643 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5d095e6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5d095e6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5d095e6 Branch: refs/heads/trunk Commit: a5d095e62ed459aefbc8c25e2bbcd46969a48eec Parents: c49bc63 Author: Vishy Kasar <vka...@apple.com> Authored: Thu Aug 4 11:06:48 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Fri Aug 5 09:21:04 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/source/operating/compaction.rst | 6 ++ .../apache/cassandra/db/ColumnFamilyStore.java | 6 +- .../cassandra/db/ColumnFamilyStoreMBean.java | 8 +++ .../db/compaction/CompactionManager.java | 53 ++++++++++++++ .../compaction/CompactionStrategyManager.java | 1 - .../compaction/LeveledCompactionStrategy.java | 9 +-- .../cassandra/service/StorageService.java | 13 +++- .../cassandra/service/StorageServiceMBean.java | 5 ++ .../org/apache/cassandra/tools/NodeProbe.java | 5 ++ .../cassandra/tools/nodetool/Compact.java | 30 ++++++-- .../LeveledCompactionStrategyTest.java | 75 ++++++++++++++++++++ 12 files changed, 201 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index db2e221..23a6eb0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Make it possible to compact a given token range (CASSANDRA-10643) * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) * Collect metrics on queries by consistency level (CASSANDRA-7384) * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/doc/source/operating/compaction.rst ---------------------------------------------------------------------- diff --git a/doc/source/operating/compaction.rst b/doc/source/operating/compaction.rst index 8d70a41..b0f97c4 100644 --- a/doc/source/operating/compaction.rst +++ b/doc/source/operating/compaction.rst @@ -45,6 +45,12 @@ Secondary index rebuild rebuild the secondary indexes on the node. Anticompaction after repair the ranges that were actually repaired are split out of the sstables that existed when repair started. +Sub range compaction + It is possible to only compact a given sub range - this could be useful if you know a token that has been + misbehaving - either gathering many updates or many deletes. (``nodetool compact -st x -et y``) will pick + all sstables containing the range between x and y and issue a compaction for those sstables. For STCS this will + most likely include all sstables but with LCS it can issue the compaction for a subset of the sstables. With LCS + the resulting sstable will end up in L0. When is a minor compaction triggered? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 53f5305..84fcb86 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2109,12 +2109,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean forceMajorCompaction(false); } - public void forceMajorCompaction(boolean splitOutput) throws InterruptedException, ExecutionException { CompactionManager.instance.performMaximal(this, splitOutput); } + public void forceCompactionForTokenRange(Collection<Range<Token>> tokenRanges) throws ExecutionException, InterruptedException + { + CompactionManager.instance.forceCompactionForTokenRange(this, tokenRanges); + } + public static Iterable<ColumnFamilyStore> all() { List<Iterable<ColumnFamilyStore>> stores = new ArrayList<Iterable<ColumnFamilyStore>>(Schema.instance.getKeyspaces().size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index 4df9f8d..ccaacf6 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -24,6 +25,9 @@ import java.util.concurrent.ExecutionException; import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + /** * The MBean interface for ColumnFamilyStore */ @@ -45,6 +49,10 @@ public interface ColumnFamilyStoreMBean public void forceMajorCompaction(boolean splitOutput) throws ExecutionException, InterruptedException; /** + * force a major compaction of specified key range in this column family + */ + public void forceCompactionForTokenRange(Collection<Range<Token>> tokenRanges) throws ExecutionException, InterruptedException; + /** * Gets the minimum number of sstables in queue before compaction kicks off */ public int getMinimumCompactionThreshold(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 1cfc76b..ac6c753 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -44,6 +44,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionInfo.Holder; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -703,11 +704,13 @@ public class CompactionManager implements CompactionManagerMBean return Collections.emptyList(); List<Future<?>> futures = new ArrayList<>(); + int nonEmptyTasks = 0; for (final AbstractCompactionTask task : tasks) { if (task.transaction.originals().size() > 0) nonEmptyTasks++; + Runnable runnable = new WrappedRunnable() { protected void runMayThrow() @@ -724,9 +727,59 @@ public class CompactionManager implements CompactionManagerMBean } if (nonEmptyTasks > 1) logger.info("Major compaction will not result in a single sstable - repaired and unrepaired data is kept separate and compaction runs per data_file_directory."); + + return futures; } + public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection<Range<Token>> ranges) + { + final Collection<AbstractCompactionTask> tasks = cfStore.runWithCompactionsDisabled(() -> + { + Collection<SSTableReader> sstables = sstablesInBounds(cfStore, ranges); + if (sstables == null || sstables.isEmpty()) + { + logger.debug("No sstables found for the provided token range"); + return null; + } + return cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds())); + }, false, false); + + if (tasks == null) + return; + + Runnable runnable = new WrappedRunnable() + { + protected void runMayThrow() + { + for (AbstractCompactionTask task : tasks) + if (task != null) + task.execute(metrics); + } + }; + + if (executor.isShutdown()) + { + logger.info("Compaction executor has shut down, not submitting task"); + return; + } + FBUtilities.waitOnFuture(executor.submit(runnable)); + } + + private static Collection<SSTableReader> sstablesInBounds(ColumnFamilyStore cfs, Collection<Range<Token>> tokenRangeCollection) + { + final Set<SSTableReader> sstables = new HashSet<>(); + Iterable<SSTableReader> liveTables = cfs.getTracker().getView().select(SSTableSet.LIVE); + SSTableIntervalTree tree = SSTableIntervalTree.build(liveTables); + + for (Range<Token> tokenRange : tokenRangeCollection) + { + Iterable<SSTableReader> ssTableReaders = View.sstablesInBounds(tokenRange.left.minKeyBound(), tokenRange.right.maxKeyBound(), tree); + Iterables.addAll(sstables, ssTableReaders); + } + return sstables; + } + public void forceUserDefinedCompaction(String dataFiles) { String[] filenames = dataFiles.split(","); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index bf367a3..ce97926 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -743,7 +743,6 @@ public class CompactionStrategyManager implements INotificationConsumer { maybeReload(cfs.metadata); List<AbstractCompactionTask> ret = new ArrayList<>(); - readLock.lock(); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 25c5d20..287e387 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -145,8 +145,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy @Override public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) { - if (sstables.size() != 1) - throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions"); + + if (sstables.isEmpty()) + return null; LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); if (transaction == null) @@ -154,8 +155,8 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables); return null; } - int level = sstables.iterator().next().getSSTableLevel(); - return getCompactionTask(transaction, gcBefore, level == 0 ? Integer.MAX_VALUE : getMaxSSTableBytes()); + int level = sstables.size() > 1 ? 0 : sstables.iterator().next().getSSTableLevel(); + return new LeveledCompactionTask(cfs, transaction, level, gcBefore, level == 0 ? Long.MAX_VALUE : getMaxSSTableBytes(), false); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index eade850..0a95827 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2860,10 +2860,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * the tag given to the snapshot; may not be null or empty */ public void takeTableSnapshot(String keyspaceName, String tableName, String tag) - throws IOException { + throws IOException + { takeMultipleTableSnapshot(tag, false, keyspaceName + "." + tableName); } + public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException + { + Collection<Range<Token>> tokenRanges = createRepairRangeFrom(startToken, endToken); + + for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, tableNames)) + { + cfStore.forceCompactionForTokenRange(tokenRanges); + } + } + /** * Takes the snapshot for the given keyspaces. A snapshot name must be specified. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 2e5651a..0f93177 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -255,6 +255,11 @@ public interface StorageServiceMBean extends NotificationEmitter public int relocateSSTables(String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException; public int relocateSSTables(int jobs, String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException; /** + * Forces major compaction of specified token range in a single keyspace + */ + public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException; + + /** * Trigger a cleanup of keys on a single keyspace */ @Deprecated http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index c33dfa4..bd0d8db 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -335,6 +335,11 @@ public class NodeProbe implements AutoCloseable ssProxy.relocateSSTables(jobs, keyspace, cfnames); } + public void forceKeyspaceCompactionForTokenRange(String keyspaceName, final String startToken, final String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException + { + ssProxy.forceKeyspaceCompactionForTokenRange(keyspaceName, startToken, endToken, tableNames); + } + public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException { ssProxy.forceKeyspaceFlush(keyspaceName, tableNames); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/tools/nodetool/Compact.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Compact.java b/src/java/org/apache/cassandra/tools/nodetool/Compact.java index f268f0a..ef10a83 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Compact.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Compact.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.tools.nodetool; +import static org.apache.commons.lang3.StringUtils.EMPTY; + import io.airlift.command.Arguments; import io.airlift.command.Command; import io.airlift.command.Option; @@ -39,14 +41,27 @@ public class Compact extends NodeToolCmd @Option(title = "user-defined", name = {"--user-defined"}, description = "Use --user-defined to submit listed files for user-defined compaction") private boolean userDefined = false; + @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the compaction range starts") + private String startToken = EMPTY; + + @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which compaction range ends") + private String endToken = EMPTY; + + @Override public void execute(NodeProbe probe) { - if (splitOutput && userDefined) + final boolean tokenProvided = !(startToken.isEmpty() && endToken.isEmpty()); + if (splitOutput && (userDefined || tokenProvided)) { - throw new RuntimeException("Invalid option combination: User defined compaction can not be split"); + throw new RuntimeException("Invalid option combination: Can not use split-output here"); } - else if (userDefined) + if (userDefined && tokenProvided) + { + throw new RuntimeException("Invalid option combination: Can not provide tokens when using user-defined"); + } + + if (userDefined) { try { @@ -65,7 +80,14 @@ public class Compact extends NodeToolCmd { try { - probe.forceKeyspaceCompaction(splitOutput, keyspace, tableNames); + if (tokenProvided) + { + probe.forceKeyspaceCompactionForTokenRange(keyspace, startToken, endToken, tableNames); + } + else + { + probe.forceKeyspaceCompaction(splitOutput, keyspace, tableNames); + } } catch (Exception e) { throw new RuntimeException("Error occurred during compaction", e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index bd964ed..12144eb 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.compaction; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -41,6 +42,7 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.UpdateBuilder; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -368,4 +370,77 @@ public class LeveledCompactionStrategyTest assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2)); assertFalse(repaired.manifest.getLevel(1).contains(sstable2)); } + + + + @Test + public void testTokenRangeCompaction() throws Exception + { + // Remove any existing data so we can start out clean with predictable number of sstables + cfs.truncateBlocking(); + + // Disable auto compaction so cassandra does not compact + CompactionManager.instance.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files + + DecoratedKey key1 = Util.dk(String.valueOf(1)); + DecoratedKey key2 = Util.dk(String.valueOf(2)); + List<DecoratedKey> keys = new ArrayList<>(Arrays.asList(key1, key2)); + int numIterations = 10; + int columns = 2; + + // Add enough data to trigger multiple sstables. + + // create 10 sstables that contain data for both key1 and key2 + for (int i = 0; i < numIterations; i++) { + for (DecoratedKey key : keys) { + UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key); + for (int c = 0; c < columns; c++) + update.newRow("column" + c).add("val", value); + update.applyUnsafe(); + } + cfs.forceBlockingFlush(); + } + + // create 20 more sstables with 10 containing data for key1 and other 10 containing data for key2 + for (int i = 0; i < numIterations; i++) { + for (DecoratedKey key : keys) { + UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key); + for (int c = 0; c < columns; c++) + update.newRow("column" + c).add("val", value); + update.applyUnsafe(); + cfs.forceBlockingFlush(); + } + } + + // We should have a total of 30 sstables by now + assertEquals(30, cfs.getLiveSSTables().size()); + + // Compact just the tables with key2 + // Bit hackish to use the key1.token as the prior key but works in BytesToken + Range<Token> tokenRange = new Range<>(key2.getToken(), key2.getToken()); + Collection<Range<Token>> tokenRanges = new ArrayList<>(Arrays.asList(tokenRange)); + cfs.forceCompactionForTokenRange(tokenRanges); + + while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) { + Thread.sleep(100); + } + + // 20 tables that have key2 should have been compacted in to 1 table resulting in 11 (30-20+1) + assertEquals(11, cfs.getLiveSSTables().size()); + + // Compact just the tables with key1. At this point all 11 tables should have key1 + Range<Token> tokenRange2 = new Range<>(key1.getToken(), key1.getToken()); + Collection<Range<Token>> tokenRanges2 = new ArrayList<>(Arrays.asList(tokenRange2)); + cfs.forceCompactionForTokenRange(tokenRanges2); + + + while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) { + Thread.sleep(100); + } + + // the 11 tables containing key1 should all compact to 1 table + assertEquals(1, cfs.getLiveSSTables().size()); + } }