SSTableScanner enforces its bounds

patch by benedict; reviewed by sylvain for CASSANDRA-8946


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/572ef50d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/572ef50d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/572ef50d

Branch: refs/heads/trunk
Commit: 572ef50dd11fcb501ebe46f1dde6656e42cb96bb
Parents: 69ffd1f
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Wed Mar 18 11:02:35 2015 +0000
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Wed Mar 18 11:02:35 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/dht/AbstractBounds.java    |  74 ++++++++++
 src/java/org/apache/cassandra/dht/Bounds.java   |  10 ++
 .../apache/cassandra/dht/ExcludingBounds.java   |  10 ++
 .../cassandra/dht/IncludingExcludingBounds.java |  10 ++
 src/java/org/apache/cassandra/dht/Range.java    |  10 ++
 .../cassandra/io/sstable/SSTableScanner.java    |  42 ++++--
 .../io/sstable/SSTableScannerTest.java          | 143 ++++++++++++++++---
 8 files changed, 270 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2af8df6..36bdb39 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * SSTableScanner enforces its bounds (CASSANDRA-8946)
  * Cleanup cell equality (CASSANDRA-8947)
  * Introduce intra-cluster message coalescing (CASSANDRA-8692)
  * DatabaseDescriptor throws NPE when rpc_interface is used (CASSANDRA-8839)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java 
b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index 90eb6b5..6d2ee43 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -68,6 +68,8 @@ public abstract class AbstractBounds<T extends 
RingPosition<T>> implements Seria
      * instead.
      */
     public abstract Pair<AbstractBounds<T>, AbstractBounds<T>> split(T 
position);
+    public abstract boolean inclusiveLeft();
+    public abstract boolean inclusiveRight();
 
     @Override
     public int hashCode()
@@ -193,4 +195,76 @@ public abstract class AbstractBounds<T extends 
RingPosition<T>> implements Seria
             return size;
         }
     }
+
+    public static <T extends RingPosition<T>> AbstractBounds<T> 
bounds(Boundary<T> min, Boundary<T> max)
+    {
+        return bounds(min.boundary, min.inclusive, max.boundary, 
max.inclusive);
+    }
+    public static <T extends RingPosition<T>> AbstractBounds<T> bounds(T min, 
boolean inclusiveMin, T max, boolean inclusiveMax)
+    {
+        if (inclusiveMin && inclusiveMax)
+            return new Bounds<T>(min, max);
+        else if (inclusiveMax)
+            return new Range<T>(min, max);
+        else if (inclusiveMin)
+            return new IncludingExcludingBounds<T>(min, max);
+        else
+            return new ExcludingBounds<T>(min, max);
+    }
+
+    // represents one side of a bounds (which side is not encoded)
+    public static class Boundary<T extends RingPosition<T>>
+    {
+        public final T boundary;
+        public final boolean inclusive;
+        public Boundary(T boundary, boolean inclusive)
+        {
+            this.boundary = boundary;
+            this.inclusive = inclusive;
+        }
+    }
+
+    public Boundary<T> leftBoundary()
+    {
+        return new Boundary<>(left, inclusiveLeft());
+    }
+
+    public Boundary<T> rightBoundary()
+    {
+        return new Boundary<>(right, inclusiveRight());
+    }
+
+    public static <T extends RingPosition<T>> boolean isEmpty(Boundary<T> 
left, Boundary<T> right)
+    {
+        int c = left.boundary.compareTo(right.boundary);
+        return c > 0 || (c == 0 && !(left.inclusive && right.inclusive));
+    }
+
+    public static <T extends RingPosition<T>> Boundary<T> minRight(Boundary<T> 
right1, T right2, boolean isInclusiveRight2)
+    {
+        return minRight(right1, new Boundary<T>(right2, isInclusiveRight2));
+    }
+
+    public static <T extends RingPosition<T>> Boundary<T> minRight(Boundary<T> 
right1, Boundary<T> right2)
+    {
+        int c = right1.boundary.compareTo(right2.boundary);
+        if (c != 0)
+            return c < 0 ? right1 : right2;
+        // return the exclusive version, if either
+        return right2.inclusive ? right1 : right2;
+    }
+
+    public static <T extends RingPosition<T>> Boundary<T> maxLeft(Boundary<T> 
left1, T left2, boolean isInclusiveLeft2)
+    {
+        return maxLeft(left1, new Boundary<T>(left2, isInclusiveLeft2));
+    }
+
+    public static <T extends RingPosition<T>> Boundary<T> maxLeft(Boundary<T> 
left1, Boundary<T> left2)
+    {
+        int c = left1.boundary.compareTo(left1.boundary);
+        if (c != 0)
+            return c > 0 ? left1 : left2;
+        // return the exclusive version, if either
+        return left2.inclusive ? left1 : left2;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Bounds.java 
b/src/java/org/apache/cassandra/dht/Bounds.java
index 396fc30..42eea77 100644
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@ -61,6 +61,16 @@ public class Bounds<T extends RingPosition<T>> extends 
AbstractBounds<T>
         return Pair.create(lb, rb);
     }
 
+    public boolean inclusiveLeft()
+    {
+        return true;
+    }
+
+    public boolean inclusiveRight()
+    {
+        return true;
+    }
+
     public boolean intersects(Bounds<T> that)
     {
         // We either contains one of the that bounds, or we are fully 
contained into that.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/dht/ExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ExcludingBounds.java 
b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
index 33e48b6..0d09f08 100644
--- a/src/java/org/apache/cassandra/dht/ExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
@@ -56,6 +56,16 @@ public class ExcludingBounds<T extends RingPosition<T>> 
extends AbstractBounds<T
         return Pair.create(lb, rb);
     }
 
+    public boolean inclusiveLeft()
+    {
+        return false;
+    }
+
+    public boolean inclusiveRight()
+    {
+        return false;
+    }
+
     public List<? extends AbstractBounds<T>> unwrap()
     {
         // ExcludingBounds objects never wrap

http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java 
b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
index e8e9c74..278a806 100644
--- a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
@@ -55,6 +55,16 @@ public class IncludingExcludingBounds<T extends 
RingPosition<T>> extends Abstrac
         return Pair.create(lb, rb);
     }
 
+    public boolean inclusiveLeft()
+    {
+        return true;
+    }
+
+    public boolean inclusiveRight()
+    {
+        return false;
+    }
+
     public List<? extends AbstractBounds<T>> unwrap()
     {
         // IncludingExcludingBounds objects never wrap

http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java 
b/src/java/org/apache/cassandra/dht/Range.java
index 44b76d5..505f1f3 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -230,6 +230,16 @@ public class Range<T extends RingPosition<T>> extends 
AbstractBounds<T> implemen
         return Pair.create(lb, rb);
     }
 
+    public boolean inclusiveLeft()
+    {
+        return false;
+    }
+
+    public boolean inclusiveRight()
+    {
+        return true;
+    }
+
     public List<Range<T>> unwrap()
     {
         @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index c05103b..46ddc24 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -32,6 +32,7 @@ import 
org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
 import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.AbstractBounds.Boundary;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -40,6 +41,10 @@ import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
+import static org.apache.cassandra.dht.AbstractBounds.isEmpty;
+import static org.apache.cassandra.dht.AbstractBounds.maxLeft;
+import static org.apache.cassandra.dht.AbstractBounds.minRight;
+
 public class SSTableScanner implements ISSTableScanner
 {
     protected final RandomAccessReader dfile;
@@ -84,28 +89,39 @@ public class SSTableScanner implements ISSTableScanner
         List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
         if (dataRange.isWrapAround())
         {
-            if (dataRange.stopKey().isMinimum(sstable.partitioner)
-                || dataRange.stopKey().compareTo(sstable.last) >= 0
-                || dataRange.startKey().compareTo(sstable.first) <= 0)
+            if (dataRange.stopKey().compareTo(sstable.first) >= 0)
             {
-                boundsList.add(new Bounds<RowPosition>(sstable.first, 
sstable.last, sstable.partitioner));
+                // since we wrap, we must contain the whole sstable prior to 
stopKey()
+                Boundary<RowPosition> left = new 
Boundary<RowPosition>(sstable.first, true);
+                Boundary<RowPosition> right;
+                right = dataRange.keyRange().rightBoundary();
+                right = minRight(right, sstable.last, true);
+                if (!isEmpty(left, right))
+                    boundsList.add(AbstractBounds.bounds(left, right));
             }
-            else
+            if (dataRange.startKey().compareTo(sstable.last) <= 0)
             {
-                if (dataRange.startKey().compareTo(sstable.last) <= 0)
-                    boundsList.add(new Bounds<>(dataRange.startKey(), 
sstable.last, sstable.partitioner));
-                if (dataRange.stopKey().compareTo(sstable.first) >= 0)
-                    boundsList.add(new Bounds<>(sstable.first, 
dataRange.stopKey(), sstable.partitioner));
+                // since we wrap, we must contain the whole sstable after 
dataRange.startKey()
+                Boundary<RowPosition> right = new 
Boundary<RowPosition>(sstable.last, true);
+                Boundary<RowPosition> left;
+                left = dataRange.keyRange().leftBoundary();
+                left = maxLeft(left, sstable.first, true);
+                if (!isEmpty(left, right))
+                    boundsList.add(AbstractBounds.bounds(left, right));
             }
         }
         else
         {
             assert dataRange.startKey().compareTo(dataRange.stopKey()) <= 0 || 
dataRange.stopKey().isMinimum();
-            RowPosition left = Ordering.natural().max(dataRange.startKey(), 
sstable.first);
+            Boundary<RowPosition> left, right;
+            left = dataRange.keyRange().leftBoundary();
+            right = dataRange.keyRange().rightBoundary();
+            left = maxLeft(left, sstable.first, true);
             // apparently isWrapAround() doesn't count Bounds that extend to 
the limit (min) as wrapping
-            RowPosition right = dataRange.stopKey().isMinimum() ? sstable.last 
: Ordering.natural().min(dataRange.stopKey(), sstable.last);
-            if (left.compareTo(right) <= 0)
-                boundsList.add(new Bounds<>(left, right, sstable.partitioner));
+            right = dataRange.stopKey().isMinimum() ? new 
Boundary<RowPosition>(sstable.last, true)
+                                                    : minRight(right, 
sstable.last, true);
+            if (!isEmpty(left, right))
+                boundsList.add(AbstractBounds.bounds(left, right));
         }
         this.rangeIterator = boundsList.iterator();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/572ef50d/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
index ff60481..91a820c 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@ -18,21 +18,19 @@
 */
 package org.apache.cassandra.io.sstable;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.*;
 
+import com.google.common.collect.Iterables;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.BytesToken;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.dht.AbstractBounds.isEmpty;
 import static org.junit.Assert.*;
 
 public class SSTableScannerTest extends SchemaLoader
@@ -45,17 +43,85 @@ public class SSTableScannerTest extends SchemaLoader
         return String.format("%03d", key);
     }
 
-    private static Bounds<RowPosition> boundsFor(int start, int end)
+    // we produce all DataRange variations that produce an inclusive start and 
exclusive end range
+    private static Iterable<DataRange> dataRanges(int start, int end)
     {
-        return new Bounds<RowPosition>(new 
BytesToken(toKey(start).getBytes()).minKeyBound(),
-                                       new 
BytesToken(toKey(end).getBytes()).maxKeyBound());
+        if (end < start)
+            return dataRanges(start, end, false, true);
+        return Iterables.concat(dataRanges(start, end, false, false),
+                                dataRanges(start, end, false, true),
+                                dataRanges(start, end, true, false),
+                                dataRanges(start, end, true, true)
+        );
     }
 
+    private static Iterable<DataRange> dataRanges(int start, int end, boolean 
inclusiveStart, boolean inclusiveEnd)
+    {
+        List<DataRange> ranges = new ArrayList<>();
+        if (start == end + 1)
+        {
+            assert !inclusiveStart && inclusiveEnd;
+            ranges.add(dataRange(min(start), false, max(end), true));
+            ranges.add(dataRange(min(start), false, min(end + 1), true));
+            ranges.add(dataRange(max(start - 1), false, max(end), true));
+            ranges.add(dataRange(dk(start - 1), false, dk(start - 1), true));
+        }
+        else
+        {
+            for (RowPosition s : starts(start, inclusiveStart))
+            {
+                for (RowPosition e : ends(end, inclusiveEnd))
+                {
+                    if (end < start && e.compareTo(s) > 0)
+                        continue;
+                    if (!isEmpty(new AbstractBounds.Boundary<>(s, 
inclusiveStart), new AbstractBounds.Boundary<>(e, inclusiveEnd)))
+                        continue;
+                    ranges.add(dataRange(s, inclusiveStart, e, inclusiveEnd));
+                }
+            }
+        }
+        return ranges;
+    }
+
+    private static Iterable<RowPosition> starts(int key, boolean inclusive)
+    {
+        return Arrays.asList(min(key), max(key - 1), dk(inclusive ? key : key 
- 1));
+    }
+
+    private static Iterable<RowPosition> ends(int key, boolean inclusive)
+    {
+        return Arrays.asList(max(key), min(key + 1), dk(inclusive ? key : key 
+ 1));
+    }
+
+    private static DecoratedKey dk(int key)
+    {
+        return Util.dk(toKey(key));
+    }
+
+    private static Token token(int key)
+    {
+        return key == Integer.MIN_VALUE ? ByteOrderedPartitioner.MINIMUM : new 
BytesToken(toKey(key).getBytes());
+    }
+
+    private static RowPosition min(int key)
+    {
+        return token(key).minKeyBound();
+    }
+
+    private static RowPosition max(int key)
+    {
+        return token(key).maxKeyBound();
+    }
+
+    private static DataRange dataRange(RowPosition start, boolean 
startInclusive, RowPosition end, boolean endInclusive)
+    {
+        return new DataRange(AbstractBounds.bounds(start, startInclusive, end, 
endInclusive), new IdentityQueryFilter());
+    }
 
     private static Range<Token> rangeFor(int start, int end)
     {
         return new Range<Token>(new BytesToken(toKey(start).getBytes()),
-                                new BytesToken(toKey(end).getBytes()));
+                                end == Integer.MIN_VALUE ? 
ByteOrderedPartitioner.MINIMUM : new BytesToken(toKey(end).getBytes()));
     }
 
     private static Collection<Range<Token>> makeRanges(int ... keys)
@@ -75,18 +141,22 @@ public class SSTableScannerTest extends SchemaLoader
         rm.apply();
     }
 
-    private static void assertScanMatches(SSTableReader sstable, int 
scanStart, int scanEnd, int expectedStart, int expectedEnd)
+    private static void assertScanMatches(SSTableReader sstable, int 
scanStart, int scanEnd, int ... boundaries)
     {
-        ISSTableScanner scanner = sstable.getScanner(new 
DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter()));
-        for (int i = expectedStart; i <= expectedEnd; i++)
-            assertEquals(toKey(i), new 
String(scanner.next().getKey().getKey().array()));
-        assertFalse(scanner.hasNext());
+        assert boundaries.length % 2 == 0;
+        for (DataRange range : dataRanges(scanStart, scanEnd))
+        {
+            ISSTableScanner scanner = sstable.getScanner(range);
+            for (int b = 0 ; b < boundaries.length ; b += 2)
+                for (int i = boundaries[b] ; i <= boundaries[b + 1] ; i++)
+                    assertEquals(toKey(i), new 
String(scanner.next().getKey().getKey().array()));
+            assertFalse(scanner.hasNext());
+        }
     }
 
     private static void assertScanEmpty(SSTableReader sstable, int scanStart, 
int scanEnd)
     {
-        ISSTableScanner scanner = sstable.getScanner(new 
DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter()));
-        assertFalse(String.format("scan of (%03d, %03d] should be empty", 
scanStart, scanEnd), scanner.hasNext());
+        assertScanMatches(sstable, scanStart, scanEnd);
     }
 
     @Test
@@ -132,6 +202,45 @@ public class SSTableScannerTest extends SchemaLoader
         // empty ranges
         assertScanEmpty(sstable, 0, 1);
         assertScanEmpty(sstable, 10, 11);
+
+        // wrapping, starts in middle
+        assertScanMatches(sstable, 5, 3, 2, 3, 5, 9);
+        assertScanMatches(sstable, 5, 2, 2, 2, 5, 9);
+        assertScanMatches(sstable, 5, 1, 5, 9);
+        assertScanMatches(sstable, 5, Integer.MIN_VALUE, 5, 9);
+        // wrapping, starts at end
+        assertScanMatches(sstable, 9, 8, 2, 8, 9, 9);
+        assertScanMatches(sstable, 9, 3, 2, 3, 9, 9);
+        assertScanMatches(sstable, 9, 2, 2, 2, 9, 9);
+        assertScanMatches(sstable, 9, 1, 9, 9);
+        assertScanMatches(sstable, 9, Integer.MIN_VALUE, 9, 9);
+        assertScanMatches(sstable, 8, 3, 2, 3, 8, 9);
+        assertScanMatches(sstable, 8, 2, 2, 2, 8, 9);
+        assertScanMatches(sstable, 8, 1, 8, 9);
+        assertScanMatches(sstable, 8, Integer.MIN_VALUE, 8, 9);
+        // wrapping, starts past end
+        assertScanMatches(sstable, 10, 9, 2, 9);
+        assertScanMatches(sstable, 10, 5, 2, 5);
+        assertScanMatches(sstable, 10, 2, 2, 2);
+        assertScanEmpty(sstable, 10, 1);
+        assertScanEmpty(sstable, 10, Integer.MIN_VALUE);
+        assertScanMatches(sstable, 11, 10, 2, 9);
+        assertScanMatches(sstable, 11, 9, 2, 9);
+        assertScanMatches(sstable, 11, 5, 2, 5);
+        assertScanMatches(sstable, 11, 2, 2, 2);
+        assertScanEmpty(sstable, 11, 1);
+        assertScanEmpty(sstable, 11, Integer.MIN_VALUE);
+        // wrapping, starts at start
+        assertScanMatches(sstable, 3, 1, 3, 9);
+        assertScanMatches(sstable, 3, Integer.MIN_VALUE, 3, 9);
+        assertScanMatches(sstable, 2, 1, 2, 9);
+        assertScanMatches(sstable, 2, Integer.MIN_VALUE, 2, 9);
+        assertScanMatches(sstable, 1, 0, 2, 9);
+        assertScanMatches(sstable, 1, Integer.MIN_VALUE, 2, 9);
+        // wrapping, starts before
+        assertScanMatches(sstable, 1, -1, 2, 9);
+        assertScanMatches(sstable, 1, Integer.MIN_VALUE, 2, 9);
+        assertScanMatches(sstable, 1, 0, 2, 9);
     }
 
     private static void assertScanContainsRanges(ISSTableScanner scanner, int 
... rangePairs)

Reply via email to