Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 3de6e9d32 -> ca85bec1d refs/heads/cassandra-3.0 4e0bced5e -> bfa8c8070 refs/heads/cassandra-3.X 072b5271a -> 90eed50c9 refs/heads/trunk 0d813fe88 -> 97eeb66ed
Make IndexSummaryManagerTest::testCancelIndex more deterministic Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-12808 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ca85bec1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ca85bec1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ca85bec1 Branch: refs/heads/cassandra-2.2 Commit: ca85bec1df69140485eb956cb61be9b68be708e0 Parents: 3de6e9d Author: Sam Tunnicliffe <s...@beobal.com> Authored: Wed Nov 9 15:49:16 2016 +0000 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Thu Nov 10 09:56:31 2016 +0000 ---------------------------------------------------------------------- .../io/sstable/IndexSummaryManager.java | 14 +++-- .../io/sstable/IndexSummaryManagerTest.java | 58 ++++++++++++++++++-- 2 files changed, 60 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca85bec1/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index 4438dc1..0e9073f 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -225,7 +225,9 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables(); try { - redistributeSummaries(compactingAndNonCompacting.left, compactingAndNonCompacting.right, this.memoryPoolBytes); + redistributeSummaries(new IndexSummaryRedistribution(compactingAndNonCompacting.left, + compactingAndNonCompacting.right, + this.memoryPoolBytes)); } finally { @@ -237,14 +239,14 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean /** * Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on * their recent read rates. - * @param transactions containing the sstables we are to redistribute the memory pool across - * @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or - * under, if possible + * @param redistribution encapsulating the transactions containing the sstables we are to redistribute the + * memory pool across and a size (in bytes) that the total index summary space usage + * should stay close to or under, if possible * @return a list of new SSTableReader instances */ @VisibleForTesting - public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException + public static List<SSTableReader> redistributeSummaries(IndexSummaryRedistribution redistribution) throws IOException { - return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes)); + return CompactionManager.instance.runIndexSummaryRedistribution(redistribution); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca85bec1/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index 6935680..a1c0e77 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@ -56,7 +56,6 @@ import static java.util.Arrays.asList; import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD; import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD; -import static org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -90,8 +89,7 @@ public class IndexSummaryManagerTest SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDRACE) .minIndexInterval(8) .maxIndexInterval(256) - .caching(CachingOptions.NONE) - ); + .caching(CachingOptions.NONE)); } @Before @@ -109,7 +107,7 @@ public class IndexSummaryManagerTest @After public void afterTest() { - for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions()) + for (CompactionInfo.Holder holder : CompactionMetrics.getCompactions()) { holder.stop(); } @@ -193,7 +191,8 @@ public class IndexSummaryManagerTest try { future.get(); - } catch (InterruptedException | ExecutionException e) + } + catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -610,6 +609,8 @@ public class IndexSummaryManagerTest // everything should get cut in half final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>(); + // barrier to control when redistribution runs + final CountDownLatch barrier = new CountDownLatch(1); Thread t = new Thread(new Runnable() { @@ -620,7 +621,10 @@ public class IndexSummaryManagerTest // Don't leave enough space for even the minimal index summaries try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) { - redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace); + IndexSummaryManager.redistributeSummaries(new ObservableRedistribution(Collections.EMPTY_LIST, + of(cfs.metadata.cfId, txn), + singleSummaryOffHeapSpace, + barrier)); } } catch (CompactionInterruptedException ex) @@ -635,7 +639,12 @@ public class IndexSummaryManagerTest t.start(); while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive()) Thread.yield(); + // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries + // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution + // to proceed until stopCompaction has been called. CompactionManager.instance.stopCompaction("INDEX_SUMMARY"); + // allows the redistribution to proceed + barrier.countDown(); t.join(); assertNotNull("Expected compaction interrupted exception", exception.get()); @@ -650,4 +659,41 @@ public class IndexSummaryManagerTest validateData(cfs, numRows); } + + private static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, + Map<UUID, LifecycleTransaction> transactions, + long memoryPoolBytes) + throws IOException + { + return IndexSummaryManager.redistributeSummaries(new IndexSummaryRedistribution(compacting, + transactions, + memoryPoolBytes)); + } + + private static class ObservableRedistribution extends IndexSummaryRedistribution + { + CountDownLatch barrier; + + ObservableRedistribution(List<SSTableReader> compacting, + Map<UUID, LifecycleTransaction> transactions, + long memoryPoolBytes, + CountDownLatch barrier) + { + super(compacting, transactions, memoryPoolBytes); + this.barrier = barrier; + } + + public List<SSTableReader> redistributeSummaries() throws IOException + { + try + { + barrier.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException("Interrupted waiting on test barrier"); + } + return super.redistributeSummaries(); + } + } }