SSTableScanner enforces its bounds patch by benedict; reviewed by sylvain for CASSANDRA-8946
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/572ef50d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/572ef50d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/572ef50d Branch: refs/heads/trunk Commit: 572ef50dd11fcb501ebe46f1dde6656e42cb96bb Parents: 69ffd1f Author: Benedict Elliott Smith <bened...@apache.org> Authored: Wed Mar 18 11:02:35 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Wed Mar 18 11:02:35 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/dht/AbstractBounds.java | 74 ++++++++++ src/java/org/apache/cassandra/dht/Bounds.java | 10 ++ .../apache/cassandra/dht/ExcludingBounds.java | 10 ++ .../cassandra/dht/IncludingExcludingBounds.java | 10 ++ src/java/org/apache/cassandra/dht/Range.java | 10 ++ .../cassandra/io/sstable/SSTableScanner.java | 42 ++++-- .../io/sstable/SSTableScannerTest.java | 143 ++++++++++++++++--- 8 files changed, 270 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2af8df6..36bdb39 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.4 + * SSTableScanner enforces its bounds (CASSANDRA-8946) * Cleanup cell equality (CASSANDRA-8947) * Introduce intra-cluster message coalescing (CASSANDRA-8692) * DatabaseDescriptor throws NPE when rpc_interface is used (CASSANDRA-8839) http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/dht/AbstractBounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java index 90eb6b5..6d2ee43 100644 --- a/src/java/org/apache/cassandra/dht/AbstractBounds.java +++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java @@ -68,6 +68,8 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria * instead. */ public abstract Pair<AbstractBounds<T>, AbstractBounds<T>> split(T position); + public abstract boolean inclusiveLeft(); + public abstract boolean inclusiveRight(); @Override public int hashCode() @@ -193,4 +195,76 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria return size; } } + + public static <T extends RingPosition<T>> AbstractBounds<T> bounds(Boundary<T> min, Boundary<T> max) + { + return bounds(min.boundary, min.inclusive, max.boundary, max.inclusive); + } + public static <T extends RingPosition<T>> AbstractBounds<T> bounds(T min, boolean inclusiveMin, T max, boolean inclusiveMax) + { + if (inclusiveMin && inclusiveMax) + return new Bounds<T>(min, max); + else if (inclusiveMax) + return new Range<T>(min, max); + else if (inclusiveMin) + return new IncludingExcludingBounds<T>(min, max); + else + return new ExcludingBounds<T>(min, max); + } + + // represents one side of a bounds (which side is not encoded) + public static class Boundary<T extends RingPosition<T>> + { + public final T boundary; + public final boolean inclusive; + public Boundary(T boundary, boolean inclusive) + { + this.boundary = boundary; + this.inclusive = inclusive; + } + } + + public Boundary<T> leftBoundary() + { + return new Boundary<>(left, inclusiveLeft()); + } + + public Boundary<T> rightBoundary() + { + return new Boundary<>(right, inclusiveRight()); + } + + public static <T extends RingPosition<T>> boolean isEmpty(Boundary<T> left, Boundary<T> right) + { + int c = left.boundary.compareTo(right.boundary); + return c > 0 || (c == 0 && !(left.inclusive && right.inclusive)); + } + + public static <T extends RingPosition<T>> Boundary<T> minRight(Boundary<T> right1, T right2, boolean isInclusiveRight2) + { + return minRight(right1, new Boundary<T>(right2, isInclusiveRight2)); + } + + public static <T extends RingPosition<T>> Boundary<T> minRight(Boundary<T> right1, Boundary<T> right2) + { + int c = right1.boundary.compareTo(right2.boundary); + if (c != 0) + return c < 0 ? right1 : right2; + // return the exclusive version, if either + return right2.inclusive ? right1 : right2; + } + + public static <T extends RingPosition<T>> Boundary<T> maxLeft(Boundary<T> left1, T left2, boolean isInclusiveLeft2) + { + return maxLeft(left1, new Boundary<T>(left2, isInclusiveLeft2)); + } + + public static <T extends RingPosition<T>> Boundary<T> maxLeft(Boundary<T> left1, Boundary<T> left2) + { + int c = left1.boundary.compareTo(left1.boundary); + if (c != 0) + return c > 0 ? left1 : left2; + // return the exclusive version, if either + return left2.inclusive ? left1 : left2; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/dht/Bounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Bounds.java b/src/java/org/apache/cassandra/dht/Bounds.java index 396fc30..42eea77 100644 --- a/src/java/org/apache/cassandra/dht/Bounds.java +++ b/src/java/org/apache/cassandra/dht/Bounds.java @@ -61,6 +61,16 @@ public class Bounds<T extends RingPosition<T>> extends AbstractBounds<T> return Pair.create(lb, rb); } + public boolean inclusiveLeft() + { + return true; + } + + public boolean inclusiveRight() + { + return true; + } + public boolean intersects(Bounds<T> that) { // We either contains one of the that bounds, or we are fully contained into that. http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/dht/ExcludingBounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/ExcludingBounds.java b/src/java/org/apache/cassandra/dht/ExcludingBounds.java index 33e48b6..0d09f08 100644 --- a/src/java/org/apache/cassandra/dht/ExcludingBounds.java +++ b/src/java/org/apache/cassandra/dht/ExcludingBounds.java @@ -56,6 +56,16 @@ public class ExcludingBounds<T extends RingPosition<T>> extends AbstractBounds<T return Pair.create(lb, rb); } + public boolean inclusiveLeft() + { + return false; + } + + public boolean inclusiveRight() + { + return false; + } + public List<? extends AbstractBounds<T>> unwrap() { // ExcludingBounds objects never wrap http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java index e8e9c74..278a806 100644 --- a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java +++ b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java @@ -55,6 +55,16 @@ public class IncludingExcludingBounds<T extends RingPosition<T>> extends Abstrac return Pair.create(lb, rb); } + public boolean inclusiveLeft() + { + return true; + } + + public boolean inclusiveRight() + { + return false; + } + public List<? extends AbstractBounds<T>> unwrap() { // IncludingExcludingBounds objects never wrap http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/dht/Range.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java index 44b76d5..505f1f3 100644 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@ -230,6 +230,16 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen return Pair.create(lb, rb); } + public boolean inclusiveLeft() + { + return false; + } + + public boolean inclusiveRight() + { + return true; + } + public List<Range<T>> unwrap() { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index c05103b..46ddc24 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@ -32,6 +32,7 @@ import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; import org.apache.cassandra.db.columniterator.LazyColumnIterator; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.AbstractBounds.Boundary; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -40,6 +41,10 @@ import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; +import static org.apache.cassandra.dht.AbstractBounds.isEmpty; +import static org.apache.cassandra.dht.AbstractBounds.maxLeft; +import static org.apache.cassandra.dht.AbstractBounds.minRight; + public class SSTableScanner implements ISSTableScanner { protected final RandomAccessReader dfile; @@ -84,28 +89,39 @@ public class SSTableScanner implements ISSTableScanner List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2); if (dataRange.isWrapAround()) { - if (dataRange.stopKey().isMinimum(sstable.partitioner) - || dataRange.stopKey().compareTo(sstable.last) >= 0 - || dataRange.startKey().compareTo(sstable.first) <= 0) + if (dataRange.stopKey().compareTo(sstable.first) >= 0) { - boundsList.add(new Bounds<RowPosition>(sstable.first, sstable.last, sstable.partitioner)); + // since we wrap, we must contain the whole sstable prior to stopKey() + Boundary<RowPosition> left = new Boundary<RowPosition>(sstable.first, true); + Boundary<RowPosition> right; + right = dataRange.keyRange().rightBoundary(); + right = minRight(right, sstable.last, true); + if (!isEmpty(left, right)) + boundsList.add(AbstractBounds.bounds(left, right)); } - else + if (dataRange.startKey().compareTo(sstable.last) <= 0) { - if (dataRange.startKey().compareTo(sstable.last) <= 0) - boundsList.add(new Bounds<>(dataRange.startKey(), sstable.last, sstable.partitioner)); - if (dataRange.stopKey().compareTo(sstable.first) >= 0) - boundsList.add(new Bounds<>(sstable.first, dataRange.stopKey(), sstable.partitioner)); + // since we wrap, we must contain the whole sstable after dataRange.startKey() + Boundary<RowPosition> right = new Boundary<RowPosition>(sstable.last, true); + Boundary<RowPosition> left; + left = dataRange.keyRange().leftBoundary(); + left = maxLeft(left, sstable.first, true); + if (!isEmpty(left, right)) + boundsList.add(AbstractBounds.bounds(left, right)); } } else { assert dataRange.startKey().compareTo(dataRange.stopKey()) <= 0 || dataRange.stopKey().isMinimum(); - RowPosition left = Ordering.natural().max(dataRange.startKey(), sstable.first); + Boundary<RowPosition> left, right; + left = dataRange.keyRange().leftBoundary(); + right = dataRange.keyRange().rightBoundary(); + left = maxLeft(left, sstable.first, true); // apparently isWrapAround() doesn't count Bounds that extend to the limit (min) as wrapping - RowPosition right = dataRange.stopKey().isMinimum() ? sstable.last : Ordering.natural().min(dataRange.stopKey(), sstable.last); - if (left.compareTo(right) <= 0) - boundsList.add(new Bounds<>(left, right, sstable.partitioner)); + right = dataRange.stopKey().isMinimum() ? new Boundary<RowPosition>(sstable.last, true) + : minRight(right, sstable.last, true); + if (!isEmpty(left, right)) + boundsList.add(AbstractBounds.bounds(left, right)); } this.rangeIterator = boundsList.iterator(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java index ff60481..91a820c 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java @@ -18,21 +18,19 @@ */ package org.apache.cassandra.io.sstable; -import java.util.ArrayList; -import java.util.Collection; +import java.util.*; +import com.google.common.collect.Iterables; import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.dht.Bounds; -import org.apache.cassandra.dht.BytesToken; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; +import org.apache.cassandra.dht.*; import org.apache.cassandra.utils.ByteBufferUtil; +import static org.apache.cassandra.dht.AbstractBounds.isEmpty; import static org.junit.Assert.*; public class SSTableScannerTest extends SchemaLoader @@ -45,17 +43,85 @@ public class SSTableScannerTest extends SchemaLoader return String.format("%03d", key); } - private static Bounds<RowPosition> boundsFor(int start, int end) + // we produce all DataRange variations that produce an inclusive start and exclusive end range + private static Iterable<DataRange> dataRanges(int start, int end) { - return new Bounds<RowPosition>(new BytesToken(toKey(start).getBytes()).minKeyBound(), - new BytesToken(toKey(end).getBytes()).maxKeyBound()); + if (end < start) + return dataRanges(start, end, false, true); + return Iterables.concat(dataRanges(start, end, false, false), + dataRanges(start, end, false, true), + dataRanges(start, end, true, false), + dataRanges(start, end, true, true) + ); } + private static Iterable<DataRange> dataRanges(int start, int end, boolean inclusiveStart, boolean inclusiveEnd) + { + List<DataRange> ranges = new ArrayList<>(); + if (start == end + 1) + { + assert !inclusiveStart && inclusiveEnd; + ranges.add(dataRange(min(start), false, max(end), true)); + ranges.add(dataRange(min(start), false, min(end + 1), true)); + ranges.add(dataRange(max(start - 1), false, max(end), true)); + ranges.add(dataRange(dk(start - 1), false, dk(start - 1), true)); + } + else + { + for (RowPosition s : starts(start, inclusiveStart)) + { + for (RowPosition e : ends(end, inclusiveEnd)) + { + if (end < start && e.compareTo(s) > 0) + continue; + if (!isEmpty(new AbstractBounds.Boundary<>(s, inclusiveStart), new AbstractBounds.Boundary<>(e, inclusiveEnd))) + continue; + ranges.add(dataRange(s, inclusiveStart, e, inclusiveEnd)); + } + } + } + return ranges; + } + + private static Iterable<RowPosition> starts(int key, boolean inclusive) + { + return Arrays.asList(min(key), max(key - 1), dk(inclusive ? key : key - 1)); + } + + private static Iterable<RowPosition> ends(int key, boolean inclusive) + { + return Arrays.asList(max(key), min(key + 1), dk(inclusive ? key : key + 1)); + } + + private static DecoratedKey dk(int key) + { + return Util.dk(toKey(key)); + } + + private static Token token(int key) + { + return key == Integer.MIN_VALUE ? ByteOrderedPartitioner.MINIMUM : new BytesToken(toKey(key).getBytes()); + } + + private static RowPosition min(int key) + { + return token(key).minKeyBound(); + } + + private static RowPosition max(int key) + { + return token(key).maxKeyBound(); + } + + private static DataRange dataRange(RowPosition start, boolean startInclusive, RowPosition end, boolean endInclusive) + { + return new DataRange(AbstractBounds.bounds(start, startInclusive, end, endInclusive), new IdentityQueryFilter()); + } private static Range<Token> rangeFor(int start, int end) { return new Range<Token>(new BytesToken(toKey(start).getBytes()), - new BytesToken(toKey(end).getBytes())); + end == Integer.MIN_VALUE ? ByteOrderedPartitioner.MINIMUM : new BytesToken(toKey(end).getBytes())); } private static Collection<Range<Token>> makeRanges(int ... keys) @@ -75,18 +141,22 @@ public class SSTableScannerTest extends SchemaLoader rm.apply(); } - private static void assertScanMatches(SSTableReader sstable, int scanStart, int scanEnd, int expectedStart, int expectedEnd) + private static void assertScanMatches(SSTableReader sstable, int scanStart, int scanEnd, int ... boundaries) { - ISSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter())); - for (int i = expectedStart; i <= expectedEnd; i++) - assertEquals(toKey(i), new String(scanner.next().getKey().getKey().array())); - assertFalse(scanner.hasNext()); + assert boundaries.length % 2 == 0; + for (DataRange range : dataRanges(scanStart, scanEnd)) + { + ISSTableScanner scanner = sstable.getScanner(range); + for (int b = 0 ; b < boundaries.length ; b += 2) + for (int i = boundaries[b] ; i <= boundaries[b + 1] ; i++) + assertEquals(toKey(i), new String(scanner.next().getKey().getKey().array())); + assertFalse(scanner.hasNext()); + } } private static void assertScanEmpty(SSTableReader sstable, int scanStart, int scanEnd) { - ISSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter())); - assertFalse(String.format("scan of (%03d, %03d] should be empty", scanStart, scanEnd), scanner.hasNext()); + assertScanMatches(sstable, scanStart, scanEnd); } @Test @@ -132,6 +202,45 @@ public class SSTableScannerTest extends SchemaLoader // empty ranges assertScanEmpty(sstable, 0, 1); assertScanEmpty(sstable, 10, 11); + + // wrapping, starts in middle + assertScanMatches(sstable, 5, 3, 2, 3, 5, 9); + assertScanMatches(sstable, 5, 2, 2, 2, 5, 9); + assertScanMatches(sstable, 5, 1, 5, 9); + assertScanMatches(sstable, 5, Integer.MIN_VALUE, 5, 9); + // wrapping, starts at end + assertScanMatches(sstable, 9, 8, 2, 8, 9, 9); + assertScanMatches(sstable, 9, 3, 2, 3, 9, 9); + assertScanMatches(sstable, 9, 2, 2, 2, 9, 9); + assertScanMatches(sstable, 9, 1, 9, 9); + assertScanMatches(sstable, 9, Integer.MIN_VALUE, 9, 9); + assertScanMatches(sstable, 8, 3, 2, 3, 8, 9); + assertScanMatches(sstable, 8, 2, 2, 2, 8, 9); + assertScanMatches(sstable, 8, 1, 8, 9); + assertScanMatches(sstable, 8, Integer.MIN_VALUE, 8, 9); + // wrapping, starts past end + assertScanMatches(sstable, 10, 9, 2, 9); + assertScanMatches(sstable, 10, 5, 2, 5); + assertScanMatches(sstable, 10, 2, 2, 2); + assertScanEmpty(sstable, 10, 1); + assertScanEmpty(sstable, 10, Integer.MIN_VALUE); + assertScanMatches(sstable, 11, 10, 2, 9); + assertScanMatches(sstable, 11, 9, 2, 9); + assertScanMatches(sstable, 11, 5, 2, 5); + assertScanMatches(sstable, 11, 2, 2, 2); + assertScanEmpty(sstable, 11, 1); + assertScanEmpty(sstable, 11, Integer.MIN_VALUE); + // wrapping, starts at start + assertScanMatches(sstable, 3, 1, 3, 9); + assertScanMatches(sstable, 3, Integer.MIN_VALUE, 3, 9); + assertScanMatches(sstable, 2, 1, 2, 9); + assertScanMatches(sstable, 2, Integer.MIN_VALUE, 2, 9); + assertScanMatches(sstable, 1, 0, 2, 9); + assertScanMatches(sstable, 1, Integer.MIN_VALUE, 2, 9); + // wrapping, starts before + assertScanMatches(sstable, 1, -1, 2, 9); + assertScanMatches(sstable, 1, Integer.MIN_VALUE, 2, 9); + assertScanMatches(sstable, 1, 0, 2, 9); } private static void assertScanContainsRanges(ISSTableScanner scanner, int ... rangePairs)