Fix growing pending background compaction patch by yukim; reviewed by benedict for CASSANDRA-9662
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f283ed29 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f283ed29 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f283ed29 Branch: refs/heads/cassandra-2.1 Commit: f283ed29814403bde6350a2598cdd6e2c8b983d5 Parents: 452d6a4 Author: Yuki Morishita <yu...@apache.org> Authored: Fri Jun 26 11:50:51 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Jul 8 11:58:17 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 3 ++- .../apache/cassandra/db/ColumnFamilyStore.java | 13 +------------ .../db/compaction/CompactionManager.java | 18 +++++------------- .../cassandra/metrics/CompactionMetrics.java | 6 +++--- .../cassandra/db/compaction/CompactionsTest.java | 12 +++++++++--- 5 files changed, 20 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bd1db92..40bf463 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 2.0.18 -* Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591) + * Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591) + * Fix growing pending background compaction (CASSANDRA-9662) 2.0.17 http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index bf1e779..00b2eb8 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -180,20 +180,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { public void run() { - List<ColumnFamilyStore> submitted = new ArrayList<>(); for (Keyspace keyspace : Keyspace.all()) for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - if (!CompactionManager.instance.submitBackground(cfs, false).isEmpty()) - submitted.add(cfs); - - while (!submitted.isEmpty() && CompactionManager.instance.getActiveCompactions() < CompactionManager.instance.getMaximumCompactorThreads()) - { - List<ColumnFamilyStore> submitMore = ImmutableList.copyOf(submitted); - submitted.clear(); - for (ColumnFamilyStore cfs : submitMore) - if (!CompactionManager.instance.submitBackground(cfs, false).isEmpty()) - submitted.add(cfs); - } + CompactionManager.instance.submitBackground(cfs); } }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index c66eeb6..5b5b39e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -128,11 +128,6 @@ public class CompactionManager implements CompactionManagerMBean */ public List<Future<?>> submitBackground(final ColumnFamilyStore cfs) { - return submitBackground(cfs, true); - } - - public List<Future<?>> submitBackground(final ColumnFamilyStore cfs, boolean autoFill) - { if (cfs.isAutoCompactionDisabled()) { logger.debug("Autocompaction is disabled"); @@ -151,14 +146,11 @@ public class CompactionManager implements CompactionManagerMBean cfs.keyspace.getName(), cfs.name, cfs.getCompactionStrategy().getClass().getSimpleName()); - List<Future<?>> futures = new ArrayList<Future<?>>(); + List<Future<?>> futures = new ArrayList<>(); // we must schedule it at least once, otherwise compaction will stop for a CF until next flush - do { - compactingCF.add(cfs); - futures.add(executor.submit(new BackgroundCompactionTask(cfs))); - // if we have room for more compactions, then fill up executor - } while (autoFill && executor.getActiveCount() + futures.size() < executor.getMaximumPoolSize()); + compactingCF.add(cfs); + futures.add(executor.submit(new BackgroundCompactionCandidate(cfs))); return futures; } @@ -173,11 +165,11 @@ public class CompactionManager implements CompactionManagerMBean // the actual sstables to compact are not determined until we run the BCT; that way, if new sstables // are created between task submission and execution, we execute against the most up-to-date information - class BackgroundCompactionTask implements Runnable + class BackgroundCompactionCandidate implements Runnable { private final ColumnFamilyStore cfs; - BackgroundCompactionTask(ColumnFamilyStore cfs) + BackgroundCompactionCandidate(ColumnFamilyStore cfs) { this.cfs = cfs; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/src/java/org/apache/cassandra/metrics/CompactionMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java index b015130..f7a99e1 100644 --- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java @@ -58,14 +58,14 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt public Integer value() { int n = 0; + // add estimate number of compactions need to be done for (String keyspaceName : Schema.instance.getKeyspaces()) { for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores()) n += cfs.getCompactionStrategy().getEstimatedRemainingTasks(); } - for (ThreadPoolExecutor collector : collectors) - n += collector.getTaskCount() - collector.getCompletedTaskCount(); - return n; + // add number of currently running compactions + return n + compactions.size(); } }); completedTasks = Metrics.newGauge(factory.createMetricName("CompletedTasks"), new Gauge<Long>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 1879838..7da8d92 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -82,8 +82,10 @@ public class CompactionsTest extends SchemaLoader // enable compaction, submit background and wait for it to complete store.enableAutoCompaction(); FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store)); - while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0) + do + { TimeUnit.SECONDS.sleep(1); + } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0); // and sstable with ttl should be compacted assertEquals(1, store.getSSTables().size()); @@ -202,8 +204,10 @@ public class CompactionsTest extends SchemaLoader // enable compaction, submit background and wait for it to complete store.enableAutoCompaction(); FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store)); - while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0) + do + { TimeUnit.SECONDS.sleep(1); + } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0); // even though both sstables were candidate for tombstone compaction // it was not executed because they have an overlapping token range @@ -222,8 +226,10 @@ public class CompactionsTest extends SchemaLoader //submit background task again and wait for it to complete FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store)); - while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0) + do + { TimeUnit.SECONDS.sleep(1); + } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0); //we still have 2 sstables, since they were not compacted against each other assertEquals(2, store.getSSTables().size());