Updated Branches:
  refs/heads/trunk db59808bf -> 0456b7eb2

Allow doing a range slice with a limit of columns instead of rows

patch by slebresne; reviewed by jbellis for CASSANDRA-3742


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0456b7eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0456b7eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0456b7eb

Branch: refs/heads/trunk
Commit: 0456b7eb231275fba368d35fb861b09096fdd8fb
Parents: db59808
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Tue Jan 17 09:40:04 2012 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Tue Jan 17 09:41:45 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 src/java/org/apache/cassandra/db/ColumnFamily.java |    4 -
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   21 +++++-
 .../org/apache/cassandra/db/RangeSliceCommand.java |   49 ++++++++++---
 src/java/org/apache/cassandra/db/Row.java          |    5 +
 .../apache/cassandra/db/filter/ExtendedFilter.java |   45 +++++++++--
 .../org/apache/cassandra/db/filter/IFilter.java    |    1 +
 .../cassandra/db/filter/NamesQueryFilter.java      |    4 +
 .../cassandra/db/filter/SliceQueryFilter.java      |   10 ++-
 .../cassandra/db/index/SecondaryIndexManager.java  |    4 +-
 .../cassandra/db/index/SecondaryIndexSearcher.java |    2 +-
 .../cassandra/db/index/keys/KeysSearcher.java      |   15 ++--
 .../cassandra/service/IndexScanVerbHandler.java    |    8 +-
 .../cassandra/service/RangeSliceVerbHandler.java   |    4 +-
 .../org/apache/cassandra/service/StorageProxy.java |   23 +++++-
 .../apache/cassandra/db/ColumnFamilyStoreTest.java |   58 ++++++++++++++-
 16 files changed, 204 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 704630e..68147d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -43,6 +43,8 @@
  * Fix ClassCastException during hinted handoff (CASSANDRA-3694)
  * Upgrade Thrift to 0.7 (CASSANDRA-3213)
  * Make stress.java insert operation to use microseconds (CASSANDRA-3725)
+ * Allows (internally) doing a range query with a limit of columns instead of
+   rows (CASSANDRA-3742)
 
 
 1.0.7

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java 
b/src/java/org/apache/cassandra/db/ColumnFamily.java
index af089cb..ee92c44 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -124,10 +124,6 @@ public class ColumnFamily extends AbstractColumnContainer
         return cfm;
     }
 
-    /**
-     * FIXME: shouldn't need to hold a reference to a serializer; worse, for 
super cfs,
-     * it will be a _unique_ serializer object per row
-     */
     public IColumnSerializer getColumnSerializer()
     {
         return cfm.getColumnSerializer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 2cc2afe..805c55c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1290,20 +1290,31 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public List<Row> getRangeSlice(ByteBuffer superColumn, final 
AbstractBounds<RowPosition> range, int maxResults, IFilter columnFilter, 
List<IndexExpression> rowFilter)
     {
-        return filter(getSequentialIterator(superColumn, range, columnFilter), 
ExtendedFilter.create(this, columnFilter, rowFilter, maxResults));
+        return getRangeSlice(superColumn, range, maxResults, columnFilter, 
rowFilter, false);
+    }
+
+    public List<Row> getRangeSlice(ByteBuffer superColumn, final 
AbstractBounds<RowPosition> range, int maxResults, IFilter columnFilter, 
List<IndexExpression> rowFilter, boolean maxIsColumns)
+    {
+        return filter(getSequentialIterator(superColumn, range, columnFilter), 
ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, maxIsColumns));
     }
 
     public List<Row> search(List<IndexExpression> clause, 
AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter)
     {
-        return indexManager.search(clause, range, maxResults, dataFilter);
+        return search(clause, range, maxResults, dataFilter, false);
+    }
+
+    public List<Row> search(List<IndexExpression> clause, 
AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter, boolean 
maxIsColumns)
+    {
+        return indexManager.search(clause, range, maxResults, dataFilter, 
maxIsColumns);
     }
 
     public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter 
filter)
     {
          List<Row> rows = new ArrayList<Row>();
+         int columnsCount = 0;
          try
          {
-             while (rowIterator.hasNext() && rows.size() < filter.maxResults)
+             while (rowIterator.hasNext() && rows.size() < filter.maxRows() && 
columnsCount < filter.maxColumns())
              {
                  // get the raw columns requested, and additional columns for 
the expressions if necessary
                  Row rawRow = rowIterator.next();
@@ -1326,6 +1337,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                  // cut the resultset back to what was requested, if necessary
                  data = filter.prune(data);
                  rows.add(new Row(rawRow.key, data));
+                 if (data != null)
+                     columnsCount += data.getLiveColumnCount();
+                 // Update the underlying filter to avoid querying more 
columns per slice than necessary
+                 filter.updateColumnsLimit(columnsCount);
              }
              return rows;
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java 
b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 4ab821e..d7310b4 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -74,27 +74,44 @@ public class RangeSliceCommand implements MessageProducer, 
IReadCommand
     public final List<IndexExpression> row_filter;
 
     public final AbstractBounds<RowPosition> range;
-    public final int max_keys;
+    public final int maxResults;
+    public final boolean maxIsColumns;
 
-    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer 
super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, int 
max_keys)
+    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer 
super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, int 
maxResults)
     {
-        this(keyspace, column_family, super_column, predicate, range, null, 
max_keys);
+        this(keyspace, column_family, super_column, predicate, range, null, 
maxResults, false);
     }
 
-    public RangeSliceCommand(String keyspace, ColumnParent column_parent, 
SlicePredicate predicate, AbstractBounds<RowPosition> range, 
List<IndexExpression> row_filter, int max_keys)
+    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer 
super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, int 
maxResults, boolean maxIsColumns)
     {
-        this(keyspace, column_parent.getColumn_family(), 
column_parent.super_column, predicate, range, row_filter, max_keys);
+        this(keyspace, column_family, super_column, predicate, range, null, 
maxResults, maxIsColumns);
     }
 
-    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer 
super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, 
List<IndexExpression> row_filter, int max_keys)
+    public RangeSliceCommand(String keyspace, ColumnParent column_parent, 
SlicePredicate predicate, AbstractBounds<RowPosition> range, 
List<IndexExpression> row_filter, int maxResults)
+    {
+        this(keyspace, column_parent.getColumn_family(), 
column_parent.super_column, predicate, range, row_filter, maxResults, false);
+    }
+
+    public RangeSliceCommand(String keyspace, ColumnParent column_parent, 
SlicePredicate predicate, AbstractBounds<RowPosition> range, 
List<IndexExpression> row_filter, int maxResults, boolean maxIsColumns)
+    {
+        this(keyspace, column_parent.getColumn_family(), 
column_parent.super_column, predicate, range, row_filter, maxResults, 
maxIsColumns);
+    }
+
+    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer 
super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, 
List<IndexExpression> row_filter, int maxResults)
+    {
+        this(keyspace, column_family, super_column, predicate, range, 
row_filter, maxResults, false);
+    }
+
+    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer 
super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, 
List<IndexExpression> row_filter, int maxResults, boolean maxIsColumns)
     {
         this.keyspace = keyspace;
         this.column_family = column_family;
         this.super_column = super_column;
         this.predicate = predicate;
         this.range = range;
-        this.max_keys = max_keys;
         this.row_filter = row_filter;
+        this.maxResults = maxResults;
+        this.maxIsColumns = maxIsColumns;
     }
 
     public Message getMessage(Integer version) throws IOException
@@ -115,8 +132,9 @@ public class RangeSliceCommand implements MessageProducer, 
IReadCommand
                ", super_column=" + super_column +
                ", predicate=" + predicate +
                ", range=" + range +
-               ", max_keys=" + max_keys +
                ", row_filter =" + row_filter +
+               ", maxResults=" + maxResults +
+               ", maxIsColumns=" + maxIsColumns +
                '}';
     }
 
@@ -161,7 +179,11 @@ class RangeSliceCommandSerializer implements 
IVersionedSerializer<RangeSliceComm
             }
         }
         AbstractBounds.serializer().serialize(sliceCommand.range, dos, 
version);
-        dos.writeInt(sliceCommand.max_keys);
+        dos.writeInt(sliceCommand.maxResults);
+        if (version >= MessagingService.VERSION_11)
+        {
+            dos.writeBoolean(sliceCommand.maxIsColumns);
+        }
     }
 
     public RangeSliceCommand deserialize(DataInput dis, int version) throws 
IOException
@@ -196,8 +218,13 @@ class RangeSliceCommandSerializer implements 
IVersionedSerializer<RangeSliceComm
         }
         AbstractBounds<RowPosition> range = 
AbstractBounds.serializer().deserialize(dis, version).toRowBounds();
 
-        int maxKeys = dis.readInt();
-        return new RangeSliceCommand(keyspace, columnFamily, superColumn, 
pred, range, rowFilter, maxKeys);
+        int maxResults = dis.readInt();
+        boolean maxIsColumns = false;
+        if (version >= MessagingService.VERSION_11)
+        {
+            maxIsColumns = dis.readBoolean();
+        }
+        return new RangeSliceCommand(keyspace, columnFamily, superColumn, 
pred, range, rowFilter, maxResults, maxIsColumns);
     }
 
     public long serializedSize(RangeSliceCommand rangeSliceCommand, int 
version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java 
b/src/java/org/apache/cassandra/db/Row.java
index 81cfc7b..8856acd 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -45,6 +45,11 @@ public class Row
         this.cf = cf;
     }
 
+    public int getLiveColumnCount()
+    {
+        return cf == null ? 0 : cf.getLiveColumnCount();
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java 
b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index bf56e99..205d3c8 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -23,24 +23,51 @@ public abstract class ExtendedFilter
     private static Logger logger = 
LoggerFactory.getLogger(ExtendedFilter.class);
 
     public final ColumnFamilyStore cfs;
-    public final int maxResults;
     protected final IFilter originalFilter;
+    private final int maxResults;
+    private final boolean maxIsColumns;
 
-    public static ExtendedFilter create(ColumnFamilyStore cfs, IFilter filter, 
List<IndexExpression> clause, int maxResults)
+    public static ExtendedFilter create(ColumnFamilyStore cfs, IFilter filter, 
List<IndexExpression> clause, int maxResults, boolean maxIsColumns)
     {
         if (clause == null || clause.isEmpty())
-            return new EmptyClauseFilter(cfs, filter, maxResults);
+            return new EmptyClauseFilter(cfs, filter, maxResults, 
maxIsColumns);
         else
-            return new FilterWithClauses(cfs, filter, clause, maxResults);
+            return new FilterWithClauses(cfs, filter, clause, maxResults, 
maxIsColumns);
     }
 
-    protected ExtendedFilter(ColumnFamilyStore cfs, IFilter filter, int 
maxResults)
+    protected ExtendedFilter(ColumnFamilyStore cfs, IFilter filter, int 
maxResults, boolean maxIsColumns)
     {
         assert cfs != null;
         assert filter != null;
         this.cfs = cfs;
         this.originalFilter = filter;
         this.maxResults = maxResults;
+        this.maxIsColumns = maxIsColumns;
+        if (maxIsColumns)
+            originalFilter.updateColumnsLimit(maxResults);
+    }
+
+    public int maxRows()
+    {
+        return maxIsColumns ? Integer.MAX_VALUE : maxResults;
+    }
+
+    public int maxColumns()
+    {
+        return maxIsColumns ? maxResults : Integer.MAX_VALUE;
+    }
+
+    /**
+     * Update the filter if necessary given the number of column already
+     * fetched.
+     */
+    public void updateColumnsLimit(int columnsCount)
+    {
+        if (!maxIsColumns)
+            return;
+
+        int remaining = maxResults - columnsCount;
+        initialFilter().updateColumnsLimit(remaining);
     }
 
     /** The initial filter we'll do our first slice with (either the original 
or a superset of it) */
@@ -90,9 +117,9 @@ public abstract class ExtendedFilter
         protected final List<IndexExpression> clause;
         protected final IFilter initialFilter;
 
-        public FilterWithClauses(ColumnFamilyStore cfs, IFilter filter, 
List<IndexExpression> clause, int maxResults)
+        public FilterWithClauses(ColumnFamilyStore cfs, IFilter filter, 
List<IndexExpression> clause, int maxResults, boolean maxIsColumns)
         {
-            super(cfs, filter, maxResults);
+            super(cfs, filter, maxResults, maxIsColumns);
             assert clause != null;
             this.clause = clause;
             this.initialFilter = computeInitialFilter();
@@ -217,9 +244,9 @@ public abstract class ExtendedFilter
 
     private static class EmptyClauseFilter extends ExtendedFilter
     {
-        public EmptyClauseFilter(ColumnFamilyStore cfs, IFilter filter, int 
maxResults)
+        public EmptyClauseFilter(ColumnFamilyStore cfs, IFilter filter, int 
maxResults, boolean maxIsColumns)
         {
-            super(cfs, filter, maxResults);
+            super(cfs, filter, maxResults, maxIsColumns);
         }
 
         public IFilter initialFilter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/filter/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IFilter.java 
b/src/java/org/apache/cassandra/db/filter/IFilter.java
index c3a0d05..e226ec6 100644
--- a/src/java/org/apache/cassandra/db/filter/IFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IFilter.java
@@ -75,4 +75,5 @@ public interface IFilter
     public Comparator<IColumn> getColumnComparator(AbstractType comparator);
 
     public boolean isReversed();
+    public void updateColumnsLimit(int newLimit);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java 
b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 9a44aaf..3403727 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -104,4 +104,8 @@ public class NamesQueryFilter implements IFilter
     {
         return false;
     }
+
+    public void updateColumnsLimit(int newLimit)
+    {
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java 
b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index d332134..efe9ec2 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -43,10 +43,9 @@ public class SliceQueryFilter implements IFilter
 {
     private static Logger logger = 
LoggerFactory.getLogger(SliceQueryFilter.class);
 
-    public final ByteBuffer start;
-    public final ByteBuffer finish;
+    public final ByteBuffer start; public final ByteBuffer finish;
     public final boolean reversed;
-    public final int count;
+    public volatile int count;
 
     public SliceQueryFilter(ByteBuffer start, ByteBuffer finish, boolean 
reversed, int count)
     {
@@ -155,4 +154,9 @@ public class SliceQueryFilter implements IFilter
     {
         return reversed;
     }
+
+    public void updateColumnsLimit(int newLimit)
+    {
+        count = newLimit;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 01b795f..12f9d4c 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -550,7 +550,7 @@ public class SecondaryIndexManager
      * @param dataFilter the column range to restrict to
      * @return found indexed rows
      */
-    public List<Row> search(List<IndexExpression> clause, 
AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter)
+    public List<Row> search(List<IndexExpression> clause, 
AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter, boolean 
maxIsColumns)
     {
         List<SecondaryIndexSearcher> indexSearchers = 
getIndexSearchersForQuery(clause);
                
@@ -562,6 +562,6 @@ public class SecondaryIndexManager
             throw new RuntimeException("Unable to search across multiple 
secondary index types");
         
         
-        return indexSearchers.get(0).search(clause, range, maxResults, 
dataFilter);
+        return indexSearchers.get(0).search(clause, range, maxResults, 
dataFilter, maxIsColumns);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 6365c81..a74b0de 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -40,7 +40,7 @@ public abstract class SecondaryIndexSearcher
         this.baseCfs = indexManager.baseCfs;
     }
 
-    public abstract List<Row> search(List<IndexExpression> clause, 
AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter);
+    public abstract List<Row> search(List<IndexExpression> clause, 
AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter, boolean 
maxIsColumns);
 
     /**
      * @return true this index is able to handle given clauses.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java 
b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 4f975eb..7237a87 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -84,10 +84,10 @@ public class KeysSearcher extends SecondaryIndexSearcher
     }
 
     @Override
-    public List<Row> search(List<IndexExpression> clause, 
AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter)
+    public List<Row> search(List<IndexExpression> clause, 
AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter, boolean 
maxIsColumns)
     {
         assert clause != null && !clause.isEmpty();
-        ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, 
clause, maxResults);
+        ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, 
clause, maxResults, maxIsColumns);
         return baseCfs.filter(getIndexedIterator(range, filter), filter);
     }
 
@@ -121,13 +121,16 @@ public class KeysSearcher extends SecondaryIndexSearcher
 
             protected Row computeNext()
             {
+                int meanColumns = 
Math.max(index.getIndexCfs().getMeanColumns(), 1);
+                // We shouldn't fetch only 1 row as this provides buggy paging 
in case the first row doesn't satisfy all clauses
+                int rowsPerQuery = Math.max(Math.min(filter.maxRows(), 
filter.maxColumns() / meanColumns), 2);
                 while (true)
                 {
                     if (indexColumns == null || !indexColumns.hasNext())
                     {
-                        if (columnsRead < filter.maxResults)
+                        if (columnsRead < rowsPerQuery)
                         {
-                            logger.debug("Read only {} (< {}) last page 
through, must be done", columnsRead, filter.maxResults);
+                            logger.debug("Read only {} (< {}) last page 
through, must be done", columnsRead, rowsPerQuery);
                             return endOfData();
                         }
 
@@ -135,14 +138,12 @@ public class KeysSearcher extends SecondaryIndexSearcher
                             logger.debug(String.format("Scanning index %s 
starting with %s",
                                                        
expressionString(primary), 
index.getBaseCfs().metadata.getKeyValidator().getString(startKey)));
 
-                        // We shouldn't fetch only 1 row as this provides 
buggy paging in case the first row doesn't satisfy all clauses
-                        int count = Math.max(filter.maxResults, 2);
                         QueryFilter indexFilter = 
QueryFilter.getSliceFilter(indexKey,
                                                                              
new QueryPath(index.getIndexCfs().getColumnFamilyName()),
                                                                              
lastSeenKey,
                                                                              
endKey,
                                                                              
false,
-                                                                             
count);
+                                                                             
rowsPerQuery);
                         ColumnFamily indexRow = 
index.getIndexCfs().getColumnFamily(indexFilter);
                         logger.debug("fetched {}", indexRow);
                         if (indexRow == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java 
b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index 066b2e4..a5a1b20 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -39,10 +39,10 @@ public class IndexScanVerbHandler implements IVerbHandler
         {
             IndexScanCommand command = IndexScanCommand.read(message);
             ColumnFamilyStore cfs = 
Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-            List<Row> rows = 
cfs.indexManager.search(command.index_clause.expressions,
-                                                     command.range,
-                                                     
command.index_clause.count,
-                                                     
QueryFilter.getFilter(command.predicate, cfs.getComparator()));
+            List<Row> rows = cfs.search(command.index_clause.expressions,
+                                        command.range,
+                                        command.index_clause.count,
+                                        
QueryFilter.getFilter(command.predicate, cfs.getComparator()));
             RangeSliceReply reply = new RangeSliceReply(rows);
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java 
b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index 2353b71..76823de 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -45,9 +45,9 @@ public class RangeSliceVerbHandler implements IVerbHandler
         IFilter columnFilter = QueryFilter.getFilter(command.predicate, 
cfs.getComparator());
 
         if (cfs.indexManager.hasIndexFor(command.row_filter))
-            return cfs.search(command.row_filter, command.range, 
command.max_keys, columnFilter);
+            return cfs.search(command.row_filter, command.range, 
command.maxResults, columnFilter, command.maxIsColumns);
         else
-            return cfs.getRangeSlice(command.super_column, command.range, 
command.max_keys, columnFilter, command.row_filter);
+            return cfs.getRangeSlice(command.super_column, command.range, 
command.maxResults, columnFilter, command.row_filter, command.maxIsColumns);
     }
 
     public void doVerb(Message message, String id)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 636e15d..4137632 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -826,7 +826,8 @@ public class StorageProxy implements StorageProxyMBean
         // now scan until we have enough results
         try
         {
-            rows = new ArrayList<Row>(command.max_keys);
+            int columnsCount = 0;
+            rows = new ArrayList<Row>();
             List<AbstractBounds<RowPosition>> ranges = 
getRestrictedRanges(command.range);
             for (AbstractBounds<RowPosition> range : ranges)
             {
@@ -836,7 +837,8 @@ public class StorageProxy implements StorageProxyMBean
                                                                   
command.predicate,
                                                                   range,
                                                                   
command.row_filter,
-                                                                  
command.max_keys);
+                                                                  
command.maxResults,
+                                                                  
command.maxIsColumns);
 
                 List<InetAddress> liveEndpoints = 
StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right);
                 
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(),
 liveEndpoints);
@@ -849,6 +851,8 @@ public class StorageProxy implements StorageProxyMBean
                     try
                     {
                         
rows.addAll(RangeSliceVerbHandler.executeLocally(nodeCmd));
+                        for (Row row : rows)
+                            columnsCount += row.getLiveColumnCount();
                     }
                     catch (ExecutionException e)
                     {
@@ -877,6 +881,7 @@ public class StorageProxy implements StorageProxyMBean
                         for (Row row : handler.get())
                         {
                             rows.add(row);
+                            columnsCount += row.getLiveColumnCount();
                             logger.debug("range slices read {}", row.key);
                         }
                         FBUtilities.waitOnFutures(resolver.repairResults, 
DatabaseDescriptor.getRpcTimeout());
@@ -894,7 +899,8 @@ public class StorageProxy implements StorageProxyMBean
                 }
 
                 // if we're done, great, otherwise, move to the next range
-                if (rows.size() >= nodeCmd.max_keys)
+                int count = nodeCmd.maxIsColumns ? columnsCount : rows.size();
+                if (count >= nodeCmd.maxResults)
                     break;
             }
         }
@@ -902,7 +908,16 @@ public class StorageProxy implements StorageProxyMBean
         {
             rangeStats.addNano(System.nanoTime() - startTime);
         }
-        return rows.size() > command.max_keys ? rows.subList(0, 
command.max_keys) : rows;
+        return trim(command, rows);
+    }
+
+    private static List<Row> trim(RangeSliceCommand command, List<Row> rows)
+    {
+        // When maxIsColumns, we let the caller trim the result.
+        if (command.maxIsColumns)
+            return rows;
+        else
+            return rows.size() > command.maxResults ? rows.subList(0, 
command.maxResults) : rows;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0456b7eb/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java 
b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index c5c4273..d2b0a0a 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -193,7 +193,7 @@ public class ColumnFamilyStoreTest extends CleanupHelper
         IFilter filter = new IdentityQueryFilter();
         IPartitioner p = StorageService.getPartitioner();
         Range<RowPosition> range = Util.range("", "");
-        List<Row> rows = 
Table.open("Keyspace1").getColumnFamilyStore("Indexed1").indexManager.search(clause,
 range, 100, filter);
+        List<Row> rows = 
Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 
100, filter);
 
         assert rows != null;
         assert rows.size() == 2 : StringUtils.join(rows, ",");
@@ -786,4 +786,60 @@ public class ColumnFamilyStoreTest extends CleanupHelper
         Column column = (Column) cf.getColumn(cname);
         assert column.value().equals(ByteBufferUtil.bytes("a")) : "expecting 
a, got " + ByteBufferUtil.string(column.value());
     }
+
+    private static void assertTotalColCount(Collection<Row> rows, int 
expectedCount) throws CharacterCodingException
+    {
+        int columns = 0;
+        for (Row row : rows)
+        {
+            columns += row.getLiveColumnCount();
+        }
+        assert columns == expectedCount : "Expected " + expectedCount + " live 
columns but got " + columns + ": " + rows;
+    }
+
+    @Test
+    public void testRangeSliceColumnsLimit() throws Throwable
+    {
+        String tableName = "Keyspace1";
+        String cfName = "Standard1";
+        Table table = Table.open(tableName);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+        cfs.clearUnsafe();
+
+        Column[] cols = new Column[5];
+        for (int i = 0; i < 5; i++)
+            cols[i] = column("c" + i, "value", 1);
+
+        putColsStandard(cfs, Util.dk("a"), cols[0], cols[1], cols[2], cols[3], 
cols[4]);
+        putColsStandard(cfs, Util.dk("b"), cols[0], cols[1]);
+        putColsStandard(cfs, Util.dk("c"), cols[0], cols[1], cols[2], cols[3]);
+        cfs.forceBlockingFlush();
+
+        SlicePredicate sp = new SlicePredicate();
+        sp.setSlice_range(new SliceRange());
+        sp.getSlice_range().setCount(1);
+        sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
+        sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
+
+        assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 3, 
QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 3);
+        assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 5, 
QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 5);
+        assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 8, 
QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 8);
+        assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 10, 
QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 10);
+        assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 100, 
QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 11);
+
+        // Check that when querying by name, we always include all names for a
+        // gien row even if it means returning more columns than requested 
(this is necesseray for CQL)
+        sp = new SlicePredicate();
+        sp.setColumn_names(Arrays.asList(
+            ByteBufferUtil.bytes("c0"),
+            ByteBufferUtil.bytes("c1"),
+            ByteBufferUtil.bytes("c2")
+        ));
+
+        assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 1, 
QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 3);
+        assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 4, 
QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 5);
+        assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 5, 
QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 5);
+        assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 6, 
QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 8);
+        assertTotalColCount(cfs.getRangeSlice(null, Util.range("", ""), 100, 
QueryFilter.getFilter(sp, cfs.getComparator()), null, true), 8);
+    }
 }

Reply via email to