Repository: cassandra Updated Branches: refs/heads/cassandra-3.1 4c6f32569 -> 02492f7bc
Optimize the way we check if a token is repaired in anticompaction Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-10768 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbfeeac1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbfeeac1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbfeeac1 Branch: refs/heads/cassandra-3.1 Commit: dbfeeac177074692bdf71d98ffb2cacb14802fb3 Parents: 5ba69a3 Author: Marcus Eriksson <marc...@apache.org> Authored: Wed Nov 25 11:28:51 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Dec 2 08:37:08 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 4 +- src/java/org/apache/cassandra/dht/Range.java | 44 +++++++++++++ .../org/apache/cassandra/dht/RangeTest.java | 67 ++++++++++++++++++++ 4 files changed, 114 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3ce2da6..b0f9588 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768) * Add proper error handling to stream receiver (CASSANDRA-10774) * Warn or fail when changing cluster topology live (CASSANDRA-10243) * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index b0ad244..2630ba2 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1115,11 +1115,11 @@ public class CompactionManager implements CompactionManagerMBean metrics.beginCompaction(ci); try { + Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges); while (iter.hasNext()) { AbstractCompactedRow row = iter.next(); - // if current range from sstable is repaired, save it into the new repaired sstable - if (Range.isInRanges(row.key.getToken(), ranges)) + if (containmentChecker.contains(row.key.getToken())) { repairedSSTableWriter.append(row); repairedKeyCount++; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/src/java/org/apache/cassandra/dht/Range.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java index 505f1f3..81c92a2 100644 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@ -21,6 +21,8 @@ import java.io.Serializable; import java.util.*; import org.apache.commons.lang3.ObjectUtils; + +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.Pair; @@ -491,4 +493,46 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen { return new Range<T>(left, newRight); } + + /** + * Helper class to check if a token is contained within a given collection of ranges + */ + public static class OrderedRangeContainmentChecker + { + private final Iterator<Range<Token>> normalizedRangesIterator; + private Token lastToken = null; + private Range<Token> currentRange; + + public OrderedRangeContainmentChecker(Collection<Range<Token>> ranges) + { + normalizedRangesIterator = normalize(ranges).iterator(); + assert normalizedRangesIterator.hasNext(); + currentRange = normalizedRangesIterator.next(); + } + + /** + * Returns true if the ranges given in the constructor contains the token, false otherwise. + * + * The tokens passed to this method must be in increasing order + * + * @param t token to check, must be larger than or equal to the last token passed + * @return true if the token is contained within the ranges given to the constructor. + */ + public boolean contains(Token t) + { + assert lastToken == null || lastToken.compareTo(t) <= 0; + lastToken = t; + while (true) + { + if (t.compareTo(currentRange.left) <= 0) + return false; + else if (t.compareTo(currentRange.right) <= 0 || currentRange.right.compareTo(currentRange.left) <= 0) + return true; + + if (!normalizedRangesIterator.hasNext()) + return false; + currentRange = normalizedRangesIterator.next(); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/test/unit/org/apache/cassandra/dht/RangeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java index 906396c..1d8123b 100644 --- a/test/unit/org/apache/cassandra/dht/RangeTest.java +++ b/test/unit/org/apache/cassandra/dht/RangeTest.java @@ -19,9 +19,15 @@ package org.apache.cassandra.dht; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Random; import java.util.Set; + +import com.google.common.base.Joiner; + import static java.util.Arrays.asList; import org.apache.commons.lang3.StringUtils; @@ -29,6 +35,7 @@ import org.junit.Test; import org.apache.cassandra.db.RowPosition; import static org.apache.cassandra.Util.range; +import static org.junit.Assert.*; public class RangeTest @@ -536,4 +543,64 @@ public class RangeTest expected = asList(range("", "")); assertNormalize(input, expected); } + + @Test + public void testRandomOrderedRangeContainmentChecker() + { + Random r = new Random(); + for (int j = 0; j < 1000; j++) + { + int numTokens = r.nextInt(300) + 1; + List<Range<Token>> ranges = new ArrayList<>(numTokens); + List<Token> tokens = new ArrayList<>(2 * numTokens); + for (int i = 0; i < 2 * numTokens; i++) + tokens.add(t(r.nextLong())); + + Collections.sort(tokens); + + for (int i = 0; i < tokens.size(); i++) + { + ranges.add(new Range<>(tokens.get(i), tokens.get(i + 1))); + i++; + } + + List<Token> tokensToTest = new ArrayList<>(); + for (int i = 0; i < 10000; i++) + tokensToTest.add(t(r.nextLong())); + + tokensToTest.add(t(Long.MAX_VALUE)); + tokensToTest.add(t(Long.MIN_VALUE)); + tokensToTest.add(t(Long.MAX_VALUE - 1)); + tokensToTest.add(t(Long.MIN_VALUE + 1)); + Collections.sort(tokensToTest); + + Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges); + for (Token t : tokensToTest) + { + if (checker.contains(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration + fail(String.format("This should never flap! If it does, it is a bug (ranges = %s, token = %s)", Joiner.on(",").join(ranges), t)); + } + } + } + + @Test + public void testBoundariesORCC() + { + List<Range<Token>> ranges = asList(r(Long.MIN_VALUE, Long.MIN_VALUE + 1), r(Long.MAX_VALUE - 1, Long.MAX_VALUE)); + Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges); + assertFalse(checker.contains(t(Long.MIN_VALUE))); + assertTrue(checker.contains(t(Long.MIN_VALUE + 1))); + assertFalse(checker.contains(t(0))); + assertFalse(checker.contains(t(Long.MAX_VALUE - 1))); + assertTrue(checker.contains(t(Long.MAX_VALUE))); + } + + private static Range<Token> r(long left, long right) + { + return new Range<>(t(left), t(right)); + } + private static Token t(long t) + { + return new LongToken(t); + } }