Updated Branches: refs/heads/trunk 4f5242cfb -> 36610aaf7
Correctly handle limits in CompositesSearcher patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for CASSANDRA-5975 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/caef32e5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/caef32e5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/caef32e5 Branch: refs/heads/trunk Commit: caef32e5d67c3b093de53bf99479cd457169178c Parents: 30f5e56 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Sep 10 17:41:04 2013 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Sep 10 17:41:04 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../db/index/composites/CompositesSearcher.java | 55 +++++++++++--------- 3 files changed, 33 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/caef32e5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a282670..2328bf7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ * Make user-defined compaction JMX blocking (CASSANDRA-4952) * Fix streaming does not transfer wrapped range (CASSANDRA-5948) * Fix loading index summary containing empty key (CASSANDRA-5965) + * Correctly handle limits in CompositesSearcher (CASSANDRA-5975) 1.2.9 http://git-wip-us.apache.org/repos/asf/cassandra/blob/caef32e5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 22b1dd5..745b5ba 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1497,7 +1497,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean try { - while (rowIterator.hasNext() && rows.size() < filter.maxRows() && columnsCount < filter.maxColumns()) + while (rowIterator.hasNext() && matched < filter.maxRows() && columnsCount < filter.maxColumns()) { // get the raw columns requested, and additional columns for the expressions if necessary Row rawRow = rowIterator.next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/caef32e5/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java index 4817a00..1e9d59d 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex; @@ -34,8 +37,6 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.thrift.IndexOperator; import org.apache.cassandra.utils.ByteBufferUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class CompositesSearcher extends SecondaryIndexSearcher { @@ -111,10 +112,10 @@ public class CompositesSearcher extends SecondaryIndexSearcher final CompositeType baseComparator = (CompositeType)baseCfs.getComparator(); final CompositeType indexComparator = (CompositeType)index.getIndexCfs().getComparator(); - CompositeType.Builder builder = null; + final ByteBuffer startPrefix; if (startKey.remaining() > 0) { - builder = indexComparator.builder().add(startKey); + CompositeType.Builder builder = indexComparator.builder().add(startKey); // For names filter, we have no choice but to query from the beginning of the key. This can be highly inefficient however. if (filter.originalFilter() instanceof SliceQueryFilter) { @@ -122,12 +123,17 @@ public class CompositesSearcher extends SecondaryIndexSearcher for (int i = 0; i < Math.min(prefixSize, components.length); ++i) builder.add(components[i]); } + startPrefix = builder.build(); + } + else + { + startPrefix = ByteBufferUtil.EMPTY_BYTE_BUFFER; } - final ByteBuffer startPrefix = startKey.remaining() == 0 ? ByteBufferUtil.EMPTY_BYTE_BUFFER : builder.build(); + final ByteBuffer endPrefix; if (endKey.remaining() > 0) { - builder = indexComparator.builder().add(endKey); + CompositeType.Builder builder = indexComparator.builder().add(endKey); // For names filter, we have no choice but to query until the end of the key. This can be highly inefficient however. if (filter.originalFilter() instanceof SliceQueryFilter) { @@ -135,8 +141,12 @@ public class CompositesSearcher extends SecondaryIndexSearcher for (int i = 0; i < Math.min(prefixSize, components.length); ++i) builder.add(components[i]); } + endPrefix = builder.buildAsEndOfRange(); + } + else + { + endPrefix = ByteBufferUtil.EMPTY_BYTE_BUFFER; } - final ByteBuffer endPrefix = endKey.remaining() == 0 ? ByteBufferUtil.EMPTY_BYTE_BUFFER : builder.buildAsEndOfRange(); // We will need to filter clustering keys based on the user filter. If // it is a names filter, we are really interested on the clustering @@ -150,7 +160,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher { ByteBuffer first = ((NamesQueryFilter)filter.originalFilter()).columns.iterator().next(); ByteBuffer[] components = baseComparator.split(first); - builder = baseComparator.builder(); + CompositeType.Builder builder = baseComparator.builder(); // All all except the last component, since it's the column name for (int i = 0; i < components.length - 1; i++) builder.add(components[i]); @@ -160,11 +170,13 @@ public class CompositesSearcher extends SecondaryIndexSearcher return new ColumnFamilyStore.AbstractScanIterator() { private ByteBuffer lastSeenPrefix = startPrefix; - private Deque<IColumn> indexColumns; + private ArrayDeque<IColumn> indexColumns; private final QueryPath path = new QueryPath(baseCfs.columnFamily); private int columnsRead = Integer.MAX_VALUE; + private int limit = ((SliceQueryFilter)filter.initialFilter()).count; + private int columnsCount = 0; - private final int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1); + private int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1); // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses private final int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2); @@ -176,33 +188,27 @@ public class CompositesSearcher extends SecondaryIndexSearcher private Row makeReturn(DecoratedKey key, ColumnFamily data) { if (data == null) - { return endOfData(); - } - else - { - assert key != null; - return new Row(key, data); - } + + assert key != null; + return new Row(key, data); } protected Row computeNext() { /* - * Our internal index code is wired toward internal rows. So we need to acumulate all results for a given + * Our internal index code is wired toward internal rows. So we need to accumulate all results for a given * row before returning from this method. Which unfortunately means that this method has to do what * CFS.filter does for KeysIndex. */ DecoratedKey currentKey = null; ColumnFamily data = null; - int columnsCount = 0; - int limit = ((SliceQueryFilter)filter.initialFilter()).count; while (true) { - // Did we got more columns that needed to respect the user limit? - // (but we still need to return was fetch already) - if (columnsCount > limit) + // Did we get more columns that needed to respect the user limit? + // (but we still need to return what has been fetched already) + if (columnsCount >= limit) return makeReturn(currentKey, data); if (indexColumns == null || indexColumns.isEmpty()) @@ -229,7 +235,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher Collection<IColumn> sortedColumns = indexRow.getSortedColumns(); columnsRead = sortedColumns.size(); - indexColumns = new ArrayDeque(sortedColumns); + indexColumns = new ArrayDeque<IColumn>(sortedColumns); IColumn firstColumn = sortedColumns.iterator().next(); // Paging is racy, so it is possible the first column of a page is not the last seen one. @@ -283,6 +289,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher logger.trace("Reached end of assigned scan range"); return endOfData(); } + if (!range.contains(dk)) { logger.debug("Skipping entry {} outside of assigned scan range", dk.token);