Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 bd89f5623 -> 7aa89a64e


Fix queries with LIMIT and filtering on clustering columns

patch by Benjamin Lerer; reviewed by Stefania Alborghetti for CASSANDRA-11223


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b08843de
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b08843de
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b08843de

Branch: refs/heads/cassandra-3.11
Commit: b08843de67b3c63fa9c0efe10bb9eda07c007f6c
Parents: 5b982d7
Author: Benjamin Lerer <b.le...@gmail.com>
Authored: Fri Jul 14 17:11:15 2017 +0200
Committer: Benjamin Lerer <b.le...@gmail.com>
Committed: Fri Jul 14 17:11:15 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnFamily.java   |   2 +-
 .../cassandra/db/filter/ColumnCounter.java      |  21 +-
 .../cassandra/db/filter/NamesQueryFilter.java   |   2 +-
 .../cassandra/db/filter/SliceQueryFilter.java   |  17 +-
 .../validation/operations/SelectLimitTest.java  | 209 +++++++++++++++++++
 6 files changed, 238 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 122ba54..bda510f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.11
+ * Fix queries with LIMIT and filtering on clustering columns (CASSANDRA-11223)
  * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272)
  * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592)
  * Fix nested Tuples/UDTs validation (CASSANDRA-13646)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java 
b/src/java/org/apache/cassandra/db/ColumnFamily.java
index a7243a2..1532439 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -92,7 +92,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, 
IRowCacheEntry
     {
         ColumnCounter counter = getComparator().isDense()
                               ? new ColumnCounter(now)
-                              : new ColumnCounter.GroupByPrefix(now, 
getComparator(), metadata.clusteringColumns().size());
+                              : new ColumnCounter.GroupByPrefix(now, 
getComparator(), metadata.clusteringColumns().size(), true);
         return counter.countAll(this).live();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java 
b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index 594fde8..a00d588 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -90,6 +90,7 @@ public class ColumnCounter
     {
         protected final CellNameType type;
         protected final int toGroup;
+        protected final boolean countPartitionsWithOnlyStaticData;
         protected CellName previous;
 
         /**
@@ -101,12 +102,15 @@ public class ColumnCounter
          * @param toGroup the number of composite components on which to group
          *                column. If 0, all columns are grouped, otherwise we 
group
          *                those for which the {@code toGroup} first component 
are equals.
+         * @param countPartitionsWithOnlyStaticData if {@code true} the 
partitions with only static data should be
+         * counted as 1 valid row.
          */
-        public GroupByPrefix(long timestamp, CellNameType type, int toGroup)
+        public GroupByPrefix(long timestamp, CellNameType type, int toGroup, 
boolean countPartitionsWithOnlyStaticData)
         {
             super(timestamp);
             this.type = type;
             this.toGroup = toGroup;
+            this.countPartitionsWithOnlyStaticData = 
countPartitionsWithOnlyStaticData;
 
             assert toGroup == 0 || type != null;
         }
@@ -153,14 +157,16 @@ public class ColumnCounter
                 // We want to count the static group as 1 (CQL) row only if 
it's the only
                 // group in the partition. So, since we have already counted 
it at this point,
                 // just don't count the 2nd group if there is one and the 
first one was static
-                if (previous.isStatic())
+                if (previous.isStatic() && countPartitionsWithOnlyStaticData)
                 {
                     previous = current;
                     return true;
                 }
             }
 
-            live++;
+            if (!current.isStatic() || countPartitionsWithOnlyStaticData)
+                live++;
+
             previous = current;
 
             return true;
@@ -172,9 +178,14 @@ public class ColumnCounter
      */
     public static class GroupByPrefixReversed extends GroupByPrefix
     {
-        public GroupByPrefixReversed(long timestamp, CellNameType type, int 
toGroup)
+        public GroupByPrefixReversed(long timestamp, CellNameType type, int 
toGroup, boolean countPartitionsWithOnlyStaticData)
         {
-            super(timestamp, type, toGroup);
+            // GroupByPrefixReversed ignores countPartitionsWithOnlyStaticData 
because the original problem (CASSANDRA-11223)
+            // only affect range queries and multi-partition queries. Range 
queries do not accept an ORDER BY clause.
+            // Multi-partition queries only accept an ORDER BY clause when 
paging is off. The limit in this case is used
+            // only when the rows with only static data have already been 
discarded. So, in practice
+            // changing GroupByPrefixReversed.count() has no effect.
+            super(timestamp, type, toGroup, countPartitionsWithOnlyStaticData);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java 
b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index c8f63bb..74ca1fd 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -183,7 +183,7 @@ public class NamesQueryFilter implements IDiskAtomFilter
     public ColumnCounter columnCounter(CellNameType comparator, long now)
     {
         return countCQL3Rows
-             ? new ColumnCounter.GroupByPrefix(now, null, 0)
+             ? new ColumnCounter.GroupByPrefix(now, null, 0, false)
              : new ColumnCounter(now);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java 
b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 822d838..95f67e6 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -26,7 +26,6 @@ import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -344,12 +343,16 @@ public class SliceQueryFilter implements IDiskAtomFilter
     {
         if (compositesToGroup < 0)
             return new ColumnCounter(now);
-        else if (compositesToGroup == 0)
-            return new ColumnCounter.GroupByPrefix(now, null, 0);
-        else if (reversed)
-            return new ColumnCounter.GroupByPrefixReversed(now, comparator, 
compositesToGroup);
-        else
-            return new ColumnCounter.GroupByPrefix(now, comparator, 
compositesToGroup);
+
+        boolean countPartitionsWithOnlyStaticData = Arrays.equals(slices, 
ColumnSlice.ALL_COLUMNS_ARRAY);
+
+        if (compositesToGroup == 0)
+            return new ColumnCounter.GroupByPrefix(now, null, 0, 
countPartitionsWithOnlyStaticData);
+
+        if (reversed)
+            return new ColumnCounter.GroupByPrefixReversed(now, comparator, 
compositesToGroup, countPartitionsWithOnlyStaticData);
+
+        return new ColumnCounter.GroupByPrefix(now, comparator, 
compositesToGroup, countPartitionsWithOnlyStaticData);
     }
 
     public ColumnFamily trim(ColumnFamily cf, int trimTo, long now)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
index 5fbf36d..0ffb799 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
@@ -129,4 +129,213 @@ public class SelectLimitTest extends CQLTester
         // strict bound (v > 1) over a range of partitions is not supported 
for compact storage if limit is provided
         assertInvalidThrow(InvalidRequestException.class, "SELECT * FROM %s 
WHERE v > 1 AND v <= 3 LIMIT 6 ALLOW FILTERING");
     }
+
+    @Test
+    public void testFilteringOnClusteringColumnsWithLimitAndStaticColumns() 
throws Throwable
+    {
+        // With only one clustering column
+        createTable("CREATE TABLE %s (a int, b int, s int static, c int, 
primary key (a, b))"
+          + " WITH caching = {'keys': 'ALL', 'rows_per_partition' : 'ALL'}");
+
+        for (int i = 0; i < 4; i++)
+        {
+            execute("INSERT INTO %s (a, s) VALUES (?, ?)", i, i);
+                for (int j = 0; j < 3; j++)
+                    if (!((i == 0 || i == 3) && j == 1))
+                        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 
i, j, i + j);
+        }
+
+        for (boolean forceFlush : new boolean[]{false, true})
+        {
+            if (forceFlush)
+                flush();
+
+            assertRows(execute("SELECT * FROM %s"),
+                       row(0, 0, 0, 0),
+                       row(0, 2, 0, 2),
+                       row(1, 0, 1, 1),
+                       row(1, 1, 1, 2),
+                       row(1, 2, 1, 3),
+                       row(2, 0, 2, 2),
+                       row(2, 1, 2, 3),
+                       row(2, 2, 2, 4),
+                       row(3, 0, 3, 3),
+                       row(3, 2, 3, 5));
+
+            assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW FILTERING"),
+                       row(1, 1, 1, 2),
+                       row(2, 1, 2, 3));
+
+            // The problem was that the static row of the partition 0 used to 
be only filtered in SelectStatement and was
+            // by consequence counted as a row. In which case the query was 
returning one row less.
+            assertRows(execute("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW 
FILTERING"),
+                       row(1, 1, 1, 2),
+                       row(2, 1, 2, 3));
+
+            assertRows(execute("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 
2 ALLOW FILTERING"),
+                       row(1, 1, 1, 2),
+                       row(2, 1, 2, 3));
+
+            // Test with paging
+            for (int pageSize = 1; pageSize < 4; pageSize++)
+            {
+                assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 
1 LIMIT 2 ALLOW FILTERING", pageSize),
+                              row(1, 1, 1, 2),
+                              row(2, 1, 2, 3));
+
+                assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b 
>= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                              row(1, 1, 1, 2),
+                              row(2, 1, 2, 3));
+            }
+        }
+
+        assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b = 1 
LIMIT 2 ALLOW FILTERING"),
+                   row(1, 1, 1, 2),
+                   row(2, 1, 2, 3));
+
+        assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b >= 
1 AND b <= 1 LIMIT 2 ALLOW FILTERING"),
+                   row(1, 1, 1, 2),
+                   row(2, 1, 2, 3));
+
+        assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b = 1 
ORDER BY b DESC LIMIT 2 ALLOW FILTERING"),
+                   row(2, 1, 2, 3),
+                   row(1, 1, 1, 2));
+
+        assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b >= 
1 AND b <= 1 ORDER BY b DESC LIMIT 2 ALLOW FILTERING"),
+                   row(2, 1, 2, 3),
+                   row(1, 1, 1, 2));
+
+        execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3)"); // Load all data 
in the row cache
+
+        assertRows(execute("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW 
FILTERING"),
+                   row(1, 1, 1, 2),
+                   row(2, 1, 2, 3));
+
+        assertRows(execute("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 2 
ALLOW FILTERING"),
+                   row(1, 1, 1, 2),
+                   row(2, 1, 2, 3));
+
+        // Test with paging
+        for (int pageSize = 1; pageSize < 4; pageSize++)
+        {
+            assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 
LIMIT 2 ALLOW FILTERING", pageSize),
+                          row(1, 1, 1, 2),
+                          row(2, 1, 2, 3));
+
+            assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b >= 1 
AND b <= 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                          row(1, 1, 1, 2),
+                          row(2, 1, 2, 3));
+        }
+
+        // With multiple clustering columns
+        createTable("CREATE TABLE %s (a int, b int, c int, s int static, d 
int, primary key (a, b, c))"
+                + " WITH caching = {'keys': 'ALL', 'rows_per_partition' : 
'ALL'}");
+
+        for (int i = 0; i < 3; i++)
+        {
+            execute("INSERT INTO %s (a, s) VALUES (?, ?)", i, i);
+            for (int j = 0; j < 3; j++)
+                if (!(i == 0 && j == 1))
+                    execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 
i, j, j, i + j);
+        }
+
+        for (boolean forceFlush : new boolean[]{false, true})
+        {
+            if (forceFlush)
+                flush();
+
+            assertRows(execute("SELECT * FROM %s"),
+                       row(0, 0, 0, 0, 0),
+                       row(0, 2, 2, 0, 2),
+                       row(1, 0, 0, 1, 1),
+                       row(1, 1, 1, 1, 2),
+                       row(1, 2, 2, 1, 3),
+                       row(2, 0, 0, 2, 2),
+                       row(2, 1, 1, 2, 3),
+                       row(2, 2, 2, 2, 4));
+
+            assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW FILTERING"),
+                       row(1, 1, 1, 1, 2),
+                       row(2, 1, 1, 2, 3));
+
+            assertRows(execute("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c 
>= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING"),
+                       row(1, 1, 1, 1, 2),
+                       row(2, 1, 1, 2, 3));
+
+            // Test with paging
+            for (int pageSize = 1; pageSize < 4; pageSize++)
+            {
+                assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 
1 LIMIT 2 ALLOW FILTERING", pageSize),
+                              row(1, 1, 1, 1, 2),
+                              row(2, 1, 1, 2, 3));
+
+                assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b 
IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                              row(1, 1, 1, 1, 2),
+                              row(2, 1, 1, 2, 3));
+            }
+        }
+
+        execute("SELECT * FROM %s WHERE a IN (0, 1, 2)"); // Load data in the 
row cache
+
+        assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW FILTERING"),
+                   row(1, 1, 1, 1, 2),
+                   row(2, 1, 1, 2, 3));
+
+        assertRows(execute("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 
1 AND c <= 1 LIMIT 2 ALLOW FILTERING"),
+                   row(1, 1, 1, 1, 2),
+                   row(2, 1, 1, 2, 3));
+
+        // Test with paging
+        for (int pageSize = 1; pageSize < 4; pageSize++)
+        {
+            assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 
LIMIT 2 ALLOW FILTERING", pageSize),
+                          row(1, 1, 1, 1, 2),
+                          row(2, 1, 1, 2, 3));
+
+            assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b IN 
(1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                          row(1, 1, 1, 1, 2),
+                          row(2, 1, 1, 2, 3));
+        }
+    }
+
+    @Test
+    public void testIndexOnRegularColumnWithPartitionWithoutRows() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, c int, s int static, v int, 
PRIMARY KEY(pk, c))");
+        createIndex("CREATE INDEX ON %s (v)");
+
+        execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 1, 9, 
1);
+        execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 2, 9, 
2);
+        execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 3, 1, 9, 
1);
+        execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 4, 1, 9, 
1);
+        flush();
+
+        assertRows(execute("SELECT * FROM %s WHERE v = ?", 1),
+                   row(1, 1, 9, 1),
+                   row(3, 1, 9, 1),
+                   row(4, 1, 9, 1));
+
+        execute("DELETE FROM %s WHERE pk = ? AND c = ?", 3, 1);
+
+        // Test without paging
+        assertRows(execute("SELECT * FROM %s WHERE v = ?", 1),
+                   row(1, 1, 9, 1),
+                   row(4, 1, 9, 1));
+
+        assertRows(execute("SELECT * FROM %s WHERE v = ? LIMIT 2", 1),
+                   row(1, 1, 9, 1),
+                   row(4, 1, 9, 1));
+
+        // Test with paging
+        for (int pageSize = 1; pageSize < 4; pageSize++)
+        {
+            assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE v = 1", 
pageSize),
+                          row(1, 1, 9, 1),
+                          row(4, 1, 9, 1));
+
+            assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE v = 1 
LIMIT 2", pageSize),
+                          row(1, 1, 9, 1),
+                          row(4, 1, 9, 1));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to