Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 7de853bff -> 88d2ac4f2
Fix queries with LIMIT and filtering on clustering columns patch by Benjamin Lerer; reviewed by Stefania Alborghetti for CASSANDRA-11223 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b08843de Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b08843de Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b08843de Branch: refs/heads/cassandra-3.0 Commit: b08843de67b3c63fa9c0efe10bb9eda07c007f6c Parents: 5b982d7 Author: Benjamin Lerer <b.le...@gmail.com> Authored: Fri Jul 14 17:11:15 2017 +0200 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Fri Jul 14 17:11:15 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamily.java | 2 +- .../cassandra/db/filter/ColumnCounter.java | 21 +- .../cassandra/db/filter/NamesQueryFilter.java | 2 +- .../cassandra/db/filter/SliceQueryFilter.java | 17 +- .../validation/operations/SelectLimitTest.java | 209 +++++++++++++++++++ 6 files changed, 238 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 122ba54..bda510f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.11 + * Fix queries with LIMIT and filtering on clustering columns (CASSANDRA-11223) * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272) * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592) * Fix nested Tuples/UDTs validation (CASSANDRA-13646) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index a7243a2..1532439 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -92,7 +92,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry { ColumnCounter counter = getComparator().isDense() ? new ColumnCounter(now) - : new ColumnCounter.GroupByPrefix(now, getComparator(), metadata.clusteringColumns().size()); + : new ColumnCounter.GroupByPrefix(now, getComparator(), metadata.clusteringColumns().size(), true); return counter.countAll(this).live(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/src/java/org/apache/cassandra/db/filter/ColumnCounter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java index 594fde8..a00d588 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java @@ -90,6 +90,7 @@ public class ColumnCounter { protected final CellNameType type; protected final int toGroup; + protected final boolean countPartitionsWithOnlyStaticData; protected CellName previous; /** @@ -101,12 +102,15 @@ public class ColumnCounter * @param toGroup the number of composite components on which to group * column. If 0, all columns are grouped, otherwise we group * those for which the {@code toGroup} first component are equals. + * @param countPartitionsWithOnlyStaticData if {@code true} the partitions with only static data should be + * counted as 1 valid row. */ - public GroupByPrefix(long timestamp, CellNameType type, int toGroup) + public GroupByPrefix(long timestamp, CellNameType type, int toGroup, boolean countPartitionsWithOnlyStaticData) { super(timestamp); this.type = type; this.toGroup = toGroup; + this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData; assert toGroup == 0 || type != null; } @@ -153,14 +157,16 @@ public class ColumnCounter // We want to count the static group as 1 (CQL) row only if it's the only // group in the partition. So, since we have already counted it at this point, // just don't count the 2nd group if there is one and the first one was static - if (previous.isStatic()) + if (previous.isStatic() && countPartitionsWithOnlyStaticData) { previous = current; return true; } } - live++; + if (!current.isStatic() || countPartitionsWithOnlyStaticData) + live++; + previous = current; return true; @@ -172,9 +178,14 @@ public class ColumnCounter */ public static class GroupByPrefixReversed extends GroupByPrefix { - public GroupByPrefixReversed(long timestamp, CellNameType type, int toGroup) + public GroupByPrefixReversed(long timestamp, CellNameType type, int toGroup, boolean countPartitionsWithOnlyStaticData) { - super(timestamp, type, toGroup); + // GroupByPrefixReversed ignores countPartitionsWithOnlyStaticData because the original problem (CASSANDRA-11223) + // only affect range queries and multi-partition queries. Range queries do not accept an ORDER BY clause. + // Multi-partition queries only accept an ORDER BY clause when paging is off. The limit in this case is used + // only when the rows with only static data have already been discarded. So, in practice + // changing GroupByPrefixReversed.count() has no effect. + super(timestamp, type, toGroup, countPartitionsWithOnlyStaticData); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java index c8f63bb..74ca1fd 100644 --- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java @@ -183,7 +183,7 @@ public class NamesQueryFilter implements IDiskAtomFilter public ColumnCounter columnCounter(CellNameType comparator, long now) { return countCQL3Rows - ? new ColumnCounter.GroupByPrefix(now, null, 0) + ? new ColumnCounter.GroupByPrefix(now, null, 0, false) : new ColumnCounter(now); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index 822d838..95f67e6 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -26,7 +26,6 @@ import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.composites.*; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -344,12 +343,16 @@ public class SliceQueryFilter implements IDiskAtomFilter { if (compositesToGroup < 0) return new ColumnCounter(now); - else if (compositesToGroup == 0) - return new ColumnCounter.GroupByPrefix(now, null, 0); - else if (reversed) - return new ColumnCounter.GroupByPrefixReversed(now, comparator, compositesToGroup); - else - return new ColumnCounter.GroupByPrefix(now, comparator, compositesToGroup); + + boolean countPartitionsWithOnlyStaticData = Arrays.equals(slices, ColumnSlice.ALL_COLUMNS_ARRAY); + + if (compositesToGroup == 0) + return new ColumnCounter.GroupByPrefix(now, null, 0, countPartitionsWithOnlyStaticData); + + if (reversed) + return new ColumnCounter.GroupByPrefixReversed(now, comparator, compositesToGroup, countPartitionsWithOnlyStaticData); + + return new ColumnCounter.GroupByPrefix(now, comparator, compositesToGroup, countPartitionsWithOnlyStaticData); } public ColumnFamily trim(ColumnFamily cf, int trimTo, long now) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08843de/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java index 5fbf36d..0ffb799 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java @@ -129,4 +129,213 @@ public class SelectLimitTest extends CQLTester // strict bound (v > 1) over a range of partitions is not supported for compact storage if limit is provided assertInvalidThrow(InvalidRequestException.class, "SELECT * FROM %s WHERE v > 1 AND v <= 3 LIMIT 6 ALLOW FILTERING"); } + + @Test + public void testFilteringOnClusteringColumnsWithLimitAndStaticColumns() throws Throwable + { + // With only one clustering column + createTable("CREATE TABLE %s (a int, b int, s int static, c int, primary key (a, b))" + + " WITH caching = {'keys': 'ALL', 'rows_per_partition' : 'ALL'}"); + + for (int i = 0; i < 4; i++) + { + execute("INSERT INTO %s (a, s) VALUES (?, ?)", i, i); + for (int j = 0; j < 3; j++) + if (!((i == 0 || i == 3) && j == 1)) + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", i, j, i + j); + } + + for (boolean forceFlush : new boolean[]{false, true}) + { + if (forceFlush) + flush(); + + assertRows(execute("SELECT * FROM %s"), + row(0, 0, 0, 0), + row(0, 2, 0, 2), + row(1, 0, 1, 1), + row(1, 1, 1, 2), + row(1, 2, 1, 3), + row(2, 0, 2, 2), + row(2, 1, 2, 3), + row(2, 2, 2, 4), + row(3, 0, 3, 3), + row(3, 2, 3, 5)); + + assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + // The problem was that the static row of the partition 0 used to be only filtered in SelectStatement and was + // by consequence counted as a row. In which case the query was returning one row less. + assertRows(execute("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + } + } + + assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b = 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b = 1 ORDER BY b DESC LIMIT 2 ALLOW FILTERING"), + row(2, 1, 2, 3), + row(1, 1, 1, 2)); + + assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b >= 1 AND b <= 1 ORDER BY b DESC LIMIT 2 ALLOW FILTERING"), + row(2, 1, 2, 3), + row(1, 1, 1, 2)); + + execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3)"); // Load all data in the row cache + + assertRows(execute("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + } + + // With multiple clustering columns + createTable("CREATE TABLE %s (a int, b int, c int, s int static, d int, primary key (a, b, c))" + + " WITH caching = {'keys': 'ALL', 'rows_per_partition' : 'ALL'}"); + + for (int i = 0; i < 3; i++) + { + execute("INSERT INTO %s (a, s) VALUES (?, ?)", i, i); + for (int j = 0; j < 3; j++) + if (!(i == 0 && j == 1)) + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", i, j, j, i + j); + } + + for (boolean forceFlush : new boolean[]{false, true}) + { + if (forceFlush) + flush(); + + assertRows(execute("SELECT * FROM %s"), + row(0, 0, 0, 0, 0), + row(0, 2, 2, 0, 2), + row(1, 0, 0, 1, 1), + row(1, 1, 1, 1, 2), + row(1, 2, 2, 1, 3), + row(2, 0, 0, 2, 2), + row(2, 1, 1, 2, 3), + row(2, 2, 2, 2, 4)); + + assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW FILTERING"), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + } + } + + execute("SELECT * FROM %s WHERE a IN (0, 1, 2)"); // Load data in the row cache + + assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW FILTERING"), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + } + } + + @Test + public void testIndexOnRegularColumnWithPartitionWithoutRows() throws Throwable + { + createTable("CREATE TABLE %s (pk int, c int, s int static, v int, PRIMARY KEY(pk, c))"); + createIndex("CREATE INDEX ON %s (v)"); + + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 1, 9, 1); + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 2, 9, 2); + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 3, 1, 9, 1); + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 4, 1, 9, 1); + flush(); + + assertRows(execute("SELECT * FROM %s WHERE v = ?", 1), + row(1, 1, 9, 1), + row(3, 1, 9, 1), + row(4, 1, 9, 1)); + + execute("DELETE FROM %s WHERE pk = ? AND c = ?", 3, 1); + + // Test without paging + assertRows(execute("SELECT * FROM %s WHERE v = ?", 1), + row(1, 1, 9, 1), + row(4, 1, 9, 1)); + + assertRows(execute("SELECT * FROM %s WHERE v = ? LIMIT 2", 1), + row(1, 1, 9, 1), + row(4, 1, 9, 1)); + + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE v = 1", pageSize), + row(1, 1, 9, 1), + row(4, 1, 9, 1)); + + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE v = 1 LIMIT 2", pageSize), + row(1, 1, 9, 1), + row(4, 1, 9, 1)); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org