Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 fccc123af -> c8dcc7515
Multi-threaded scrub/cleanup/upgradesstables Patch by rspitzer and marcuse for CASSANDRA-5547 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c8dcc751 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c8dcc751 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c8dcc751 Branch: refs/heads/cassandra-2.1 Commit: c8dcc7515e9f1d7f73f2d5e6651862a1b1023bea Parents: fccc123 Author: Marcus Eriksson <marc...@apache.org> Authored: Thu Apr 24 08:57:46 2014 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Apr 28 08:52:00 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 328 +++++++++---------- 2 files changed, 153 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8dcc751/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5baaefd..1b67357 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -52,6 +52,7 @@ * fix cassandra stress errors on reads with native protocol (CASSANDRA-7033) * Use OpOrder to guard sstable references for reads (CASSANDRA-6919) * Preemptive opening of compaction result (CASSANDRA-6916) + * Multi-threaded scrub/cleanup/upgradesstables (CASSANDRA-5547) Merged from 2.0: * Set JMX RMI port to 7199 (CASSANDRA-7087) * Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8dcc751/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 30d564c..792c962 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -42,6 +42,7 @@ import javax.management.ObjectName; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ConcurrentHashMultiset; @@ -240,127 +241,130 @@ public class CompactionManager implements CompactionManagerMBean } } - private abstract static class UnmarkingRunnable implements Runnable + private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation) throws ExecutionException, InterruptedException { - private final ColumnFamilyStore cfs; - private final Iterable<SSTableReader> sstables; - - private UnmarkingRunnable(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables) + Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting(); + if (compactingSSTables == null) { - this.cfs = cfs; - this.sstables = sstables; + logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.ABORTED; } - - protected abstract void runMayThrow() throws IOException; - - public final void run() + if (Iterables.isEmpty(compactingSSTables)) { - try - { - runMayThrow(); - } - catch (Exception e) - { - throw Throwables.propagate(e); - } - finally + logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.SUCCESSFUL; + } + try + { + Iterable<SSTableReader> sstables = operation.filterSSTables(compactingSSTables); + List<Future<Object>> futures = new ArrayList<>(); + + for (final SSTableReader sstable : sstables) { - cfs.getDataTracker().unmarkCompacting(sstables); + futures.add(executor.submit(new Callable<Object>() + { + @Override + public Object call() throws Exception + { + operation.execute(sstable); + return this; + } + })); } + + for (Future<Object> f : futures) + f.get(); + } + finally + { + cfs.getDataTracker().unmarkCompacting(compactingSSTables); } + return AllSSTableOpStatus.SUCCESSFUL; + } + + private static interface OneSSTableOperation + { + Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input); + void execute(SSTableReader input) throws IOException; } public enum AllSSTableOpStatus { ABORTED, SUCCESSFUL } public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted) throws InterruptedException, ExecutionException { - final Iterable<SSTableReader> sstables = cfs.markAllCompacting(); - if (sstables == null) - { - logger.info("Aborting scrub of {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name); - return AllSSTableOpStatus.ABORTED; - } - if (Iterables.isEmpty(sstables)) + assert !cfs.isIndex(); + return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { - logger.info("No sstables to scrub for {}.{}", cfs.keyspace.getName(), cfs.name); - return AllSSTableOpStatus.SUCCESSFUL; - } + @Override + public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input) + { + return input; + } - Runnable runnable = new UnmarkingRunnable(cfs, sstables) - { - protected void runMayThrow() throws IOException + @Override + public void execute(SSTableReader input) throws IOException { - doScrub(cfs, sstables, skipCorrupted); + scrubOne(cfs, input, skipCorrupted); } - }; - executor.submit(runnable).get(); - return AllSSTableOpStatus.SUCCESSFUL; + }); } public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException { - final Iterable<SSTableReader> sstables = cfs.markAllCompacting(); - if (sstables == null) - { - logger.info("Aborting sstable format upgrade of {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name); - return AllSSTableOpStatus.ABORTED; - } - if (Iterables.isEmpty(sstables)) - { - logger.info("No sstables to upgrade for {}.{}", cfs.keyspace.getName(), cfs.name); - return AllSSTableOpStatus.SUCCESSFUL; - } - - Runnable runnable = new UnmarkingRunnable(cfs, sstables) + return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { - protected void runMayThrow() throws IOException + @Override + public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input) { - for (final SSTableReader sstable : sstables) + return Iterables.filter(input, new Predicate<SSTableReader>() { - if (excludeCurrentVersion && sstable.descriptor.version.equals(Descriptor.Version.CURRENT)) - continue; - - // SSTables are marked by the caller - // NOTE: it is important that the task create one and only one sstable, even for Leveled compaction (see LeveledManifest.replace()) - AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(Collections.singleton(sstable), NO_GC, Long.MAX_VALUE); - task.setUserDefined(true); - task.setCompactionType(OperationType.UPGRADE_SSTABLES); - task.execute(metrics); - } + @Override + public boolean apply(SSTableReader sstable) + { + return !(excludeCurrentVersion && sstable.descriptor.version.equals(Descriptor.Version.CURRENT)); + } + }); } - }; - executor.submit(runnable).get(); - return AllSSTableOpStatus.SUCCESSFUL; + + @Override + public void execute(SSTableReader input) throws IOException + { + AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(Collections.singleton(input), NO_GC, Long.MAX_VALUE); + task.setUserDefined(true); + task.setCompactionType(OperationType.UPGRADE_SSTABLES); + task.execute(metrics); + } + }); } public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException { - final Iterable<SSTableReader> sstables = cfStore.markAllCompacting(); - if (sstables == null) + assert !cfStore.isIndex(); + Keyspace keyspace = cfStore.keyspace; + final Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); + if (ranges.isEmpty()) { - logger.info("Aborting cleanup of {}.{} after failing to interrupt other compaction operations", cfStore.keyspace.getName(), cfStore.name); + logger.info("Cleanup cannot run before a node has joined the ring"); return AllSSTableOpStatus.ABORTED; } - if (Iterables.isEmpty(sstables)) - { - logger.info("No sstables to cleanup for {}.{}", cfStore.keyspace.getName(), cfStore.name); - return AllSSTableOpStatus.SUCCESSFUL; - } - - Runnable runnable = new UnmarkingRunnable(cfStore, sstables) + final boolean hasIndexes = cfStore.indexManager.hasIndexes(); + final CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges); + return parallelAllSSTableOperation(cfStore, new OneSSTableOperation() { - protected void runMayThrow() throws IOException + @Override + public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input) { - // Sort the column families in order of SSTable size, so cleanup of smaller CFs - // can free up space for larger ones - List<SSTableReader> sortedSSTables = Lists.newArrayList(sstables); + List<SSTableReader> sortedSSTables = Lists.newArrayList(input); Collections.sort(sortedSSTables, new SSTableReader.SizeComparator()); + return sortedSSTables; + } - doCleanupCompaction(cfStore, sortedSSTables); + @Override + public void execute(SSTableReader input) throws IOException + { + doCleanupOne(cfStore, input, cleanupStrategy, ranges, hasIndexes); } - }; - executor.submit(runnable).get(); - return AllSSTableOpStatus.SUCCESSFUL; + }); } public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs, @@ -567,20 +571,6 @@ public class CompactionManager implements CompactionManagerMBean } } - /** - * Deserialize everything in the CFS and re-serialize w/ the newest version. Also attempts to recover - * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted - * from early ByteBuffer bugs. - * - * @throws IOException - */ - private void doScrub(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, boolean skipCorrupted) throws IOException - { - assert !cfs.isIndex(); - for (final SSTableReader sstable : sstables) - scrubOne(cfs, sstable, skipCorrupted); - } - private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException { Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false); @@ -652,108 +642,94 @@ public class CompactionManager implements CompactionManagerMBean } /** - * This function goes over each file and removes the keys that the node is not responsible for + * This function goes over a file and removes the keys that the node is not responsible for * and only keeps keys that this node is responsible for. * * @throws IOException */ - private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables) throws IOException + private void doCleanupOne(final ColumnFamilyStore cfs, SSTableReader sstable, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException { assert !cfs.isIndex(); - Keyspace keyspace = cfs.keyspace; - Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); - if (ranges.isEmpty()) + + if (!hasIndexes && !new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges)) { - logger.info("Cleanup cannot run before a node has joined the ring"); + cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP); return; } - - boolean hasIndexes = cfs.indexManager.hasIndexes(); - CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges); - - for (SSTableReader sstable : sstables) + if (!needsCleanup(sstable, ranges)) { - if (!hasIndexes && !new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges)) - { - cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP); - continue; - } - if (!needsCleanup(sstable, ranges)) - { - logger.debug("Skipping {} for cleanup; all rows should be kept", sstable); - continue; - } + logger.debug("Skipping {} for cleanup; all rows should be kept", sstable); + return; + } - CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs)); - long start = System.nanoTime(); + long start = System.nanoTime(); - long totalkeysWritten = 0; + long totalkeysWritten = 0; - int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), - (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable)))); - if (logger.isDebugEnabled()) - logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize); + int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), + (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable)))); + if (logger.isDebugEnabled()) + logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize); - logger.info("Cleaning up {}", sstable); + logger.info("Cleaning up {}", sstable); - File compactionFileLocation = cfs.directories.getDirectoryForCompactedSSTables(); - if (compactionFileLocation == null) - throw new IOException("disk full"); + File compactionFileLocation = cfs.directories.getDirectoryForCompactedSSTables(); + if (compactionFileLocation == null) + throw new IOException("disk full"); - ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter()); - CleanupInfo ci = new CleanupInfo(sstable, scanner); + ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter()); + CleanupInfo ci = new CleanupInfo(sstable, scanner); - metrics.beginCompaction(ci); - SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false); + metrics.beginCompaction(ci); + SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false); - try + try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs))) + { + writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable)); + + while (scanner.hasNext()) { - writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable)); + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); - while (scanner.hasNext()) - { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - - SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); - row = cleanupStrategy.cleanup(row); - if (row == null) - continue; - AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row)); - if (writer.append(compactedRow) != null) - totalkeysWritten++; - } + SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); + row = cleanupStrategy.cleanup(row); + if (row == null) + continue; + AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row)); + if (writer.append(compactedRow) != null) + totalkeysWritten++; + } - // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd - cfs.indexManager.flushIndexesBlocking(); + // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd + cfs.indexManager.flushIndexesBlocking(); - writer.finish(); - } - catch (Throwable e) - { - writer.abort(); - throw Throwables.propagate(e); - } - finally - { - controller.close(); - scanner.close(); - metrics.finishCompaction(ci); - } + writer.finish(); + } + catch (Throwable e) + { + writer.abort(); + throw Throwables.propagate(e); + } + finally + { + scanner.close(); + metrics.finishCompaction(ci); + } - List<SSTableReader> results = writer.finished(); - if (!results.isEmpty()) - { - String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms."; - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = sstable.onDiskLength(); - long endsize = 0; - for (SSTableReader newSstable : results) - endsize += newSstable.onDiskLength(); - double ratio = (double) endsize / (double) startsize; - logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime)); - } + List<SSTableReader> results = writer.finished(); + if (!results.isEmpty()) + { + String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms."; + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = sstable.onDiskLength(); + long endsize = 0; + for (SSTableReader newSstable : results) + endsize += newSstable.onDiskLength(); + double ratio = (double) endsize / (double) startsize; + logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime)); } + } private static abstract class CleanupStrategy