Make IndexSummaryManagerTest::testCancelIndex more deterministic

Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
CASSANDRA-12808


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ca85bec1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ca85bec1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ca85bec1

Branch: refs/heads/trunk
Commit: ca85bec1df69140485eb956cb61be9b68be708e0
Parents: 3de6e9d
Author: Sam Tunnicliffe <s...@beobal.com>
Authored: Wed Nov 9 15:49:16 2016 +0000
Committer: Sam Tunnicliffe <s...@beobal.com>
Committed: Thu Nov 10 09:56:31 2016 +0000

----------------------------------------------------------------------
 .../io/sstable/IndexSummaryManager.java         | 14 +++--
 .../io/sstable/IndexSummaryManagerTest.java     | 58 ++++++++++++++++++--
 2 files changed, 60 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca85bec1/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 4438dc1..0e9073f 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -225,7 +225,9 @@ public class IndexSummaryManager implements 
IndexSummaryManagerMBean
         Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> 
compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
         try
         {
-            redistributeSummaries(compactingAndNonCompacting.left, 
compactingAndNonCompacting.right, this.memoryPoolBytes);
+            redistributeSummaries(new 
IndexSummaryRedistribution(compactingAndNonCompacting.left,
+                                                                 
compactingAndNonCompacting.right,
+                                                                 
this.memoryPoolBytes));
         }
         finally
         {
@@ -237,14 +239,14 @@ public class IndexSummaryManager implements 
IndexSummaryManagerMBean
     /**
      * Attempts to fairly distribute a fixed pool of memory for index 
summaries across a set of SSTables based on
      * their recent read rates.
-     * @param transactions containing the sstables we are to redistribute the 
memory pool across
-     * @param memoryPoolBytes a size (in bytes) that the total index summary 
space usage should stay close to or
-     *                        under, if possible
+     * @param redistribution encapsulating the transactions containing the 
sstables we are to redistribute the
+     *                       memory pool across and a size (in bytes) that the 
total index summary space usage
+     *                       should stay close to or under, if possible
      * @return a list of new SSTableReader instances
      */
     @VisibleForTesting
-    public static List<SSTableReader> 
redistributeSummaries(List<SSTableReader> compacting, Map<UUID, 
LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException
+    public static List<SSTableReader> 
redistributeSummaries(IndexSummaryRedistribution redistribution) throws 
IOException
     {
-        return CompactionManager.instance.runIndexSummaryRedistribution(new 
IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes));
+        return 
CompactionManager.instance.runIndexSummaryRedistribution(redistribution);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca85bec1/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 6935680..a1c0e77 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -56,7 +56,6 @@ import static java.util.Arrays.asList;
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 import static 
org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD;
 import static 
org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD;
-import static 
org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -90,8 +89,7 @@ public class IndexSummaryManagerTest
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARDRACE)
                                                 .minIndexInterval(8)
                                                 .maxIndexInterval(256)
-                                                .caching(CachingOptions.NONE)
-                                    );
+                                                .caching(CachingOptions.NONE));
     }
 
     @Before
@@ -109,7 +107,7 @@ public class IndexSummaryManagerTest
     @After
     public void afterTest()
     {
-        for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions())
+        for (CompactionInfo.Holder holder : CompactionMetrics.getCompactions())
         {
             holder.stop();
         }
@@ -193,7 +191,8 @@ public class IndexSummaryManagerTest
             try
             {
                 future.get();
-            } catch (InterruptedException | ExecutionException e)
+            }
+            catch (InterruptedException | ExecutionException e)
             {
                 throw new RuntimeException(e);
             }
@@ -610,6 +609,8 @@ public class IndexSummaryManagerTest
 
         // everything should get cut in half
         final AtomicReference<CompactionInterruptedException> exception = new 
AtomicReference<>();
+        // barrier to control when redistribution runs
+        final CountDownLatch barrier = new CountDownLatch(1);
 
         Thread t = new Thread(new Runnable()
         {
@@ -620,7 +621,10 @@ public class IndexSummaryManagerTest
                     // Don't leave enough space for even the minimal index 
summaries
                     try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
                     {
-                        redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
+                        IndexSummaryManager.redistributeSummaries(new 
ObservableRedistribution(Collections.EMPTY_LIST,
+                                                                               
                of(cfs.metadata.cfId, txn),
+                                                                               
                singleSummaryOffHeapSpace,
+                                                                               
                barrier));
                     }
                 }
                 catch (CompactionInterruptedException ex)
@@ -635,7 +639,12 @@ public class IndexSummaryManagerTest
         t.start();
         while (CompactionManager.instance.getActiveCompactions() == 0 && 
t.isAlive())
             Thread.yield();
+        // to ensure that the stop condition check in 
IndexSummaryRedistribution::redistributeSummaries
+        // is made *after* the halt request is made to the CompactionManager, 
don't allow the redistribution
+        // to proceed until stopCompaction has been called.
         CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+        // allows the redistribution to proceed
+        barrier.countDown();
         t.join();
 
         assertNotNull("Expected compaction interrupted exception", 
exception.get());
@@ -650,4 +659,41 @@ public class IndexSummaryManagerTest
 
         validateData(cfs, numRows);
     }
+
+    private static List<SSTableReader> 
redistributeSummaries(List<SSTableReader> compacting,
+                                                             Map<UUID, 
LifecycleTransaction> transactions,
+                                                             long 
memoryPoolBytes)
+    throws IOException
+    {
+        return IndexSummaryManager.redistributeSummaries(new 
IndexSummaryRedistribution(compacting,
+                                                                               
         transactions,
+                                                                               
         memoryPoolBytes));
+    }
+
+    private static class ObservableRedistribution extends 
IndexSummaryRedistribution
+    {
+        CountDownLatch barrier;
+
+        ObservableRedistribution(List<SSTableReader> compacting,
+                                 Map<UUID, LifecycleTransaction> transactions,
+                                 long memoryPoolBytes,
+                                 CountDownLatch barrier)
+        {
+            super(compacting, transactions, memoryPoolBytes);
+            this.barrier = barrier;
+        }
+
+        public List<SSTableReader> redistributeSummaries() throws IOException
+        {
+            try
+            {
+                barrier.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException("Interrupted waiting on test 
barrier");
+            }
+            return super.redistributeSummaries();
+        }
+    }
 }

Reply via email to