Fix flaky test in IndexSummaryManagerTest Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-12218
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b447ffc4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b447ffc4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b447ffc4 Branch: refs/heads/trunk Commit: b447ffc4242c1514dd33e4ace96b719d892ff4a2 Parents: a98552e Author: Sam Tunnicliffe <s...@beobal.com> Authored: Tue Jul 19 16:22:22 2016 +0100 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Mon Aug 1 12:32:38 2016 +0100 ---------------------------------------------------------------------- .../io/sstable/IndexSummaryManager.java | 14 +-- .../io/sstable/IndexSummaryManagerTest.java | 95 ++++++++++++++------ 2 files changed, 74 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b447ffc4/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index ddda430..95ade16 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -225,7 +225,9 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables(); try { - redistributeSummaries(compactingAndNonCompacting.left, compactingAndNonCompacting.right, this.memoryPoolBytes); + redistributeSummaries(new IndexSummaryRedistribution(compactingAndNonCompacting.left, + compactingAndNonCompacting.right, + this.memoryPoolBytes)); } finally { @@ -237,14 +239,14 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean /** * Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on * their recent read rates. - * @param transactions containing the sstables we are to redistribute the memory pool across - * @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or - * under, if possible + * @param redistribution encapsulating the transactions containing the sstables we are to redistribute the + * memory pool across and a size (in bytes) that the total index summary space usage + * should stay close to or under, if possible * @return a list of new SSTableReader instances */ @VisibleForTesting - public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException + public static List<SSTableReader> redistributeSummaries(IndexSummaryRedistribution redistribution) throws IOException { - return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes)); + return CompactionManager.instance.runIndexSummaryRedistribution(redistribution); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b447ffc4/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index 0498c68..c0445d5 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@ -19,16 +19,8 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Joiner; @@ -147,7 +139,7 @@ public class IndexSummaryManagerTest try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), originalOffHeapSize * sstables.size()); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), originalOffHeapSize * sstables.size())); } for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); @@ -255,7 +247,7 @@ public class IndexSummaryManagerTest long summarySpace = sstable.getIndexSummaryOffHeapSize(); try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN)) { - redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), summarySpace); + redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), summarySpace)); } sstable = cfs.getLiveSSTables().iterator().next(); @@ -267,7 +259,7 @@ public class IndexSummaryManagerTest int previousSize = sstable.getIndexSummarySize(); try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN)) { - redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace * 1.5)); + redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace * 1.5))); } sstable = cfs.getLiveSSTables().iterator().next(); assertEquals(previousSize * 1.5, (double) sstable.getIndexSummarySize(), 1); @@ -278,7 +270,7 @@ public class IndexSummaryManagerTest cfs.metadata.minIndexInterval(originalMinIndexInterval); try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN)) { - redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace / 2.0)); + redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace / 2.0))); } sstable = cfs.getLiveSSTables().iterator().next(); assertEquals(originalMinIndexInterval * 2, sstable.getEffectiveIndexInterval(), 0.001); @@ -291,7 +283,7 @@ public class IndexSummaryManagerTest cfs.metadata.maxIndexInterval(originalMinIndexInterval * 4); try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN)) { - redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10); + redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10)); } sstable = cfs.getLiveSSTables().iterator().next(); assertEquals(cfs.metadata.params.minIndexInterval, sstable.getEffectiveIndexInterval(), 0.001); @@ -314,7 +306,7 @@ public class IndexSummaryManagerTest try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10); + redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10)); } sstables = new ArrayList<>(cfs.getLiveSSTables()); for (SSTableReader sstable : sstables) @@ -324,7 +316,7 @@ public class IndexSummaryManagerTest cfs.metadata.maxIndexInterval(cfs.metadata.params.maxIndexInterval / 2); try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1); + redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1)); } sstables = new ArrayList<>(cfs.getLiveSSTables()); for (SSTableReader sstable : sstables) @@ -337,7 +329,7 @@ public class IndexSummaryManagerTest cfs.metadata.maxIndexInterval(cfs.metadata.params.maxIndexInterval * 2); try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1); + redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1)); } for (SSTableReader sstable : cfs.getLiveSSTables()) { @@ -368,7 +360,7 @@ public class IndexSummaryManagerTest // there should be enough space to not downsample anything try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables)); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables))); } for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); @@ -379,7 +371,7 @@ public class IndexSummaryManagerTest assert sstables.size() == 4; try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2))); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)))); } for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel()); @@ -388,7 +380,7 @@ public class IndexSummaryManagerTest // everything should get cut to a quarter try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 4))); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 4)))); } for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL / 4, sstable.getIndexSummarySamplingLevel()); @@ -397,7 +389,7 @@ public class IndexSummaryManagerTest // upsample back up to half try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 4)); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 4))); } assert sstables.size() == 4; for (SSTableReader sstable : sstables) @@ -407,7 +399,7 @@ public class IndexSummaryManagerTest // upsample back up to the original index summary try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables)); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables))); } for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); @@ -419,7 +411,7 @@ public class IndexSummaryManagerTest sstables.get(1).overrideReadMeter(new RestorableMeter(50.0, 50.0)); try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3)); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3))); } Collections.sort(sstables, hotnessComparator); assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel()); @@ -435,7 +427,7 @@ public class IndexSummaryManagerTest sstables.get(1).overrideReadMeter(new RestorableMeter(higherRate, higherRate)); try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3)); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3))); } Collections.sort(sstables, hotnessComparator); assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel()); @@ -453,7 +445,7 @@ public class IndexSummaryManagerTest try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3) + 50); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3) + 50)); } Collections.sort(sstables, hotnessComparator); @@ -477,7 +469,7 @@ public class IndexSummaryManagerTest sstables.get(3).overrideReadMeter(new RestorableMeter(128.0, 128.0)); try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL)))); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL))))); } Collections.sort(sstables, hotnessComparator); assertEquals(1, sstables.get(0).getIndexSummarySize()); // at the min sampling level @@ -490,7 +482,7 @@ public class IndexSummaryManagerTest // Don't leave enough space for even the minimal index summaries try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10); + sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10)); } for (SSTableReader sstable : sstables) assertEquals(1, sstable.getIndexSummarySize()); // at the min sampling level @@ -625,6 +617,9 @@ public class IndexSummaryManagerTest // everything should get cut in half final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>(); + // barrier to control when redistribution runs + final CountDownLatch barrier = new CountDownLatch(1); + Thread t = new Thread(new Runnable() { public void run() @@ -634,7 +629,10 @@ public class IndexSummaryManagerTest // Don't leave enough space for even the minimal index summaries try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace); + redistributeSummaries(new ObservableRedistribution(Collections.EMPTY_LIST, + of(cfs.metadata.cfId, txn), + singleSummaryOffHeapSpace, + barrier)); } } catch (CompactionInterruptedException ex) @@ -649,7 +647,13 @@ public class IndexSummaryManagerTest t.start(); while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive()) Thread.sleep(1); + // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries + // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution + // to proceed until stopCompaction has been called. CompactionManager.instance.stopCompaction("INDEX_SUMMARY"); + // allows the redistribution to proceed + barrier.countDown(); + t.join(); assertNotNull("Expected compaction interrupted exception", exception.get()); @@ -664,4 +668,37 @@ public class IndexSummaryManagerTest validateData(cfs, numRows); } + + private static IndexSummaryRedistribution redistribution(List<SSTableReader> compacting, + Map<UUID, LifecycleTransaction> transactions, + long memoryPoolBytes) + { + return new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes); + } + + private static class ObservableRedistribution extends IndexSummaryRedistribution + { + CountDownLatch barrier; + public ObservableRedistribution(List<SSTableReader> compacting, + Map<UUID, LifecycleTransaction> transactions, + long memoryPoolBytes, + CountDownLatch barrier) + { + super(compacting, transactions, memoryPoolBytes); + this.barrier = barrier; + } + + public List<SSTableReader> redistributeSummaries() throws IOException + { + try + { + barrier.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException("Interrupted waiting on test barrier"); + } + return super.redistributeSummaries(); + } + } }