Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.1 4c6f32569 -> 02492f7bc


Optimize the way we check if a token is repaired in anticompaction

Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-10768


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbfeeac1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbfeeac1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbfeeac1

Branch: refs/heads/cassandra-3.1
Commit: dbfeeac177074692bdf71d98ffb2cacb14802fb3
Parents: 5ba69a3
Author: Marcus Eriksson <marc...@apache.org>
Authored: Wed Nov 25 11:28:51 2015 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Dec 2 08:37:08 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        |  4 +-
 src/java/org/apache/cassandra/dht/Range.java    | 44 +++++++++++++
 .../org/apache/cassandra/dht/RangeTest.java     | 67 ++++++++++++++++++++
 4 files changed, 114 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3ce2da6..b0f9588 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Optimize the way we check if a token is repaired in anticompaction 
(CASSANDRA-10768)
  * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)
  * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b0ad244..2630ba2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1115,11 +1115,11 @@ public class CompactionManager implements 
CompactionManagerMBean
                 metrics.beginCompaction(ci);
                 try
                 {
+                    Range.OrderedRangeContainmentChecker containmentChecker = 
new Range.OrderedRangeContainmentChecker(ranges);
                     while (iter.hasNext())
                     {
                         AbstractCompactedRow row = iter.next();
-                        // if current range from sstable is repaired, save it 
into the new repaired sstable
-                        if (Range.isInRanges(row.key.getToken(), ranges))
+                        if (containmentChecker.contains(row.key.getToken()))
                         {
                             repairedSSTableWriter.append(row);
                             repairedKeyCount++;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java 
b/src/java/org/apache/cassandra/dht/Range.java
index 505f1f3..81c92a2 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -21,6 +21,8 @@ import java.io.Serializable;
 import java.util.*;
 
 import org.apache.commons.lang3.ObjectUtils;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
@@ -491,4 +493,46 @@ public class Range<T extends RingPosition<T>> extends 
AbstractBounds<T> implemen
     {
         return new Range<T>(left, newRight);
     }
+
+    /**
+     * Helper class to check if a token is contained within a given collection 
of ranges
+     */
+    public static class OrderedRangeContainmentChecker
+    {
+        private final Iterator<Range<Token>> normalizedRangesIterator;
+        private Token lastToken = null;
+        private Range<Token> currentRange;
+
+        public OrderedRangeContainmentChecker(Collection<Range<Token>> ranges)
+        {
+            normalizedRangesIterator = normalize(ranges).iterator();
+            assert normalizedRangesIterator.hasNext();
+            currentRange = normalizedRangesIterator.next();
+        }
+
+        /**
+         * Returns true if the ranges given in the constructor contains the 
token, false otherwise.
+         *
+         * The tokens passed to this method must be in increasing order
+         *
+         * @param t token to check, must be larger than or equal to the last 
token passed
+         * @return true if the token is contained within the ranges given to 
the constructor.
+         */
+        public boolean contains(Token t)
+        {
+            assert lastToken == null || lastToken.compareTo(t) <= 0;
+            lastToken = t;
+            while (true)
+            {
+                if (t.compareTo(currentRange.left) <= 0)
+                    return false;
+                else if (t.compareTo(currentRange.right) <= 0 || 
currentRange.right.compareTo(currentRange.left) <= 0)
+                    return true;
+
+                if (!normalizedRangesIterator.hasNext())
+                    return false;
+                currentRange = normalizedRangesIterator.next();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/test/unit/org/apache/cassandra/dht/RangeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java 
b/test/unit/org/apache/cassandra/dht/RangeTest.java
index 906396c..1d8123b 100644
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@ -19,9 +19,15 @@
 package org.apache.cassandra.dht;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
+
+import com.google.common.base.Joiner;
+
 import static java.util.Arrays.asList;
 
 import org.apache.commons.lang3.StringUtils;
@@ -29,6 +35,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.db.RowPosition;
 import static org.apache.cassandra.Util.range;
+import static org.junit.Assert.*;
 
 
 public class RangeTest
@@ -536,4 +543,64 @@ public class RangeTest
         expected = asList(range("", ""));
         assertNormalize(input, expected);
     }
+
+    @Test
+    public void testRandomOrderedRangeContainmentChecker()
+    {
+        Random r = new Random();
+        for (int j = 0; j < 1000; j++)
+        {
+            int numTokens = r.nextInt(300) + 1;
+            List<Range<Token>> ranges = new ArrayList<>(numTokens);
+            List<Token> tokens = new ArrayList<>(2 * numTokens);
+            for (int i = 0; i < 2 * numTokens; i++)
+                tokens.add(t(r.nextLong()));
+
+            Collections.sort(tokens);
+
+            for (int i = 0; i < tokens.size(); i++)
+            {
+                ranges.add(new Range<>(tokens.get(i), tokens.get(i + 1)));
+                i++;
+            }
+
+            List<Token> tokensToTest = new ArrayList<>();
+            for (int i = 0; i < 10000; i++)
+                tokensToTest.add(t(r.nextLong()));
+
+            tokensToTest.add(t(Long.MAX_VALUE));
+            tokensToTest.add(t(Long.MIN_VALUE));
+            tokensToTest.add(t(Long.MAX_VALUE - 1));
+            tokensToTest.add(t(Long.MIN_VALUE + 1));
+            Collections.sort(tokensToTest);
+
+            Range.OrderedRangeContainmentChecker checker = new 
Range.OrderedRangeContainmentChecker(ranges);
+            for (Token t : tokensToTest)
+            {
+                if (checker.contains(t) != Range.isInRanges(t, ranges)) // 
avoid running Joiner.on(..) every iteration
+                    fail(String.format("This should never flap! If it does, it 
is a bug (ranges = %s, token = %s)", Joiner.on(",").join(ranges), t));
+            }
+        }
+    }
+
+    @Test
+    public void testBoundariesORCC()
+    {
+        List<Range<Token>> ranges = asList(r(Long.MIN_VALUE, Long.MIN_VALUE + 
1), r(Long.MAX_VALUE - 1, Long.MAX_VALUE));
+        Range.OrderedRangeContainmentChecker checker = new 
Range.OrderedRangeContainmentChecker(ranges);
+        assertFalse(checker.contains(t(Long.MIN_VALUE)));
+        assertTrue(checker.contains(t(Long.MIN_VALUE + 1)));
+        assertFalse(checker.contains(t(0)));
+        assertFalse(checker.contains(t(Long.MAX_VALUE - 1)));
+        assertTrue(checker.contains(t(Long.MAX_VALUE)));
+    }
+
+    private static Range<Token> r(long left, long right)
+    {
+        return new Range<>(t(left), t(right));
+    }
+    private static Token t(long t)
+    {
+        return new LongToken(t);
+    }
 }

Reply via email to